Skip to main content

lash_sqlite_store/
attachments.rs

1//! The lashlang module-artifact store and the attachment write-ahead manifest.
2//!
3//! Both traits in this module have synchronous-looking call sites in their
4//! consumers but bridge to the async [`SqliteConnection`] underneath:
5//!
6//! * [`lashlang::LashlangArtifactStore`] is itself an `#[async_trait]`, so its
7//!   methods `.await` the connection wrapper directly (matching the the prior store
8//!   store's async surface byte-for-byte).
9//! * [`AttachmentManifest`] is a *synchronous* trait. Its bodies therefore wrap
10//!   the async store work in [`block_on_store`], exactly as the prior store did.
11//!
12//! Every DB body is a synchronous rusqlite closure handed to `conn.call`
13//! (reads) or `conn.write` (read-then-write); only the wrapper call is awaited.
14
15use super::*;
16
17#[async_trait::async_trait]
18impl lashlang::LashlangArtifactStore for Store {
19    fn durability_tier(&self) -> lashlang::DurabilityTier {
20        lashlang::DurabilityTier::Durable
21    }
22
23    async fn put_module_artifact(
24        &self,
25        artifact: &lashlang::ModuleArtifact,
26    ) -> Result<(), lashlang::ArtifactStoreError> {
27        let bytes = artifact
28            .to_store_bytes()
29            .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
30        let blob_profile = self.options.blob_profile;
31        let artifact_ref = artifact.module_ref.as_str().to_string();
32        self.conn
33            .write(move |tx| {
34                let blob_ref = Self::insert_artifact_blob_conn(
35                    tx,
36                    BlobArtifactDescriptor::lashlang_module(),
37                    &bytes,
38                    blob_profile,
39                )?;
40                tx.execute(
41                    "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
42                    params![artifact_ref, blob_ref.as_str()],
43                )?;
44                Ok(())
45            })
46            .await
47            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
48        self.artifact_cache
49            .lock()
50            .map_err(|_| {
51                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
52            })?
53            .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
54        Ok(())
55    }
56
57    async fn get_module_artifact(
58        &self,
59        module_ref: &lashlang::ModuleRef,
60    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
61        if let Some(artifact) = self
62            .artifact_cache
63            .lock()
64            .map_err(|_| {
65                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
66            })?
67            .get(module_ref)
68            .cloned()
69        {
70            return Ok(Some(artifact));
71        }
72
73        let artifact_ref = module_ref.as_str().to_string();
74        // `Option<Option<Vec<u8>>>`: the outer `None` means no `artifact_refs`
75        // row exists (return `Ok(None)`); the inner `None` means the row points
76        // at a missing blob (a hard error, matching the prior store).
77        let resolved = self
78            .conn
79            .call(move |conn| {
80                let blob_ref: Option<String> = conn
81                    .query_row(
82                        "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
83                        params![artifact_ref],
84                        |row| row.get::<_, String>(0),
85                    )
86                    .optional()?;
87                let Some(blob_ref) = blob_ref else {
88                    return Ok(None);
89                };
90                Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
91            })
92            .await
93            .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
94        let Some(blob) = resolved else {
95            return Ok(None);
96        };
97        let bytes = blob.ok_or_else(|| {
98            lashlang::ArtifactStoreError::Backend(format!(
99                "lashlang module artifact `{}` points at a missing blob",
100                module_ref
101            ))
102        })?;
103        let artifact = Arc::new(
104            lashlang::ModuleArtifact::from_store_bytes(&bytes)
105                .map_err(lashlang::ArtifactStoreError::from)?,
106        );
107        self.artifact_cache
108            .lock()
109            .map_err(|_| {
110                lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
111            })?
112            .insert(module_ref.clone(), artifact.clone());
113        Ok(Some(artifact))
114    }
115}
116
117impl AttachmentManifest for Store {
118    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
119        block_on_store(async {
120            let attachment_id = intent.attachment_id.as_str().to_string();
121            let session_id = intent.session_id.as_str().to_string();
122            let canonical_uri = intent.canonical_uri.as_str().to_string();
123            let intent_at_ms = intent.intent_at_epoch_ms as i64;
124            self.conn
125                .call(move |conn| {
126                    conn.execute(
127                        "INSERT INTO attachment_manifest
128                            (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
129                         VALUES (?1, ?2, ?3, ?4, NULL)
130                         ON CONFLICT(attachment_id) DO NOTHING",
131                        params![attachment_id, session_id, canonical_uri, intent_at_ms],
132                    )
133                })
134                .await
135                .map_err(sqlite_error)?;
136            Ok(())
137        })
138    }
139
140    fn commit_refs(
141        &self,
142        session_id: &str,
143        attachment_ids: &[AttachmentId],
144    ) -> Result<(), StoreError> {
145        if attachment_ids.is_empty() {
146            return Ok(());
147        }
148        block_on_store(async {
149            let session_id = session_id.to_string();
150            let attachment_ids: Vec<String> = attachment_ids
151                .iter()
152                .map(|id| id.as_str().to_string())
153                .collect();
154            self.conn
155                .write(move |tx| {
156                    let now = current_epoch_ms() as i64;
157                    let mut stmt = tx.prepare(
158                        "UPDATE attachment_manifest
159                         SET committed_at_ms = COALESCE(committed_at_ms, ?1)
160                         WHERE attachment_id = ?2 AND session_id = ?3",
161                    )?;
162                    for id in &attachment_ids {
163                        stmt.execute(params![now, id, session_id])?;
164                    }
165                    Ok(())
166                })
167                .await
168                .map_err(sqlite_error)?;
169            Ok(())
170        })
171    }
172
173    fn list_uncommitted(
174        &self,
175        older_than_epoch_ms: u64,
176    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
177        block_on_store(async {
178            let older_than = older_than_epoch_ms as i64;
179            self.conn
180                .call(move |conn| {
181                    let mut stmt = conn.prepare(
182                        "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
183                         FROM attachment_manifest
184                         WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
185                         ORDER BY intent_at_ms ASC",
186                    )?;
187                    let rows = stmt.query_map(params![older_than], |row| {
188                        let id: String = row.get(0)?;
189                        let session_id: String = row.get(1)?;
190                        let canonical_uri: String = row.get(2)?;
191                        let intent_at_ms: i64 = row.get(3)?;
192                        let committed_at_ms: Option<i64> = row.get(4)?;
193                        Ok(AttachmentManifestEntry {
194                            attachment_id: AttachmentId::new(id),
195                            session_id,
196                            canonical_uri,
197                            intent_at_epoch_ms: intent_at_ms as u64,
198                            committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
199                        })
200                    })?;
201                    Ok(rows.filter_map(Result::ok).collect())
202                })
203                .await
204                .map_err(sqlite_error)
205        })
206    }
207
208    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
209        block_on_store(async {
210            let attachment_id = attachment_id.as_str().to_string();
211            self.conn
212                .call(move |conn| {
213                    conn.execute(
214                        "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
215                        params![attachment_id],
216                    )
217                })
218                .await
219                .map_err(sqlite_error)?;
220            Ok(())
221        })
222    }
223}