1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61 ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63 RuntimePersistence, SessionExecutionLease, SessionExecutionLeaseCompletion,
64 SessionExecutionLeaseFence, SessionMeta, SessionPickerInfo, SessionReadScope, SessionScope,
65 SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
66};
67use rusqlite::{Connection, OptionalExtension, Transaction, params};
68use sha2::{Digest, Sha256};
69
70use conn::SqliteConnection;
71
72mod attachments;
73mod blobs;
74mod conn;
75mod effect_replay;
76mod graph;
77mod leases;
78mod lifecycle;
79mod persistence;
80mod process_registry;
81mod queued_work;
82mod schema;
83mod triggers;
84
85use conn::TxOutcome;
86pub use effect_replay::{
87 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
88};
89use leases::*;
90use queued_work::*;
91use schema::{
92 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
93 ensure_trigger_schema,
94};
95pub use triggers::SqliteTriggerStore;
96
97pub struct Store {
104 conn: SqliteConnection,
105 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
106 options: StoreOptions,
107 commit_count: AtomicU64,
108}
109
110pub struct SqliteProcessRegistry {
116 conn: SqliteConnection,
117 notify: tokio::sync::Notify,
118}
119
120fn sqlite_error(err: rusqlite::Error) -> StoreError {
121 StoreError::Backend(err.to_string())
122}
123
124fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
125 lash_core::PluginError::Session(err.to_string())
126}
127
128fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
129 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
130}
131
132fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
133 serde_json::to_string(value).map_err(|err| {
134 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
135 })
136}
137
138fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
139 futures_executor::block_on(future)
140}
141
142#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
143pub enum PersistedArtifactKind {
144 GenericBlob,
145 CheckpointManifest,
146 ToolState,
147 PluginSessionSnapshot,
148 ExecutionStateSnapshot,
149 LashlangModule,
150 ProcessExecutionEnv,
151}
152
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
154pub enum BlobStorageHint {
155 Compressible,
156 InlinePreferred,
157 LargePayload,
158}
159
160#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
161enum BlobCompression {
162 None,
163 Zlib,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
167pub struct BlobArtifactDescriptor {
168 pub kind: PersistedArtifactKind,
169 #[serde(default, skip_serializing_if = "Vec::is_empty")]
170 pub hints: Vec<BlobStorageHint>,
171}
172
173impl BlobArtifactDescriptor {
174 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
175 Self {
176 kind,
177 hints: hints.into(),
178 }
179 }
180
181 pub fn checkpoint_manifest() -> Self {
182 Self::new(
183 PersistedArtifactKind::CheckpointManifest,
184 vec![BlobStorageHint::Compressible],
185 )
186 }
187
188 pub fn tool_state_snapshot() -> Self {
189 Self::new(
190 PersistedArtifactKind::ToolState,
191 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
192 )
193 }
194
195 pub fn plugin_session_snapshot() -> Self {
196 Self::new(
197 PersistedArtifactKind::PluginSessionSnapshot,
198 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
199 )
200 }
201
202 pub fn execution_state_snapshot() -> Self {
203 Self::new(
204 PersistedArtifactKind::ExecutionStateSnapshot,
205 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
206 )
207 }
208
209 pub fn lashlang_module() -> Self {
210 Self::new(
211 PersistedArtifactKind::LashlangModule,
212 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
213 )
214 }
215
216 pub fn process_execution_env() -> Self {
217 Self::new(
218 PersistedArtifactKind::ProcessExecutionEnv,
219 vec![BlobStorageHint::Compressible],
220 )
221 }
222}
223
224#[derive(Clone, Debug, PartialEq, Eq, Hash)]
225struct RetainedArtifactRef {
226 pub blob_ref: BlobRef,
227 pub kind: PersistedArtifactKind,
228}
229
230#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
231pub enum BuiltinBlobProfile {
232 LowLatency,
233 #[default]
234 Balanced,
235 Compact,
236}
237
238#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
239pub struct StoreGcPolicy {
240 pub auto_run_every_commits: Option<u64>,
241}
242
243#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
244pub struct StoreOptions {
245 pub blob_profile: BuiltinBlobProfile,
246 pub gc_policy: StoreGcPolicy,
247}
248
249#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
250struct StoredBlobEnvelope {
251 descriptor: BlobArtifactDescriptor,
252 compression: BlobCompression,
253 content: Vec<u8>,
254}
255
256#[derive(Clone, Debug)]
257pub struct StoredSessionCheckpoint {
258 pub checkpoint_ref: BlobRef,
259 pub manifest: SessionCheckpoint,
260}
261
262#[derive(Clone, Debug)]
269pub struct SqliteSessionStoreFactory {
270 root: PathBuf,
271 options: StoreOptions,
272}
273
274impl SqliteSessionStoreFactory {
275 pub fn new(root: impl Into<PathBuf>) -> Self {
276 Self {
277 root: root.into(),
278 options: StoreOptions::default(),
279 }
280 }
281
282 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
283 Self {
284 root: root.into(),
285 options,
286 }
287 }
288
289 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
290 self.root.join(safe_session_db_file_name(session_id))
291 }
292}
293
294#[async_trait::async_trait]
295impl SessionStoreFactory for SqliteSessionStoreFactory {
296 fn durability_tier(&self) -> DurabilityTier {
297 DurabilityTier::Durable
298 }
299
300 async fn create_store(
301 &self,
302 request: &SessionStoreCreateRequest,
303 ) -> Result<Arc<dyn RuntimePersistence>, String> {
304 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
305 let path = self.path_for_session(&request.session_id);
306 let store = Arc::new(
307 Store::open_with_options(&path, self.options)
308 .await
309 .map_err(|err| err.to_string())?,
310 );
311 if store.load_session_meta().await.is_none() {
312 store
313 .save_session_meta(SessionMeta {
314 session_id: request.session_id.clone(),
315 session_name: request.session_id.clone(),
316 created_at: current_timestamp_string(),
317 model: request.policy.model.id.clone(),
318 cwd: std::env::current_dir()
319 .ok()
320 .and_then(|path| path.to_str().map(str::to_string)),
321 relation: request.relation.clone(),
322 })
323 .await;
324 }
325 Ok(store as Arc<dyn RuntimePersistence>)
326 }
327
328 async fn open_existing_store(
329 &self,
330 request: &SessionStoreCreateRequest,
331 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
332 let path = self.path_for_session(&request.session_id);
333 if !path.exists() {
334 return Ok(None);
335 }
336 self.create_store(request).await.map(Some)
337 }
338
339 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
340 let db_path = self.path_for_session(session_id);
341 for path in [
342 db_path.clone(),
343 sqlite_sidecar_path(&db_path, "-wal"),
344 sqlite_sidecar_path(&db_path, "-shm"),
345 ] {
346 match std::fs::remove_file(&path) {
347 Ok(()) => {}
348 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
349 Err(err) => {
350 return Err(format!("remove session store {}: {err}", path.display()));
351 }
352 }
353 }
354 Ok(())
355 }
356}
357
358fn safe_session_db_file_name(session_id: &str) -> String {
359 let mut safe = session_id
360 .chars()
361 .map(|ch| match ch {
362 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
363 _ => '_',
364 })
365 .collect::<String>();
366 safe = safe.trim_matches('_').to_string();
367 if safe.is_empty() {
368 safe.push_str("session");
369 }
370 safe.truncate(80);
371 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
372 format!("{safe}-{}.db", &hash[..16])
373}
374
375fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
376 let mut sidecar = path.as_os_str().to_os_string();
377 sidecar.push(suffix);
378 PathBuf::from(sidecar)
379}
380
381fn current_timestamp_string() -> String {
382 let now = SystemTime::now()
383 .duration_since(UNIX_EPOCH)
384 .unwrap_or_default();
385 format!("unix:{}", now.as_secs())
386}
387
388fn current_epoch_ms() -> u64 {
389 SystemTime::now()
390 .duration_since(UNIX_EPOCH)
391 .unwrap_or_default()
392 .as_millis() as u64
393}
394
395fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
396 let mut refs = Vec::new();
397 if let Some(blob_ref) = &checkpoint.tool_state_ref {
398 refs.push(RetainedArtifactRef {
399 blob_ref: blob_ref.clone(),
400 kind: PersistedArtifactKind::ToolState,
401 });
402 }
403 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
404 refs.push(RetainedArtifactRef {
405 blob_ref: blob_ref.clone(),
406 kind: PersistedArtifactKind::PluginSessionSnapshot,
407 });
408 }
409 if let Some(blob_ref) = &checkpoint.execution_state_ref {
410 refs.push(RetainedArtifactRef {
411 blob_ref: blob_ref.clone(),
412 kind: PersistedArtifactKind::ExecutionStateSnapshot,
413 });
414 }
415 refs
416}
417
418fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
419 SessionHeadMeta {
420 session_id: head.session_id.clone(),
421 head_revision: 0,
422 config: head.config.clone(),
423 agent_frames: head.agent_frames.clone(),
424 current_agent_frame_id: head.current_agent_frame_id.clone(),
425 checkpoint_ref: head.checkpoint_ref.clone(),
426 leaf_node_id: head.graph.leaf_node_id.clone(),
427 graph_node_count: head.graph.nodes.len(),
428 token_ledger: Vec::new(),
429 }
430}
431
432fn encode_json<T: serde::Serialize>(value: &T) -> String {
433 serde_json::to_string(value).expect("persisted state should serialize")
434}
435
436fn should_compress_blob(
437 profile: BuiltinBlobProfile,
438 descriptor: &BlobArtifactDescriptor,
439 len: usize,
440) -> bool {
441 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
442 return false;
443 }
444 match profile {
445 BuiltinBlobProfile::LowLatency => false,
446 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
447 BuiltinBlobProfile::Compact => len >= 1024,
448 }
449}
450
451fn compress_blob(content: &[u8]) -> Vec<u8> {
452 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
453 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
454 encoder.finish().expect("submit blob compression")
455}
456
457fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
458 let mut decoder = ZlibDecoder::new(content);
459 let mut out = Vec::new();
460 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
461 Some(out)
462}
463
464fn encode_artifact_blob(
465 descriptor: &BlobArtifactDescriptor,
466 profile: BuiltinBlobProfile,
467 content: &[u8],
468) -> Vec<u8> {
469 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
470 {
471 (BlobCompression::Zlib, compress_blob(content))
472 } else {
473 (BlobCompression::None, content.to_vec())
474 };
475 encode_msgpack(&StoredBlobEnvelope {
476 descriptor: descriptor.clone(),
477 compression,
478 content: stored_content,
479 })
480}
481
482fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
483 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
484 match envelope.compression {
485 BlobCompression::None => Some(envelope.content),
486 BlobCompression::Zlib => decompress_blob(&envelope.content),
487 }
488}
489
490fn try_load_session_head_meta_from_conn(
493 conn: &Connection,
494) -> Result<Option<SessionHeadMeta>, StoreError> {
495 let row = conn
496 .query_row(
497 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
498 [],
499 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
500 )
501 .optional()
502 .map_err(sqlite_error)?;
503 let Some((head_json, head_revision)) = row else {
504 return Ok(None);
505 };
506 let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
507 .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
508 meta.head_revision = head_revision as u64;
509 Ok(Some(meta))
510}
511
512fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
513 try_load_session_head_meta_from_conn(conn).ok().flatten()
514}
515
516fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
517 conn.query_row(
518 "SELECT session_id, session_name, created_at, model, cwd, relation_json
519 FROM session_meta WHERE singleton = 1",
520 [],
521 |row| {
522 let relation_json: Option<String> = row.get(5)?;
523 let relation = relation_json
524 .and_then(|json| serde_json::from_str(&json).ok())
525 .unwrap_or_default();
526 Ok(SessionMeta {
527 session_id: row.get(0)?,
528 session_name: row.get(1)?,
529 created_at: row.get(2)?,
530 model: row.get(3)?,
531 cwd: row.get(4)?,
532 relation,
533 })
534 },
535 )
536 .optional()
537 .ok()
538 .flatten()
539}
540
541fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
542 rmp_serde::from_slice(bytes).ok()
543}
544
545fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
546 let mut buf = Vec::with_capacity(1024);
549 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
550 buf
551}
552
553fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
554 rmp_serde::from_slice(bytes).ok()
555}
556
557fn merge_token_ledger_entries(
558 entries: Vec<lash_core::TokenLedgerEntry>,
559) -> Vec<lash_core::TokenLedgerEntry> {
560 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
561 for entry in entries {
562 if entry.usage.total() == 0 {
563 continue;
564 }
565 if let Some(existing) = merged
566 .iter_mut()
567 .find(|existing| existing.source == entry.source && existing.model == entry.model)
568 {
569 existing.usage.input_tokens += entry.usage.input_tokens;
570 existing.usage.output_tokens += entry.usage.output_tokens;
571 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
572 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
573 } else {
574 merged.push(entry);
575 }
576 }
577 merged
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use lash_core::ProcessInput;
584 use lashlang::LashlangArtifactStore;
585
586 fn registration(id: &str) -> ProcessRegistration {
587 ProcessRegistration::new(
588 id,
589 ProcessInput::External {
590 metadata: serde_json::Value::Null,
591 },
592 lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
593 )
594 }
595
596 #[tokio::test]
597 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
598 let store = Store::memory().await.expect("memory store");
599 let module =
600 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
601 let linked = lashlang::LinkedModule::link(
602 module,
603 lashlang::LashlangHostEnvironment::new(
604 lashlang::LashlangHostCatalog::new(),
605 lashlang::LashlangAbilities::all(),
606 ),
607 )
608 .expect("link module");
609
610 store
611 .put_module_artifact(&linked.artifact)
612 .await
613 .expect("put artifact");
614 let restored = store
615 .get_module_artifact(&linked.module_ref)
616 .await
617 .expect("get artifact")
618 .expect("artifact exists");
619
620 assert_eq!(restored.module_ref, linked.module_ref);
621 assert_eq!(
622 restored.process_ref("scan"),
623 linked.artifact.process_ref("scan")
624 );
625 }
626
627 #[tokio::test]
628 async fn sqlite_process_registry_persists_rows_after_reopen() {
629 let dir = tempfile::tempdir().expect("tempdir");
630 let path = dir.path().join("processes.db");
631 {
632 let registry = SqliteProcessRegistry::open(&path)
633 .await
634 .expect("open registry");
635 let session_scope = lash_core::SessionScope::new("session");
636 registry
637 .register_process(registration("proc-persist"))
638 .await
639 .expect("register");
640 registry
641 .grant_handle(
642 &session_scope,
643 "proc-persist",
644 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
645 )
646 .await
647 .expect("grant");
648 registry
649 .complete_process(
650 "proc-persist",
651 ProcessAwaitOutput::Success {
652 value: serde_json::json!({"ok": true}),
653 control: None,
654 },
655 )
656 .await
657 .expect("complete");
658 }
659
660 let registry = SqliteProcessRegistry::open(&path)
661 .await
662 .expect("reopen registry");
663 let session_scope = lash_core::SessionScope::new("session");
664 let record = registry
665 .get_process("proc-persist")
666 .await
667 .expect("persisted process");
668
669 assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
670 assert_eq!(
671 record.provenance.originator,
672 lash_core::ProcessOriginator::session(session_scope.clone())
673 );
674 assert_eq!(
675 registry
676 .await_process("proc-persist")
677 .await
678 .expect("await persisted"),
679 ProcessAwaitOutput::Success {
680 value: serde_json::json!({"ok": true}),
681 control: None,
682 }
683 );
684 assert_eq!(
685 registry
686 .list_handle_grants(&session_scope)
687 .await
688 .expect("grants")
689 .len(),
690 1
691 );
692 }
693}