1use super::*;
16
17impl Store {
18 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
19 Self::open_with_options(path, StoreOptions::default()).await
20 }
21
22 pub async fn open_with_options(
23 path: &Path,
24 options: StoreOptions,
25 ) -> tokio_rusqlite::Result<Self> {
26 let conn = SqliteConnection::open(path).await?;
27 ensure_schema(&conn).await?;
28 apply_pragmas(&conn, StoreBacking::File).await?;
29 Ok(Self {
30 conn,
31 artifact_cache: Mutex::new(BTreeMap::new()),
32 options,
33 commit_count: AtomicU64::new(0),
34 })
35 }
36
37 pub async fn open_readonly(path: &Path) -> tokio_rusqlite::Result<Self> {
40 let conn = SqliteConnection::open_readonly(path).await?;
41 Ok(Self {
42 conn,
43 artifact_cache: Mutex::new(BTreeMap::new()),
44 options: StoreOptions::default(),
45 commit_count: AtomicU64::new(0),
46 })
47 }
48
49 pub async fn load_picker_info(&self) -> Option<SessionPickerInfo> {
50 self.conn
51 .call(|conn| {
52 let meta = conn
53 .query_row(
54 "SELECT session_id, cwd, relation_json
55 FROM session_meta WHERE singleton = 1",
56 [],
57 |row| {
58 let relation_json: Option<String> = row.get(2)?;
59 let relation = relation_json
60 .and_then(|json| serde_json::from_str(&json).ok())
61 .unwrap_or_default();
62 Ok((
63 row.get::<_, String>(0)?,
64 row.get::<_, Option<String>>(1)?,
65 relation,
66 ))
67 },
68 )
69 .optional()?;
70 let Some((session_id, cwd, relation)) = meta else {
71 return Ok(None);
72 };
73
74 let head_json: String = conn
75 .query_row(
76 "SELECT head_json FROM session_head WHERE singleton = 1",
77 [],
78 |row| row.get(0),
79 )
80 .optional()?
81 .unwrap_or_else(|| "{}".to_string());
82 let head_meta =
83 serde_json::from_str::<SessionHeadMeta>(&head_json).unwrap_or_default();
84 let graph = Self::load_session_graph_from_conn(conn, head_meta.leaf_node_id);
85
86 Ok(Some(SessionPickerInfo {
87 session_id,
88 cwd,
89 relation,
90 first_user_message: graph.first_user_message(),
91 user_message_count: graph.user_message_count(),
92 }))
93 })
94 .await
95 .ok()
96 .flatten()
97 }
98
99 pub async fn memory() -> tokio_rusqlite::Result<Self> {
100 Self::memory_with_options(StoreOptions {
101 blob_profile: BuiltinBlobProfile::LowLatency,
102 gc_policy: StoreGcPolicy::default(),
103 })
104 .await
105 }
106
107 pub async fn memory_with_options(options: StoreOptions) -> tokio_rusqlite::Result<Self> {
108 let conn = SqliteConnection::open_in_memory().await?;
109 ensure_schema(&conn).await?;
110 apply_pragmas(&conn, StoreBacking::Memory).await?;
111 Ok(Self {
112 conn,
113 artifact_cache: Mutex::new(BTreeMap::new()),
114 options,
115 commit_count: AtomicU64::new(0),
116 })
117 }
118
119 pub async fn save_session_head_meta(&self, meta: SessionHeadMeta) {
120 let head_json = encode_json(&meta);
121 let session_id = meta.session_id.clone();
122 let head_revision = meta.head_revision as i64;
123 let result = self
124 .conn
125 .call(move |conn| {
126 conn.execute(
127 "INSERT OR REPLACE INTO session_head (singleton, session_id, head_json, head_revision)
128 VALUES (1, ?1, ?2, ?3)",
129 params![session_id, head_json, head_revision],
130 )
131 })
132 .await;
133 if let Err(err) = result {
134 tracing::warn!(error = %err, "failed to persist session head");
135 }
136 }
137
138 pub async fn load_session_head_meta(&self) -> Option<SessionHeadMeta> {
139 self.conn
140 .call(|conn| Ok(load_session_head_meta_from_conn(conn)))
141 .await
142 .ok()
143 .flatten()
144 }
145
146 pub async fn save_session_head(&self, head: SessionHead) {
147 self.replace_session_graph(&head.graph).await;
148 self.save_session_head_meta(session_head_meta(&head)).await;
149 }
150
151 pub async fn load_session_head(&self) -> Option<SessionHead> {
152 let meta = self.load_session_head_meta().await?;
153 let mut graph = self.load_session_graph().await;
154 graph.set_leaf_node_id(meta.leaf_node_id.clone());
155 Some(SessionHead {
156 session_id: meta.session_id,
157 head_revision: meta.head_revision,
158 agent_frames: meta.agent_frames,
159 current_agent_frame_id: meta.current_agent_frame_id,
160 graph,
161 config: meta.config,
162 checkpoint_ref: meta.checkpoint_ref,
163 token_ledger: merge_token_ledger_entries(self.load_usage_deltas().await),
164 })
165 }
166
167 pub async fn head_copy_from_store(&self, source: &Store) {
168 if let Some(head) = source.load_session_head().await {
169 if let Some(checkpoint_ref) = &head.checkpoint_ref
170 && let Some(record) = source
171 .get_typed_blob::<SessionCheckpoint>(checkpoint_ref)
172 .await
173 {
174 for blob_ref in [
175 record.tool_state_ref.as_ref(),
176 record.plugin_snapshot_ref.as_ref(),
177 ]
178 .into_iter()
179 .flatten()
180 {
181 if let Some(blob) = source.get_blob(blob_ref).await {
182 let descriptor = match record
183 .tool_state_ref
184 .as_ref()
185 .filter(|candidate| *candidate == blob_ref)
186 {
187 Some(_) => BlobArtifactDescriptor::tool_state_snapshot(),
188 None => BlobArtifactDescriptor::plugin_session_snapshot(),
189 };
190 let _ = self.put_artifact_blob(descriptor, &blob).await;
191 }
192 }
193 if let Some(blob) = source.get_blob(checkpoint_ref).await {
194 let _ = self
195 .put_artifact_blob(BlobArtifactDescriptor::checkpoint_manifest(), &blob)
196 .await;
197 }
198 }
199 self.replace_session_graph(&head.graph).await;
200 self.save_session_head_meta(session_head_meta(&head)).await;
201 }
202 }
203
204 pub async fn save_session_meta(&self, meta: SessionMeta) {
205 let relation_json = serde_json::to_string(&meta.relation).ok();
206 let session_id_for_log = meta.session_id.clone();
207 let result = self
208 .conn
209 .call(move |conn| {
210 conn.execute(
211 "INSERT OR REPLACE INTO session_meta
212 (singleton, session_id, session_name, created_at, model, cwd, relation_json)
213 VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6)",
214 params![
215 meta.session_id,
216 meta.session_name,
217 meta.created_at,
218 meta.model,
219 meta.cwd,
220 relation_json
221 ],
222 )
223 })
224 .await;
225 if let Err(err) = result {
226 tracing::warn!(
227 error = %err,
228 session_id = session_id_for_log,
229 "failed to persist session metadata"
230 );
231 }
232 }
233
234 pub async fn load_session_meta(&self) -> Option<SessionMeta> {
235 self.conn
236 .call(|conn| Ok(load_session_meta_from_conn(conn)))
237 .await
238 .ok()
239 .flatten()
240 }
241}