Skip to main content

lash_sqlite_store/
attachments.rs

1//! The lashlang module-artifact store and the attachment write-ahead manifest.
2//!
3//! Both traits in this module have synchronous-looking call sites in their
4//! consumers but bridge to the async [`SqliteConnection`] underneath:
5//!
6//! * [`lashlang::LashlangArtifactStore`] is itself an `#[async_trait]`, so its
7//!   methods `.await` the connection wrapper directly (matching the the prior store
8//!   store's async surface byte-for-byte).
9//! * [`AttachmentManifest`] is a *synchronous* trait. Its bodies therefore wrap
10//!   the async store work in [`block_on_store`], exactly as the prior store did.
11//!
12//! Every DB body is a synchronous rusqlite closure handed to `conn.call`
13//! (reads) or `conn.write` (read-then-write); only the wrapper call is awaited.
14
15use super::*;
16
17impl Store {
18    async fn put_artifact_ref_blob(
19        &self,
20        artifact_ref: String,
21        descriptor: BlobArtifactDescriptor,
22        bytes: Vec<u8>,
23    ) -> Result<(), StoreError> {
24        let blob_profile = self.options.blob_profile;
25        self.conn
26            .write(move |tx| {
27                let blob_ref =
28                    Self::insert_artifact_blob_conn(tx, descriptor, &bytes, blob_profile)?;
29                tx.execute(
30                    "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
31                    params![artifact_ref, blob_ref.as_str()],
32                )?;
33                Ok(())
34            })
35            .await
36            .map_err(sqlite_error)
37    }
38
39    async fn get_artifact_ref_blob(
40        &self,
41        artifact_ref: String,
42        missing_diagnostic: String,
43    ) -> Result<Option<Vec<u8>>, StoreError> {
44        let resolved = self
45            .conn
46            .call(move |conn| {
47                let blob_ref: Option<String> = conn
48                    .query_row(
49                        "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
50                        params![artifact_ref],
51                        |row| row.get::<_, String>(0),
52                    )
53                    .optional()?;
54                let Some(blob_ref) = blob_ref else {
55                    return Ok(None);
56                };
57                Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
58            })
59            .await
60            .map_err(sqlite_error)?;
61        let Some(blob) = resolved else {
62            return Ok(None);
63        };
64        blob.ok_or_else(|| {
65            StoreError::Backend(format!("{missing_diagnostic} points at a missing blob"))
66        })
67        .map(Some)
68    }
69}
70
71#[async_trait::async_trait]
72impl lashlang::LashlangArtifactStore for Store {
73    fn durability_tier(&self) -> lashlang::DurabilityTier {
74        lashlang::DurabilityTier::Durable
75    }
76
77    async fn put_module_artifact(
78        &self,
79        artifact: &lashlang::ModuleArtifact,
80    ) -> Result<(), lashlang::ArtifactStoreError> {
81        let bytes = artifact
82            .to_store_bytes()
83            .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
84        let artifact_ref = artifact.module_ref.as_str().to_string();
85        self.put_artifact_ref_blob(
86            artifact_ref,
87            BlobArtifactDescriptor::lashlang_module(),
88            bytes,
89        )
90        .await
91        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
92        self.artifact_cache
93            .lock()
94            .map_err(|_| {
95                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
96            })?
97            .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
98        Ok(())
99    }
100
101    async fn get_module_artifact(
102        &self,
103        module_ref: &lashlang::ModuleRef,
104    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
105        if let Some(artifact) = self
106            .artifact_cache
107            .lock()
108            .map_err(|_| {
109                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
110            })?
111            .get(module_ref)
112            .cloned()
113        {
114            return Ok(Some(artifact));
115        }
116
117        let artifact_ref = module_ref.as_str().to_string();
118        let Some(bytes) = self
119            .get_artifact_ref_blob(
120                artifact_ref,
121                format!("lashlang module artifact `{module_ref}`"),
122            )
123            .await
124            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?
125        else {
126            return Ok(None);
127        };
128        let artifact = Arc::new(
129            lashlang::ModuleArtifact::from_store_bytes(&bytes)
130                .map_err(lashlang::ArtifactStoreError::from)?,
131        );
132        self.artifact_cache
133            .lock()
134            .map_err(|_| {
135                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
136            })?
137            .insert(module_ref.clone(), artifact.clone());
138        Ok(Some(artifact))
139    }
140
141    async fn put_artifact_bytes(
142        &self,
143        artifact_ref: &str,
144        descriptor: &str,
145        bytes: &[u8],
146    ) -> Result<(), lashlang::ArtifactStoreError> {
147        let artifact_ref = artifact_ref.to_string();
148        let descriptor = match descriptor {
149            "process_execution_env" => BlobArtifactDescriptor::process_execution_env(),
150            _ => BlobArtifactDescriptor::new(PersistedArtifactKind::GenericBlob, Vec::new()),
151        };
152        self.put_artifact_ref_blob(artifact_ref, descriptor, bytes.to_vec())
153            .await
154            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
155    }
156
157    async fn get_artifact_bytes(
158        &self,
159        artifact_ref: &str,
160    ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
161        let artifact_ref = artifact_ref.to_string();
162        self.get_artifact_ref_blob(artifact_ref.clone(), format!("artifact `{artifact_ref}`"))
163            .await
164            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
165    }
166}
167
168#[async_trait::async_trait]
169impl lash_core::ProcessExecutionEnvStore for Store {
170    fn durability_tier(&self) -> DurabilityTier {
171        DurabilityTier::Durable
172    }
173
174    async fn put_process_execution_env(
175        &self,
176        env_ref: &lash_core::ProcessExecutionEnvRef,
177        bytes: &[u8],
178    ) -> Result<(), lash_core::PluginError> {
179        let artifact_ref = env_ref.as_str().to_string();
180        self.put_artifact_ref_blob(
181            artifact_ref,
182            BlobArtifactDescriptor::process_execution_env(),
183            bytes.to_vec(),
184        )
185        .await
186        .map_err(|err| lash_core::PluginError::Session(err.to_string()))
187    }
188
189    async fn get_process_execution_env(
190        &self,
191        env_ref: &lash_core::ProcessExecutionEnvRef,
192    ) -> Result<Option<Vec<u8>>, lash_core::PluginError> {
193        let artifact_ref = env_ref.as_str().to_string();
194        self.get_artifact_ref_blob(
195            artifact_ref.clone(),
196            format!("process execution env `{artifact_ref}`"),
197        )
198        .await
199        .map_err(|err| lash_core::PluginError::Session(err.to_string()))
200    }
201}
202
203impl AttachmentManifest for Store {
204    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
205        block_on_store(async {
206            let attachment_id = intent.attachment_id.as_str().to_string();
207            let session_id = intent.session_id.as_str().to_string();
208            let canonical_uri = intent.canonical_uri.as_str().to_string();
209            let intent_at_ms = intent.intent_at_epoch_ms as i64;
210            self.conn
211                .call(move |conn| {
212                    conn.execute(
213                        "INSERT INTO attachment_manifest
214                            (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
215                         VALUES (?1, ?2, ?3, ?4, NULL)
216                         ON CONFLICT(attachment_id) DO NOTHING",
217                        params![attachment_id, session_id, canonical_uri, intent_at_ms],
218                    )
219                })
220                .await
221                .map_err(sqlite_error)?;
222            Ok(())
223        })
224    }
225
226    fn commit_refs(
227        &self,
228        session_id: &str,
229        attachment_ids: &[AttachmentId],
230    ) -> Result<(), StoreError> {
231        if attachment_ids.is_empty() {
232            return Ok(());
233        }
234        block_on_store(async {
235            let session_id = session_id.to_string();
236            let attachment_ids: Vec<String> = attachment_ids
237                .iter()
238                .map(|id| id.as_str().to_string())
239                .collect();
240            self.conn
241                .write(move |tx| {
242                    let now = current_epoch_ms() as i64;
243                    let mut stmt = tx.prepare(
244                        "UPDATE attachment_manifest
245                         SET committed_at_ms = COALESCE(committed_at_ms, ?1)
246                         WHERE attachment_id = ?2 AND session_id = ?3",
247                    )?;
248                    for id in &attachment_ids {
249                        stmt.execute(params![now, id, session_id])?;
250                    }
251                    Ok(())
252                })
253                .await
254                .map_err(sqlite_error)?;
255            Ok(())
256        })
257    }
258
259    fn list_uncommitted(
260        &self,
261        older_than_epoch_ms: u64,
262    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
263        block_on_store(async {
264            let older_than = older_than_epoch_ms as i64;
265            self.conn
266                .call(move |conn| {
267                    let mut stmt = conn.prepare(
268                        "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
269                         FROM attachment_manifest
270                         WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
271                         ORDER BY intent_at_ms ASC",
272                    )?;
273                    let rows = stmt.query_map(params![older_than], |row| {
274                        let id: String = row.get(0)?;
275                        let session_id: String = row.get(1)?;
276                        let canonical_uri: String = row.get(2)?;
277                        let intent_at_ms: i64 = row.get(3)?;
278                        let committed_at_ms: Option<i64> = row.get(4)?;
279                        Ok(AttachmentManifestEntry {
280                            attachment_id: AttachmentId::new(id),
281                            session_id,
282                            canonical_uri,
283                            intent_at_epoch_ms: intent_at_ms as u64,
284                            committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
285                        })
286                    })?;
287                    Ok(rows.filter_map(Result::ok).collect())
288                })
289                .await
290                .map_err(sqlite_error)
291        })
292    }
293
294    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
295        block_on_store(async {
296            let attachment_id = attachment_id.as_str().to_string();
297            self.conn
298                .call(move |conn| {
299                    conn.execute(
300                        "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
301                        params![attachment_id],
302                    )
303                })
304                .await
305                .map_err(sqlite_error)?;
306            Ok(())
307        })
308    }
309}