1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_leading_session_command,
52 select_turn_work_claim_prefix,
53};
54use lash_core::store::{
55 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
56 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
57};
58use lash_core::{
59 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
60 DeliveryPolicy, DurabilityTier, GcReport, LeaseOwnerIdentity, LeaseOwnerLiveness, MergeKey,
61 PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest,
62 ProcessEventAppendResult, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
63 ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
64 RuntimePersistence, SessionExecutionLease, SessionExecutionLeaseClaimOutcome,
65 SessionExecutionLeaseCompletion, SessionExecutionLeaseFence, SessionMeta, SessionPickerInfo,
66 SessionReadScope, SessionScope, SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy,
67 StoreError, VacuumReport,
68};
69use rusqlite::{Connection, OptionalExtension, Transaction, params};
70use sha2::{Digest, Sha256};
71
72use conn::SqliteConnection;
73
74mod attachments;
75mod blobs;
76mod conn;
77mod effect_replay;
78mod graph;
79mod leases;
80mod lifecycle;
81mod pending_turn_inputs;
82mod persistence;
83mod process_registry;
84mod queued_work;
85mod schema;
86mod triggers;
87
88use conn::TxOutcome;
89pub use effect_replay::{
90 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
91};
92use leases::*;
93use pending_turn_inputs::*;
94use queued_work::*;
95use schema::{
96 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
97 ensure_trigger_schema,
98};
99pub use triggers::SqliteTriggerStore;
100
101pub struct Store {
108 conn: SqliteConnection,
109 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
110 options: StoreOptions,
111 commit_count: AtomicU64,
112}
113
114pub struct SqliteProcessRegistry {
120 conn: SqliteConnection,
121 notify: tokio::sync::Notify,
122}
123
124fn sqlite_error(err: rusqlite::Error) -> StoreError {
125 StoreError::Backend(err.to_string())
126}
127
128fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
129 lash_core::PluginError::Session(err.to_string())
130}
131
132fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
133 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
134}
135
136fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
137 serde_json::to_string(value).map_err(|err| {
138 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
139 })
140}
141
142fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
143 futures_executor::block_on(future)
144}
145
146#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
147pub enum PersistedArtifactKind {
148 GenericBlob,
149 CheckpointManifest,
150 ToolState,
151 PluginSessionSnapshot,
152 ExecutionStateSnapshot,
153 LashlangModule,
154 ProcessExecutionEnv,
155}
156
157#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
158pub enum BlobStorageHint {
159 Compressible,
160 InlinePreferred,
161 LargePayload,
162}
163
164#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
165enum BlobCompression {
166 None,
167 Zlib,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
171pub struct BlobArtifactDescriptor {
172 pub kind: PersistedArtifactKind,
173 #[serde(default, skip_serializing_if = "Vec::is_empty")]
174 pub hints: Vec<BlobStorageHint>,
175}
176
177impl BlobArtifactDescriptor {
178 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
179 Self {
180 kind,
181 hints: hints.into(),
182 }
183 }
184
185 pub fn checkpoint_manifest() -> Self {
186 Self::new(
187 PersistedArtifactKind::CheckpointManifest,
188 vec![BlobStorageHint::Compressible],
189 )
190 }
191
192 pub fn tool_state_snapshot() -> Self {
193 Self::new(
194 PersistedArtifactKind::ToolState,
195 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
196 )
197 }
198
199 pub fn plugin_session_snapshot() -> Self {
200 Self::new(
201 PersistedArtifactKind::PluginSessionSnapshot,
202 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
203 )
204 }
205
206 pub fn execution_state_snapshot() -> Self {
207 Self::new(
208 PersistedArtifactKind::ExecutionStateSnapshot,
209 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
210 )
211 }
212
213 pub fn lashlang_module() -> Self {
214 Self::new(
215 PersistedArtifactKind::LashlangModule,
216 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
217 )
218 }
219
220 pub fn process_execution_env() -> Self {
221 Self::new(
222 PersistedArtifactKind::ProcessExecutionEnv,
223 vec![BlobStorageHint::Compressible],
224 )
225 }
226}
227
228#[derive(Clone, Debug, PartialEq, Eq, Hash)]
229struct RetainedArtifactRef {
230 pub blob_ref: BlobRef,
231 pub kind: PersistedArtifactKind,
232}
233
234#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
235pub enum BuiltinBlobProfile {
236 LowLatency,
237 #[default]
238 Balanced,
239 Compact,
240}
241
242#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
243pub struct StoreGcPolicy {
244 pub auto_run_every_commits: Option<u64>,
245}
246
247#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
248pub struct StoreOptions {
249 pub blob_profile: BuiltinBlobProfile,
250 pub gc_policy: StoreGcPolicy,
251}
252
253#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
254struct StoredBlobEnvelope {
255 descriptor: BlobArtifactDescriptor,
256 compression: BlobCompression,
257 content: Vec<u8>,
258}
259
260#[derive(Clone, Debug)]
261pub struct StoredSessionCheckpoint {
262 pub checkpoint_ref: BlobRef,
263 pub manifest: SessionCheckpoint,
264}
265
266#[derive(Clone, Debug)]
273pub struct SqliteSessionStoreFactory {
274 root: PathBuf,
275 options: StoreOptions,
276}
277
278impl SqliteSessionStoreFactory {
279 pub fn new(root: impl Into<PathBuf>) -> Self {
280 Self {
281 root: root.into(),
282 options: StoreOptions::default(),
283 }
284 }
285
286 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
287 Self {
288 root: root.into(),
289 options,
290 }
291 }
292
293 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
294 self.root.join(safe_session_db_file_name(session_id))
295 }
296}
297
298#[async_trait::async_trait]
299impl SessionStoreFactory for SqliteSessionStoreFactory {
300 fn durability_tier(&self) -> DurabilityTier {
301 DurabilityTier::Durable
302 }
303
304 async fn create_store(
305 &self,
306 request: &SessionStoreCreateRequest,
307 ) -> Result<Arc<dyn RuntimePersistence>, String> {
308 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
309 let path = self.path_for_session(&request.session_id);
310 let store = Arc::new(
311 Store::open_with_options(&path, self.options)
312 .await
313 .map_err(|err| err.to_string())?,
314 );
315 if store.load_session_meta().await.is_none() {
316 store
317 .save_session_meta(SessionMeta {
318 session_id: request.session_id.clone(),
319 session_name: request.session_id.clone(),
320 created_at: current_timestamp_string(),
321 model: request.policy.model.id.clone(),
322 cwd: std::env::current_dir()
323 .ok()
324 .and_then(|path| path.to_str().map(str::to_string)),
325 relation: request.relation.clone(),
326 })
327 .await;
328 }
329 Ok(store as Arc<dyn RuntimePersistence>)
330 }
331
332 async fn open_existing_store(
333 &self,
334 request: &SessionStoreCreateRequest,
335 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
336 let path = self.path_for_session(&request.session_id);
337 if !path.exists() {
338 return Ok(None);
339 }
340 self.create_store(request).await.map(Some)
341 }
342
343 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
344 let db_path = self.path_for_session(session_id);
345 for path in [
346 db_path.clone(),
347 sqlite_sidecar_path(&db_path, "-wal"),
348 sqlite_sidecar_path(&db_path, "-shm"),
349 ] {
350 match std::fs::remove_file(&path) {
351 Ok(()) => {}
352 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
353 Err(err) => {
354 return Err(format!("remove session store {}: {err}", path.display()));
355 }
356 }
357 }
358 Ok(())
359 }
360}
361
362fn safe_session_db_file_name(session_id: &str) -> String {
363 let mut safe = session_id
364 .chars()
365 .map(|ch| match ch {
366 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
367 _ => '_',
368 })
369 .collect::<String>();
370 safe = safe.trim_matches('_').to_string();
371 if safe.is_empty() {
372 safe.push_str("session");
373 }
374 safe.truncate(80);
375 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
376 format!("{safe}-{}.db", &hash[..16])
377}
378
379fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
380 let mut sidecar = path.as_os_str().to_os_string();
381 sidecar.push(suffix);
382 PathBuf::from(sidecar)
383}
384
385fn current_timestamp_string() -> String {
386 let now = SystemTime::now()
387 .duration_since(UNIX_EPOCH)
388 .unwrap_or_default();
389 format!("unix:{}", now.as_secs())
390}
391
392fn current_epoch_ms() -> u64 {
393 SystemTime::now()
394 .duration_since(UNIX_EPOCH)
395 .unwrap_or_default()
396 .as_millis() as u64
397}
398
399fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
400 let mut refs = Vec::new();
401 if let Some(blob_ref) = &checkpoint.tool_state_ref {
402 refs.push(RetainedArtifactRef {
403 blob_ref: blob_ref.clone(),
404 kind: PersistedArtifactKind::ToolState,
405 });
406 }
407 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
408 refs.push(RetainedArtifactRef {
409 blob_ref: blob_ref.clone(),
410 kind: PersistedArtifactKind::PluginSessionSnapshot,
411 });
412 }
413 if let Some(blob_ref) = &checkpoint.execution_state_ref {
414 refs.push(RetainedArtifactRef {
415 blob_ref: blob_ref.clone(),
416 kind: PersistedArtifactKind::ExecutionStateSnapshot,
417 });
418 }
419 refs
420}
421
422fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
423 SessionHeadMeta {
424 schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
425 session_id: head.session_id.clone(),
426 head_revision: 0,
427 config: head.config.clone(),
428 agent_frames: head.agent_frames.clone(),
429 current_agent_frame_id: head.current_agent_frame_id.clone(),
430 checkpoint_ref: head.checkpoint_ref.clone(),
431 leaf_node_id: head.graph.leaf_node_id.clone(),
432 graph_node_count: head.graph.nodes.len(),
433 token_ledger: Vec::new(),
434 }
435}
436
437fn encode_json<T: serde::Serialize>(value: &T) -> String {
438 serde_json::to_string(value).expect("persisted state should serialize")
439}
440
441fn should_compress_blob(
442 profile: BuiltinBlobProfile,
443 descriptor: &BlobArtifactDescriptor,
444 len: usize,
445) -> bool {
446 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
447 return false;
448 }
449 match profile {
450 BuiltinBlobProfile::LowLatency => false,
451 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
452 BuiltinBlobProfile::Compact => len >= 1024,
453 }
454}
455
456fn compress_blob(content: &[u8]) -> Vec<u8> {
457 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
458 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
459 encoder.finish().expect("submit blob compression")
460}
461
462fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
463 let mut decoder = ZlibDecoder::new(content);
464 let mut out = Vec::new();
465 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
466 Some(out)
467}
468
469fn encode_artifact_blob(
470 descriptor: &BlobArtifactDescriptor,
471 profile: BuiltinBlobProfile,
472 content: &[u8],
473) -> Vec<u8> {
474 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
475 {
476 (BlobCompression::Zlib, compress_blob(content))
477 } else {
478 (BlobCompression::None, content.to_vec())
479 };
480 encode_msgpack(&StoredBlobEnvelope {
481 descriptor: descriptor.clone(),
482 compression,
483 content: stored_content,
484 })
485}
486
487fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
488 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
489 match envelope.compression {
490 BlobCompression::None => Some(envelope.content),
491 BlobCompression::Zlib => decompress_blob(&envelope.content),
492 }
493}
494
495fn try_load_session_head_meta_from_conn(
498 conn: &Connection,
499) -> Result<Option<SessionHeadMeta>, StoreError> {
500 let row = conn
501 .query_row(
502 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
503 [],
504 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
505 )
506 .optional()
507 .map_err(sqlite_error)?;
508 let Some((head_json, head_revision)) = row else {
509 return Ok(None);
510 };
511 let mut meta: SessionHeadMeta = lash_core::store::decode_versioned_json_record(
512 &head_json,
513 "SessionHeadMeta",
514 lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
515 )?;
516 meta.head_revision = head_revision as u64;
517 Ok(Some(meta))
518}
519
520fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
521 try_load_session_head_meta_from_conn(conn).ok().flatten()
522}
523
524fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
525 conn.query_row(
526 "SELECT session_id, session_name, created_at, model, cwd, relation_json
527 FROM session_meta WHERE singleton = 1",
528 [],
529 |row| {
530 let relation_json: Option<String> = row.get(5)?;
531 let relation = relation_json
532 .and_then(|json| serde_json::from_str(&json).ok())
533 .unwrap_or_default();
534 Ok(SessionMeta {
535 session_id: row.get(0)?,
536 session_name: row.get(1)?,
537 created_at: row.get(2)?,
538 model: row.get(3)?,
539 cwd: row.get(4)?,
540 relation,
541 })
542 },
543 )
544 .optional()
545 .ok()
546 .flatten()
547}
548
549fn decode_checkpoint(bytes: &[u8]) -> Result<SessionCheckpoint, StoreError> {
550 let value: serde_json::Value = rmp_serde::from_slice(bytes)
551 .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))?;
552 lash_core::store::ensure_supported_record_schema_version(
553 "SessionCheckpoint",
554 &value,
555 lash_core::store::SESSION_CHECKPOINT_SCHEMA_VERSION,
556 )?;
557 rmp_serde::from_slice(bytes)
558 .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))
559}
560
561fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
562 let mut buf = Vec::with_capacity(1024);
565 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
566 buf
567}
568
569fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
570 rmp_serde::from_slice(bytes).ok()
571}
572
573fn merge_token_ledger_entries(
574 entries: Vec<lash_core::TokenLedgerEntry>,
575) -> Vec<lash_core::TokenLedgerEntry> {
576 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
577 for entry in entries {
578 if entry.usage.total() == 0 {
579 continue;
580 }
581 if let Some(existing) = merged
582 .iter_mut()
583 .find(|existing| existing.source == entry.source && existing.model == entry.model)
584 {
585 existing.usage.input_tokens += entry.usage.input_tokens;
586 existing.usage.output_tokens += entry.usage.output_tokens;
587 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
588 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
589 } else {
590 merged.push(entry);
591 }
592 }
593 merged
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599 use lash_core::ProcessInput;
600 use lashlang::LashlangArtifactStore;
601
602 fn registration(id: &str) -> ProcessRegistration {
603 ProcessRegistration::new(
604 id,
605 ProcessInput::External {
606 metadata: serde_json::Value::Null,
607 },
608 lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
609 )
610 }
611
612 #[tokio::test]
613 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
614 let store = Store::memory().await.expect("memory store");
615 let module =
616 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
617 let linked = lashlang::LinkedModule::link(
618 module,
619 lashlang::LashlangHostEnvironment::new(
620 lashlang::LashlangHostCatalog::new(),
621 lashlang::LashlangAbilities::all(),
622 ),
623 )
624 .expect("link module");
625
626 store
627 .put_module_artifact(&linked.artifact)
628 .await
629 .expect("put artifact");
630 let restored = store
631 .get_module_artifact(&linked.module_ref)
632 .await
633 .expect("get artifact")
634 .expect("artifact exists");
635
636 assert_eq!(restored.module_ref, linked.module_ref);
637 assert_eq!(
638 restored.process_ref("scan"),
639 linked.artifact.process_ref("scan")
640 );
641 }
642
643 #[tokio::test]
644 async fn sqlite_process_registry_persists_rows_after_reopen() {
645 let dir = tempfile::tempdir().expect("tempdir");
646 let path = dir.path().join("processes.db");
647 {
648 let registry = SqliteProcessRegistry::open(&path)
649 .await
650 .expect("open registry");
651 let session_scope = lash_core::SessionScope::new("session");
652 registry
653 .register_process(registration("proc-persist"))
654 .await
655 .expect("register");
656 registry
657 .grant_handle(
658 &session_scope,
659 "proc-persist",
660 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
661 )
662 .await
663 .expect("grant");
664 registry
665 .complete_process(
666 "proc-persist",
667 ProcessAwaitOutput::Success {
668 value: serde_json::json!({"ok": true}),
669 control: None,
670 },
671 )
672 .await
673 .expect("complete");
674 }
675
676 let registry = SqliteProcessRegistry::open(&path)
677 .await
678 .expect("reopen registry");
679 let session_scope = lash_core::SessionScope::new("session");
680 let record = registry
681 .get_process("proc-persist")
682 .await
683 .expect("persisted process");
684
685 assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
686 assert_eq!(
687 record.provenance.originator,
688 lash_core::ProcessOriginator::session(session_scope.clone())
689 );
690 assert_eq!(
691 registry
692 .await_process("proc-persist")
693 .await
694 .expect("await persisted"),
695 ProcessAwaitOutput::Success {
696 value: serde_json::json!({"ok": true}),
697 control: None,
698 }
699 );
700 assert_eq!(
701 registry
702 .list_handle_grants(&session_scope)
703 .await
704 .expect("grants")
705 .len(),
706 1
707 );
708 }
709}