Skip to main content

lash_sqlite_store/
lifecycle.rs

1//! [`Store`] open/memory lifecycle plus session head/meta accessors.
2//!
3//! This is one of the two reference modules (with `blobs.rs`) establishing the
4//! tokio-rusqlite translation pattern every other module follows:
5//!
6//! * The async public methods keep the *exact* the prior store signatures.
7//! * A read goes through `self.conn.call(move |c| { ... })`, where the closure
8//!   is a *synchronous* rusqlite body returning `rusqlite::Result<T>`.
9//! * A read-then-write goes through `self.conn.write(move |tx| { ... })`.
10//! * The shared `*_from_conn` helpers in `lib.rs` are synchronous and take a
11//!   `&rusqlite::Connection`, so they can be called from inside either closure.
12//! * Closures must be `'static` + `Send`: capture owned values (clone strings,
13//!   move them in), not borrows of `self`.
14
15use super::*;
16
17impl Store {
18    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
19        Self::open_with_options(path, StoreOptions::default()).await
20    }
21
22    pub async fn open_with_options(
23        path: &Path,
24        options: StoreOptions,
25    ) -> tokio_rusqlite::Result<Self> {
26        let conn = SqliteConnection::open(path).await?;
27        ensure_schema(&conn).await?;
28        apply_pragmas(&conn, StoreBacking::File).await?;
29        Ok(Self {
30            conn,
31            artifact_cache: Mutex::new(BTreeMap::new()),
32            options,
33            commit_count: AtomicU64::new(0),
34        })
35    }
36
37    /// Open the local database read-only. Used by export/resume call sites that
38    /// must never mutate the source.
39    pub async fn open_readonly(path: &Path) -> tokio_rusqlite::Result<Self> {
40        let conn = SqliteConnection::open_readonly(path).await?;
41        Ok(Self {
42            conn,
43            artifact_cache: Mutex::new(BTreeMap::new()),
44            options: StoreOptions::default(),
45            commit_count: AtomicU64::new(0),
46        })
47    }
48
49    pub async fn load_picker_info(&self) -> Option<SessionPickerInfo> {
50        self.conn
51            .call(|conn| {
52                let meta = conn
53                    .query_row(
54                        "SELECT session_id, cwd, relation_json
55                         FROM session_meta WHERE singleton = 1",
56                        [],
57                        |row| {
58                            let relation_json: Option<String> = row.get(2)?;
59                            let relation = relation_json
60                                .and_then(|json| serde_json::from_str(&json).ok())
61                                .unwrap_or_default();
62                            Ok((
63                                row.get::<_, String>(0)?,
64                                row.get::<_, Option<String>>(1)?,
65                                relation,
66                            ))
67                        },
68                    )
69                    .optional()?;
70                let Some((session_id, cwd, relation)) = meta else {
71                    return Ok(None);
72                };
73
74                let head_json: String = conn
75                    .query_row(
76                        "SELECT head_json FROM session_head WHERE singleton = 1",
77                        [],
78                        |row| row.get(0),
79                    )
80                    .optional()?
81                    .unwrap_or_else(|| "{}".to_string());
82                let head_meta =
83                    serde_json::from_str::<SessionHeadMeta>(&head_json).unwrap_or_default();
84                let graph = Self::load_session_graph_from_conn(conn, head_meta.leaf_node_id);
85
86                Ok(Some(SessionPickerInfo {
87                    session_id,
88                    cwd,
89                    relation,
90                    first_user_message: graph.first_user_message(),
91                    user_message_count: graph.user_message_count(),
92                }))
93            })
94            .await
95            .ok()
96            .flatten()
97    }
98
99    pub async fn memory() -> tokio_rusqlite::Result<Self> {
100        Self::memory_with_options(StoreOptions {
101            blob_profile: BuiltinBlobProfile::LowLatency,
102            gc_policy: StoreGcPolicy::default(),
103        })
104        .await
105    }
106
107    pub async fn memory_with_options(options: StoreOptions) -> tokio_rusqlite::Result<Self> {
108        let conn = SqliteConnection::open_in_memory().await?;
109        ensure_schema(&conn).await?;
110        apply_pragmas(&conn, StoreBacking::Memory).await?;
111        Ok(Self {
112            conn,
113            artifact_cache: Mutex::new(BTreeMap::new()),
114            options,
115            commit_count: AtomicU64::new(0),
116        })
117    }
118
119    pub async fn save_session_head_meta(&self, meta: SessionHeadMeta) {
120        let head_json = encode_json(&meta);
121        let session_id = meta.session_id.clone();
122        let head_revision = meta.head_revision as i64;
123        let result = self
124            .conn
125            .call(move |conn| {
126                conn.execute(
127                    "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
128                     VALUES (1, ?1, ?2, ?3)",
129                    params![session_id, head_json, head_revision],
130                )
131            })
132            .await;
133        if let Err(err) = result {
134            tracing::warn!(error = %err, "failed to persist session head");
135        }
136    }
137
138    pub async fn load_session_head_meta(&self) -> Option<SessionHeadMeta> {
139        self.conn
140            .call(|conn| Ok(load_session_head_meta_from_conn(conn)))
141            .await
142            .ok()
143            .flatten()
144    }
145
146    pub async fn save_session_head(&self, head: SessionHead) {
147        self.replace_session_graph(&head.graph).await;
148        self.save_session_head_meta(session_head_meta(&head)).await;
149    }
150
151    pub async fn load_session_head(&self) -> Option<SessionHead> {
152        let meta = self.load_session_head_meta().await?;
153        let mut graph = self.load_session_graph().await;
154        graph.set_leaf_node_id(meta.leaf_node_id.clone());
155        Some(SessionHead {
156            session_id: meta.session_id,
157            head_revision: meta.head_revision,
158            agent_frames: meta.agent_frames,
159            current_agent_frame_id: meta.current_agent_frame_id,
160            graph,
161            config: meta.config,
162            checkpoint_ref: meta.checkpoint_ref,
163            token_ledger: merge_token_ledger_entries(self.load_usage_deltas().await),
164        })
165    }
166
167    pub async fn head_copy_from_store(&self, source: &Store) {
168        if let Some(head) = source.load_session_head().await {
169            if let Some(checkpoint_ref) = &head.checkpoint_ref
170                && let Some(record) = source
171                    .get_typed_blob::<SessionCheckpoint>(checkpoint_ref)
172                    .await
173            {
174                for blob_ref in [
175                    record.tool_state_ref.as_ref(),
176                    record.plugin_snapshot_ref.as_ref(),
177                ]
178                .into_iter()
179                .flatten()
180                {
181                    if let Some(blob) = source.get_blob(blob_ref).await {
182                        let descriptor = match record
183                            .tool_state_ref
184                            .as_ref()
185                            .filter(|candidate| *candidate == blob_ref)
186                        {
187                            Some(_) => BlobArtifactDescriptor::tool_state_snapshot(),
188                            None => BlobArtifactDescriptor::plugin_session_snapshot(),
189                        };
190                        let _ = self.put_artifact_blob(descriptor, &blob).await;
191                    }
192                }
193                if let Some(blob) = source.get_blob(checkpoint_ref).await {
194                    let _ = self
195                        .put_artifact_blob(BlobArtifactDescriptor::checkpoint_manifest(), &blob)
196                        .await;
197                }
198            }
199            self.replace_session_graph(&head.graph).await;
200            self.save_session_head_meta(session_head_meta(&head)).await;
201        }
202    }
203
204    pub async fn save_session_meta(&self, meta: SessionMeta) {
205        let relation_json = serde_json::to_string(&meta.relation).ok();
206        let session_id_for_log = meta.session_id.clone();
207        let result = self
208            .conn
209            .call(move |conn| {
210                conn.execute(
211                    "INSERT OR REPLACE INTO session_meta
212                     (singleton, session_id, session_name, created_at, model, cwd, relation_json)
213                     VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6)",
214                    params![
215                        meta.session_id,
216                        meta.session_name,
217                        meta.created_at,
218                        meta.model,
219                        meta.cwd,
220                        relation_json
221                    ],
222                )
223            })
224            .await;
225        if let Err(err) = result {
226            tracing::warn!(
227                error = %err,
228                session_id = session_id_for_log,
229                "failed to persist session metadata"
230            );
231        }
232    }
233
234    pub async fn load_session_meta(&self) -> Option<SessionMeta> {
235        self.conn
236            .call(|conn| Ok(load_session_meta_from_conn(conn)))
237            .await
238            .ok()
239            .flatten()
240    }
241}