1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_leading_session_command,
52 select_turn_work_claim_prefix,
53};
54use lash_core::store::{
55 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
56 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
57};
58use lash_core::{
59 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
60 DeliveryPolicy, DurabilityTier, GcReport, LeaseOwnerIdentity, LeaseOwnerLiveness, MergeKey,
61 PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest,
62 ProcessEventAppendResult, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
63 ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
64 RuntimePersistence, SessionExecutionLease, SessionExecutionLeaseClaimOutcome,
65 SessionExecutionLeaseCompletion, SessionExecutionLeaseFence, SessionMeta, SessionPickerInfo,
66 SessionReadScope, SessionScope, SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy,
67 StoreError, VacuumReport,
68};
69use rusqlite::{Connection, OptionalExtension, Transaction, params};
70use sha2::{Digest, Sha256};
71
72use conn::SqliteConnection;
73
74mod attachments;
75mod blobs;
76mod conn;
77mod effect_replay;
78mod graph;
79mod leases;
80mod lifecycle;
81mod persistence;
82mod process_registry;
83mod queued_work;
84mod schema;
85mod triggers;
86
87use conn::TxOutcome;
88pub use effect_replay::{
89 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
90};
91use leases::*;
92use queued_work::*;
93use schema::{
94 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
95 ensure_trigger_schema,
96};
97pub use triggers::SqliteTriggerStore;
98
99pub struct Store {
106 conn: SqliteConnection,
107 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
108 options: StoreOptions,
109 commit_count: AtomicU64,
110}
111
112pub struct SqliteProcessRegistry {
118 conn: SqliteConnection,
119 notify: tokio::sync::Notify,
120}
121
122fn sqlite_error(err: rusqlite::Error) -> StoreError {
123 StoreError::Backend(err.to_string())
124}
125
126fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
127 lash_core::PluginError::Session(err.to_string())
128}
129
130fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
131 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
132}
133
134fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
135 serde_json::to_string(value).map_err(|err| {
136 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
137 })
138}
139
140fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
141 futures_executor::block_on(future)
142}
143
144#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
145pub enum PersistedArtifactKind {
146 GenericBlob,
147 CheckpointManifest,
148 ToolState,
149 PluginSessionSnapshot,
150 ExecutionStateSnapshot,
151 LashlangModule,
152 ProcessExecutionEnv,
153}
154
155#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
156pub enum BlobStorageHint {
157 Compressible,
158 InlinePreferred,
159 LargePayload,
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
163enum BlobCompression {
164 None,
165 Zlib,
166}
167
168#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
169pub struct BlobArtifactDescriptor {
170 pub kind: PersistedArtifactKind,
171 #[serde(default, skip_serializing_if = "Vec::is_empty")]
172 pub hints: Vec<BlobStorageHint>,
173}
174
175impl BlobArtifactDescriptor {
176 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
177 Self {
178 kind,
179 hints: hints.into(),
180 }
181 }
182
183 pub fn checkpoint_manifest() -> Self {
184 Self::new(
185 PersistedArtifactKind::CheckpointManifest,
186 vec![BlobStorageHint::Compressible],
187 )
188 }
189
190 pub fn tool_state_snapshot() -> Self {
191 Self::new(
192 PersistedArtifactKind::ToolState,
193 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
194 )
195 }
196
197 pub fn plugin_session_snapshot() -> Self {
198 Self::new(
199 PersistedArtifactKind::PluginSessionSnapshot,
200 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
201 )
202 }
203
204 pub fn execution_state_snapshot() -> Self {
205 Self::new(
206 PersistedArtifactKind::ExecutionStateSnapshot,
207 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
208 )
209 }
210
211 pub fn lashlang_module() -> Self {
212 Self::new(
213 PersistedArtifactKind::LashlangModule,
214 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
215 )
216 }
217
218 pub fn process_execution_env() -> Self {
219 Self::new(
220 PersistedArtifactKind::ProcessExecutionEnv,
221 vec![BlobStorageHint::Compressible],
222 )
223 }
224}
225
226#[derive(Clone, Debug, PartialEq, Eq, Hash)]
227struct RetainedArtifactRef {
228 pub blob_ref: BlobRef,
229 pub kind: PersistedArtifactKind,
230}
231
232#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
233pub enum BuiltinBlobProfile {
234 LowLatency,
235 #[default]
236 Balanced,
237 Compact,
238}
239
240#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
241pub struct StoreGcPolicy {
242 pub auto_run_every_commits: Option<u64>,
243}
244
245#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
246pub struct StoreOptions {
247 pub blob_profile: BuiltinBlobProfile,
248 pub gc_policy: StoreGcPolicy,
249}
250
251#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
252struct StoredBlobEnvelope {
253 descriptor: BlobArtifactDescriptor,
254 compression: BlobCompression,
255 content: Vec<u8>,
256}
257
258#[derive(Clone, Debug)]
259pub struct StoredSessionCheckpoint {
260 pub checkpoint_ref: BlobRef,
261 pub manifest: SessionCheckpoint,
262}
263
264#[derive(Clone, Debug)]
271pub struct SqliteSessionStoreFactory {
272 root: PathBuf,
273 options: StoreOptions,
274}
275
276impl SqliteSessionStoreFactory {
277 pub fn new(root: impl Into<PathBuf>) -> Self {
278 Self {
279 root: root.into(),
280 options: StoreOptions::default(),
281 }
282 }
283
284 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
285 Self {
286 root: root.into(),
287 options,
288 }
289 }
290
291 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
292 self.root.join(safe_session_db_file_name(session_id))
293 }
294}
295
296#[async_trait::async_trait]
297impl SessionStoreFactory for SqliteSessionStoreFactory {
298 fn durability_tier(&self) -> DurabilityTier {
299 DurabilityTier::Durable
300 }
301
302 async fn create_store(
303 &self,
304 request: &SessionStoreCreateRequest,
305 ) -> Result<Arc<dyn RuntimePersistence>, String> {
306 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
307 let path = self.path_for_session(&request.session_id);
308 let store = Arc::new(
309 Store::open_with_options(&path, self.options)
310 .await
311 .map_err(|err| err.to_string())?,
312 );
313 if store.load_session_meta().await.is_none() {
314 store
315 .save_session_meta(SessionMeta {
316 session_id: request.session_id.clone(),
317 session_name: request.session_id.clone(),
318 created_at: current_timestamp_string(),
319 model: request.policy.model.id.clone(),
320 cwd: std::env::current_dir()
321 .ok()
322 .and_then(|path| path.to_str().map(str::to_string)),
323 relation: request.relation.clone(),
324 })
325 .await;
326 }
327 Ok(store as Arc<dyn RuntimePersistence>)
328 }
329
330 async fn open_existing_store(
331 &self,
332 request: &SessionStoreCreateRequest,
333 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
334 let path = self.path_for_session(&request.session_id);
335 if !path.exists() {
336 return Ok(None);
337 }
338 self.create_store(request).await.map(Some)
339 }
340
341 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
342 let db_path = self.path_for_session(session_id);
343 for path in [
344 db_path.clone(),
345 sqlite_sidecar_path(&db_path, "-wal"),
346 sqlite_sidecar_path(&db_path, "-shm"),
347 ] {
348 match std::fs::remove_file(&path) {
349 Ok(()) => {}
350 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
351 Err(err) => {
352 return Err(format!("remove session store {}: {err}", path.display()));
353 }
354 }
355 }
356 Ok(())
357 }
358}
359
360fn safe_session_db_file_name(session_id: &str) -> String {
361 let mut safe = session_id
362 .chars()
363 .map(|ch| match ch {
364 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
365 _ => '_',
366 })
367 .collect::<String>();
368 safe = safe.trim_matches('_').to_string();
369 if safe.is_empty() {
370 safe.push_str("session");
371 }
372 safe.truncate(80);
373 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
374 format!("{safe}-{}.db", &hash[..16])
375}
376
377fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
378 let mut sidecar = path.as_os_str().to_os_string();
379 sidecar.push(suffix);
380 PathBuf::from(sidecar)
381}
382
383fn current_timestamp_string() -> String {
384 let now = SystemTime::now()
385 .duration_since(UNIX_EPOCH)
386 .unwrap_or_default();
387 format!("unix:{}", now.as_secs())
388}
389
390fn current_epoch_ms() -> u64 {
391 SystemTime::now()
392 .duration_since(UNIX_EPOCH)
393 .unwrap_or_default()
394 .as_millis() as u64
395}
396
397fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
398 let mut refs = Vec::new();
399 if let Some(blob_ref) = &checkpoint.tool_state_ref {
400 refs.push(RetainedArtifactRef {
401 blob_ref: blob_ref.clone(),
402 kind: PersistedArtifactKind::ToolState,
403 });
404 }
405 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
406 refs.push(RetainedArtifactRef {
407 blob_ref: blob_ref.clone(),
408 kind: PersistedArtifactKind::PluginSessionSnapshot,
409 });
410 }
411 if let Some(blob_ref) = &checkpoint.execution_state_ref {
412 refs.push(RetainedArtifactRef {
413 blob_ref: blob_ref.clone(),
414 kind: PersistedArtifactKind::ExecutionStateSnapshot,
415 });
416 }
417 refs
418}
419
420fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
421 SessionHeadMeta {
422 session_id: head.session_id.clone(),
423 head_revision: 0,
424 config: head.config.clone(),
425 agent_frames: head.agent_frames.clone(),
426 current_agent_frame_id: head.current_agent_frame_id.clone(),
427 checkpoint_ref: head.checkpoint_ref.clone(),
428 leaf_node_id: head.graph.leaf_node_id.clone(),
429 graph_node_count: head.graph.nodes.len(),
430 token_ledger: Vec::new(),
431 }
432}
433
434fn encode_json<T: serde::Serialize>(value: &T) -> String {
435 serde_json::to_string(value).expect("persisted state should serialize")
436}
437
438fn should_compress_blob(
439 profile: BuiltinBlobProfile,
440 descriptor: &BlobArtifactDescriptor,
441 len: usize,
442) -> bool {
443 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
444 return false;
445 }
446 match profile {
447 BuiltinBlobProfile::LowLatency => false,
448 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
449 BuiltinBlobProfile::Compact => len >= 1024,
450 }
451}
452
453fn compress_blob(content: &[u8]) -> Vec<u8> {
454 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
455 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
456 encoder.finish().expect("submit blob compression")
457}
458
459fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
460 let mut decoder = ZlibDecoder::new(content);
461 let mut out = Vec::new();
462 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
463 Some(out)
464}
465
466fn encode_artifact_blob(
467 descriptor: &BlobArtifactDescriptor,
468 profile: BuiltinBlobProfile,
469 content: &[u8],
470) -> Vec<u8> {
471 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
472 {
473 (BlobCompression::Zlib, compress_blob(content))
474 } else {
475 (BlobCompression::None, content.to_vec())
476 };
477 encode_msgpack(&StoredBlobEnvelope {
478 descriptor: descriptor.clone(),
479 compression,
480 content: stored_content,
481 })
482}
483
484fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
485 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
486 match envelope.compression {
487 BlobCompression::None => Some(envelope.content),
488 BlobCompression::Zlib => decompress_blob(&envelope.content),
489 }
490}
491
492fn try_load_session_head_meta_from_conn(
495 conn: &Connection,
496) -> Result<Option<SessionHeadMeta>, StoreError> {
497 let row = conn
498 .query_row(
499 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
500 [],
501 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
502 )
503 .optional()
504 .map_err(sqlite_error)?;
505 let Some((head_json, head_revision)) = row else {
506 return Ok(None);
507 };
508 let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
509 .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
510 meta.head_revision = head_revision as u64;
511 Ok(Some(meta))
512}
513
514fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
515 try_load_session_head_meta_from_conn(conn).ok().flatten()
516}
517
518fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
519 conn.query_row(
520 "SELECT session_id, session_name, created_at, model, cwd, relation_json
521 FROM session_meta WHERE singleton = 1",
522 [],
523 |row| {
524 let relation_json: Option<String> = row.get(5)?;
525 let relation = relation_json
526 .and_then(|json| serde_json::from_str(&json).ok())
527 .unwrap_or_default();
528 Ok(SessionMeta {
529 session_id: row.get(0)?,
530 session_name: row.get(1)?,
531 created_at: row.get(2)?,
532 model: row.get(3)?,
533 cwd: row.get(4)?,
534 relation,
535 })
536 },
537 )
538 .optional()
539 .ok()
540 .flatten()
541}
542
543fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
544 rmp_serde::from_slice(bytes).ok()
545}
546
547fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
548 let mut buf = Vec::with_capacity(1024);
551 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
552 buf
553}
554
555fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
556 rmp_serde::from_slice(bytes).ok()
557}
558
559fn merge_token_ledger_entries(
560 entries: Vec<lash_core::TokenLedgerEntry>,
561) -> Vec<lash_core::TokenLedgerEntry> {
562 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
563 for entry in entries {
564 if entry.usage.total() == 0 {
565 continue;
566 }
567 if let Some(existing) = merged
568 .iter_mut()
569 .find(|existing| existing.source == entry.source && existing.model == entry.model)
570 {
571 existing.usage.input_tokens += entry.usage.input_tokens;
572 existing.usage.output_tokens += entry.usage.output_tokens;
573 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
574 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
575 } else {
576 merged.push(entry);
577 }
578 }
579 merged
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use lash_core::ProcessInput;
586 use lashlang::LashlangArtifactStore;
587
588 fn registration(id: &str) -> ProcessRegistration {
589 ProcessRegistration::new(
590 id,
591 ProcessInput::External {
592 metadata: serde_json::Value::Null,
593 },
594 lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
595 )
596 }
597
598 #[tokio::test]
599 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
600 let store = Store::memory().await.expect("memory store");
601 let module =
602 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
603 let linked = lashlang::LinkedModule::link(
604 module,
605 lashlang::LashlangHostEnvironment::new(
606 lashlang::LashlangHostCatalog::new(),
607 lashlang::LashlangAbilities::all(),
608 ),
609 )
610 .expect("link module");
611
612 store
613 .put_module_artifact(&linked.artifact)
614 .await
615 .expect("put artifact");
616 let restored = store
617 .get_module_artifact(&linked.module_ref)
618 .await
619 .expect("get artifact")
620 .expect("artifact exists");
621
622 assert_eq!(restored.module_ref, linked.module_ref);
623 assert_eq!(
624 restored.process_ref("scan"),
625 linked.artifact.process_ref("scan")
626 );
627 }
628
629 #[tokio::test]
630 async fn sqlite_process_registry_persists_rows_after_reopen() {
631 let dir = tempfile::tempdir().expect("tempdir");
632 let path = dir.path().join("processes.db");
633 {
634 let registry = SqliteProcessRegistry::open(&path)
635 .await
636 .expect("open registry");
637 let session_scope = lash_core::SessionScope::new("session");
638 registry
639 .register_process(registration("proc-persist"))
640 .await
641 .expect("register");
642 registry
643 .grant_handle(
644 &session_scope,
645 "proc-persist",
646 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
647 )
648 .await
649 .expect("grant");
650 registry
651 .complete_process(
652 "proc-persist",
653 ProcessAwaitOutput::Success {
654 value: serde_json::json!({"ok": true}),
655 control: None,
656 },
657 )
658 .await
659 .expect("complete");
660 }
661
662 let registry = SqliteProcessRegistry::open(&path)
663 .await
664 .expect("reopen registry");
665 let session_scope = lash_core::SessionScope::new("session");
666 let record = registry
667 .get_process("proc-persist")
668 .await
669 .expect("persisted process");
670
671 assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
672 assert_eq!(
673 record.provenance.originator,
674 lash_core::ProcessOriginator::session(session_scope.clone())
675 );
676 assert_eq!(
677 registry
678 .await_process("proc-persist")
679 .await
680 .expect("await persisted"),
681 ProcessAwaitOutput::Success {
682 value: serde_json::json!({"ok": true}),
683 control: None,
684 }
685 );
686 assert_eq!(
687 registry
688 .list_handle_grants(&session_scope)
689 .await
690 .expect("grants")
691 .len(),
692 1
693 );
694 }
695}