1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::{
50 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
51 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
52};
53use lash_core::{
54 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
55 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
56 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
57 ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
58 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope,
59 RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope,
60 SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
61};
62use rusqlite::{Connection, OptionalExtension, Transaction, params};
63use sha2::{Digest, Sha256};
64
65use conn::SqliteConnection;
66
67mod attachments;
68mod blobs;
69mod conn;
70mod effect_replay;
71mod graph;
72mod host_events;
73mod leases;
74mod lifecycle;
75mod persistence;
76mod process_registry;
77mod queued_work;
78mod schema;
79
80use conn::TxOutcome;
81pub use effect_replay::{
82 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
83};
84pub use host_events::SqliteHostEventStore;
85use leases::*;
86use queued_work::*;
87use schema::{
88 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_host_event_schema,
89 ensure_process_schema, ensure_schema,
90};
91
92pub struct Store {
99 conn: SqliteConnection,
100 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
101 options: StoreOptions,
102 commit_count: AtomicU64,
103}
104
105pub struct SqliteProcessRegistry {
113 conn: SqliteConnection,
114 notify: tokio::sync::Notify,
115}
116
117fn sqlite_error(err: rusqlite::Error) -> StoreError {
118 StoreError::Backend(err.to_string())
119}
120
121fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
122 lash_core::PluginError::Session(err.to_string())
123}
124
125fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
126 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
127}
128
129fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
130 serde_json::to_string(value).map_err(|err| {
131 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
132 })
133}
134
135fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
136 futures_executor::block_on(future)
137}
138
139#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
140pub enum PersistedArtifactKind {
141 GenericBlob,
142 CheckpointManifest,
143 ToolState,
144 PluginSessionSnapshot,
145 ExecutionStateSnapshot,
146 LashlangModule,
147}
148
149#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
150pub enum BlobStorageHint {
151 Compressible,
152 InlinePreferred,
153 LargePayload,
154}
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
157enum BlobCompression {
158 None,
159 Zlib,
160}
161
162#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
163pub struct BlobArtifactDescriptor {
164 pub kind: PersistedArtifactKind,
165 #[serde(default, skip_serializing_if = "Vec::is_empty")]
166 pub hints: Vec<BlobStorageHint>,
167}
168
169impl BlobArtifactDescriptor {
170 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
171 Self {
172 kind,
173 hints: hints.into(),
174 }
175 }
176
177 pub fn checkpoint_manifest() -> Self {
178 Self::new(
179 PersistedArtifactKind::CheckpointManifest,
180 vec![BlobStorageHint::Compressible],
181 )
182 }
183
184 pub fn tool_state_snapshot() -> Self {
185 Self::new(
186 PersistedArtifactKind::ToolState,
187 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
188 )
189 }
190
191 pub fn plugin_session_snapshot() -> Self {
192 Self::new(
193 PersistedArtifactKind::PluginSessionSnapshot,
194 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
195 )
196 }
197
198 pub fn execution_state_snapshot() -> Self {
199 Self::new(
200 PersistedArtifactKind::ExecutionStateSnapshot,
201 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
202 )
203 }
204
205 pub fn lashlang_module() -> Self {
206 Self::new(
207 PersistedArtifactKind::LashlangModule,
208 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
209 )
210 }
211}
212
213#[derive(Clone, Debug, PartialEq, Eq, Hash)]
214struct RetainedArtifactRef {
215 pub blob_ref: BlobRef,
216 pub kind: PersistedArtifactKind,
217}
218
219#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
220pub enum BuiltinBlobProfile {
221 LowLatency,
222 #[default]
223 Balanced,
224 Compact,
225}
226
227#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
228pub struct StoreGcPolicy {
229 pub auto_run_every_commits: Option<u64>,
230}
231
232#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
233pub struct StoreOptions {
234 pub blob_profile: BuiltinBlobProfile,
235 pub gc_policy: StoreGcPolicy,
236}
237
238#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
239struct StoredBlobEnvelope {
240 descriptor: BlobArtifactDescriptor,
241 compression: BlobCompression,
242 content: Vec<u8>,
243}
244
245#[derive(Clone, Debug)]
246pub struct StoredSessionCheckpoint {
247 pub checkpoint_ref: BlobRef,
248 pub manifest: SessionCheckpoint,
249}
250
251#[derive(Clone, Debug)]
259pub struct SqliteSessionStoreFactory {
260 root: PathBuf,
261 options: StoreOptions,
262}
263
264impl SqliteSessionStoreFactory {
265 pub fn new(root: impl Into<PathBuf>) -> Self {
266 Self {
267 root: root.into(),
268 options: StoreOptions::default(),
269 }
270 }
271
272 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
273 Self {
274 root: root.into(),
275 options,
276 }
277 }
278
279 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
280 self.root.join(safe_session_db_file_name(session_id))
281 }
282}
283
284#[async_trait::async_trait]
285impl SessionStoreFactory for SqliteSessionStoreFactory {
286 fn durability_tier(&self) -> DurabilityTier {
287 DurabilityTier::Durable
288 }
289
290 async fn create_store(
291 &self,
292 request: &SessionStoreCreateRequest,
293 ) -> Result<Arc<dyn RuntimePersistence>, String> {
294 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
295 let path = self.path_for_session(&request.session_id);
296 let store = Arc::new(
297 Store::open_with_options(&path, self.options)
298 .await
299 .map_err(|err| err.to_string())?,
300 );
301 if store.load_session_meta().await.is_none() {
302 store
303 .save_session_meta(SessionMeta {
304 session_id: request.session_id.clone(),
305 session_name: request.session_id.clone(),
306 created_at: current_timestamp_string(),
307 model: request.policy.model.id.clone(),
308 cwd: std::env::current_dir()
309 .ok()
310 .and_then(|path| path.to_str().map(str::to_string)),
311 relation: request.relation.clone(),
312 })
313 .await;
314 }
315 Ok(store as Arc<dyn RuntimePersistence>)
316 }
317
318 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
319 let db_path = self.path_for_session(session_id);
320 for path in [
321 db_path.clone(),
322 sqlite_sidecar_path(&db_path, "-wal"),
323 sqlite_sidecar_path(&db_path, "-shm"),
324 ] {
325 match std::fs::remove_file(&path) {
326 Ok(()) => {}
327 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
328 Err(err) => {
329 return Err(format!("remove session store {}: {err}", path.display()));
330 }
331 }
332 }
333 Ok(())
334 }
335}
336
337fn safe_session_db_file_name(session_id: &str) -> String {
338 let mut safe = session_id
339 .chars()
340 .map(|ch| match ch {
341 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
342 _ => '_',
343 })
344 .collect::<String>();
345 safe = safe.trim_matches('_').to_string();
346 if safe.is_empty() {
347 safe.push_str("session");
348 }
349 safe.truncate(80);
350 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
351 format!("{safe}-{}.db", &hash[..16])
352}
353
354fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
355 let mut sidecar = path.as_os_str().to_os_string();
356 sidecar.push(suffix);
357 PathBuf::from(sidecar)
358}
359
360fn current_timestamp_string() -> String {
361 let now = SystemTime::now()
362 .duration_since(UNIX_EPOCH)
363 .unwrap_or_default();
364 format!("unix:{}", now.as_secs())
365}
366
367fn current_epoch_ms() -> u64 {
368 SystemTime::now()
369 .duration_since(UNIX_EPOCH)
370 .unwrap_or_default()
371 .as_millis() as u64
372}
373
374fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
375 let mut refs = Vec::new();
376 if let Some(blob_ref) = &checkpoint.tool_state_ref {
377 refs.push(RetainedArtifactRef {
378 blob_ref: blob_ref.clone(),
379 kind: PersistedArtifactKind::ToolState,
380 });
381 }
382 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
383 refs.push(RetainedArtifactRef {
384 blob_ref: blob_ref.clone(),
385 kind: PersistedArtifactKind::PluginSessionSnapshot,
386 });
387 }
388 if let Some(blob_ref) = &checkpoint.execution_state_ref {
389 refs.push(RetainedArtifactRef {
390 blob_ref: blob_ref.clone(),
391 kind: PersistedArtifactKind::ExecutionStateSnapshot,
392 });
393 }
394 refs
395}
396
397fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
398 SessionHeadMeta {
399 session_id: head.session_id.clone(),
400 head_revision: 0,
401 config: head.config.clone(),
402 agent_frames: head.agent_frames.clone(),
403 current_agent_frame_id: head.current_agent_frame_id.clone(),
404 checkpoint_ref: head.checkpoint_ref.clone(),
405 leaf_node_id: head.graph.leaf_node_id.clone(),
406 graph_node_count: head.graph.nodes.len(),
407 token_ledger: Vec::new(),
408 }
409}
410
411fn encode_json<T: serde::Serialize>(value: &T) -> String {
412 serde_json::to_string(value).expect("persisted state should serialize")
413}
414
415fn should_compress_blob(
416 profile: BuiltinBlobProfile,
417 descriptor: &BlobArtifactDescriptor,
418 len: usize,
419) -> bool {
420 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
421 return false;
422 }
423 match profile {
424 BuiltinBlobProfile::LowLatency => false,
425 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
426 BuiltinBlobProfile::Compact => len >= 1024,
427 }
428}
429
430fn compress_blob(content: &[u8]) -> Vec<u8> {
431 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
432 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
433 encoder.finish().expect("submit blob compression")
434}
435
436fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
437 let mut decoder = ZlibDecoder::new(content);
438 let mut out = Vec::new();
439 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
440 Some(out)
441}
442
443fn encode_artifact_blob(
444 descriptor: &BlobArtifactDescriptor,
445 profile: BuiltinBlobProfile,
446 content: &[u8],
447) -> Vec<u8> {
448 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
449 {
450 (BlobCompression::Zlib, compress_blob(content))
451 } else {
452 (BlobCompression::None, content.to_vec())
453 };
454 encode_msgpack(&StoredBlobEnvelope {
455 descriptor: descriptor.clone(),
456 compression,
457 content: stored_content,
458 })
459}
460
461fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
462 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
463 match envelope.compression {
464 BlobCompression::None => Some(envelope.content),
465 BlobCompression::Zlib => decompress_blob(&envelope.content),
466 }
467}
468
469fn try_load_session_head_meta_from_conn(
472 conn: &Connection,
473) -> Result<Option<SessionHeadMeta>, StoreError> {
474 let row = conn
475 .query_row(
476 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
477 [],
478 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
479 )
480 .optional()
481 .map_err(sqlite_error)?;
482 let Some((head_json, head_revision)) = row else {
483 return Ok(None);
484 };
485 let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
486 .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
487 meta.head_revision = head_revision as u64;
488 Ok(Some(meta))
489}
490
491fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
492 try_load_session_head_meta_from_conn(conn).ok().flatten()
493}
494
495fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
496 conn.query_row(
497 "SELECT session_id, session_name, created_at, model, cwd, relation_json
498 FROM session_meta WHERE singleton = 1",
499 [],
500 |row| {
501 let relation_json: Option<String> = row.get(5)?;
502 let relation = relation_json
503 .and_then(|json| serde_json::from_str(&json).ok())
504 .unwrap_or_default();
505 Ok(SessionMeta {
506 session_id: row.get(0)?,
507 session_name: row.get(1)?,
508 created_at: row.get(2)?,
509 model: row.get(3)?,
510 cwd: row.get(4)?,
511 relation,
512 })
513 },
514 )
515 .optional()
516 .ok()
517 .flatten()
518}
519
520fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
521 rmp_serde::from_slice(bytes).ok()
522}
523
524fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
525 let mut buf = Vec::with_capacity(1024);
528 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
529 buf
530}
531
532fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
533 rmp_serde::from_slice(bytes).ok()
534}
535
536fn merge_token_ledger_entries(
537 entries: Vec<lash_core::TokenLedgerEntry>,
538) -> Vec<lash_core::TokenLedgerEntry> {
539 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
540 for entry in entries {
541 if entry.usage.total() == 0 {
542 continue;
543 }
544 if let Some(existing) = merged
545 .iter_mut()
546 .find(|existing| existing.source == entry.source && existing.model == entry.model)
547 {
548 existing.usage.input_tokens += entry.usage.input_tokens;
549 existing.usage.output_tokens += entry.usage.output_tokens;
550 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
551 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
552 } else {
553 merged.push(entry);
554 }
555 }
556 merged
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use lash_core::ProcessInput;
563 use lashlang::LashlangArtifactStore;
564
565 fn registration(id: &str) -> ProcessRegistration {
566 ProcessRegistration::new(
567 id,
568 ProcessInput::External {
569 metadata: serde_json::Value::Null,
570 },
571 )
572 .with_process_provenance(lash_core::ProcessProvenance::new(
573 lash_core::ProcessScope::new("session"),
574 "test-host",
575 ))
576 }
577
578 #[tokio::test]
579 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
580 let store = Store::memory().await.expect("memory store");
581 let module =
582 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
583 let linked = lashlang::LinkedModule::link(
584 module,
585 lashlang::LashlangSurface::new(
586 lashlang::ResourceCatalog::new(),
587 lashlang::LashlangAbilities::all(),
588 ),
589 )
590 .expect("link module");
591
592 store
593 .put_module_artifact(&linked.artifact)
594 .await
595 .expect("put artifact");
596 let restored = store
597 .get_module_artifact(&linked.module_ref)
598 .await
599 .expect("get artifact")
600 .expect("artifact exists");
601
602 assert_eq!(restored.module_ref, linked.module_ref);
603 assert_eq!(
604 restored.process_ref("scan"),
605 linked.artifact.process_ref("scan")
606 );
607 }
608
609 #[tokio::test]
610 async fn sqlite_process_registry_persists_rows_after_reopen() {
611 let dir = tempfile::tempdir().expect("tempdir");
612 let path = dir.path().join("processes.db");
613 {
614 let registry = SqliteProcessRegistry::open(&path)
615 .await
616 .expect("open registry");
617 let owner_scope = lash_core::ProcessScope::new("session");
618 registry
619 .register_process(registration("proc-persist"))
620 .await
621 .expect("register");
622 registry
623 .grant_handle(
624 &owner_scope,
625 "proc-persist",
626 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
627 )
628 .await
629 .expect("grant");
630 registry
631 .complete_process(
632 "proc-persist",
633 ProcessAwaitOutput::Success {
634 value: serde_json::json!({"ok": true}),
635 control: None,
636 },
637 )
638 .await
639 .expect("complete");
640 }
641
642 let registry = SqliteProcessRegistry::open(&path)
643 .await
644 .expect("reopen registry");
645 let owner_scope = lash_core::ProcessScope::new("session");
646 let record = registry
647 .get_process("proc-persist")
648 .await
649 .expect("persisted process");
650
651 assert_eq!(record.owner_scope_id(), owner_scope.id());
652 assert_eq!(record.provenance.owner_scope.session_id.as_str(), "session");
653 assert_eq!(
654 registry
655 .await_process("proc-persist")
656 .await
657 .expect("await persisted"),
658 ProcessAwaitOutput::Success {
659 value: serde_json::json!({"ok": true}),
660 control: None,
661 }
662 );
663 assert_eq!(
664 registry
665 .list_handle_grants(&owner_scope)
666 .await
667 .expect("grants")
668 .len(),
669 1
670 );
671 }
672}