1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59 DeliveryPolicy, DurabilityTier, GcReport, LeaseOwnerIdentity, LeaseOwnerLiveness, MergeKey,
60 PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest,
61 ProcessEventAppendResult, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
62 ProcessLease, ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63 RuntimePersistence, SessionExecutionLease, SessionExecutionLeaseClaimOutcome,
64 SessionExecutionLeaseCompletion, SessionExecutionLeaseFence, SessionMeta, SessionPickerInfo,
65 SessionReadScope, SessionScope, SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy,
66 StoreError, VacuumReport,
67};
68use rusqlite::{Connection, OptionalExtension, Transaction, params};
69use sha2::{Digest, Sha256};
70
71use conn::SqliteConnection;
72
73mod attachments;
74mod blobs;
75mod conn;
76mod effect_replay;
77mod graph;
78mod leases;
79mod lifecycle;
80mod persistence;
81mod process_registry;
82mod queued_work;
83mod schema;
84mod triggers;
85
86use conn::TxOutcome;
87pub use effect_replay::{
88 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
89};
90use leases::*;
91use queued_work::*;
92use schema::{
93 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_process_schema, ensure_schema,
94 ensure_trigger_schema,
95};
96pub use triggers::SqliteTriggerStore;
97
98pub struct Store {
105 conn: SqliteConnection,
106 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
107 options: StoreOptions,
108 commit_count: AtomicU64,
109}
110
111pub struct SqliteProcessRegistry {
117 conn: SqliteConnection,
118 notify: tokio::sync::Notify,
119}
120
121fn sqlite_error(err: rusqlite::Error) -> StoreError {
122 StoreError::Backend(err.to_string())
123}
124
125fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
126 lash_core::PluginError::Session(err.to_string())
127}
128
129fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
130 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
131}
132
133fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
134 serde_json::to_string(value).map_err(|err| {
135 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
136 })
137}
138
139fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
140 futures_executor::block_on(future)
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
144pub enum PersistedArtifactKind {
145 GenericBlob,
146 CheckpointManifest,
147 ToolState,
148 PluginSessionSnapshot,
149 ExecutionStateSnapshot,
150 LashlangModule,
151 ProcessExecutionEnv,
152}
153
154#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
155pub enum BlobStorageHint {
156 Compressible,
157 InlinePreferred,
158 LargePayload,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
162enum BlobCompression {
163 None,
164 Zlib,
165}
166
167#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
168pub struct BlobArtifactDescriptor {
169 pub kind: PersistedArtifactKind,
170 #[serde(default, skip_serializing_if = "Vec::is_empty")]
171 pub hints: Vec<BlobStorageHint>,
172}
173
174impl BlobArtifactDescriptor {
175 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
176 Self {
177 kind,
178 hints: hints.into(),
179 }
180 }
181
182 pub fn checkpoint_manifest() -> Self {
183 Self::new(
184 PersistedArtifactKind::CheckpointManifest,
185 vec![BlobStorageHint::Compressible],
186 )
187 }
188
189 pub fn tool_state_snapshot() -> Self {
190 Self::new(
191 PersistedArtifactKind::ToolState,
192 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
193 )
194 }
195
196 pub fn plugin_session_snapshot() -> Self {
197 Self::new(
198 PersistedArtifactKind::PluginSessionSnapshot,
199 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
200 )
201 }
202
203 pub fn execution_state_snapshot() -> Self {
204 Self::new(
205 PersistedArtifactKind::ExecutionStateSnapshot,
206 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
207 )
208 }
209
210 pub fn lashlang_module() -> Self {
211 Self::new(
212 PersistedArtifactKind::LashlangModule,
213 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
214 )
215 }
216
217 pub fn process_execution_env() -> Self {
218 Self::new(
219 PersistedArtifactKind::ProcessExecutionEnv,
220 vec![BlobStorageHint::Compressible],
221 )
222 }
223}
224
225#[derive(Clone, Debug, PartialEq, Eq, Hash)]
226struct RetainedArtifactRef {
227 pub blob_ref: BlobRef,
228 pub kind: PersistedArtifactKind,
229}
230
231#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
232pub enum BuiltinBlobProfile {
233 LowLatency,
234 #[default]
235 Balanced,
236 Compact,
237}
238
239#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
240pub struct StoreGcPolicy {
241 pub auto_run_every_commits: Option<u64>,
242}
243
244#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
245pub struct StoreOptions {
246 pub blob_profile: BuiltinBlobProfile,
247 pub gc_policy: StoreGcPolicy,
248}
249
250#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
251struct StoredBlobEnvelope {
252 descriptor: BlobArtifactDescriptor,
253 compression: BlobCompression,
254 content: Vec<u8>,
255}
256
257#[derive(Clone, Debug)]
258pub struct StoredSessionCheckpoint {
259 pub checkpoint_ref: BlobRef,
260 pub manifest: SessionCheckpoint,
261}
262
263#[derive(Clone, Debug)]
270pub struct SqliteSessionStoreFactory {
271 root: PathBuf,
272 options: StoreOptions,
273}
274
275impl SqliteSessionStoreFactory {
276 pub fn new(root: impl Into<PathBuf>) -> Self {
277 Self {
278 root: root.into(),
279 options: StoreOptions::default(),
280 }
281 }
282
283 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
284 Self {
285 root: root.into(),
286 options,
287 }
288 }
289
290 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
291 self.root.join(safe_session_db_file_name(session_id))
292 }
293}
294
295#[async_trait::async_trait]
296impl SessionStoreFactory for SqliteSessionStoreFactory {
297 fn durability_tier(&self) -> DurabilityTier {
298 DurabilityTier::Durable
299 }
300
301 async fn create_store(
302 &self,
303 request: &SessionStoreCreateRequest,
304 ) -> Result<Arc<dyn RuntimePersistence>, String> {
305 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
306 let path = self.path_for_session(&request.session_id);
307 let store = Arc::new(
308 Store::open_with_options(&path, self.options)
309 .await
310 .map_err(|err| err.to_string())?,
311 );
312 if store.load_session_meta().await.is_none() {
313 store
314 .save_session_meta(SessionMeta {
315 session_id: request.session_id.clone(),
316 session_name: request.session_id.clone(),
317 created_at: current_timestamp_string(),
318 model: request.policy.model.id.clone(),
319 cwd: std::env::current_dir()
320 .ok()
321 .and_then(|path| path.to_str().map(str::to_string)),
322 relation: request.relation.clone(),
323 })
324 .await;
325 }
326 Ok(store as Arc<dyn RuntimePersistence>)
327 }
328
329 async fn open_existing_store(
330 &self,
331 request: &SessionStoreCreateRequest,
332 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
333 let path = self.path_for_session(&request.session_id);
334 if !path.exists() {
335 return Ok(None);
336 }
337 self.create_store(request).await.map(Some)
338 }
339
340 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
341 let db_path = self.path_for_session(session_id);
342 for path in [
343 db_path.clone(),
344 sqlite_sidecar_path(&db_path, "-wal"),
345 sqlite_sidecar_path(&db_path, "-shm"),
346 ] {
347 match std::fs::remove_file(&path) {
348 Ok(()) => {}
349 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
350 Err(err) => {
351 return Err(format!("remove session store {}: {err}", path.display()));
352 }
353 }
354 }
355 Ok(())
356 }
357}
358
359fn safe_session_db_file_name(session_id: &str) -> String {
360 let mut safe = session_id
361 .chars()
362 .map(|ch| match ch {
363 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
364 _ => '_',
365 })
366 .collect::<String>();
367 safe = safe.trim_matches('_').to_string();
368 if safe.is_empty() {
369 safe.push_str("session");
370 }
371 safe.truncate(80);
372 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
373 format!("{safe}-{}.db", &hash[..16])
374}
375
376fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
377 let mut sidecar = path.as_os_str().to_os_string();
378 sidecar.push(suffix);
379 PathBuf::from(sidecar)
380}
381
382fn current_timestamp_string() -> String {
383 let now = SystemTime::now()
384 .duration_since(UNIX_EPOCH)
385 .unwrap_or_default();
386 format!("unix:{}", now.as_secs())
387}
388
389fn current_epoch_ms() -> u64 {
390 SystemTime::now()
391 .duration_since(UNIX_EPOCH)
392 .unwrap_or_default()
393 .as_millis() as u64
394}
395
396fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
397 let mut refs = Vec::new();
398 if let Some(blob_ref) = &checkpoint.tool_state_ref {
399 refs.push(RetainedArtifactRef {
400 blob_ref: blob_ref.clone(),
401 kind: PersistedArtifactKind::ToolState,
402 });
403 }
404 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
405 refs.push(RetainedArtifactRef {
406 blob_ref: blob_ref.clone(),
407 kind: PersistedArtifactKind::PluginSessionSnapshot,
408 });
409 }
410 if let Some(blob_ref) = &checkpoint.execution_state_ref {
411 refs.push(RetainedArtifactRef {
412 blob_ref: blob_ref.clone(),
413 kind: PersistedArtifactKind::ExecutionStateSnapshot,
414 });
415 }
416 refs
417}
418
419fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
420 SessionHeadMeta {
421 session_id: head.session_id.clone(),
422 head_revision: 0,
423 config: head.config.clone(),
424 agent_frames: head.agent_frames.clone(),
425 current_agent_frame_id: head.current_agent_frame_id.clone(),
426 checkpoint_ref: head.checkpoint_ref.clone(),
427 leaf_node_id: head.graph.leaf_node_id.clone(),
428 graph_node_count: head.graph.nodes.len(),
429 token_ledger: Vec::new(),
430 }
431}
432
433fn encode_json<T: serde::Serialize>(value: &T) -> String {
434 serde_json::to_string(value).expect("persisted state should serialize")
435}
436
437fn should_compress_blob(
438 profile: BuiltinBlobProfile,
439 descriptor: &BlobArtifactDescriptor,
440 len: usize,
441) -> bool {
442 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
443 return false;
444 }
445 match profile {
446 BuiltinBlobProfile::LowLatency => false,
447 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
448 BuiltinBlobProfile::Compact => len >= 1024,
449 }
450}
451
452fn compress_blob(content: &[u8]) -> Vec<u8> {
453 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
454 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
455 encoder.finish().expect("submit blob compression")
456}
457
458fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
459 let mut decoder = ZlibDecoder::new(content);
460 let mut out = Vec::new();
461 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
462 Some(out)
463}
464
465fn encode_artifact_blob(
466 descriptor: &BlobArtifactDescriptor,
467 profile: BuiltinBlobProfile,
468 content: &[u8],
469) -> Vec<u8> {
470 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
471 {
472 (BlobCompression::Zlib, compress_blob(content))
473 } else {
474 (BlobCompression::None, content.to_vec())
475 };
476 encode_msgpack(&StoredBlobEnvelope {
477 descriptor: descriptor.clone(),
478 compression,
479 content: stored_content,
480 })
481}
482
483fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
484 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
485 match envelope.compression {
486 BlobCompression::None => Some(envelope.content),
487 BlobCompression::Zlib => decompress_blob(&envelope.content),
488 }
489}
490
491fn try_load_session_head_meta_from_conn(
494 conn: &Connection,
495) -> Result<Option<SessionHeadMeta>, StoreError> {
496 let row = conn
497 .query_row(
498 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
499 [],
500 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
501 )
502 .optional()
503 .map_err(sqlite_error)?;
504 let Some((head_json, head_revision)) = row else {
505 return Ok(None);
506 };
507 let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
508 .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
509 meta.head_revision = head_revision as u64;
510 Ok(Some(meta))
511}
512
513fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
514 try_load_session_head_meta_from_conn(conn).ok().flatten()
515}
516
517fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
518 conn.query_row(
519 "SELECT session_id, session_name, created_at, model, cwd, relation_json
520 FROM session_meta WHERE singleton = 1",
521 [],
522 |row| {
523 let relation_json: Option<String> = row.get(5)?;
524 let relation = relation_json
525 .and_then(|json| serde_json::from_str(&json).ok())
526 .unwrap_or_default();
527 Ok(SessionMeta {
528 session_id: row.get(0)?,
529 session_name: row.get(1)?,
530 created_at: row.get(2)?,
531 model: row.get(3)?,
532 cwd: row.get(4)?,
533 relation,
534 })
535 },
536 )
537 .optional()
538 .ok()
539 .flatten()
540}
541
542fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
543 rmp_serde::from_slice(bytes).ok()
544}
545
546fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
547 let mut buf = Vec::with_capacity(1024);
550 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
551 buf
552}
553
554fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
555 rmp_serde::from_slice(bytes).ok()
556}
557
558fn merge_token_ledger_entries(
559 entries: Vec<lash_core::TokenLedgerEntry>,
560) -> Vec<lash_core::TokenLedgerEntry> {
561 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
562 for entry in entries {
563 if entry.usage.total() == 0 {
564 continue;
565 }
566 if let Some(existing) = merged
567 .iter_mut()
568 .find(|existing| existing.source == entry.source && existing.model == entry.model)
569 {
570 existing.usage.input_tokens += entry.usage.input_tokens;
571 existing.usage.output_tokens += entry.usage.output_tokens;
572 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
573 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
574 } else {
575 merged.push(entry);
576 }
577 }
578 merged
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use lash_core::ProcessInput;
585 use lashlang::LashlangArtifactStore;
586
587 fn registration(id: &str) -> ProcessRegistration {
588 ProcessRegistration::new(
589 id,
590 ProcessInput::External {
591 metadata: serde_json::Value::Null,
592 },
593 lash_core::ProcessProvenance::session(lash_core::SessionScope::new("session")),
594 )
595 }
596
597 #[tokio::test]
598 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
599 let store = Store::memory().await.expect("memory store");
600 let module =
601 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
602 let linked = lashlang::LinkedModule::link(
603 module,
604 lashlang::LashlangHostEnvironment::new(
605 lashlang::LashlangHostCatalog::new(),
606 lashlang::LashlangAbilities::all(),
607 ),
608 )
609 .expect("link module");
610
611 store
612 .put_module_artifact(&linked.artifact)
613 .await
614 .expect("put artifact");
615 let restored = store
616 .get_module_artifact(&linked.module_ref)
617 .await
618 .expect("get artifact")
619 .expect("artifact exists");
620
621 assert_eq!(restored.module_ref, linked.module_ref);
622 assert_eq!(
623 restored.process_ref("scan"),
624 linked.artifact.process_ref("scan")
625 );
626 }
627
628 #[tokio::test]
629 async fn sqlite_process_registry_persists_rows_after_reopen() {
630 let dir = tempfile::tempdir().expect("tempdir");
631 let path = dir.path().join("processes.db");
632 {
633 let registry = SqliteProcessRegistry::open(&path)
634 .await
635 .expect("open registry");
636 let session_scope = lash_core::SessionScope::new("session");
637 registry
638 .register_process(registration("proc-persist"))
639 .await
640 .expect("register");
641 registry
642 .grant_handle(
643 &session_scope,
644 "proc-persist",
645 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
646 )
647 .await
648 .expect("grant");
649 registry
650 .complete_process(
651 "proc-persist",
652 ProcessAwaitOutput::Success {
653 value: serde_json::json!({"ok": true}),
654 control: None,
655 },
656 )
657 .await
658 .expect("complete");
659 }
660
661 let registry = SqliteProcessRegistry::open(&path)
662 .await
663 .expect("reopen registry");
664 let session_scope = lash_core::SessionScope::new("session");
665 let record = registry
666 .get_process("proc-persist")
667 .await
668 .expect("persisted process");
669
670 assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
671 assert_eq!(
672 record.provenance.originator,
673 lash_core::ProcessOriginator::session(session_scope.clone())
674 );
675 assert_eq!(
676 registry
677 .await_process("proc-persist")
678 .await
679 .expect("await persisted"),
680 ProcessAwaitOutput::Success {
681 value: serde_json::json!({"ok": true}),
682 control: None,
683 }
684 );
685 assert_eq!(
686 registry
687 .list_handle_grants(&session_scope)
688 .await
689 .expect("grants")
690 .len(),
691 1
692 );
693 }
694}