Skip to main content

lash_sqlite_store/
attachments.rs

1//! The lashlang module-artifact store and the attachment write-ahead manifest.
2//!
3//! Both traits in this module have synchronous-looking call sites in their
4//! consumers but bridge to the async [`SqliteConnection`] underneath:
5//!
6//! * [`lashlang::LashlangArtifactStore`] is itself an `#[async_trait]`, so its
7//!   methods `.await` the connection wrapper directly (matching the the prior store
8//!   store's async surface byte-for-byte).
9//! * [`AttachmentManifest`] is a *synchronous* trait. Its bodies therefore wrap
10//!   the async store work in [`block_on_store`], exactly as the prior store did.
11//!
12//! Every DB body is a synchronous rusqlite closure handed to `conn.call`
13//! (reads) or `conn.write` (read-then-write); only the wrapper call is awaited.
14
15use super::*;
16
17/// Logical keyspaces multiplexed onto the `artifact_refs` pointer table. Each
18/// namespace owns its own half of the `(namespace, artifact_ref)` composite
19/// primary key. The `blobs` table is content-addressed, but the `artifact_refs`
20/// pointer is *not*: without the namespace column, a module ref that collides
21/// with a process-execution-env ref would rewrite the same pointer row under
22/// `INSERT OR REPLACE`, so content-addressing alone does not keep the namespaces
23/// disjoint. The composite key does.
24pub(crate) const MODULE_ARTIFACT_NAMESPACE: &str = "lashlang_module";
25pub(crate) const RAW_ARTIFACT_NAMESPACE: &str = "lashlang_artifact";
26pub(crate) const PROCESS_ENV_NAMESPACE: &str = "process_execution_env";
27
28impl Store {
29    async fn put_artifact_ref_blob(
30        &self,
31        namespace: &'static str,
32        artifact_ref: String,
33        descriptor: BlobArtifactDescriptor,
34        bytes: Vec<u8>,
35    ) -> Result<(), StoreError> {
36        let blob_profile = self.options.blob_profile;
37        self.conn
38            .write(move |tx| {
39                let blob_ref =
40                    Self::insert_artifact_blob_conn(tx, descriptor, &bytes, blob_profile)?;
41                tx.execute(
42                    "INSERT OR REPLACE INTO artifact_refs (namespace, artifact_ref, blob_ref)
43                     VALUES (?1, ?2, ?3)",
44                    params![namespace, artifact_ref, blob_ref.as_str()],
45                )?;
46                Ok(())
47            })
48            .await
49            .map_err(sqlite_error)
50    }
51
52    async fn get_artifact_ref_blob(
53        &self,
54        namespace: &'static str,
55        artifact_ref: String,
56        missing_diagnostic: String,
57    ) -> Result<Option<Vec<u8>>, StoreError> {
58        let resolved = self
59            .conn
60            .call(move |conn| {
61                let blob_ref: Option<String> = conn
62                    .query_row(
63                        "SELECT blob_ref FROM artifact_refs
64                         WHERE namespace = ?1 AND artifact_ref = ?2",
65                        params![namespace, artifact_ref],
66                        |row| row.get::<_, String>(0),
67                    )
68                    .optional()?;
69                let Some(blob_ref) = blob_ref else {
70                    return Ok(None);
71                };
72                Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
73            })
74            .await
75            .map_err(sqlite_error)?;
76        let Some(blob) = resolved else {
77            return Ok(None);
78        };
79        blob.ok_or_else(|| {
80            StoreError::Backend(format!("{missing_diagnostic} points at a missing blob"))
81        })
82        .map(Some)
83    }
84}
85
86#[async_trait::async_trait]
87impl lashlang::LashlangArtifactStore for Store {
88    fn durability_tier(&self) -> lashlang::DurabilityTier {
89        lashlang::DurabilityTier::Durable
90    }
91
92    async fn put_module_artifact(
93        &self,
94        artifact: &lashlang::ModuleArtifact,
95    ) -> Result<(), lashlang::ArtifactStoreError> {
96        let bytes = artifact
97            .to_store_bytes()
98            .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
99        let artifact_ref = artifact.module_ref.as_str().to_string();
100        self.put_artifact_ref_blob(
101            MODULE_ARTIFACT_NAMESPACE,
102            artifact_ref,
103            BlobArtifactDescriptor::lashlang_module(),
104            bytes,
105        )
106        .await
107        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
108        self.artifact_cache
109            .lock()
110            .map_err(|_| {
111                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
112            })?
113            .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
114        Ok(())
115    }
116
117    async fn get_module_artifact(
118        &self,
119        module_ref: &lashlang::ModuleRef,
120    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
121        if let Some(artifact) = self
122            .artifact_cache
123            .lock()
124            .map_err(|_| {
125                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
126            })?
127            .get(module_ref)
128            .cloned()
129        {
130            return Ok(Some(artifact));
131        }
132
133        let artifact_ref = module_ref.as_str().to_string();
134        let Some(bytes) = self
135            .get_artifact_ref_blob(
136                MODULE_ARTIFACT_NAMESPACE,
137                artifact_ref,
138                format!("lashlang module artifact `{module_ref}`"),
139            )
140            .await
141            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?
142        else {
143            return Ok(None);
144        };
145        let artifact = Arc::new(
146            lashlang::ModuleArtifact::from_store_bytes(&bytes)
147                .map_err(lashlang::ArtifactStoreError::from)?,
148        );
149        self.artifact_cache
150            .lock()
151            .map_err(|_| {
152                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
153            })?
154            .insert(module_ref.clone(), artifact.clone());
155        Ok(Some(artifact))
156    }
157
158    async fn put_artifact_bytes(
159        &self,
160        artifact_ref: &str,
161        descriptor: &str,
162        bytes: &[u8],
163    ) -> Result<(), lashlang::ArtifactStoreError> {
164        let artifact_ref = artifact_ref.to_string();
165        let descriptor = match descriptor {
166            "process_execution_env" => BlobArtifactDescriptor::process_execution_env(),
167            _ => BlobArtifactDescriptor::new(PersistedArtifactKind::GenericBlob, Vec::new()),
168        };
169        self.put_artifact_ref_blob(
170            RAW_ARTIFACT_NAMESPACE,
171            artifact_ref,
172            descriptor,
173            bytes.to_vec(),
174        )
175        .await
176        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
177    }
178
179    async fn get_artifact_bytes(
180        &self,
181        artifact_ref: &str,
182    ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
183        let artifact_ref = artifact_ref.to_string();
184        self.get_artifact_ref_blob(
185            RAW_ARTIFACT_NAMESPACE,
186            artifact_ref.clone(),
187            format!("artifact `{artifact_ref}`"),
188        )
189        .await
190        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
191    }
192}
193
194#[async_trait::async_trait]
195impl lash_core::ProcessExecutionEnvStore for Store {
196    fn durability_tier(&self) -> DurabilityTier {
197        DurabilityTier::Durable
198    }
199
200    async fn put_process_execution_env(
201        &self,
202        env_ref: &lash_core::ProcessExecutionEnvRef,
203        bytes: &[u8],
204    ) -> Result<(), lash_core::PluginError> {
205        let artifact_ref = env_ref.as_str().to_string();
206        self.put_artifact_ref_blob(
207            PROCESS_ENV_NAMESPACE,
208            artifact_ref,
209            BlobArtifactDescriptor::process_execution_env(),
210            bytes.to_vec(),
211        )
212        .await
213        .map_err(|err| lash_core::PluginError::Session(err.to_string()))
214    }
215
216    async fn get_process_execution_env(
217        &self,
218        env_ref: &lash_core::ProcessExecutionEnvRef,
219    ) -> Result<Option<Vec<u8>>, lash_core::PluginError> {
220        let artifact_ref = env_ref.as_str().to_string();
221        self.get_artifact_ref_blob(
222            PROCESS_ENV_NAMESPACE,
223            artifact_ref.clone(),
224            format!("process execution env `{artifact_ref}`"),
225        )
226        .await
227        .map_err(|err| lash_core::PluginError::Session(err.to_string()))
228    }
229}
230
231impl AttachmentManifest for Store {
232    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
233        block_on_store(async {
234            let attachment_id = intent.attachment_id.as_str().to_string();
235            let session_id = intent.session_id.as_str().to_string();
236            let canonical_uri = intent.canonical_uri.as_str().to_string();
237            let intent_at_ms = intent.intent_at_epoch_ms as i64;
238            self.conn
239                .call(move |conn| {
240                    conn.execute(
241                        "INSERT INTO attachment_manifest
242                            (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
243                         VALUES (?1, ?2, ?3, ?4, NULL)
244                         ON CONFLICT(attachment_id) DO NOTHING",
245                        params![attachment_id, session_id, canonical_uri, intent_at_ms],
246                    )
247                })
248                .await
249                .map_err(sqlite_error)?;
250            Ok(())
251        })
252    }
253
254    fn commit_refs(
255        &self,
256        session_id: &str,
257        attachment_ids: &[AttachmentId],
258    ) -> Result<(), StoreError> {
259        if attachment_ids.is_empty() {
260            return Ok(());
261        }
262        block_on_store(async {
263            let session_id = session_id.to_string();
264            let attachment_ids: Vec<String> = attachment_ids
265                .iter()
266                .map(|id| id.as_str().to_string())
267                .collect();
268            self.conn
269                .write(move |tx| {
270                    let now = current_epoch_ms() as i64;
271                    let mut stmt = tx.prepare(
272                        "UPDATE attachment_manifest
273                         SET committed_at_ms = COALESCE(committed_at_ms, ?1)
274                         WHERE attachment_id = ?2 AND session_id = ?3",
275                    )?;
276                    for id in &attachment_ids {
277                        stmt.execute(params![now, id, session_id])?;
278                    }
279                    Ok(())
280                })
281                .await
282                .map_err(sqlite_error)?;
283            Ok(())
284        })
285    }
286
287    fn list_uncommitted(
288        &self,
289        older_than_epoch_ms: u64,
290    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
291        block_on_store(async {
292            let older_than = older_than_epoch_ms as i64;
293            self.conn
294                .call(move |conn| {
295                    let mut stmt = conn.prepare(
296                        "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
297                         FROM attachment_manifest
298                         WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
299                         ORDER BY intent_at_ms ASC",
300                    )?;
301                    let rows = stmt.query_map(params![older_than], |row| {
302                        let id: String = row.get(0)?;
303                        let session_id: String = row.get(1)?;
304                        let canonical_uri: String = row.get(2)?;
305                        let intent_at_ms: i64 = row.get(3)?;
306                        let committed_at_ms: Option<i64> = row.get(4)?;
307                        Ok(AttachmentManifestEntry {
308                            attachment_id: AttachmentId::new(id),
309                            session_id,
310                            canonical_uri,
311                            intent_at_epoch_ms: intent_at_ms as u64,
312                            committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
313                        })
314                    })?;
315                    Ok(rows.filter_map(Result::ok).collect())
316                })
317                .await
318                .map_err(sqlite_error)
319        })
320    }
321
322    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
323        block_on_store(async {
324            let attachment_id = attachment_id.as_str().to_string();
325            self.conn
326                .call(move |conn| {
327                    conn.execute(
328                        "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
329                        params![attachment_id],
330                    )
331                })
332                .await
333                .map_err(sqlite_error)?;
334            Ok(())
335        })
336    }
337}