1use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45 QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46 QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47 prepare_process_registration,
48};
49use lash_core::store::queued_work::{
50 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id,
51 ensure_completion_owns_all_batches, renewed_claim, select_claim_prefix,
52};
53use lash_core::store::{
54 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
55 RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
56};
57use lash_core::{
58 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
59 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
60 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
61 ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
62 ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope,
63 RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope,
64 SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
65};
66use rusqlite::{Connection, OptionalExtension, Transaction, params};
67use sha2::{Digest, Sha256};
68
69use conn::SqliteConnection;
70
71mod attachments;
72mod blobs;
73mod conn;
74mod effect_replay;
75mod graph;
76mod host_events;
77mod leases;
78mod lifecycle;
79mod persistence;
80mod process_registry;
81mod queued_work;
82mod schema;
83
84use conn::TxOutcome;
85pub use effect_replay::{
86 SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
87};
88pub use host_events::SqliteHostEventStore;
89use leases::*;
90use queued_work::*;
91use schema::{
92 StoreBacking, apply_pragmas, ensure_effect_schema, ensure_host_event_schema,
93 ensure_process_schema, ensure_schema,
94};
95
96pub struct Store {
103 conn: SqliteConnection,
104 artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
105 options: StoreOptions,
106 commit_count: AtomicU64,
107}
108
109pub struct SqliteProcessRegistry {
117 conn: SqliteConnection,
118 notify: tokio::sync::Notify,
119}
120
121fn sqlite_error(err: rusqlite::Error) -> StoreError {
122 StoreError::Backend(err.to_string())
123}
124
125fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
126 lash_core::PluginError::Session(err.to_string())
127}
128
129fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
130 lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
131}
132
133fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
134 serde_json::to_string(value).map_err(|err| {
135 lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
136 })
137}
138
139fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
140 futures_executor::block_on(future)
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
144pub enum PersistedArtifactKind {
145 GenericBlob,
146 CheckpointManifest,
147 ToolState,
148 PluginSessionSnapshot,
149 ExecutionStateSnapshot,
150 LashlangModule,
151}
152
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
154pub enum BlobStorageHint {
155 Compressible,
156 InlinePreferred,
157 LargePayload,
158}
159
160#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
161enum BlobCompression {
162 None,
163 Zlib,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
167pub struct BlobArtifactDescriptor {
168 pub kind: PersistedArtifactKind,
169 #[serde(default, skip_serializing_if = "Vec::is_empty")]
170 pub hints: Vec<BlobStorageHint>,
171}
172
173impl BlobArtifactDescriptor {
174 pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
175 Self {
176 kind,
177 hints: hints.into(),
178 }
179 }
180
181 pub fn checkpoint_manifest() -> Self {
182 Self::new(
183 PersistedArtifactKind::CheckpointManifest,
184 vec![BlobStorageHint::Compressible],
185 )
186 }
187
188 pub fn tool_state_snapshot() -> Self {
189 Self::new(
190 PersistedArtifactKind::ToolState,
191 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
192 )
193 }
194
195 pub fn plugin_session_snapshot() -> Self {
196 Self::new(
197 PersistedArtifactKind::PluginSessionSnapshot,
198 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
199 )
200 }
201
202 pub fn execution_state_snapshot() -> Self {
203 Self::new(
204 PersistedArtifactKind::ExecutionStateSnapshot,
205 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
206 )
207 }
208
209 pub fn lashlang_module() -> Self {
210 Self::new(
211 PersistedArtifactKind::LashlangModule,
212 vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
213 )
214 }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq, Hash)]
218struct RetainedArtifactRef {
219 pub blob_ref: BlobRef,
220 pub kind: PersistedArtifactKind,
221}
222
223#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
224pub enum BuiltinBlobProfile {
225 LowLatency,
226 #[default]
227 Balanced,
228 Compact,
229}
230
231#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
232pub struct StoreGcPolicy {
233 pub auto_run_every_commits: Option<u64>,
234}
235
236#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
237pub struct StoreOptions {
238 pub blob_profile: BuiltinBlobProfile,
239 pub gc_policy: StoreGcPolicy,
240}
241
242#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
243struct StoredBlobEnvelope {
244 descriptor: BlobArtifactDescriptor,
245 compression: BlobCompression,
246 content: Vec<u8>,
247}
248
249#[derive(Clone, Debug)]
250pub struct StoredSessionCheckpoint {
251 pub checkpoint_ref: BlobRef,
252 pub manifest: SessionCheckpoint,
253}
254
255#[derive(Clone, Debug)]
263pub struct SqliteSessionStoreFactory {
264 root: PathBuf,
265 options: StoreOptions,
266}
267
268impl SqliteSessionStoreFactory {
269 pub fn new(root: impl Into<PathBuf>) -> Self {
270 Self {
271 root: root.into(),
272 options: StoreOptions::default(),
273 }
274 }
275
276 pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
277 Self {
278 root: root.into(),
279 options,
280 }
281 }
282
283 pub fn path_for_session(&self, session_id: &str) -> PathBuf {
284 self.root.join(safe_session_db_file_name(session_id))
285 }
286}
287
288#[async_trait::async_trait]
289impl SessionStoreFactory for SqliteSessionStoreFactory {
290 fn durability_tier(&self) -> DurabilityTier {
291 DurabilityTier::Durable
292 }
293
294 async fn create_store(
295 &self,
296 request: &SessionStoreCreateRequest,
297 ) -> Result<Arc<dyn RuntimePersistence>, String> {
298 std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
299 let path = self.path_for_session(&request.session_id);
300 let store = Arc::new(
301 Store::open_with_options(&path, self.options)
302 .await
303 .map_err(|err| err.to_string())?,
304 );
305 if store.load_session_meta().await.is_none() {
306 store
307 .save_session_meta(SessionMeta {
308 session_id: request.session_id.clone(),
309 session_name: request.session_id.clone(),
310 created_at: current_timestamp_string(),
311 model: request.policy.model.id.clone(),
312 cwd: std::env::current_dir()
313 .ok()
314 .and_then(|path| path.to_str().map(str::to_string)),
315 relation: request.relation.clone(),
316 })
317 .await;
318 }
319 Ok(store as Arc<dyn RuntimePersistence>)
320 }
321
322 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
323 let db_path = self.path_for_session(session_id);
324 for path in [
325 db_path.clone(),
326 sqlite_sidecar_path(&db_path, "-wal"),
327 sqlite_sidecar_path(&db_path, "-shm"),
328 ] {
329 match std::fs::remove_file(&path) {
330 Ok(()) => {}
331 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
332 Err(err) => {
333 return Err(format!("remove session store {}: {err}", path.display()));
334 }
335 }
336 }
337 Ok(())
338 }
339}
340
341fn safe_session_db_file_name(session_id: &str) -> String {
342 let mut safe = session_id
343 .chars()
344 .map(|ch| match ch {
345 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
346 _ => '_',
347 })
348 .collect::<String>();
349 safe = safe.trim_matches('_').to_string();
350 if safe.is_empty() {
351 safe.push_str("session");
352 }
353 safe.truncate(80);
354 let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
355 format!("{safe}-{}.db", &hash[..16])
356}
357
358fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
359 let mut sidecar = path.as_os_str().to_os_string();
360 sidecar.push(suffix);
361 PathBuf::from(sidecar)
362}
363
364fn current_timestamp_string() -> String {
365 let now = SystemTime::now()
366 .duration_since(UNIX_EPOCH)
367 .unwrap_or_default();
368 format!("unix:{}", now.as_secs())
369}
370
371fn current_epoch_ms() -> u64 {
372 SystemTime::now()
373 .duration_since(UNIX_EPOCH)
374 .unwrap_or_default()
375 .as_millis() as u64
376}
377
378fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
379 let mut refs = Vec::new();
380 if let Some(blob_ref) = &checkpoint.tool_state_ref {
381 refs.push(RetainedArtifactRef {
382 blob_ref: blob_ref.clone(),
383 kind: PersistedArtifactKind::ToolState,
384 });
385 }
386 if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
387 refs.push(RetainedArtifactRef {
388 blob_ref: blob_ref.clone(),
389 kind: PersistedArtifactKind::PluginSessionSnapshot,
390 });
391 }
392 if let Some(blob_ref) = &checkpoint.execution_state_ref {
393 refs.push(RetainedArtifactRef {
394 blob_ref: blob_ref.clone(),
395 kind: PersistedArtifactKind::ExecutionStateSnapshot,
396 });
397 }
398 refs
399}
400
401fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
402 SessionHeadMeta {
403 session_id: head.session_id.clone(),
404 head_revision: 0,
405 config: head.config.clone(),
406 agent_frames: head.agent_frames.clone(),
407 current_agent_frame_id: head.current_agent_frame_id.clone(),
408 checkpoint_ref: head.checkpoint_ref.clone(),
409 leaf_node_id: head.graph.leaf_node_id.clone(),
410 graph_node_count: head.graph.nodes.len(),
411 token_ledger: Vec::new(),
412 }
413}
414
415fn encode_json<T: serde::Serialize>(value: &T) -> String {
416 serde_json::to_string(value).expect("persisted state should serialize")
417}
418
419fn should_compress_blob(
420 profile: BuiltinBlobProfile,
421 descriptor: &BlobArtifactDescriptor,
422 len: usize,
423) -> bool {
424 if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
425 return false;
426 }
427 match profile {
428 BuiltinBlobProfile::LowLatency => false,
429 BuiltinBlobProfile::Balanced => len >= 4 * 1024,
430 BuiltinBlobProfile::Compact => len >= 1024,
431 }
432}
433
434fn compress_blob(content: &[u8]) -> Vec<u8> {
435 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
436 std::io::Write::write_all(&mut encoder, content).expect("compress blob");
437 encoder.finish().expect("submit blob compression")
438}
439
440fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
441 let mut decoder = ZlibDecoder::new(content);
442 let mut out = Vec::new();
443 std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
444 Some(out)
445}
446
447fn encode_artifact_blob(
448 descriptor: &BlobArtifactDescriptor,
449 profile: BuiltinBlobProfile,
450 content: &[u8],
451) -> Vec<u8> {
452 let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
453 {
454 (BlobCompression::Zlib, compress_blob(content))
455 } else {
456 (BlobCompression::None, content.to_vec())
457 };
458 encode_msgpack(&StoredBlobEnvelope {
459 descriptor: descriptor.clone(),
460 compression,
461 content: stored_content,
462 })
463}
464
465fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
466 let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
467 match envelope.compression {
468 BlobCompression::None => Some(envelope.content),
469 BlobCompression::Zlib => decompress_blob(&envelope.content),
470 }
471}
472
473fn try_load_session_head_meta_from_conn(
476 conn: &Connection,
477) -> Result<Option<SessionHeadMeta>, StoreError> {
478 let row = conn
479 .query_row(
480 "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
481 [],
482 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
483 )
484 .optional()
485 .map_err(sqlite_error)?;
486 let Some((head_json, head_revision)) = row else {
487 return Ok(None);
488 };
489 let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
490 .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
491 meta.head_revision = head_revision as u64;
492 Ok(Some(meta))
493}
494
495fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
496 try_load_session_head_meta_from_conn(conn).ok().flatten()
497}
498
499fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
500 conn.query_row(
501 "SELECT session_id, session_name, created_at, model, cwd, relation_json
502 FROM session_meta WHERE singleton = 1",
503 [],
504 |row| {
505 let relation_json: Option<String> = row.get(5)?;
506 let relation = relation_json
507 .and_then(|json| serde_json::from_str(&json).ok())
508 .unwrap_or_default();
509 Ok(SessionMeta {
510 session_id: row.get(0)?,
511 session_name: row.get(1)?,
512 created_at: row.get(2)?,
513 model: row.get(3)?,
514 cwd: row.get(4)?,
515 relation,
516 })
517 },
518 )
519 .optional()
520 .ok()
521 .flatten()
522}
523
524fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
525 rmp_serde::from_slice(bytes).ok()
526}
527
528fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
529 let mut buf = Vec::with_capacity(1024);
532 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
533 buf
534}
535
536fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
537 rmp_serde::from_slice(bytes).ok()
538}
539
540fn merge_token_ledger_entries(
541 entries: Vec<lash_core::TokenLedgerEntry>,
542) -> Vec<lash_core::TokenLedgerEntry> {
543 let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
544 for entry in entries {
545 if entry.usage.total() == 0 {
546 continue;
547 }
548 if let Some(existing) = merged
549 .iter_mut()
550 .find(|existing| existing.source == entry.source && existing.model == entry.model)
551 {
552 existing.usage.input_tokens += entry.usage.input_tokens;
553 existing.usage.output_tokens += entry.usage.output_tokens;
554 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
555 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
556 } else {
557 merged.push(entry);
558 }
559 }
560 merged
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use lash_core::ProcessInput;
567 use lashlang::LashlangArtifactStore;
568
569 fn registration(id: &str) -> ProcessRegistration {
570 ProcessRegistration::new(
571 id,
572 ProcessInput::External {
573 metadata: serde_json::Value::Null,
574 },
575 )
576 .with_process_provenance(lash_core::ProcessProvenance::new(
577 lash_core::ProcessScope::new("session"),
578 "test-host",
579 ))
580 }
581
582 #[tokio::test]
583 async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
584 let store = Store::memory().await.expect("memory store");
585 let module =
586 lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
587 let linked = lashlang::LinkedModule::link(
588 module,
589 lashlang::LashlangSurface::new(
590 lashlang::ResourceCatalog::new(),
591 lashlang::LashlangAbilities::all(),
592 ),
593 )
594 .expect("link module");
595
596 store
597 .put_module_artifact(&linked.artifact)
598 .await
599 .expect("put artifact");
600 let restored = store
601 .get_module_artifact(&linked.module_ref)
602 .await
603 .expect("get artifact")
604 .expect("artifact exists");
605
606 assert_eq!(restored.module_ref, linked.module_ref);
607 assert_eq!(
608 restored.process_ref("scan"),
609 linked.artifact.process_ref("scan")
610 );
611 }
612
613 #[tokio::test]
614 async fn sqlite_process_registry_persists_rows_after_reopen() {
615 let dir = tempfile::tempdir().expect("tempdir");
616 let path = dir.path().join("processes.db");
617 {
618 let registry = SqliteProcessRegistry::open(&path)
619 .await
620 .expect("open registry");
621 let owner_scope = lash_core::ProcessScope::new("session");
622 registry
623 .register_process(registration("proc-persist"))
624 .await
625 .expect("register");
626 registry
627 .grant_handle(
628 &owner_scope,
629 "proc-persist",
630 ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
631 )
632 .await
633 .expect("grant");
634 registry
635 .complete_process(
636 "proc-persist",
637 ProcessAwaitOutput::Success {
638 value: serde_json::json!({"ok": true}),
639 control: None,
640 },
641 )
642 .await
643 .expect("complete");
644 }
645
646 let registry = SqliteProcessRegistry::open(&path)
647 .await
648 .expect("reopen registry");
649 let owner_scope = lash_core::ProcessScope::new("session");
650 let record = registry
651 .get_process("proc-persist")
652 .await
653 .expect("persisted process");
654
655 assert_eq!(record.owner_scope_id(), owner_scope.id());
656 assert_eq!(record.provenance.owner_scope.session_id.as_str(), "session");
657 assert_eq!(
658 registry
659 .await_process("proc-persist")
660 .await
661 .expect("await persisted"),
662 ProcessAwaitOutput::Success {
663 value: serde_json::json!({"ok": true}),
664 control: None,
665 }
666 );
667 assert_eq!(
668 registry
669 .list_handle_grants(&owner_scope)
670 .await
671 .expect("grants")
672 .len(),
673 1
674 );
675 }
676}