1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_leading_session_command,
52 select_turn_work_claim_prefix,
53};
54use lash_core::store::{
55 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
56 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
57};
58use lash_core::{
59 AbandonRequest, AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry,
60 BlobRef, DeliveryPolicy, DurabilityTier, GcReport, LeaseOwnerIdentity, LeaseOwnerLiveness,
61 MergeKey, PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput, ProcessEvent,
62 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
63 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseClaimOutcome,
64 ProcessLeaseCompletion, ProcessPruneReport, ProcessRecord, ProcessRegistration,
65 ProcessRegistry, ProcessStarted, QueuedWorkStore, RuntimePersistence, SessionCommitStore,
66 SessionExecutionLease, SessionExecutionLeaseClaimOutcome, SessionExecutionLeaseCompletion,
67 SessionExecutionLeaseFence, SessionExecutionLeaseStore, SessionMeta, SessionPickerInfo,
68 SessionReadScope, SessionScope, SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy,
69 StoreError, StoreMaintenance, TurnInputStore, VacuumReport,
70};
71use rusqlite::{Connection, OptionalExtension, Transaction, params};
72use sha2::{Digest, Sha256};
73
74use conn::SqliteConnection;
75
76mod attachments;
77mod blobs;
78mod conn;
79mod effect_replay;
80mod graph;
81mod leases;
82mod lifecycle;
83mod pending_turn_inputs;
84mod persistence;
85mod process_registry;
86mod queued_work;
87mod schema;
88mod triggers;
89
90use conn::TxOutcome;
91pub use effect_replay::{
92 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
93};
94use leases::*;
95use pending_turn_inputs::*;
96use queued_work::*;
97use schema::{
98 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
99 ensure_trigger_schema,
100};
101pub use triggers::SqliteTriggerStore;
102
103pub struct Store {
110 conn: SqliteConnection,
111 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
112 options: StoreOptions,
113 commit_count: AtomicU64,
114}
115
116pub struct SqliteProcessRegistry {
122 conn: SqliteConnection,
123}
124
125fn sqlite_error(err: rusqlite::Error) -> StoreError {
126 StoreError::Backend(err.to_string())
127}
128
129fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
130 lash_core::PluginError::Session(err.to_string())
131}
132
133fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
134 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
135}
136
137fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
138 serde_json::to_string(value).map_err(|err| {
139 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
140 })
141}
142
143fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
144 futures_executor::block_on(future)
145}
146
147#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
148pub enum PersistedArtifactKind {
149 GenericBlob,
150 CheckpointManifest,
151 ToolState,
152 PluginSessionSnapshot,
153 ExecutionStateSnapshot,
154 LashlangModule,
155 ProcessExecutionEnv,
156}
157
158#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
159pub enum BlobStorageHint {
160 Compressible,
161 InlinePreferred,
162 LargePayload,
163}
164
165#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
166enum BlobCompression {
167 None,
168 Zlib,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
172pub struct BlobArtifactDescriptor {
173 pub kind: PersistedArtifactKind,
174 #[serde(default, skip_serializing_if = "Vec::is_empty")]
175 pub hints: Vec<BlobStorageHint>,
176}
177
178impl BlobArtifactDescriptor {
179 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
180 Self {
181 kind,
182 hints: hints.into(),
183 }
184 }
185
186 pub fn checkpoint_manifest() -> Self {
187 Self::new(
188 PersistedArtifactKind::CheckpointManifest,
189 vec![BlobStorageHint::Compressible],
190 )
191 }
192
193 pub fn tool_state_snapshot() -> Self {
194 Self::new(
195 PersistedArtifactKind::ToolState,
196 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
197 )
198 }
199
200 pub fn plugin_session_snapshot() -> Self {
201 Self::new(
202 PersistedArtifactKind::PluginSessionSnapshot,
203 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
204 )
205 }
206
207 pub fn execution_state_snapshot() -> Self {
208 Self::new(
209 PersistedArtifactKind::ExecutionStateSnapshot,
210 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
211 )
212 }
213
214 pub fn lashlang_module() -> Self {
215 Self::new(
216 PersistedArtifactKind::LashlangModule,
217 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
218 )
219 }
220
221 pub fn process_execution_env() -> Self {
222 Self::new(
223 PersistedArtifactKind::ProcessExecutionEnv,
224 vec![BlobStorageHint::Compressible],
225 )
226 }
227}
228
229#[derive(Clone, Debug, PartialEq, Eq, Hash)]
230struct RetainedArtifactRef {
231 pub blob_ref: BlobRef,
232 pub kind: PersistedArtifactKind,
233}
234
235#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
236pub enum BuiltinBlobProfile {
237 LowLatency,
238 #[default]
239 Balanced,
240 Compact,
241}
242
243#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
244pub struct StoreGcPolicy {
245 pub auto_run_every_commits: Option<u64>,
246}
247
248#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
249pub struct StoreOptions {
250 pub blob_profile: BuiltinBlobProfile,
251 pub gc_policy: StoreGcPolicy,
252}
253
254#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
255struct StoredBlobEnvelope {
256 descriptor: BlobArtifactDescriptor,
257 compression: BlobCompression,
258 content: Vec<u8>,
259}
260
261#[derive(Clone, Debug)]
262pub struct StoredSessionCheckpoint {
263 pub checkpoint_ref: BlobRef,
264 pub manifest: SessionCheckpoint,
265}
266
267#[derive(Clone, Debug)]
274pub struct SqliteSessionStoreFactory {
275 root: PathBuf,
276 options: StoreOptions,
277}
278
279impl SqliteSessionStoreFactory {
280 pub fn new(root: impl Into<PathBuf>) -> Self {
281 Self {
282 root: root.into(),
283 options: StoreOptions::default(),
284 }
285 }
286
287 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
288 Self {
289 root: root.into(),
290 options,
291 }
292 }
293
294 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
295 self.root.join(safe_session_db_file_name(session_id))
296 }
297}
298
299#[async_trait::async_trait]
300impl SessionStoreFactory for SqliteSessionStoreFactory {
301 fn durability_tier(&self) -> DurabilityTier {
302 DurabilityTier::Durable
303 }
304
305 async fn create_store(
306 &self,
307 request: &SessionStoreCreateRequest,
308 ) -> Result<Arc<dyn RuntimePersistence>, String> {
309 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
310 let path = self.path_for_session(&request.session_id);
311 let store = Arc::new(
312 Store::open_with_options(&path, self.options)
313 .await
314 .map_err(|err| err.to_string())?,
315 );
316 if store.load_session_meta().await.is_none() {
317 store
318 .save_session_meta(SessionMeta {
319 session_id: request.session_id.clone(),
320 session_name: request.session_id.clone(),
321 created_at: current_timestamp_string(),
322 model: request.policy.model.id.clone(),
323 cwd: std::env::current_dir()
324 .ok()
325 .and_then(|path| path.to_str().map(str::to_string)),
326 relation: request.relation.clone(),
327 })
328 .await;
329 }
330 Ok(store as Arc<dyn RuntimePersistence>)
331 }
332
333 async fn open_existing_store(
334 &self,
335 request: &SessionStoreCreateRequest,
336 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
337 let path = self.path_for_session(&request.session_id);
338 if !path.exists() {
339 return Ok(None);
340 }
341 self.create_store(request).await.map(Some)
342 }
343
344 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
345 let db_path = self.path_for_session(session_id);
346 for path in [
347 db_path.clone(),
348 sqlite_sidecar_path(&db_path, "-wal"),
349 sqlite_sidecar_path(&db_path, "-shm"),
350 ] {
351 match std::fs::remove_file(&path) {
352 Ok(()) => {}
353 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
354 Err(err) => {
355 return Err(format!("remove session store {}: {err}", path.display()));
356 }
357 }
358 }
359 Ok(())
360 }
361}
362
363fn safe_session_db_file_name(session_id: &str) -> String {
364 let mut safe = session_id
365 .chars()
366 .map(|ch| match ch {
367 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
368 _ => '_',
369 })
370 .collect::<String>();
371 safe = safe.trim_matches('_').to_string();
372 if safe.is_empty() {
373 safe.push_str("session");
374 }
375 safe.truncate(80);
376 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
377 format!("{safe}-{}.db", &hash[..16])
378}
379
380fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
381 let mut sidecar = path.as_os_str().to_os_string();
382 sidecar.push(suffix);
383 PathBuf::from(sidecar)
384}
385
386fn current_timestamp_string() -> String {
387 let now = SystemTime::now()
388 .duration_since(UNIX_EPOCH)
389 .unwrap_or_default();
390 format!("unix:{}", now.as_secs())
391}
392
393fn current_epoch_ms() -> u64 {
394 SystemTime::now()
395 .duration_since(UNIX_EPOCH)
396 .unwrap_or_default()
397 .as_millis() as u64
398}
399
400fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
401 let mut refs = Vec::new();
402 if let Some(blob_ref) = &checkpoint.tool_state_ref {
403 refs.push(RetainedArtifactRef {
404 blob_ref: blob_ref.clone(),
405 kind: PersistedArtifactKind::ToolState,
406 });
407 }
408 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
409 refs.push(RetainedArtifactRef {
410 blob_ref: blob_ref.clone(),
411 kind: PersistedArtifactKind::PluginSessionSnapshot,
412 });
413 }
414 if let Some(blob_ref) = &checkpoint.execution_state_ref {
415 refs.push(RetainedArtifactRef {
416 blob_ref: blob_ref.clone(),
417 kind: PersistedArtifactKind::ExecutionStateSnapshot,
418 });
419 }
420 refs
421}
422
423fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
424 SessionHeadMeta {
425 schema_version: lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
426 session_id: head.session_id.clone(),
427 head_revision: 0,
428 config: head.config.clone(),
429 agent_frames: head.agent_frames.clone(),
430 current_agent_frame_id: head.current_agent_frame_id.clone(),
431 checkpoint_ref: head.checkpoint_ref.clone(),
432 leaf_node_id: head.graph.leaf_node_id.clone(),
433 graph_node_count: head.graph.nodes.len(),
434 token_ledger: Vec::new(),
435 }
436}
437
438fn encode_json<T: serde::Serialize>(value: &T) -> String {
439 serde_json::to_string(value).expect("persisted state should serialize")
440}
441
442fn should_compress_blob(
443 profile: BuiltinBlobProfile,
444 descriptor: &BlobArtifactDescriptor,
445 len: usize,
446) -> bool {
447 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
448 return false;
449 }
450 match profile {
451 BuiltinBlobProfile::LowLatency => false,
452 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
453 BuiltinBlobProfile::Compact => len >= 1024,
454 }
455}
456
457fn compress_blob(content: &[u8]) -> Vec<u8> {
458 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
459 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
460 encoder.finish().expect("submit blob compression")
461}
462
463fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
464 let mut decoder = ZlibDecoder::new(content);
465 let mut out = Vec::new();
466 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
467 Some(out)
468}
469
470fn encode_artifact_blob(
471 descriptor: &BlobArtifactDescriptor,
472 profile: BuiltinBlobProfile,
473 content: &[u8],
474) -> Vec<u8> {
475 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
476 {
477 (BlobCompression::Zlib, compress_blob(content))
478 } else {
479 (BlobCompression::None, content.to_vec())
480 };
481 encode_msgpack(&StoredBlobEnvelope {
482 descriptor: descriptor.clone(),
483 compression,
484 content: stored_content,
485 })
486}
487
488fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
489 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
490 match envelope.compression {
491 BlobCompression::None => Some(envelope.content),
492 BlobCompression::Zlib => decompress_blob(&envelope.content),
493 }
494}
495
496fn try_load_session_head_meta_from_conn(
499 conn: &Connection,
500) -> Result<Option<SessionHeadMeta>, StoreError> {
501 let row = conn
502 .query_row(
503 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
504 [],
505 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
506 )
507 .optional()
508 .map_err(sqlite_error)?;
509 let Some((head_json, head_revision)) = row else {
510 return Ok(None);
511 };
512 let mut meta: SessionHeadMeta = lash_core::store::decode_versioned_json_record(
513 &head_json,
514 "SessionHeadMeta",
515 lash_core::store::SESSION_HEAD_META_SCHEMA_VERSION,
516 )?;
517 meta.head_revision = head_revision as u64;
518 Ok(Some(meta))
519}
520
521fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
522 try_load_session_head_meta_from_conn(conn).ok().flatten()
523}
524
525fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
526 conn.query_row(
527 "SELECT session_id, session_name, created_at, model, cwd, relation_json
528 FROM session_meta WHERE singleton = 1",
529 [],
530 |row| {
531 let relation_json: Option<String> = row.get(5)?;
532 let relation = relation_json
533 .and_then(|json| serde_json::from_str(&json).ok())
534 .unwrap_or_default();
535 Ok(SessionMeta {
536 session_id: row.get(0)?,
537 session_name: row.get(1)?,
538 created_at: row.get(2)?,
539 model: row.get(3)?,
540 cwd: row.get(4)?,
541 relation,
542 })
543 },
544 )
545 .optional()
546 .ok()
547 .flatten()
548}
549
550fn decode_checkpoint(bytes: &[u8]) -> Result<SessionCheckpoint, StoreError> {
551 let value: serde_json::Value = rmp_serde::from_slice(bytes)
552 .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))?;
553 lash_core::store::ensure_supported_record_schema_version(
554 "SessionCheckpoint",
555 &value,
556 lash_core::store::SESSION_CHECKPOINT_SCHEMA_VERSION,
557 )?;
558 rmp_serde::from_slice(bytes)
559 .map_err(|err| StoreError::Backend(format!("failed to decode SessionCheckpoint: {err}")))
560}
561
562fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
563 let mut buf = Vec::with_capacity(1024);
566 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
567 buf
568}
569
570fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
571 rmp_serde::from_slice(bytes).ok()
572}
573
574fn merge_token_ledger_entries(
575 entries: Vec<lash_core::TokenLedgerEntry>,
576) -> Vec<lash_core::TokenLedgerEntry> {
577 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
578 for entry in entries {
579 if entry.usage.total() == 0 {
580 continue;
581 }
582 if let Some(existing) = merged
583 .iter_mut()
584 .find(|existing| existing.source == entry.source && existing.model == entry.model)
585 {
586 existing.usage.input_tokens += entry.usage.input_tokens;
587 existing.usage.output_tokens += entry.usage.output_tokens;
588 existing.usage.cache_read_input_tokens += entry.usage.cache_read_input_tokens;
589 existing.usage.cache_write_input_tokens += entry.usage.cache_write_input_tokens;
590 existing.usage.reasoning_output_tokens += entry.usage.reasoning_output_tokens;
591 } else {
592 merged.push(entry);
593 }
594 }
595 merged
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use lash_core::ProcessInput;
602 use lashlang::LashlangArtifactStore;
603
604 fn registration(id: &str) -> ProcessRegistration {
605 ProcessRegistration::new(
606 id,
607 ProcessInput::External {
608 metadata: serde_json::Value::Null,
609 },
610 lash_core::RecoveryDisposition::ExternallyOwned,
611 lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
612 )
613 }
614
615 #[tokio::test]
616 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
617 let store = Store::memory().await.expect("memory store");
618 let module =
619 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
620 let linked = lashlang::LinkedModule::link(
621 module,
622 lashlang::LashlangHostEnvironment::new(
623 lashlang::LashlangHostCatalog::new(),
624 lashlang::LashlangAbilities::all(),
625 ),
626 )
627 .expect("link module");
628
629 store
630 .put_module_artifact(&linked.artifact)
631 .await
632 .expect("put artifact");
633 let restored = store
634 .get_module_artifact(&linked.module_ref)
635 .await
636 .expect("get artifact")
637 .expect("artifact exists");
638
639 assert_eq!(restored.module_ref, linked.module_ref);
640 assert_eq!(
641 restored.process_ref("scan"),
642 linked.artifact.process_ref("scan")
643 );
644 }
645
646 #[tokio::test]
647 async fn sqlite_process_registry_persists_rows_after_reopen() {
648 let dir = tempfile::tempdir().expect("tempdir");
649 let path = dir.path().join("processes.db");
650 {
651 let registry = SqliteProcessRegistry::open(&path)
652 .await
653 .expect("open registry");
654 let session_scope = lash_core::SessionScope::new("session");
655 registry
656 .register_process(registration("proc-persist"))
657 .await
658 .expect("register");
659 registry
660 .grant_handle(
661 &session_scope,
662 "proc-persist",
663 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
664 )
665 .await
666 .expect("grant");
667 registry
668 .complete_process(
669 "proc-persist",
670 ProcessAwaitOutput::Success {
671 value: serde_json::json!({"ok": true}),
672 control: None,
673 },
674 )
675 .await
676 .expect("complete");
677 }
678
679 let registry = Arc::new(
680 SqliteProcessRegistry::open(&path)
681 .await
682 .expect("reopen registry"),
683 ) as Arc<dyn lash_core::ProcessRegistry>;
684 let session_scope = lash_core::SessionScope::new("session");
685 let record = registry
686 .get_process("proc-persist")
687 .await
688 .expect("persisted process");
689
690 assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
691 assert_eq!(
692 record.provenance.originator,
693 lash_core::ProcessOriginator::session(session_scope.clone())
694 );
695 assert_eq!(
696 lash_core::ProcessAwaiter::polling(Arc::clone(®istry))
697 .await_terminal("proc-persist")
698 .await
699 .expect("await persisted"),
700 ProcessAwaitOutput::Success {
701 value: serde_json::json!({"ok": true}),
702 control: None,
703 }
704 );
705 assert_eq!(
706 registry
707 .list_handle_grants(&session_scope)
708 .await
709 .expect("grants")
710 .len(),
711 1
712 );
713 }
714}