1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61 ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry,
63 RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope, SessionScope,
64 SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
65};
66use rusqlite::{Connection, OptionalExtension, Transaction, params};
67use sha2::{Digest, Sha256};
68
69use conn::SqliteConnection;
70
71mod attachments;
72mod blobs;
73mod conn;
74mod effect_replay;
75mod graph;
76mod host_events;
77mod leases;
78mod lifecycle;
79mod persistence;
80mod process_registry;
81mod queued_work;
82mod schema;
83
84use conn::TxOutcome;
85pub use effect_replay::{
86 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
87};
88pub use host_events::SqliteHostEventStore;
89use leases::*;
90use queued_work::*;
91use schema::{
92 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_host_event_schema,
93 ensure_process_schema, ensure_schema,
94};
95
96pub struct Store {
103 conn: SqliteConnection,
104 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
105 options: StoreOptions,
106 commit_count: AtomicU64,
107}
108
109pub struct SqliteProcessRegistry {
117 conn: SqliteConnection,
118 notify: tokio::sync::Notify,
119}
120
121fn sqlite_error(err: rusqlite::Error) -> StoreError {
122 StoreError::Backend(err.to_string())
123}
124
125fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
126 lash_core::PluginError::Session(err.to_string())
127}
128
129fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
130 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
131}
132
133fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
134 serde_json::to_string(value).map_err(|err| {
135 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
136 })
137}
138
139fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
140 futures_executor::block_on(future)
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
144pub enum PersistedArtifactKind {
145 GenericBlob,
146 CheckpointManifest,
147 ToolState,
148 PluginSessionSnapshot,
149 ExecutionStateSnapshot,
150 LashlangModule,
151 ProcessExecutionEnv,
152}
153
154#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
155pub enum BlobStorageHint {
156 Compressible,
157 InlinePreferred,
158 LargePayload,
159}
160
161#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
162enum BlobCompression {
163 None,
164 Zlib,
165}
166
167#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
168pub struct BlobArtifactDescriptor {
169 pub kind: PersistedArtifactKind,
170 #[serde(default, skip_serializing_if = "Vec::is_empty")]
171 pub hints: Vec<BlobStorageHint>,
172}
173
174impl BlobArtifactDescriptor {
175 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
176 Self {
177 kind,
178 hints: hints.into(),
179 }
180 }
181
182 pub fn checkpoint_manifest() -> Self {
183 Self::new(
184 PersistedArtifactKind::CheckpointManifest,
185 vec![BlobStorageHint::Compressible],
186 )
187 }
188
189 pub fn tool_state_snapshot() -> Self {
190 Self::new(
191 PersistedArtifactKind::ToolState,
192 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
193 )
194 }
195
196 pub fn plugin_session_snapshot() -> Self {
197 Self::new(
198 PersistedArtifactKind::PluginSessionSnapshot,
199 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
200 )
201 }
202
203 pub fn execution_state_snapshot() -> Self {
204 Self::new(
205 PersistedArtifactKind::ExecutionStateSnapshot,
206 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
207 )
208 }
209
210 pub fn lashlang_module() -> Self {
211 Self::new(
212 PersistedArtifactKind::LashlangModule,
213 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
214 )
215 }
216
217 pub fn process_execution_env() -> Self {
218 Self::new(
219 PersistedArtifactKind::ProcessExecutionEnv,
220 vec![BlobStorageHint::Compressible],
221 )
222 }
223}
224
225#[derive(Clone, Debug, PartialEq, Eq, Hash)]
226struct RetainedArtifactRef {
227 pub blob_ref: BlobRef,
228 pub kind: PersistedArtifactKind,
229}
230
231#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
232pub enum BuiltinBlobProfile {
233 LowLatency,
234 #[default]
235 Balanced,
236 Compact,
237}
238
239#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
240pub struct StoreGcPolicy {
241 pub auto_run_every_commits: Option<u64>,
242}
243
244#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
245pub struct StoreOptions {
246 pub blob_profile: BuiltinBlobProfile,
247 pub gc_policy: StoreGcPolicy,
248}
249
250#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
251struct StoredBlobEnvelope {
252 descriptor: BlobArtifactDescriptor,
253 compression: BlobCompression,
254 content: Vec<u8>,
255}
256
257#[derive(Clone, Debug)]
258pub struct StoredSessionCheckpoint {
259 pub checkpoint_ref: BlobRef,
260 pub manifest: SessionCheckpoint,
261}
262
263#[derive(Clone, Debug)]
271pub struct SqliteSessionStoreFactory {
272 root: PathBuf,
273 options: StoreOptions,
274}
275
276impl SqliteSessionStoreFactory {
277 pub fn new(root: impl Into<PathBuf>) -> Self {
278 Self {
279 root: root.into(),
280 options: StoreOptions::default(),
281 }
282 }
283
284 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
285 Self {
286 root: root.into(),
287 options,
288 }
289 }
290
291 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
292 self.root.join(safe_session_db_file_name(session_id))
293 }
294}
295
296#[async_trait::async_trait]
297impl SessionStoreFactory for SqliteSessionStoreFactory {
298 fn durability_tier(&self) -> DurabilityTier {
299 DurabilityTier::Durable
300 }
301
302 async fn create_store(
303 &self,
304 request: &SessionStoreCreateRequest,
305 ) -> Result<Arc<dyn RuntimePersistence>, String> {
306 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
307 let path = self.path_for_session(&request.session_id);
308 let store = Arc::new(
309 Store::open_with_options(&path, self.options)
310 .await
311 .map_err(|err| err.to_string())?,
312 );
313 if store.load_session_meta().await.is_none() {
314 store
315 .save_session_meta(SessionMeta {
316 session_id: request.session_id.clone(),
317 session_name: request.session_id.clone(),
318 created_at: current_timestamp_string(),
319 model: request.policy.model.id.clone(),
320 cwd: std::env::current_dir()
321 .ok()
322 .and_then(|path| path.to_str().map(str::to_string)),
323 relation: request.relation.clone(),
324 })
325 .await;
326 }
327 Ok(store as Arc<dyn RuntimePersistence>)
328 }
329
330 async fn open_existing_store(
331 &self,
332 request: &SessionStoreCreateRequest,
333 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
334 let path = self.path_for_session(&request.session_id);
335 if !path.exists() {
336 return Ok(None);
337 }
338 self.create_store(request).await.map(Some)
339 }
340
341 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
342 let db_path = self.path_for_session(session_id);
343 for path in [
344 db_path.clone(),
345 sqlite_sidecar_path(&db_path, "-wal"),
346 sqlite_sidecar_path(&db_path, "-shm"),
347 ] {
348 match std::fs::remove_file(&path) {
349 Ok(()) => {}
350 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
351 Err(err) => {
352 return Err(format!("remove session store {}: {err}", path.display()));
353 }
354 }
355 }
356 Ok(())
357 }
358}
359
360fn safe_session_db_file_name(session_id: &str) -> String {
361 let mut safe = session_id
362 .chars()
363 .map(|ch| match ch {
364 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
365 _ => '_',
366 })
367 .collect::<String>();
368 safe = safe.trim_matches('_').to_string();
369 if safe.is_empty() {
370 safe.push_str("session");
371 }
372 safe.truncate(80);
373 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
374 format!("{safe}-{}.db", &hash[..16])
375}
376
377fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
378 let mut sidecar = path.as_os_str().to_os_string();
379 sidecar.push(suffix);
380 PathBuf::from(sidecar)
381}
382
383fn current_timestamp_string() -> String {
384 let now = SystemTime::now()
385 .duration_since(UNIX_EPOCH)
386 .unwrap_or_default();
387 format!("unix:{}", now.as_secs())
388}
389
390fn current_epoch_ms() -> u64 {
391 SystemTime::now()
392 .duration_since(UNIX_EPOCH)
393 .unwrap_or_default()
394 .as_millis() as u64
395}
396
397fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
398 let mut refs = Vec::new();
399 if let Some(blob_ref) = &checkpoint.tool_state_ref {
400 refs.push(RetainedArtifactRef {
401 blob_ref: blob_ref.clone(),
402 kind: PersistedArtifactKind::ToolState,
403 });
404 }
405 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
406 refs.push(RetainedArtifactRef {
407 blob_ref: blob_ref.clone(),
408 kind: PersistedArtifactKind::PluginSessionSnapshot,
409 });
410 }
411 if let Some(blob_ref) = &checkpoint.execution_state_ref {
412 refs.push(RetainedArtifactRef {
413 blob_ref: blob_ref.clone(),
414 kind: PersistedArtifactKind::ExecutionStateSnapshot,
415 });
416 }
417 refs
418}
419
420fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
421 SessionHeadMeta {
422 session_id: head.session_id.clone(),
423 head_revision: 0,
424 config: head.config.clone(),
425 agent_frames: head.agent_frames.clone(),
426 current_agent_frame_id: head.current_agent_frame_id.clone(),
427 checkpoint_ref: head.checkpoint_ref.clone(),
428 leaf_node_id: head.graph.leaf_node_id.clone(),
429 graph_node_count: head.graph.nodes.len(),
430 token_ledger: Vec::new(),
431 }
432}
433
434fn encode_json<T: serde::Serialize>(value: &T) -> String {
435 serde_json::to_string(value).expect("persisted state should serialize")
436}
437
438fn should_compress_blob(
439 profile: BuiltinBlobProfile,
440 descriptor: &BlobArtifactDescriptor,
441 len: usize,
442) -> bool {
443 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
444 return false;
445 }
446 match profile {
447 BuiltinBlobProfile::LowLatency => false,
448 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
449 BuiltinBlobProfile::Compact => len >= 1024,
450 }
451}
452
453fn compress_blob(content: &[u8]) -> Vec<u8> {
454 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
455 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
456 encoder.finish().expect("submit blob compression")
457}
458
459fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
460 let mut decoder = ZlibDecoder::new(content);
461 let mut out = Vec::new();
462 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
463 Some(out)
464}
465
466fn encode_artifact_blob(
467 descriptor: &BlobArtifactDescriptor,
468 profile: BuiltinBlobProfile,
469 content: &[u8],
470) -> Vec<u8> {
471 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
472 {
473 (BlobCompression::Zlib, compress_blob(content))
474 } else {
475 (BlobCompression::None, content.to_vec())
476 };
477 encode_msgpack(&StoredBlobEnvelope {
478 descriptor: descriptor.clone(),
479 compression,
480 content: stored_content,
481 })
482}
483
484fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
485 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
486 match envelope.compression {
487 BlobCompression::None => Some(envelope.content),
488 BlobCompression::Zlib => decompress_blob(&envelope.content),
489 }
490}
491
492fn try_load_session_head_meta_from_conn(
495 conn: &Connection,
496) -> Result<Option<SessionHeadMeta>, StoreError> {
497 let row = conn
498 .query_row(
499 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
500 [],
501 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
502 )
503 .optional()
504 .map_err(sqlite_error)?;
505 let Some((head_json, head_revision)) = row else {
506 return Ok(None);
507 };
508 let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
509 .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
510 meta.head_revision = head_revision as u64;
511 Ok(Some(meta))
512}
513
514fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
515 try_load_session_head_meta_from_conn(conn).ok().flatten()
516}
517
518fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
519 conn.query_row(
520 "SELECT session_id, session_name, created_at, model, cwd, relation_json
521 FROM session_meta WHERE singleton = 1",
522 [],
523 |row| {
524 let relation_json: Option<String> = row.get(5)?;
525 let relation = relation_json
526 .and_then(|json| serde_json::from_str(&json).ok())
527 .unwrap_or_default();
528 Ok(SessionMeta {
529 session_id: row.get(0)?,
530 session_name: row.get(1)?,
531 created_at: row.get(2)?,
532 model: row.get(3)?,
533 cwd: row.get(4)?,
534 relation,
535 })
536 },
537 )
538 .optional()
539 .ok()
540 .flatten()
541}
542
543fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
544 rmp_serde::from_slice(bytes).ok()
545}
546
547fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
548 let mut buf = Vec::with_capacity(1024);
551 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
552 buf
553}
554
555fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
556 rmp_serde::from_slice(bytes).ok()
557}
558
559fn merge_token_ledger_entries(
560 entries: Vec<lash_core::TokenLedgerEntry>,
561) -> Vec<lash_core::TokenLedgerEntry> {
562 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
563 for entry in entries {
564 if entry.usage.total() == 0 {
565 continue;
566 }
567 if let Some(existing) = merged
568 .iter_mut()
569 .find(|existing| existing.source == entry.source && existing.model == entry.model)
570 {
571 existing.usage.input_tokens += entry.usage.input_tokens;
572 existing.usage.output_tokens += entry.usage.output_tokens;
573 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
574 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
575 } else {
576 merged.push(entry);
577 }
578 }
579 merged
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use lash_core::ProcessInput;
586 use lashlang::LashlangArtifactStore;
587
588 fn registration(id: &str) -> ProcessRegistration {
589 ProcessRegistration::new(
590 id,
591 ProcessInput::External {
592 metadata: serde_json::Value::Null,
593 },
594 lash_core::ProcessProvenance::session(
595 lash_core::SessionScope::new("session"),
596 "test-host",
597 ),
598 )
599 }
600
601 #[tokio::test]
602 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
603 let store = Store::memory().await.expect("memory store");
604 let module =
605 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
606 let linked = lashlang::LinkedModule::link(
607 module,
608 lashlang::LashlangSurface::new(
609 lashlang::ResourceCatalog::new(),
610 lashlang::LashlangAbilities::all(),
611 ),
612 )
613 .expect("link module");
614
615 store
616 .put_module_artifact(&linked.artifact)
617 .await
618 .expect("put artifact");
619 let restored = store
620 .get_module_artifact(&linked.module_ref)
621 .await
622 .expect("get artifact")
623 .expect("artifact exists");
624
625 assert_eq!(restored.module_ref, linked.module_ref);
626 assert_eq!(
627 restored.process_ref("scan"),
628 linked.artifact.process_ref("scan")
629 );
630 }
631
632 #[tokio::test]
633 async fn sqlite_process_registry_persists_rows_after_reopen() {
634 let dir = tempfile::tempdir().expect("tempdir");
635 let path = dir.path().join("processes.db");
636 {
637 let registry = SqliteProcessRegistry::open(&path)
638 .await
639 .expect("open registry");
640 let session_scope = lash_core::SessionScope::new("session");
641 registry
642 .register_process(registration("proc-persist"))
643 .await
644 .expect("register");
645 registry
646 .grant_handle(
647 &session_scope,
648 "proc-persist",
649 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
650 )
651 .await
652 .expect("grant");
653 registry
654 .complete_process(
655 "proc-persist",
656 ProcessAwaitOutput::Success {
657 value: serde_json::json!({"ok": true}),
658 control: None,
659 },
660 )
661 .await
662 .expect("complete");
663 }
664
665 let registry = SqliteProcessRegistry::open(&path)
666 .await
667 .expect("reopen registry");
668 let session_scope = lash_core::SessionScope::new("session");
669 let record = registry
670 .get_process("proc-persist")
671 .await
672 .expect("persisted process");
673
674 assert_eq!(record.originator_scope_id(), session_scope.id().as_str());
675 assert_eq!(
676 record.provenance.originator,
677 lash_core::ProcessOriginator::session(session_scope.clone())
678 );
679 assert_eq!(
680 registry
681 .await_process("proc-persist")
682 .await
683 .expect("await persisted"),
684 ProcessAwaitOutput::Success {
685 value: serde_json::json!({"ok": true}),
686 control: None,
687 }
688 );
689 assert_eq!(
690 registry
691 .list_handle_grants(&session_scope)
692 .await
693 .expect("grants")
694 .len(),
695 1
696 );
697 }
698}