1use super::*;
25
26#[async_trait::async_trait]
27impl RuntimePersistence for Store {
28 fn durability_tier(&self) -> DurabilityTier {
29 DurabilityTier::Durable
30 }
31
32 async fn load_session(
33 &self,
34 scope: SessionReadScope,
35 ) -> Result<Option<PersistedSessionRead>, StoreError> {
36 self.conn
37 .call(move |conn| {
38 let outcome: Result<Option<PersistedSessionRead>, StoreError> = (|| {
39 let Some(meta) = try_load_session_head_meta_from_conn(conn)? else {
40 return Ok(None);
41 };
42 let leaf_node_id = match &scope {
43 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
44 SessionReadScope::ActivePath { leaf_node_id } => {
45 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
46 }
47 };
48 let mut graph = match scope {
49 SessionReadScope::FullGraph => {
50 Self::load_session_graph_from_conn(conn, meta.leaf_node_id.clone())
51 }
52 SessionReadScope::ActivePath { .. } => {
53 Self::load_active_path_session_graph_from_conn(
54 conn,
55 leaf_node_id.clone(),
56 )
57 .map_err(sqlite_error)?
58 }
59 };
60 graph.set_leaf_node_id(leaf_node_id);
61 let checkpoint = meta
62 .checkpoint_ref
63 .as_ref()
64 .and_then(|blob_ref| Self::get_checkpoint_conn(conn, blob_ref));
65 Ok(Some(PersistedSessionRead {
66 session_id: meta.session_id,
67 head_revision: meta.head_revision,
68 config: meta.config,
69 agent_frames: meta.agent_frames,
70 current_agent_frame_id: meta.current_agent_frame_id,
71 graph,
72 checkpoint_ref: meta.checkpoint_ref,
73 checkpoint,
74 token_ledger: merge_token_ledger_entries(Self::load_usage_deltas_conn(
75 conn,
76 )),
77 }))
78 })(
79 );
80 Ok(outcome)
81 })
82 .await
83 .map_err(sqlite_error)?
84 }
85
86 async fn load_node(
87 &self,
88 node_id: &str,
89 ) -> Result<Option<lash_core::SessionNodeRecord>, StoreError> {
90 let node_id = node_id.to_string();
91 let row: Option<String> = self
92 .conn
93 .call(move |conn| {
94 conn.query_row(
95 "SELECT node_json FROM graph_nodes WHERE node_id = ?1 AND tombstoned = 0",
96 params![node_id],
97 |row| row.get(0),
98 )
99 .optional()
100 })
101 .await
102 .map_err(sqlite_error)?;
103 Ok(row.and_then(|json| serde_json::from_str(&json).ok()))
104 }
105
106 async fn commit_runtime_state(
107 &self,
108 commit: RuntimeCommit,
109 ) -> Result<RuntimeCommitResult, StoreError> {
110 let blob_profile = self.options.blob_profile;
111 let result = self
112 .conn
113 .write_flow(move |tx| {
114 let outcome: Result<RuntimeCommitResult, StoreError> = (|| {
115 let existing = try_load_session_head_meta_from_conn(tx)?;
116 if let Some(bound_session_id) =
117 existing.as_ref().map(|meta| meta.session_id.as_str())
118 && bound_session_id != commit.session_id
119 {
120 return Err(StoreError::SessionBindingMismatch {
121 bound_session_id: bound_session_id.to_string(),
122 attempted_session_id: commit.session_id.clone(),
123 });
124 }
125 if let Some(completed) = &commit.turn_commit {
126 if completed.session_id != commit.session_id {
127 return Err(StoreError::RuntimeTurnCommitConflict {
128 session_id: completed.session_id.clone(),
129 turn_id: completed.turn_id.clone(),
130 });
131 }
132 let prior: Option<(String, String)> = tx
133 .query_row(
134 "SELECT turn_commit_hash, result_json FROM runtime_turn_commits
135 WHERE session_id = ?1 AND turn_id = ?2",
136 params![completed.session_id, completed.turn_id],
137 |row| Ok((row.get(0)?, row.get(1)?)),
138 )
139 .optional()
140 .map_err(sqlite_error)?;
141 if let Some((turn_commit_hash, result_json)) = prior {
142 if turn_commit_hash == completed.turn_commit_hash {
143 let result: RuntimeCommitResult =
144 serde_json::from_str(&result_json).map_err(|err| {
145 StoreError::Backend(format!(
146 "failed to decode runtime turn commit result: {err}"
147 ))
148 })?;
149 return Ok(result);
150 }
151 return Err(StoreError::RuntimeTurnCommitConflict {
152 session_id: completed.session_id.clone(),
153 turn_id: completed.turn_id.clone(),
154 });
155 }
156 }
157 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
158 if commit.expected_head_revision.is_some()
159 && commit.expected_head_revision != Some(actual_revision)
160 {
161 return Err(StoreError::HeadRevisionConflict {
162 expected: commit.expected_head_revision,
163 actual: actual_revision,
164 });
165 }
166 for completed in &commit.completed_queue_claims {
167 if completed.session_id != commit.session_id {
168 return Err(StoreError::QueuedWorkClaimExpired {
169 session_id: completed.session_id.clone(),
170 claim_id: completed.claim_id.clone(),
171 });
172 }
173 ensure_queued_work_completion_conn(tx, completed)?;
174 }
175
176 let stored_checkpoint =
177 Self::put_checkpoint_conn(tx, &commit.checkpoint, blob_profile)
178 .map_err(sqlite_error)?;
179
180 if !commit.usage_deltas.is_empty() {
181 let mut stmt = tx
182 .prepare(
183 "INSERT INTO usage_deltas (
184 source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
185 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
186 )
187 .map_err(sqlite_error)?;
188 for entry in &commit.usage_deltas {
189 stmt.execute(params![
190 entry.source,
191 entry.model,
192 entry.usage.input_tokens,
193 entry.usage.output_tokens,
194 entry.usage.cached_input_tokens,
195 entry.usage.reasoning_tokens,
196 ])
197 .map_err(sqlite_error)?;
198 }
199 }
200
201 let leaf_node_id = match &commit.graph {
202 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
203 GraphCommitDelta::Append {
204 nodes,
205 leaf_node_id,
206 } => {
207 for node in nodes {
208 let node_json = encode_json(node);
209 tx.execute(
210 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
211 params![node.node_id, node_json],
212 )
213 .map_err(sqlite_error)?;
214 }
215 leaf_node_id.clone()
216 }
217 GraphCommitDelta::ReplaceFull(graph) => {
218 tx.execute("DELETE FROM graph_nodes", [])
219 .map_err(sqlite_error)?;
220 for node in &graph.nodes {
221 let node_json = encode_json(node);
222 tx.execute(
223 "INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)",
224 params![node.node_id, node_json],
225 )
226 .map_err(sqlite_error)?;
227 }
228 graph.leaf_node_id.clone()
229 }
230 };
231 let graph_node_count: usize = tx
232 .query_row(
233 "SELECT COUNT(*) FROM graph_nodes WHERE tombstoned = 0",
234 [],
235 |row| row.get::<_, i64>(0),
236 )
237 .map_err(sqlite_error)? as usize;
238 let next_revision = actual_revision + 1;
239 let meta = SessionHeadMeta {
240 session_id: commit.session_id.clone(),
241 head_revision: next_revision,
242 config: commit.config.clone(),
243 agent_frames: commit.agent_frames.clone(),
244 current_agent_frame_id: commit.current_agent_frame_id.clone(),
245 checkpoint_ref: Some(stored_checkpoint.checkpoint_ref.clone()),
246 leaf_node_id,
247 graph_node_count,
248 token_ledger: Vec::new(),
249 };
250 tx.execute(
251 "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
252 VALUES (1, ?1, ?2, ?3)",
253 params![
254 meta.session_id,
255 encode_json(&meta),
256 meta.head_revision as i64
257 ],
258 )
259 .map_err(sqlite_error)?;
260 for completed in &commit.completed_queue_claims {
261 for batch_id in &completed.batch_ids {
262 tx.execute(
263 "DELETE FROM queued_work_batches
264 WHERE session_id = ?1
265 AND batch_id = ?2
266 AND claim_id = ?3
267 AND claim_token = ?4",
268 params![
269 completed.session_id,
270 batch_id,
271 completed.claim_id,
272 completed.lease_token
273 ],
274 )
275 .map_err(sqlite_error)?;
276 }
277 }
278 if !commit.committed_attachment_ids.is_empty() {
279 let now = current_epoch_ms() as i64;
280 let mut stmt = tx
281 .prepare(
282 "UPDATE attachment_manifest
283 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
284 WHERE attachment_id = ?2 AND session_id = ?3",
285 )
286 .map_err(sqlite_error)?;
287 for id in &commit.committed_attachment_ids {
288 stmt.execute(params![now, id.as_str(), commit.session_id])
289 .map_err(sqlite_error)?;
290 }
291 }
292 let result = RuntimeCommitResult {
293 head_revision: next_revision,
294 checkpoint_ref: stored_checkpoint.checkpoint_ref,
295 manifest: stored_checkpoint.manifest,
296 };
297 if let Some(completed) = &commit.turn_commit {
298 tx.execute(
299 "INSERT INTO runtime_turn_commits (
300 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
301 )
302 VALUES (?1, ?2, ?3, ?4, ?5)",
303 params![
304 completed.session_id,
305 completed.turn_id,
306 completed.turn_commit_hash,
307 encode_json(&result),
308 current_epoch_ms() as i64
309 ],
310 )
311 .map_err(sqlite_error)?;
312 }
313 Ok(result)
314 })();
315 match outcome {
320 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
321 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
322 }
323 })
324 .await
325 .map_err(sqlite_error)??;
326 self.maybe_auto_gc().await;
327 Ok(result)
328 }
329
330 async fn enqueue_queued_work(
331 &self,
332 batch: QueuedWorkBatchDraft,
333 ) -> Result<QueuedWorkBatch, StoreError> {
334 let nonce = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed);
335 self.conn
336 .write_flow(move |tx| {
337 let outcome: Result<QueuedWorkBatch, StoreError> = (|| {
338 if let Some(source_key) = batch.source_key.as_deref() {
339 let existing_id: Option<String> = tx
340 .query_row(
341 "SELECT batch_id
342 FROM queued_work_batches
343 WHERE session_id = ?1 AND source_key = ?2",
344 params![batch.session_id, source_key],
345 |row| row.get(0),
346 )
347 .optional()
348 .map_err(sqlite_error)?;
349 if let Some(batch_id) = existing_id {
350 let existing = load_queued_batch_by_id_conn(tx, &batch_id)?
351 .ok_or_else(|| {
352 StoreError::Backend(
353 "queued work source row disappeared".to_string(),
354 )
355 })?;
356 return Ok(existing);
357 }
358 }
359 let now = current_epoch_ms();
360 let batch_id = format!(
361 "qwb:{:x}",
362 Sha256::digest(
363 format!("{}:{:?}:{now}:{nonce}", batch.session_id, batch.source_key)
364 .as_bytes()
365 )
366 );
367 tx.execute(
368 "INSERT INTO queued_work_batches (
369 batch_id, session_id, source_key, delivery_policy, slot_policy,
370 merge_key_json, available_at_ms, enqueued_at_ms
371 )
372 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
373 params![
374 batch_id,
375 batch.session_id,
376 batch.source_key.as_deref(),
377 batch.delivery_policy.as_str(),
378 batch.slot_policy.as_str(),
379 encode_json(&batch.merge_key),
380 batch.available_at_ms as i64,
381 now as i64,
382 ],
383 )
384 .map_err(sqlite_error)?;
385 for (index, payload) in batch.payloads.iter().enumerate() {
386 let item_id = format!("{batch_id}:item:{index}");
387 tx.execute(
388 "INSERT INTO queued_work_items (batch_id, item_index, item_id, payload_json)
389 VALUES (?1, ?2, ?3, ?4)",
390 params![batch_id, index as i64, item_id, encode_json(payload)],
391 )
392 .map_err(sqlite_error)?;
393 }
394 load_queued_batch_by_id_conn(tx, &batch_id)?.ok_or_else(|| {
395 StoreError::Backend("queued work insert disappeared".to_string())
396 })
397 })();
398 match outcome {
401 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
402 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
403 }
404 })
405 .await
406 .map_err(sqlite_error)?
407 }
408
409 async fn claim_ready_queued_work(
410 &self,
411 session_id: &str,
412 owner_id: &str,
413 boundary: QueuedWorkClaimBoundary,
414 lease_ttl_ms: u64,
415 max_batches: usize,
416 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
417 if max_batches == 0 {
418 return Ok(None);
419 }
420 let session_id = session_id.to_string();
421 let owner_id = owner_id.to_string();
422 self.conn
423 .write_flow(move |tx| {
424 let outcome: Result<TxOutcome<Option<QueuedWorkClaim>>, StoreError> = (|| {
425 let now = current_epoch_ms();
426 let candidate_rows = {
427 let mut stmt = tx
428 .prepare(
429 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
430 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
431 claim_fencing_token
432 FROM queued_work_batches
433 WHERE session_id = ?1
434 AND available_at_ms <= ?2
435 AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
436 ORDER BY enqueue_seq ASC
437 LIMIT ?3",
438 )
439 .map_err(sqlite_error)?;
440 let rows = stmt
441 .query_map(
442 params![
443 session_id,
444 now as i64,
445 (max_batches as i64).saturating_add(32)
446 ],
447 queued_batch_row_from_sql,
448 )
449 .map_err(sqlite_error)?;
450 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
451 };
452 let Some(first_row) = candidate_rows.first() else {
453 return Ok(TxOutcome::Commit(None));
454 };
455 let first_delivery = decode_delivery_policy(first_row.delivery_policy.clone())?;
456 if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
457 && first_delivery != DeliveryPolicy::EarliestSafeBoundary
458 {
459 return Ok(TxOutcome::Commit(None));
460 }
461 let first_slot = decode_slot_policy(first_row.slot_policy.clone())?;
462 let first_merge_key = decode_merge_key(first_row.merge_key_json.clone())?;
463 let mut selected = Vec::new();
464 for row in candidate_rows {
465 if selected.len() >= max_batches {
466 break;
467 }
468 let delivery = decode_delivery_policy(row.delivery_policy.clone())?;
469 let slot = decode_slot_policy(row.slot_policy.clone())?;
470 let merge_key = decode_merge_key(row.merge_key_json.clone())?;
471 if selected.is_empty() {
472 selected.push(row);
473 if first_slot == SlotPolicy::Exclusive {
474 break;
475 }
476 continue;
477 }
478 if first_slot != SlotPolicy::Join
479 || slot != SlotPolicy::Join
480 || delivery != first_delivery
481 || merge_key != first_merge_key
482 {
483 break;
484 }
485 selected.push(row);
486 }
487 let Some(first) = selected.first() else {
488 return Ok(TxOutcome::Commit(None));
489 };
490 let fencing_token = first.claim_fencing_token.saturating_add(1);
491 let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
492 let lease_token = format!(
493 "{:x}",
494 Sha256::digest(
495 format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes()
496 )
497 );
498 let expires_at = now.saturating_add(lease_ttl_ms);
499 for row in &selected {
500 let claimed = tx
509 .execute(
510 "UPDATE queued_work_batches
511 SET claim_id = ?3,
512 claim_owner_id = ?4,
513 claim_token = ?5,
514 claim_fencing_token = claim_fencing_token + 1,
515 claim_claimed_at_ms = ?6,
516 claim_expires_at_ms = ?7
517 WHERE session_id = ?1
518 AND batch_id = ?2
519 AND (claim_token IS NULL OR claim_expires_at_ms <= ?6)",
520 params![
521 session_id,
522 row.batch_id,
523 claim_id,
524 owner_id,
525 lease_token,
526 now as i64,
527 expires_at as i64
528 ],
529 )
530 .map_err(sqlite_error)?;
531 if claimed == 0 {
532 return Ok(TxOutcome::Rollback(None));
536 }
537 }
538 let mut batches = Vec::new();
539 for row in selected {
540 batches.push(queued_work_batch_from_conn(tx, row)?);
541 }
542 Ok(TxOutcome::Commit(Some(QueuedWorkClaim {
543 session_id: session_id.clone(),
544 claim_id,
545 owner_id: owner_id.clone(),
546 lease_token,
547 fencing_token,
548 claimed_at_epoch_ms: now,
549 expires_at_epoch_ms: expires_at,
550 batches,
551 })))
552 })();
553 match outcome {
557 Ok(TxOutcome::Commit(value)) => Ok(TxOutcome::Commit(Ok(value))),
558 Ok(TxOutcome::Rollback(value)) => Ok(TxOutcome::Rollback(Ok(value))),
559 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
560 }
561 })
562 .await
563 .map_err(sqlite_error)?
564 }
565
566 async fn renew_queued_work_claim(
567 &self,
568 claim: &QueuedWorkClaim,
569 lease_ttl_ms: u64,
570 ) -> Result<QueuedWorkClaim, StoreError> {
571 let now = current_epoch_ms();
572 let expires_at = now.saturating_add(lease_ttl_ms);
573 let session_id = claim.session_id.clone();
574 let claim_id = claim.claim_id.clone();
575 let lease_token = claim.lease_token.clone();
576 let batch_count = claim.batches.len();
577 let changed = self
578 .conn
579 .write(move |tx| {
580 tx.execute(
581 "UPDATE queued_work_batches
582 SET claim_expires_at_ms = ?4
583 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
584 params![session_id, claim_id, lease_token, expires_at as i64],
585 )
586 })
587 .await
588 .map_err(sqlite_error)?;
589 if changed != batch_count {
590 return Err(StoreError::QueuedWorkClaimExpired {
591 session_id: claim.session_id.clone(),
592 claim_id: claim.claim_id.clone(),
593 });
594 }
595 Ok(QueuedWorkClaim {
596 expires_at_epoch_ms: expires_at,
597 ..claim.clone()
598 })
599 }
600
601 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
602 let session_id = claim.session_id.clone();
603 let claim_id = claim.claim_id.clone();
604 let lease_token = claim.lease_token.clone();
605 self.conn
606 .write(move |tx| {
607 tx.execute(
608 "UPDATE queued_work_batches
609 SET claim_id = NULL,
610 claim_owner_id = NULL,
611 claim_token = NULL,
612 claim_claimed_at_ms = 0,
613 claim_expires_at_ms = 0
614 WHERE session_id = ?1 AND claim_id = ?2 AND claim_token = ?3",
615 params![session_id, claim_id, lease_token],
616 )
617 })
618 .await
619 .map_err(sqlite_error)?;
620 Ok(())
621 }
622
623 async fn cancel_queued_work_batch(
624 &self,
625 session_id: &str,
626 batch_id: &str,
627 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
628 let session_id = session_id.to_string();
629 let batch_id = batch_id.to_string();
630 self.conn
631 .write_flow(move |tx| {
632 let outcome: Result<Option<QueuedWorkBatch>, StoreError> = (|| {
633 let now = current_epoch_ms() as i64;
634 let row = tx
635 .query_row(
636 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
637 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
638 claim_fencing_token
639 FROM queued_work_batches
640 WHERE session_id = ?1
641 AND batch_id = ?2
642 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
643 params![session_id, batch_id, now],
644 queued_batch_row_from_sql,
645 )
646 .optional()
647 .map_err(sqlite_error)?;
648 let Some(row) = row else {
649 return Ok(None);
650 };
651 let batch = queued_work_batch_from_conn(tx, row)?;
652 tx.execute(
653 "DELETE FROM queued_work_batches
654 WHERE session_id = ?1
655 AND batch_id = ?2
656 AND (claim_token IS NULL OR claim_expires_at_ms <= ?3)",
657 params![session_id, batch_id, now],
658 )
659 .map_err(sqlite_error)?;
660 Ok(Some(batch))
661 })();
662 match outcome {
663 Ok(value) => Ok(TxOutcome::Commit(Ok(value))),
664 Err(err) => Ok(TxOutcome::Rollback(Err(err))),
665 }
666 })
667 .await
668 .map_err(sqlite_error)?
669 }
670
671 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
672 let session_id = session_id.to_string();
673 self.conn
674 .call(move |conn| {
675 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
676 let rows = {
677 let mut stmt = conn
678 .prepare(
679 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
680 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
681 claim_fencing_token
682 FROM queued_work_batches
683 WHERE session_id = ?1
684 ORDER BY enqueue_seq ASC",
685 )
686 .map_err(sqlite_error)?;
687 let rows = stmt
688 .query_map(params![session_id], queued_batch_row_from_sql)
689 .map_err(sqlite_error)?;
690 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
691 };
692 rows.into_iter()
693 .map(|row| queued_work_batch_from_conn(conn, row))
694 .collect()
695 })();
696 Ok(outcome)
697 })
698 .await
699 .map_err(sqlite_error)?
700 }
701
702 async fn list_pending_queued_work(
703 &self,
704 session_id: &str,
705 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
706 let session_id = session_id.to_string();
707 self.conn
708 .call(move |conn| {
709 let outcome: Result<Vec<QueuedWorkBatch>, StoreError> = (|| {
710 let now = current_epoch_ms();
711 let rows = {
712 let mut stmt = conn
713 .prepare(
714 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
715 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
716 claim_fencing_token
717 FROM queued_work_batches
718 WHERE session_id = ?1
719 AND (claim_token IS NULL OR claim_expires_at_ms <= ?2)
720 ORDER BY enqueue_seq ASC",
721 )
722 .map_err(sqlite_error)?;
723 let rows = stmt
724 .query_map(
725 params![session_id, now as i64],
726 queued_batch_row_from_sql,
727 )
728 .map_err(sqlite_error)?;
729 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
730 };
731 rows.into_iter()
732 .map(|row| queued_work_batch_from_conn(conn, row))
733 .collect()
734 })();
735 Ok(outcome)
736 })
737 .await
738 .map_err(sqlite_error)?
739 }
740
741 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
742 Store::save_session_meta(self, meta).await;
743 Ok(())
744 }
745
746 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
747 Ok(Store::load_session_meta(self).await)
748 }
749
750 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
751 if ids.is_empty() {
752 return Ok(());
753 }
754 let ids = ids.to_vec();
755 self.conn
756 .write(move |tx| {
757 let mut stmt =
758 tx.prepare("UPDATE graph_nodes SET tombstoned = 1 WHERE node_id = ?1")?;
759 for id in &ids {
760 stmt.execute(params![id])?;
761 }
762 Ok(())
763 })
764 .await
765 .map_err(sqlite_error)
766 }
767
768 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
769 let removed = self
770 .conn
771 .write(move |tx| tx.execute("DELETE FROM graph_nodes WHERE tombstoned = 1", []))
772 .await
773 .map_err(sqlite_error)?;
774 Ok(VacuumReport {
775 removed_node_count: removed,
776 })
777 }
778
779 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
780 Ok(Store::gc_unreachable(self).await)
781 }
782}