1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61 ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63 RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope, SessionScope,
64 SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
65};
66use rusqlite::{Connection, OptionalExtension, Transaction, params};
67use sha2::{Digest, Sha256};
68
69use conn::SqliteConnection;
70
71mod attachments;
72mod blobs;
73mod conn;
74mod effect_replay;
75mod graph;
76mod leases;
77mod lifecycle;
78mod persistence;
79mod process_registry;
80mod queued_work;
81mod schema;
82mod triggers;
83
84use conn::TxOutcome;
85pub use effect_replay::{
86 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
87};
88use leases::*;
89use queued_work::*;
90use schema::{
91 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
92 ensure_trigger_schema,
93};
94pub use triggers::SqliteTriggerStore;
95
96pub struct Store {
103 conn: SqliteConnection,
104 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
105 options: StoreOptions,
106 commit_count: AtomicU64,
107}
108
109pub struct SqliteProcessRegistry {
115 conn: SqliteConnection,
116 notify: tokio::sync::Notify,
117}
118
119fn sqlite_error(err: rusqlite::Error) -> StoreError {
120 StoreError::Backend(err.to_string())
121}
122
123fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
124 lash_core::PluginError::Session(err.to_string())
125}
126
127fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
128 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
129}
130
131fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
132 serde_json::to_string(value).map_err(|err| {
133 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
134 })
135}
136
137fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
138 futures_executor::block_on(future)
139}
140
141#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
142pub enum PersistedArtifactKind {
143 GenericBlob,
144 CheckpointManifest,
145 ToolState,
146 PluginSessionSnapshot,
147 ExecutionStateSnapshot,
148 LashlangModule,
149 ProcessExecutionEnv,
150}
151
152#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
153pub enum BlobStorageHint {
154 Compressible,
155 InlinePreferred,
156 LargePayload,
157}
158
159#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
160enum BlobCompression {
161 None,
162 Zlib,
163}
164
165#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
166pub struct BlobArtifactDescriptor {
167 pub kind: PersistedArtifactKind,
168 #[serde(default, skip_serializing_if = "Vec::is_empty")]
169 pub hints: Vec<BlobStorageHint>,
170}
171
172impl BlobArtifactDescriptor {
173 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
174 Self {
175 kind,
176 hints: hints.into(),
177 }
178 }
179
180 pub fn checkpoint_manifest() -> Self {
181 Self::new(
182 PersistedArtifactKind::CheckpointManifest,
183 vec![BlobStorageHint::Compressible],
184 )
185 }
186
187 pub fn tool_state_snapshot() -> Self {
188 Self::new(
189 PersistedArtifactKind::ToolState,
190 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
191 )
192 }
193
194 pub fn plugin_session_snapshot() -> Self {
195 Self::new(
196 PersistedArtifactKind::PluginSessionSnapshot,
197 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
198 )
199 }
200
201 pub fn execution_state_snapshot() -> Self {
202 Self::new(
203 PersistedArtifactKind::ExecutionStateSnapshot,
204 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
205 )
206 }
207
208 pub fn lashlang_module() -> Self {
209 Self::new(
210 PersistedArtifactKind::LashlangModule,
211 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
212 )
213 }
214
215 pub fn process_execution_env() -> Self {
216 Self::new(
217 PersistedArtifactKind::ProcessExecutionEnv,
218 vec![BlobStorageHint::Compressible],
219 )
220 }
221}
222
223#[derive(Clone, Debug, PartialEq, Eq, Hash)]
224struct RetainedArtifactRef {
225 pub blob_ref: BlobRef,
226 pub kind: PersistedArtifactKind,
227}
228
229#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
230pub enum BuiltinBlobProfile {
231 LowLatency,
232 #[default]
233 Balanced,
234 Compact,
235}
236
237#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
238pub struct StoreGcPolicy {
239 pub auto_run_every_commits: Option<u64>,
240}
241
242#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
243pub struct StoreOptions {
244 pub blob_profile: BuiltinBlobProfile,
245 pub gc_policy: StoreGcPolicy,
246}
247
248#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
249struct StoredBlobEnvelope {
250 descriptor: BlobArtifactDescriptor,
251 compression: BlobCompression,
252 content: Vec<u8>,
253}
254
255#[derive(Clone, Debug)]
256pub struct StoredSessionCheckpoint {
257 pub checkpoint_ref: BlobRef,
258 pub manifest: SessionCheckpoint,
259}
260
261#[derive(Clone, Debug)]
268pub struct SqliteSessionStoreFactory {
269 root: PathBuf,
270 options: StoreOptions,
271}
272
273impl SqliteSessionStoreFactory {
274 pub fn new(root: impl Into<PathBuf>) -> Self {
275 Self {
276 root: root.into(),
277 options: StoreOptions::default(),
278 }
279 }
280
281 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
282 Self {
283 root: root.into(),
284 options,
285 }
286 }
287
288 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
289 self.root.join(safe_session_db_file_name(session_id))
290 }
291}
292
293#[async_trait::async_trait]
294impl SessionStoreFactory for SqliteSessionStoreFactory {
295 fn durability_tier(&self) -> DurabilityTier {
296 DurabilityTier::Durable
297 }
298
299 async fn create_store(
300 &self,
301 request: &SessionStoreCreateRequest,
302 ) -> Result<Arc<dyn RuntimePersistence>, String> {
303 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
304 let path = self.path_for_session(&request.session_id);
305 let store = Arc::new(
306 Store::open_with_options(&path, self.options)
307 .await
308 .map_err(|err| err.to_string())?,
309 );
310 if store.load_session_meta().await.is_none() {
311 store
312 .save_session_meta(SessionMeta {
313 session_id: request.session_id.clone(),
314 session_name: request.session_id.clone(),
315 created_at: current_timestamp_string(),
316 model: request.policy.model.id.clone(),
317 cwd: std::env::current_dir()
318 .ok()
319 .and_then(|path| path.to_str().map(str::to_string)),
320 relation: request.relation.clone(),
321 })
322 .await;
323 }
324 Ok(store as Arc<dyn RuntimePersistence>)
325 }
326
327 async fn open_existing_store(
328 &self,
329 request: &SessionStoreCreateRequest,
330 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
331 let path = self.path_for_session(&request.session_id);
332 if !path.exists() {
333 return Ok(None);
334 }
335 self.create_store(request).await.map(Some)
336 }
337
338 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
339 let db_path = self.path_for_session(session_id);
340 for path in [
341 db_path.clone(),
342 sqlite_sidecar_path(&db_path, "-wal"),
343 sqlite_sidecar_path(&db_path, "-shm"),
344 ] {
345 match std::fs::remove_file(&path) {
346 Ok(()) => {}
347 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
348 Err(err) => {
349 return Err(format!("remove session store {}: {err}", path.display()));
350 }
351 }
352 }
353 Ok(())
354 }
355}
356
357fn safe_session_db_file_name(session_id: &str) -> String {
358 let mut safe = session_id
359 .chars()
360 .map(|ch| match ch {
361 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
362 _ => '_',
363 })
364 .collect::<String>();
365 safe = safe.trim_matches('_').to_string();
366 if safe.is_empty() {
367 safe.push_str("session");
368 }
369 safe.truncate(80);
370 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
371 format!("{safe}-{}.db", &hash[..16])
372}
373
374fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
375 let mut sidecar = path.as_os_str().to_os_string();
376 sidecar.push(suffix);
377 PathBuf::from(sidecar)
378}
379
380fn current_timestamp_string() -> String {
381 let now = SystemTime::now()
382 .duration_since(UNIX_EPOCH)
383 .unwrap_or_default();
384 format!("unix:{}", now.as_secs())
385}
386
387fn current_epoch_ms() -> u64 {
388 SystemTime::now()
389 .duration_since(UNIX_EPOCH)
390 .unwrap_or_default()
391 .as_millis() as u64
392}
393
394fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
395 let mut refs = Vec::new();
396 if let Some(blob_ref) = &checkpoint.tool_state_ref {
397 refs.push(RetainedArtifactRef {
398 blob_ref: blob_ref.clone(),
399 kind: PersistedArtifactKind::ToolState,
400 });
401 }
402 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
403 refs.push(RetainedArtifactRef {
404 blob_ref: blob_ref.clone(),
405 kind: PersistedArtifactKind::PluginSessionSnapshot,
406 });
407 }
408 if let Some(blob_ref) = &checkpoint.execution_state_ref {
409 refs.push(RetainedArtifactRef {
410 blob_ref: blob_ref.clone(),
411 kind: PersistedArtifactKind::ExecutionStateSnapshot,
412 });
413 }
414 refs
415}
416
417fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
418 SessionHeadMeta {
419 session_id: head.session_id.clone(),
420 head_revision: 0,
421 config: head.config.clone(),
422 agent_frames: head.agent_frames.clone(),
423 current_agent_frame_id: head.current_agent_frame_id.clone(),
424 checkpoint_ref: head.checkpoint_ref.clone(),
425 leaf_node_id: head.graph.leaf_node_id.clone(),
426 graph_node_count: head.graph.nodes.len(),
427 token_ledger: Vec::new(),
428 }
429}
430
431fn encode_json<T: serde::Serialize>(value: &T) -> String {
432 serde_json::to_string(value).expect("persisted state should serialize")
433}
434
435fn should_compress_blob(
436 profile: BuiltinBlobProfile,
437 descriptor: &BlobArtifactDescriptor,
438 len: usize,
439) -> bool {
440 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
441 return false;
442 }
443 match profile {
444 BuiltinBlobProfile::LowLatency => false,
445 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
446 BuiltinBlobProfile::Compact => len >= 1024,
447 }
448}
449
450fn compress_blob(content: &[u8]) -> Vec<u8> {
451 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
452 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
453 encoder.finish().expect("submit blob compression")
454}
455
456fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
457 let mut decoder = ZlibDecoder::new(content);
458 let mut out = Vec::new();
459 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
460 Some(out)
461}
462
463fn encode_artifact_blob(
464 descriptor: &BlobArtifactDescriptor,
465 profile: BuiltinBlobProfile,
466 content: &[u8],
467) -> Vec<u8> {
468 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
469 {
470 (BlobCompression::Zlib, compress_blob(content))
471 } else {
472 (BlobCompression::None, content.to_vec())
473 };
474 encode_msgpack(&StoredBlobEnvelope {
475 descriptor: descriptor.clone(),
476 compression,
477 content: stored_content,
478 })
479}
480
481fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
482 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
483 match envelope.compression {
484 BlobCompression::None => Some(envelope.content),
485 BlobCompression::Zlib => decompress_blob(&envelope.content),
486 }
487}
488
489fn try_load_session_head_meta_from_conn(
492 conn: &Connection,
493) -> Result<Option<SessionHeadMeta>, StoreError> {
494 let row = conn
495 .query_row(
496 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
497 [],
498 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
499 )
500 .optional()
501 .map_err(sqlite_error)?;
502 let Some((head_json, head_revision)) = row else {
503 return Ok(None);
504 };
505 let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
506 .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
507 meta.head_revision = head_revision as u64;
508 Ok(Some(meta))
509}
510
511fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
512 try_load_session_head_meta_from_conn(conn).ok().flatten()
513}
514
515fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
516 conn.query_row(
517 "SELECT session_id, session_name, created_at, model, cwd, relation_json
518 FROM session_meta WHERE singleton = 1",
519 [],
520 |row| {
521 let relation_json: Option<String> = row.get(5)?;
522 let relation = relation_json
523 .and_then(|json| serde_json::from_str(&json).ok())
524 .unwrap_or_default();
525 Ok(SessionMeta {
526 session_id: row.get(0)?,
527 session_name: row.get(1)?,
528 created_at: row.get(2)?,
529 model: row.get(3)?,
530 cwd: row.get(4)?,
531 relation,
532 })
533 },
534 )
535 .optional()
536 .ok()
537 .flatten()
538}
539
540fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
541 rmp_serde::from_slice(bytes).ok()
542}
543
544fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
545 let mut buf = Vec::with_capacity(1024);
548 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
549 buf
550}
551
552fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
553 rmp_serde::from_slice(bytes).ok()
554}
555
556fn merge_token_ledger_entries(
557 entries: Vec<lash_core::TokenLedgerEntry>,
558) -> Vec<lash_core::TokenLedgerEntry> {
559 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
560 for entry in entries {
561 if entry.usage.total() == 0 {
562 continue;
563 }
564 if let Some(existing) = merged
565 .iter_mut()
566 .find(|existing| existing.source == entry.source && existing.model == entry.model)
567 {
568 existing.usage.input_tokens += entry.usage.input_tokens;
569 existing.usage.output_tokens += entry.usage.output_tokens;
570 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
571 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
572 } else {
573 merged.push(entry);
574 }
575 }
576 merged
577}
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582 use lash_core::ProcessInput;
583 use lashlang::LashlangArtifactStore;
584
585 fn registration(id: &str) -> ProcessRegistration {
586 ProcessRegistration::new(
587 id,
588 ProcessInput::External {
589 metadata: serde_json::Value::Null,
590 },
591 lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
592 )
593 }
594
595 #[tokio::test]
596 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
597 let store = Store::memory().await.expect("memory store");
598 let module =
599 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
600 let linked = lashlang::LinkedModule::link(
601 module,
602 lashlang::LashlangHostEnvironment::new(
603 lashlang::LashlangHostCatalog::new(),
604 lashlang::LashlangAbilities::all(),
605 ),
606 )
607 .expect("link module");
608
609 store
610 .put_module_artifact(&linked.artifact)
611 .await
612 .expect("put artifact");
613 let restored = store
614 .get_module_artifact(&linked.module_ref)
615 .await
616 .expect("get artifact")
617 .expect("artifact exists");
618
619 assert_eq!(restored.module_ref, linked.module_ref);
620 assert_eq!(
621 restored.process_ref("scan"),
622 linked.artifact.process_ref("scan")
623 );
624 }
625
626 #[tokio::test]
627 async fn sqlite_process_registry_persists_rows_after_reopen() {
628 let dir = tempfile::tempdir().expect("tempdir");
629 let path = dir.path().join("processes.db");
630 {
631 let registry = SqliteProcessRegistry::open(&path)
632 .await
633 .expect("open registry");
634 let session_scope = lash_core::SessionScope::new("session");
635 registry
636 .register_process(registration("proc-persist"))
637 .await
638 .expect("register");
639 registry
640 .grant_handle(
641 &session_scope,
642 "proc-persist",
643 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
644 )
645 .await
646 .expect("grant");
647 registry
648 .complete_process(
649 "proc-persist",
650 ProcessAwaitOutput::Success {
651 value: serde_json::json!({"ok": true}),
652 control: None,
653 },
654 )
655 .await
656 .expect("complete");
657 }
658
659 let registry = SqliteProcessRegistry::open(&path)
660 .await
661 .expect("reopen registry");
662 let session_scope = lash_core::SessionScope::new("session");
663 let record = registry
664 .get_process("proc-persist")
665 .await
666 .expect("persisted process");
667
668 assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
669 assert_eq!(
670 record.provenance.originator,
671 lash_core::ProcessOriginator::session(session_scope.clone())
672 );
673 assert_eq!(
674 registry
675 .await_process("proc-persist")
676 .await
677 .expect("await persisted"),
678 ProcessAwaitOutput::Success {
679 value: serde_json::json!({"ok": true}),
680 control: None,
681 }
682 );
683 assert_eq!(
684 registry
685 .list_handle_grants(&session_scope)
686 .await
687 .expect("grants")
688 .len(),
689 1
690 );
691 }
692}