1use super::*;
16
17#[async_trait::async_trait]
18impl lashlang::LashlangArtifactStore for Store {
19 fn durability_tier(&self) -> lashlang::DurabilityTier {
20 lashlang::DurabilityTier::Durable
21 }
22
23 async fn put_module_artifact(
24 &self,
25 artifact: &lashlang::ModuleArtifact,
26 ) -> Result<(), lashlang::ArtifactStoreError> {
27 let bytes = artifact
28 .to_store_bytes()
29 .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
30 let blob_profile = self.options.blob_profile;
31 let artifact_ref = artifact.module_ref.as_str().to_string();
32 self.conn
33 .write(move |tx| {
34 let blob_ref = Self::insert_artifact_blob_conn(
35 tx,
36 BlobArtifactDescriptor::lashlang_module(),
37 &bytes,
38 blob_profile,
39 )?;
40 tx.execute(
41 "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
42 params![artifact_ref, blob_ref.as_str()],
43 )?;
44 Ok(())
45 })
46 .await
47 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
48 self.artifact_cache
49 .lock()
50 .map_err(|_| {
51 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
52 })?
53 .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
54 Ok(())
55 }
56
57 async fn get_module_artifact(
58 &self,
59 module_ref: &lashlang::ModuleRef,
60 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
61 if let Some(artifact) = self
62 .artifact_cache
63 .lock()
64 .map_err(|_| {
65 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
66 })?
67 .get(module_ref)
68 .cloned()
69 {
70 return Ok(Some(artifact));
71 }
72
73 let artifact_ref = module_ref.as_str().to_string();
74 let resolved = self
78 .conn
79 .call(move |conn| {
80 let blob_ref: Option<String> = conn
81 .query_row(
82 "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
83 params![artifact_ref],
84 |row| row.get::<_, String>(0),
85 )
86 .optional()?;
87 let Some(blob_ref) = blob_ref else {
88 return Ok(None);
89 };
90 Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
91 })
92 .await
93 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
94 let Some(blob) = resolved else {
95 return Ok(None);
96 };
97 let bytes = blob.ok_or_else(|| {
98 lashlang::ArtifactStoreError::Backend(format!(
99 "lashlang module artifact `{}` points at a missing blob",
100 module_ref
101 ))
102 })?;
103 let artifact = Arc::new(
104 lashlang::ModuleArtifact::from_store_bytes(&bytes)
105 .map_err(lashlang::ArtifactStoreError::from)?,
106 );
107 self.artifact_cache
108 .lock()
109 .map_err(|_| {
110 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
111 })?
112 .insert(module_ref.clone(), artifact.clone());
113 Ok(Some(artifact))
114 }
115
116 async fn put_artifact_bytes(
117 &self,
118 artifact_ref: &str,
119 descriptor: &str,
120 bytes: &[u8],
121 ) -> Result<(), lashlang::ArtifactStoreError> {
122 let blob_profile = self.options.blob_profile;
123 let artifact_ref = artifact_ref.to_string();
124 let descriptor = match descriptor {
125 "process_execution_env" => BlobArtifactDescriptor::process_execution_env(),
126 _ => BlobArtifactDescriptor::new(PersistedArtifactKind::GenericBlob, Vec::new()),
127 };
128 let bytes = bytes.to_vec();
129 self.conn
130 .write(move |tx| {
131 let blob_ref =
132 Self::insert_artifact_blob_conn(tx, descriptor, &bytes, blob_profile)?;
133 tx.execute(
134 "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
135 params![artifact_ref, blob_ref.as_str()],
136 )?;
137 Ok(())
138 })
139 .await
140 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
141 }
142
143 async fn get_artifact_bytes(
144 &self,
145 artifact_ref: &str,
146 ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
147 let artifact_ref = artifact_ref.to_string();
148 let diagnostic_ref = artifact_ref.clone();
149 let resolved = self
150 .conn
151 .call(move |conn| {
152 let blob_ref: Option<String> = conn
153 .query_row(
154 "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
155 params![artifact_ref],
156 |row| row.get::<_, String>(0),
157 )
158 .optional()?;
159 let Some(blob_ref) = blob_ref else {
160 return Ok(None);
161 };
162 Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
163 })
164 .await
165 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
166 let Some(blob) = resolved else {
167 return Ok(None);
168 };
169 let bytes = blob.ok_or_else(|| {
170 lashlang::ArtifactStoreError::Backend(format!(
171 "artifact `{diagnostic_ref}` points at a missing blob"
172 ))
173 })?;
174 Ok(Some(bytes))
175 }
176}
177
178impl AttachmentManifest for Store {
179 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
180 block_on_store(async {
181 let attachment_id = intent.attachment_id.as_str().to_string();
182 let session_id = intent.session_id.as_str().to_string();
183 let canonical_uri = intent.canonical_uri.as_str().to_string();
184 let intent_at_ms = intent.intent_at_epoch_ms as i64;
185 self.conn
186 .call(move |conn| {
187 conn.execute(
188 "INSERT INTO attachment_manifest
189 (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
190 VALUES (?1, ?2, ?3, ?4, NULL)
191 ON CONFLICT(attachment_id) DO NOTHING",
192 params![attachment_id, session_id, canonical_uri, intent_at_ms],
193 )
194 })
195 .await
196 .map_err(sqlite_error)?;
197 Ok(())
198 })
199 }
200
201 fn commit_refs(
202 &self,
203 session_id: &str,
204 attachment_ids: &[AttachmentId],
205 ) -> Result<(), StoreError> {
206 if attachment_ids.is_empty() {
207 return Ok(());
208 }
209 block_on_store(async {
210 let session_id = session_id.to_string();
211 let attachment_ids: Vec<String> = attachment_ids
212 .iter()
213 .map(|id| id.as_str().to_string())
214 .collect();
215 self.conn
216 .write(move |tx| {
217 let now = current_epoch_ms() as i64;
218 let mut stmt = tx.prepare(
219 "UPDATE attachment_manifest
220 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
221 WHERE attachment_id = ?2 AND session_id = ?3",
222 )?;
223 for id in &attachment_ids {
224 stmt.execute(params![now, id, session_id])?;
225 }
226 Ok(())
227 })
228 .await
229 .map_err(sqlite_error)?;
230 Ok(())
231 })
232 }
233
234 fn list_uncommitted(
235 &self,
236 older_than_epoch_ms: u64,
237 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
238 block_on_store(async {
239 let older_than = older_than_epoch_ms as i64;
240 self.conn
241 .call(move |conn| {
242 let mut stmt = conn.prepare(
243 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
244 FROM attachment_manifest
245 WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
246 ORDER BY intent_at_ms ASC",
247 )?;
248 let rows = stmt.query_map(params![older_than], |row| {
249 let id: String = row.get(0)?;
250 let session_id: String = row.get(1)?;
251 let canonical_uri: String = row.get(2)?;
252 let intent_at_ms: i64 = row.get(3)?;
253 let committed_at_ms: Option<i64> = row.get(4)?;
254 Ok(AttachmentManifestEntry {
255 attachment_id: AttachmentId::new(id),
256 session_id,
257 canonical_uri,
258 intent_at_epoch_ms: intent_at_ms as u64,
259 committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
260 })
261 })?;
262 Ok(rows.filter_map(Result::ok).collect())
263 })
264 .await
265 .map_err(sqlite_error)
266 })
267 }
268
269 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
270 block_on_store(async {
271 let attachment_id = attachment_id.as_str().to_string();
272 self.conn
273 .call(move |conn| {
274 conn.execute(
275 "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
276 params![attachment_id],
277 )
278 })
279 .await
280 .map_err(sqlite_error)?;
281 Ok(())
282 })
283 }
284}