lash-sqlite-store 0.1.0-alpha.40

SQLite-backed session store for the lash agent runtime.
Documentation
//! [`Store`] open/memory lifecycle plus session head/meta accessors.
//!
//! This is one of the two reference modules (with `blobs.rs`) establishing the
//! tokio-rusqlite translation pattern every other module follows:
//!
//! * The async public methods keep the *exact* the prior store signatures.
//! * A read goes through `self.conn.call(move |c| { ... })`, where the closure
//!   is a *synchronous* rusqlite body returning `rusqlite::Result<T>`.
//! * A read-then-write goes through `self.conn.write(move |tx| { ... })`.
//! * The shared `*_from_conn` helpers in `lib.rs` are synchronous and take a
//!   `&rusqlite::Connection`, so they can be called from inside either closure.
//! * Closures must be `'static` + `Send`: capture owned values (clone strings,
//!   move them in), not borrows of `self`.

use super::*;

impl Store {
    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
        Self::open_with_options(path, StoreOptions::default()).await
    }

    pub async fn open_with_options(
        path: &Path,
        options: StoreOptions,
    ) -> tokio_rusqlite::Result<Self> {
        let conn = SqliteConnection::open(path).await?;
        ensure_schema(&conn).await?;
        apply_pragmas(&conn, StoreBacking::File).await?;
        Ok(Self {
            conn,
            artifact_cache: Mutex::new(BTreeMap::new()),
            options,
            commit_count: AtomicU64::new(0),
        })
    }

    /// Open the local database read-only. Used by export/resume call sites that
    /// must never mutate the source.
    pub async fn open_readonly(path: &Path) -> tokio_rusqlite::Result<Self> {
        let conn = SqliteConnection::open_readonly(path).await?;
        Ok(Self {
            conn,
            artifact_cache: Mutex::new(BTreeMap::new()),
            options: StoreOptions::default(),
            commit_count: AtomicU64::new(0),
        })
    }

    pub async fn load_picker_info(&self) -> Option<SessionPickerInfo> {
        self.conn
            .call(|conn| {
                let meta = conn
                    .query_row(
                        "SELECT session_id, cwd, relation_json
                         FROM session_meta WHERE singleton = 1",
                        [],
                        |row| {
                            let relation_json: Option<String> = row.get(2)?;
                            let relation = relation_json
                                .and_then(|json| serde_json::from_str(&json).ok())
                                .unwrap_or_default();
                            Ok((
                                row.get::<_, String>(0)?,
                                row.get::<_, Option<String>>(1)?,
                                relation,
                            ))
                        },
                    )
                    .optional()?;
                let Some((session_id, cwd, relation)) = meta else {
                    return Ok(None);
                };

                let head_json: String = conn
                    .query_row(
                        "SELECT head_json FROM session_head WHERE singleton = 1",
                        [],
                        |row| row.get(0),
                    )
                    .optional()?
                    .unwrap_or_else(|| "{}".to_string());
                let head_meta =
                    serde_json::from_str::<SessionHeadMeta>(&head_json).unwrap_or_default();
                let graph = Self::load_session_graph_from_conn(conn, head_meta.leaf_node_id);

                Ok(Some(SessionPickerInfo {
                    session_id,
                    cwd,
                    relation,
                    first_user_message: graph.first_user_message(),
                    user_message_count: graph.user_message_count(),
                }))
            })
            .await
            .ok()
            .flatten()
    }

    pub async fn memory() -> tokio_rusqlite::Result<Self> {
        Self::memory_with_options(StoreOptions {
            blob_profile: BuiltinBlobProfile::LowLatency,
            gc_policy: StoreGcPolicy::default(),
        })
        .await
    }

    pub async fn memory_with_options(options: StoreOptions) -> tokio_rusqlite::Result<Self> {
        let conn = SqliteConnection::open_in_memory().await?;
        ensure_schema(&conn).await?;
        apply_pragmas(&conn, StoreBacking::Memory).await?;
        Ok(Self {
            conn,
            artifact_cache: Mutex::new(BTreeMap::new()),
            options,
            commit_count: AtomicU64::new(0),
        })
    }

    pub async fn save_session_head_meta(&self, meta: SessionHeadMeta) {
        let head_json = encode_json(&meta);
        let session_id = meta.session_id.clone();
        let head_revision = meta.head_revision as i64;
        let result = self
            .conn
            .call(move |conn| {
                conn.execute(
                    "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
                     VALUES (1, ?1, ?2, ?3)",
                    params![session_id, head_json, head_revision],
                )
            })
            .await;
        if let Err(err) = result {
            tracing::warn!(error = %err, "failed to persist session head");
        }
    }

    pub async fn load_session_head_meta(&self) -> Option<SessionHeadMeta> {
        self.conn
            .call(|conn| Ok(load_session_head_meta_from_conn(conn)))
            .await
            .ok()
            .flatten()
    }

    pub async fn save_session_head(&self, head: SessionHead) {
        self.replace_session_graph(&head.graph).await;
        self.save_session_head_meta(session_head_meta(&head)).await;
    }

    pub async fn load_session_head(&self) -> Option<SessionHead> {
        let meta = self.load_session_head_meta().await?;
        let mut graph = self.load_session_graph().await;
        graph.set_leaf_node_id(meta.leaf_node_id.clone());
        Some(SessionHead {
            session_id: meta.session_id,
            head_revision: meta.head_revision,
            agent_frames: meta.agent_frames,
            current_agent_frame_id: meta.current_agent_frame_id,
            graph,
            config: meta.config,
            checkpoint_ref: meta.checkpoint_ref,
            token_ledger: merge_token_ledger_entries(self.load_usage_deltas().await),
        })
    }

    pub async fn head_copy_from_store(&self, source: &Store) {
        if let Some(head) = source.load_session_head().await {
            if let Some(checkpoint_ref) = &head.checkpoint_ref
                && let Some(record) = source
                    .get_typed_blob::<SessionCheckpoint>(checkpoint_ref)
                    .await
            {
                for blob_ref in [
                    record.tool_state_ref.as_ref(),
                    record.plugin_snapshot_ref.as_ref(),
                ]
                .into_iter()
                .flatten()
                {
                    if let Some(blob) = source.get_blob(blob_ref).await {
                        let descriptor = match record
                            .tool_state_ref
                            .as_ref()
                            .filter(|candidate| *candidate == blob_ref)
                        {
                            Some(_) => BlobArtifactDescriptor::tool_state_snapshot(),
                            None => BlobArtifactDescriptor::plugin_session_snapshot(),
                        };
                        let _ = self.put_artifact_blob(descriptor, &blob).await;
                    }
                }
                if let Some(blob) = source.get_blob(checkpoint_ref).await {
                    let _ = self
                        .put_artifact_blob(BlobArtifactDescriptor::checkpoint_manifest(), &blob)
                        .await;
                }
            }
            self.replace_session_graph(&head.graph).await;
            self.save_session_head_meta(session_head_meta(&head)).await;
        }
    }

    pub async fn save_session_meta(&self, meta: SessionMeta) {
        let relation_json = serde_json::to_string(&meta.relation).ok();
        let session_id_for_log = meta.session_id.clone();
        let result = self
            .conn
            .call(move |conn| {
                conn.execute(
                    "INSERT OR REPLACE INTO session_meta
                     (singleton, session_id, session_name, created_at, model, cwd, relation_json)
                     VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6)",
                    params![
                        meta.session_id,
                        meta.session_name,
                        meta.created_at,
                        meta.model,
                        meta.cwd,
                        relation_json
                    ],
                )
            })
            .await;
        if let Err(err) = result {
            tracing::warn!(
                error = %err,
                session_id = session_id_for_log,
                "failed to persist session metadata"
            );
        }
    }

    pub async fn load_session_meta(&self) -> Option<SessionMeta> {
        self.conn
            .call(|conn| Ok(load_session_meta_from_conn(conn)))
            .await
            .ok()
            .flatten()
    }
}