Skip to main content

lash_sqlite_store/
attachments.rs

1//! The lashlang module-artifact store and the attachment write-ahead manifest.
2//!
3//! Both traits in this module have synchronous-looking call sites in their
4//! consumers but bridge to the async [`SqliteConnection`] underneath:
5//!
6//! * [`lashlang::LashlangArtifactStore`] is itself an `#[async_trait]`, so its
7//!   methods `.await` the connection wrapper directly (matching the the prior store
8//!   store's async surface byte-for-byte).
9//! * [`AttachmentManifest`] is a *synchronous* trait. Its bodies therefore wrap
10//!   the async store work in [`block_on_store`], exactly as the prior store did.
11//!
12//! Every DB body is a synchronous rusqlite closure handed to `conn.call`
13//! (reads) or `conn.write` (read-then-write); only the wrapper call is awaited.
14
15use super::*;
16
17#[async_trait::async_trait]
18impl lashlang::LashlangArtifactStore for Store {
19    fn durability_tier(&self) -> lashlang::DurabilityTier {
20        lashlang::DurabilityTier::Durable
21    }
22
23    async fn put_module_artifact(
24        &self,
25        artifact: &lashlang::ModuleArtifact,
26    ) -> Result<(), lashlang::ArtifactStoreError> {
27        let bytes = artifact
28            .to_store_bytes()
29            .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
30        let blob_profile = self.options.blob_profile;
31        let artifact_ref = artifact.module_ref.as_str().to_string();
32        self.conn
33            .write(move |tx| {
34                let blob_ref = Self::insert_artifact_blob_conn(
35                    tx,
36                    BlobArtifactDescriptor::lashlang_module(),
37                    &bytes,
38                    blob_profile,
39                )?;
40                tx.execute(
41                    "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
42                    params![artifact_ref, blob_ref.as_str()],
43                )?;
44                Ok(())
45            })
46            .await
47            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
48        self.artifact_cache
49            .lock()
50            .map_err(|_| {
51                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
52            })?
53            .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
54        Ok(())
55    }
56
57    async fn get_module_artifact(
58        &self,
59        module_ref: &lashlang::ModuleRef,
60    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
61        if let Some(artifact) = self
62            .artifact_cache
63            .lock()
64            .map_err(|_| {
65                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
66            })?
67            .get(module_ref)
68            .cloned()
69        {
70            return Ok(Some(artifact));
71        }
72
73        let artifact_ref = module_ref.as_str().to_string();
74        // `Option<Option<Vec<u8>>>`: the outer `None` means no `artifact_refs`
75        // row exists (return `Ok(None)`); the inner `None` means the row points
76        // at a missing blob (a hard error, matching the prior store).
77        let resolved = self
78            .conn
79            .call(move |conn| {
80                let blob_ref: Option<String> = conn
81                    .query_row(
82                        "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
83                        params![artifact_ref],
84                        |row| row.get::<_, String>(0),
85                    )
86                    .optional()?;
87                let Some(blob_ref) = blob_ref else {
88                    return Ok(None);
89                };
90                Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
91            })
92            .await
93            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
94        let Some(blob) = resolved else {
95            return Ok(None);
96        };
97        let bytes = blob.ok_or_else(|| {
98            lashlang::ArtifactStoreError::Backend(format!(
99                "lashlang module artifact `{}` points at a missing blob",
100                module_ref
101            ))
102        })?;
103        let artifact = Arc::new(
104            lashlang::ModuleArtifact::from_store_bytes(&bytes)
105                .map_err(lashlang::ArtifactStoreError::from)?,
106        );
107        self.artifact_cache
108            .lock()
109            .map_err(|_| {
110                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
111            })?
112            .insert(module_ref.clone(), artifact.clone());
113        Ok(Some(artifact))
114    }
115
116    async fn put_artifact_bytes(
117        &self,
118        artifact_ref: &str,
119        descriptor: &str,
120        bytes: &[u8],
121    ) -> Result<(), lashlang::ArtifactStoreError> {
122        let blob_profile = self.options.blob_profile;
123        let artifact_ref = artifact_ref.to_string();
124        let descriptor = match descriptor {
125            "process_execution_env" => BlobArtifactDescriptor::process_execution_env(),
126            _ => BlobArtifactDescriptor::new(PersistedArtifactKind::GenericBlob, Vec::new()),
127        };
128        let bytes = bytes.to_vec();
129        self.conn
130            .write(move |tx| {
131                let blob_ref =
132                    Self::insert_artifact_blob_conn(tx, descriptor, &bytes, blob_profile)?;
133                tx.execute(
134                    "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
135                    params![artifact_ref, blob_ref.as_str()],
136                )?;
137                Ok(())
138            })
139            .await
140            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
141    }
142
143    async fn get_artifact_bytes(
144        &self,
145        artifact_ref: &str,
146    ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
147        let artifact_ref = artifact_ref.to_string();
148        let diagnostic_ref = artifact_ref.clone();
149        let resolved = self
150            .conn
151            .call(move |conn| {
152                let blob_ref: Option<String> = conn
153                    .query_row(
154                        "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
155                        params![artifact_ref],
156                        |row| row.get::<_, String>(0),
157                    )
158                    .optional()?;
159                let Some(blob_ref) = blob_ref else {
160                    return Ok(None);
161                };
162                Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
163            })
164            .await
165            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
166        let Some(blob) = resolved else {
167            return Ok(None);
168        };
169        let bytes = blob.ok_or_else(|| {
170            lashlang::ArtifactStoreError::Backend(format!(
171                "artifact `{diagnostic_ref}` points at a missing blob"
172            ))
173        })?;
174        Ok(Some(bytes))
175    }
176}
177
178impl AttachmentManifest for Store {
179    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
180        block_on_store(async {
181            let attachment_id = intent.attachment_id.as_str().to_string();
182            let session_id = intent.session_id.as_str().to_string();
183            let canonical_uri = intent.canonical_uri.as_str().to_string();
184            let intent_at_ms = intent.intent_at_epoch_ms as i64;
185            self.conn
186                .call(move |conn| {
187                    conn.execute(
188                        "INSERT INTO attachment_manifest
189                            (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
190                         VALUES (?1, ?2, ?3, ?4, NULL)
191                         ON CONFLICT(attachment_id) DO NOTHING",
192                        params![attachment_id, session_id, canonical_uri, intent_at_ms],
193                    )
194                })
195                .await
196                .map_err(sqlite_error)?;
197            Ok(())
198        })
199    }
200
201    fn commit_refs(
202        &self,
203        session_id: &str,
204        attachment_ids: &[AttachmentId],
205    ) -> Result<(), StoreError> {
206        if attachment_ids.is_empty() {
207            return Ok(());
208        }
209        block_on_store(async {
210            let session_id = session_id.to_string();
211            let attachment_ids: Vec<String> = attachment_ids
212                .iter()
213                .map(|id| id.as_str().to_string())
214                .collect();
215            self.conn
216                .write(move |tx| {
217                    let now = current_epoch_ms() as i64;
218                    let mut stmt = tx.prepare(
219                        "UPDATE attachment_manifest
220                         SET committed_at_ms = COALESCE(committed_at_ms, ?1)
221                         WHERE attachment_id = ?2 AND session_id = ?3",
222                    )?;
223                    for id in &attachment_ids {
224                        stmt.execute(params![now, id, session_id])?;
225                    }
226                    Ok(())
227                })
228                .await
229                .map_err(sqlite_error)?;
230            Ok(())
231        })
232    }
233
234    fn list_uncommitted(
235        &self,
236        older_than_epoch_ms: u64,
237    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
238        block_on_store(async {
239            let older_than = older_than_epoch_ms as i64;
240            self.conn
241                .call(move |conn| {
242                    let mut stmt = conn.prepare(
243                        "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
244                         FROM attachment_manifest
245                         WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
246                         ORDER BY intent_at_ms ASC",
247                    )?;
248                    let rows = stmt.query_map(params![older_than], |row| {
249                        let id: String = row.get(0)?;
250                        let session_id: String = row.get(1)?;
251                        let canonical_uri: String = row.get(2)?;
252                        let intent_at_ms: i64 = row.get(3)?;
253                        let committed_at_ms: Option<i64> = row.get(4)?;
254                        Ok(AttachmentManifestEntry {
255                            attachment_id: AttachmentId::new(id),
256                            session_id,
257                            canonical_uri,
258                            intent_at_epoch_ms: intent_at_ms as u64,
259                            committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
260                        })
261                    })?;
262                    Ok(rows.filter_map(Result::ok).collect())
263                })
264                .await
265                .map_err(sqlite_error)
266        })
267    }
268
269    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
270        block_on_store(async {
271            let attachment_id = attachment_id.as_str().to_string();
272            self.conn
273                .call(move |conn| {
274                    conn.execute(
275                        "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
276                        params![attachment_id],
277                    )
278                })
279                .await
280                .map_err(sqlite_error)?;
281            Ok(())
282        })
283    }
284}