Skip to main content

lash_sqlite_store/
lib.rs

1//! # lash-sqlite-store
2//!
3//! The high-performance local **durable** persistence backend for the lash
4//! agent runtime. One SQLite database per session, opened in WAL journal mode
5//! with a 15-second busy timeout, satisfying the full [`RuntimePersistence`] +
6//! [`AttachmentManifest`] contract from `lash-core`.
7//!
8//! This crate is a drop-in replacement for `lash-sqlite-store`: it exposes the
9//! same public surface (`Store`, `SqliteProcessRegistry`,
10//! `SqliteSessionStoreFactory`, `SqliteEffectHost`, the option/descriptor types)
11//! with identical async signatures, so a consumer swaps backends by renaming
12//! the crate path only. The difference is the engine underneath: tokio-rusqlite
13//! over a statically-linked SQLite with real WAL (`-wal`/`-shm` sidecars,
14//! multi-process readers + single writer) instead of the prior store's experimental mvcc.
15//!
16//! ## Why this is "the durable backend" not just "an option"
17//!
18//! Lash's runtime layer treats persistence as a first-class boundary, not a
19//! debug-only convenience. Every primitive that lets the runtime survive a
20//! crash — head-revision CAS, final turn-commit idempotency, attachment
21//! write-ahead manifests, blob content-addressing with optional compression —
22//! is implemented in this crate against SQLite for one reason: SQLite is the
23//! simplest backend that gives us *atomic multi-statement transactions on a
24//! single file* with durability guarantees we can reason about.
25//!
26//! ## Schema cutover, not migrations
27//!
28//! There is exactly one supported schema (see [`schema::SCHEMA`]). Older
29//! databases must be deleted before opening — we do not carry migration code.
30//!
31//! [`RuntimePersistence`]: lash_core::RuntimePersistence
32//! [`AttachmentManifest`]: lash_core::AttachmentManifest
33
34use std::collections::BTreeMap;
35use std::path::{Path, PathBuf};
36use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
37use std::sync::{Arc, Mutex};
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39
40use flate2::Compression;
41use flate2::read::ZlibDecoder;
42use flate2::write::ZlibEncoder;
43use lash_core::runtime::ProcessHandleGrantEntry;
44use lash_core::runtime::{
45    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
46    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, prepare_process_event_append,
47    prepare_process_registration,
48};
49use lash_core::store::{
50    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
51    RuntimeCommitResult, SessionCheckpoint, SessionHead, SessionHeadMeta,
52};
53use lash_core::{
54    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
55    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, PROCESS_LEASE_SCHEMA_VERSION,
56    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
57    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease,
58    ProcessLeaseCompletion, ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope,
59    RuntimePersistence, SessionMeta, SessionPickerInfo, SessionReadScope,
60    SessionStoreCreateRequest, SessionStoreFactory, SlotPolicy, StoreError, VacuumReport,
61};
62use rusqlite::{Connection, OptionalExtension, Transaction, params};
63use sha2::{Digest, Sha256};
64
65use conn::SqliteConnection;
66
67mod attachments;
68mod blobs;
69mod conn;
70mod effect_replay;
71mod graph;
72mod host_events;
73mod leases;
74mod lifecycle;
75mod persistence;
76mod process_registry;
77mod queued_work;
78mod schema;
79
80use conn::TxOutcome;
81pub use effect_replay::{
82    SqliteEffectHost, SqliteEffectReplayOptions, SqliteRuntimeEffectController,
83};
84pub use host_events::SqliteHostEventStore;
85use leases::*;
86use queued_work::*;
87use schema::{
88    StoreBacking, apply_pragmas, ensure_effect_schema, ensure_host_event_schema,
89    ensure_process_schema, ensure_schema,
90};
91
92/// SQLite-backed store for checkpoint blobs and the canonical session head.
93///
94/// The struct name and every public method match `lash_sqlite_store::Store`
95/// exactly so consumers swap backends with a path rename. Internally it holds a
96/// single cloneable [`SqliteConnection`] (a tokio-rusqlite handle to one
97/// database thread) rather than the prior store's `tokio::sync::Mutex<rusqlite::Connection>`.
98pub struct Store {
99    conn: SqliteConnection,
100    artifact_cache: Mutex<BTreeMap<lashlang::ModuleRef, Arc<lashlang::ModuleArtifact>>>,
101    options: StoreOptions,
102    commit_count: AtomicU64,
103}
104
105/// SQLite-backed process registry for one configured runtime deployment.
106///
107/// Named `SqliteProcessRegistry` so the path-rename swap keeps compiling; this is
108/// the SQLite implementation. It is intentionally separate from [`Store`]:
109/// session databases persist one conversation, while this registry persists
110/// background process state and handle visibility across all sessions in the
111/// same host profile.
112pub struct SqliteProcessRegistry {
113    conn: SqliteConnection,
114    notify: tokio::sync::Notify,
115}
116
117fn sqlite_error(err: rusqlite::Error) -> StoreError {
118    StoreError::Backend(err.to_string())
119}
120
121fn process_sqlite_error(err: rusqlite::Error) -> lash_core::PluginError {
122    lash_core::PluginError::Session(err.to_string())
123}
124
125fn process_decode_error(err: serde_json::Error) -> lash_core::PluginError {
126    lash_core::PluginError::Session(format!("failed to decode process registry row: {err}"))
127}
128
129fn process_encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
130    serde_json::to_string(value).map_err(|err| {
131        lash_core::PluginError::Session(format!("failed to encode process row: {err}"))
132    })
133}
134
135fn block_on_store<T>(future: impl std::future::Future<Output = T>) -> T {
136    futures_executor::block_on(future)
137}
138
139#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
140pub enum PersistedArtifactKind {
141    GenericBlob,
142    CheckpointManifest,
143    ToolState,
144    PluginSessionSnapshot,
145    ExecutionStateSnapshot,
146    LashlangModule,
147}
148
149#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
150pub enum BlobStorageHint {
151    Compressible,
152    InlinePreferred,
153    LargePayload,
154}
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
157enum BlobCompression {
158    None,
159    Zlib,
160}
161
162#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
163pub struct BlobArtifactDescriptor {
164    pub kind: PersistedArtifactKind,
165    #[serde(default, skip_serializing_if = "Vec::is_empty")]
166    pub hints: Vec<BlobStorageHint>,
167}
168
169impl BlobArtifactDescriptor {
170    pub fn new(kind: PersistedArtifactKind, hints: impl Into<Vec<BlobStorageHint>>) -> Self {
171        Self {
172            kind,
173            hints: hints.into(),
174        }
175    }
176
177    pub fn checkpoint_manifest() -> Self {
178        Self::new(
179            PersistedArtifactKind::CheckpointManifest,
180            vec![BlobStorageHint::Compressible],
181        )
182    }
183
184    pub fn tool_state_snapshot() -> Self {
185        Self::new(
186            PersistedArtifactKind::ToolState,
187            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
188        )
189    }
190
191    pub fn plugin_session_snapshot() -> Self {
192        Self::new(
193            PersistedArtifactKind::PluginSessionSnapshot,
194            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
195        )
196    }
197
198    pub fn execution_state_snapshot() -> Self {
199        Self::new(
200            PersistedArtifactKind::ExecutionStateSnapshot,
201            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
202        )
203    }
204
205    pub fn lashlang_module() -> Self {
206        Self::new(
207            PersistedArtifactKind::LashlangModule,
208            vec![BlobStorageHint::Compressible, BlobStorageHint::LargePayload],
209        )
210    }
211}
212
213#[derive(Clone, Debug, PartialEq, Eq, Hash)]
214struct RetainedArtifactRef {
215    pub blob_ref: BlobRef,
216    pub kind: PersistedArtifactKind,
217}
218
219#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
220pub enum BuiltinBlobProfile {
221    LowLatency,
222    #[default]
223    Balanced,
224    Compact,
225}
226
227#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
228pub struct StoreGcPolicy {
229    pub auto_run_every_commits: Option<u64>,
230}
231
232#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
233pub struct StoreOptions {
234    pub blob_profile: BuiltinBlobProfile,
235    pub gc_policy: StoreGcPolicy,
236}
237
238#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
239struct StoredBlobEnvelope {
240    descriptor: BlobArtifactDescriptor,
241    compression: BlobCompression,
242    content: Vec<u8>,
243}
244
245#[derive(Clone, Debug)]
246pub struct StoredSessionCheckpoint {
247    pub checkpoint_ref: BlobRef,
248    pub manifest: SessionCheckpoint,
249}
250
251/// Explicit first-party factory for one SQLite session database per Lash
252/// session.
253///
254/// Named `SqliteSessionStoreFactory` so the path-rename swap keeps compiling.
255/// Hosts opt into this by passing it to `lash::LashCoreBuilder::store_factory`.
256/// The factory never becomes a default: app storage and runtime storage remain
257/// host-owned decisions.
258#[derive(Clone, Debug)]
259pub struct SqliteSessionStoreFactory {
260    root: PathBuf,
261    options: StoreOptions,
262}
263
264impl SqliteSessionStoreFactory {
265    pub fn new(root: impl Into<PathBuf>) -> Self {
266        Self {
267            root: root.into(),
268            options: StoreOptions::default(),
269        }
270    }
271
272    pub fn with_options(root: impl Into<PathBuf>, options: StoreOptions) -> Self {
273        Self {
274            root: root.into(),
275            options,
276        }
277    }
278
279    pub fn path_for_session(&self, session_id: &str) -> PathBuf {
280        self.root.join(safe_session_db_file_name(session_id))
281    }
282}
283
284#[async_trait::async_trait]
285impl SessionStoreFactory for SqliteSessionStoreFactory {
286    fn durability_tier(&self) -> DurabilityTier {
287        DurabilityTier::Durable
288    }
289
290    async fn create_store(
291        &self,
292        request: &SessionStoreCreateRequest,
293    ) -> Result<Arc<dyn RuntimePersistence>, String> {
294        std::fs::create_dir_all(&self.root).map_err(|err| err.to_string())?;
295        let path = self.path_for_session(&request.session_id);
296        let store = Arc::new(
297            Store::open_with_options(&path, self.options)
298                .await
299                .map_err(|err| err.to_string())?,
300        );
301        if store.load_session_meta().await.is_none() {
302            store
303                .save_session_meta(SessionMeta {
304                    session_id: request.session_id.clone(),
305                    session_name: request.session_id.clone(),
306                    created_at: current_timestamp_string(),
307                    model: request.policy.model.id.clone(),
308                    cwd: std::env::current_dir()
309                        .ok()
310                        .and_then(|path| path.to_str().map(str::to_string)),
311                    relation: request.relation.clone(),
312                })
313                .await;
314        }
315        Ok(store as Arc<dyn RuntimePersistence>)
316    }
317
318    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
319        let db_path = self.path_for_session(session_id);
320        for path in [
321            db_path.clone(),
322            sqlite_sidecar_path(&db_path, "-wal"),
323            sqlite_sidecar_path(&db_path, "-shm"),
324        ] {
325            match std::fs::remove_file(&path) {
326                Ok(()) => {}
327                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
328                Err(err) => {
329                    return Err(format!("remove session store {}: {err}", path.display()));
330                }
331            }
332        }
333        Ok(())
334    }
335}
336
337fn safe_session_db_file_name(session_id: &str) -> String {
338    let mut safe = session_id
339        .chars()
340        .map(|ch| match ch {
341            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
342            _ => '_',
343        })
344        .collect::<String>();
345    safe = safe.trim_matches('_').to_string();
346    if safe.is_empty() {
347        safe.push_str("session");
348    }
349    safe.truncate(80);
350    let hash = format!("{:x}", Sha256::digest(session_id.as_bytes()));
351    format!("{safe}-{}.db", &hash[..16])
352}
353
354fn sqlite_sidecar_path(path: &Path, suffix: &str) -> PathBuf {
355    let mut sidecar = path.as_os_str().to_os_string();
356    sidecar.push(suffix);
357    PathBuf::from(sidecar)
358}
359
360fn current_timestamp_string() -> String {
361    let now = SystemTime::now()
362        .duration_since(UNIX_EPOCH)
363        .unwrap_or_default();
364    format!("unix:{}", now.as_secs())
365}
366
367fn current_epoch_ms() -> u64 {
368    SystemTime::now()
369        .duration_since(UNIX_EPOCH)
370        .unwrap_or_default()
371        .as_millis() as u64
372}
373
374fn retained_artifact_refs(checkpoint: &SessionCheckpoint) -> Vec<RetainedArtifactRef> {
375    let mut refs = Vec::new();
376    if let Some(blob_ref) = &checkpoint.tool_state_ref {
377        refs.push(RetainedArtifactRef {
378            blob_ref: blob_ref.clone(),
379            kind: PersistedArtifactKind::ToolState,
380        });
381    }
382    if let Some(blob_ref) = &checkpoint.plugin_snapshot_ref {
383        refs.push(RetainedArtifactRef {
384            blob_ref: blob_ref.clone(),
385            kind: PersistedArtifactKind::PluginSessionSnapshot,
386        });
387    }
388    if let Some(blob_ref) = &checkpoint.execution_state_ref {
389        refs.push(RetainedArtifactRef {
390            blob_ref: blob_ref.clone(),
391            kind: PersistedArtifactKind::ExecutionStateSnapshot,
392        });
393    }
394    refs
395}
396
397fn session_head_meta(head: &SessionHead) -> SessionHeadMeta {
398    SessionHeadMeta {
399        session_id: head.session_id.clone(),
400        head_revision: 0,
401        config: head.config.clone(),
402        agent_frames: head.agent_frames.clone(),
403        current_agent_frame_id: head.current_agent_frame_id.clone(),
404        checkpoint_ref: head.checkpoint_ref.clone(),
405        leaf_node_id: head.graph.leaf_node_id.clone(),
406        graph_node_count: head.graph.nodes.len(),
407        token_ledger: Vec::new(),
408    }
409}
410
411fn encode_json<T: serde::Serialize>(value: &T) -> String {
412    serde_json::to_string(value).expect("persisted state should serialize")
413}
414
415fn should_compress_blob(
416    profile: BuiltinBlobProfile,
417    descriptor: &BlobArtifactDescriptor,
418    len: usize,
419) -> bool {
420    if !descriptor.hints.contains(&BlobStorageHint::Compressible) {
421        return false;
422    }
423    match profile {
424        BuiltinBlobProfile::LowLatency => false,
425        BuiltinBlobProfile::Balanced => len >= 4 * 1024,
426        BuiltinBlobProfile::Compact => len >= 1024,
427    }
428}
429
430fn compress_blob(content: &[u8]) -> Vec<u8> {
431    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
432    std::io::Write::write_all(&mut encoder, content).expect("compress blob");
433    encoder.finish().expect("submit blob compression")
434}
435
436fn decompress_blob(content: &[u8]) -> Option<Vec<u8>> {
437    let mut decoder = ZlibDecoder::new(content);
438    let mut out = Vec::new();
439    std::io::Read::read_to_end(&mut decoder, &mut out).ok()?;
440    Some(out)
441}
442
443fn encode_artifact_blob(
444    descriptor: &BlobArtifactDescriptor,
445    profile: BuiltinBlobProfile,
446    content: &[u8],
447) -> Vec<u8> {
448    let (compression, stored_content) = if should_compress_blob(profile, descriptor, content.len())
449    {
450        (BlobCompression::Zlib, compress_blob(content))
451    } else {
452        (BlobCompression::None, content.to_vec())
453    };
454    encode_msgpack(&StoredBlobEnvelope {
455        descriptor: descriptor.clone(),
456        compression,
457        content: stored_content,
458    })
459}
460
461fn decode_artifact_blob(bytes: &[u8]) -> Option<Vec<u8>> {
462    let envelope = decode_msgpack::<StoredBlobEnvelope>(bytes)?;
463    match envelope.compression {
464        BlobCompression::None => Some(envelope.content),
465        BlobCompression::Zlib => decompress_blob(&envelope.content),
466    }
467}
468
469/// Read the session head meta off a raw connection. Synchronous because it runs
470/// inside a `conn.call`/`conn.write` closure on the connection thread.
471fn try_load_session_head_meta_from_conn(
472    conn: &Connection,
473) -> Result<Option<SessionHeadMeta>, StoreError> {
474    let row = conn
475        .query_row(
476            "SELECT head_json, head_revision FROM session_head WHERE singleton = 1",
477            [],
478            |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
479        )
480        .optional()
481        .map_err(sqlite_error)?;
482    let Some((head_json, head_revision)) = row else {
483        return Ok(None);
484    };
485    let mut meta: SessionHeadMeta = serde_json::from_str(&head_json)
486        .map_err(|err| StoreError::Backend(format!("decode session head: {err}")))?;
487    meta.head_revision = head_revision as u64;
488    Ok(Some(meta))
489}
490
491fn load_session_head_meta_from_conn(conn: &Connection) -> Option<SessionHeadMeta> {
492    try_load_session_head_meta_from_conn(conn).ok().flatten()
493}
494
495fn load_session_meta_from_conn(conn: &Connection) -> Option<SessionMeta> {
496    conn.query_row(
497        "SELECT session_id, session_name, created_at, model, cwd, relation_json
498         FROM session_meta WHERE singleton = 1",
499        [],
500        |row| {
501            let relation_json: Option<String> = row.get(5)?;
502            let relation = relation_json
503                .and_then(|json| serde_json::from_str(&json).ok())
504                .unwrap_or_default();
505            Ok(SessionMeta {
506                session_id: row.get(0)?,
507                session_name: row.get(1)?,
508                created_at: row.get(2)?,
509                model: row.get(3)?,
510                cwd: row.get(4)?,
511                relation,
512            })
513        },
514    )
515    .optional()
516    .ok()
517    .flatten()
518}
519
520fn decode_checkpoint(bytes: &[u8]) -> Option<SessionCheckpoint> {
521    rmp_serde::from_slice(bytes).ok()
522}
523
524fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
525    // Pre-size the buffer so the per-byte writes inside rmp_serde don't
526    // walk the Vec through 0→4→8→16→32… reallocations on every call.
527    let mut buf = Vec::with_capacity(1024);
528    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
529    buf
530}
531
532fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
533    rmp_serde::from_slice(bytes).ok()
534}
535
536fn merge_token_ledger_entries(
537    entries: Vec<lash_core::TokenLedgerEntry>,
538) -> Vec<lash_core::TokenLedgerEntry> {
539    let mut merged: Vec<lash_core::TokenLedgerEntry> = Vec::new();
540    for entry in entries {
541        if entry.usage.total() == 0 {
542            continue;
543        }
544        if let Some(existing) = merged
545            .iter_mut()
546            .find(|existing| existing.source == entry.source && existing.model == entry.model)
547        {
548            existing.usage.input_tokens += entry.usage.input_tokens;
549            existing.usage.output_tokens += entry.usage.output_tokens;
550            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
551            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
552        } else {
553            merged.push(entry);
554        }
555    }
556    merged
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use lash_core::ProcessInput;
563    use lashlang::LashlangArtifactStore;
564
565    fn registration(id: &str) -> ProcessRegistration {
566        ProcessRegistration::new(
567            id,
568            ProcessInput::External {
569                metadata: serde_json::Value::Null,
570            },
571        )
572        .with_process_provenance(lash_core::ProcessProvenance::new(
573            lash_core::ProcessScope::new("session"),
574            "test-host",
575        ))
576    }
577
578    #[tokio::test]
579    async fn sqlite_lashlang_artifact_store_round_trips_verified_module_artifacts() {
580        let store = Store::memory().await.expect("memory store");
581        let module =
582            lashlang::parse("process scan(root: str) { finish root }").expect("parse module");
583        let linked = lashlang::LinkedModule::link(
584            module,
585            lashlang::LashlangSurface::new(
586                lashlang::ResourceCatalog::new(),
587                lashlang::LashlangAbilities::all(),
588            ),
589        )
590        .expect("link module");
591
592        store
593            .put_module_artifact(&linked.artifact)
594            .await
595            .expect("put artifact");
596        let restored = store
597            .get_module_artifact(&linked.module_ref)
598            .await
599            .expect("get artifact")
600            .expect("artifact exists");
601
602        assert_eq!(restored.module_ref, linked.module_ref);
603        assert_eq!(
604            restored.process_ref("scan"),
605            linked.artifact.process_ref("scan")
606        );
607    }
608
609    #[tokio::test]
610    async fn sqlite_process_registry_persists_rows_after_reopen() {
611        let dir = tempfile::tempdir().expect("tempdir");
612        let path = dir.path().join("processes.db");
613        {
614            let registry = SqliteProcessRegistry::open(&path)
615                .await
616                .expect("open registry");
617            let owner_scope = lash_core::ProcessScope::new("session");
618            registry
619                .register_process(registration("proc-persist"))
620                .await
621                .expect("register");
622            registry
623                .grant_handle(
624                    &owner_scope,
625                    "proc-persist",
626                    ProcessHandleDescriptor::new(Some("tool"), Some("demo")),
627                )
628                .await
629                .expect("grant");
630            registry
631                .complete_process(
632                    "proc-persist",
633                    ProcessAwaitOutput::Success {
634                        value: serde_json::json!({"ok": true}),
635                        control: None,
636                    },
637                )
638                .await
639                .expect("complete");
640        }
641
642        let registry = SqliteProcessRegistry::open(&path)
643            .await
644            .expect("reopen registry");
645        let owner_scope = lash_core::ProcessScope::new("session");
646        let record = registry
647            .get_process("proc-persist")
648            .await
649            .expect("persisted process");
650
651        assert_eq!(record.owner_scope_id(), owner_scope.id());
652        assert_eq!(record.provenance.owner_scope.session_id.as_str(), "session");
653        assert_eq!(
654            registry
655                .await_process("proc-persist")
656                .await
657                .expect("await persisted"),
658            ProcessAwaitOutput::Success {
659                value: serde_json::json!({"ok": true}),
660                control: None,
661            }
662        );
663        assert_eq!(
664            registry
665                .list_handle_grants(&owner_scope)
666                .await
667                .expect("grants")
668                .len(),
669            1
670        );
671    }
672}