1use super::*;
16
17pub(crate) const MODULE_ARTIFACT_NAMESPACE: &str = "lashlang_module";
25pub(crate) const RAW_ARTIFACT_NAMESPACE: &str = "lashlang_artifact";
26pub(crate) const PROCESS_ENV_NAMESPACE: &str = "process_execution_env";
27
28impl Store {
29 async fn put_artifact_ref_blob(
30 &self,
31 namespace: &'static str,
32 artifact_ref: String,
33 descriptor: BlobArtifactDescriptor,
34 bytes: Vec<u8>,
35 ) -> Result<(), StoreError> {
36 let blob_profile = self.options.blob_profile;
37 self.conn
38 .write(move |tx| {
39 let blob_ref =
40 Self::insert_artifact_blob_conn(tx, descriptor, &bytes, blob_profile)?;
41 tx.execute(
42 "INSERT OR REPLACE INTO artifact_refs (namespace, artifact_ref, blob_ref)
43 VALUES (?1, ?2, ?3)",
44 params![namespace, artifact_ref, blob_ref.as_str()],
45 )?;
46 Ok(())
47 })
48 .await
49 .map_err(sqlite_error)
50 }
51
52 async fn get_artifact_ref_blob(
53 &self,
54 namespace: &'static str,
55 artifact_ref: String,
56 missing_diagnostic: String,
57 ) -> Result<Option<Vec<u8>>, StoreError> {
58 let resolved = self
59 .conn
60 .call(move |conn| {
61 let blob_ref: Option<String> = conn
62 .query_row(
63 "SELECT blob_ref FROM artifact_refs
64 WHERE namespace = ?1 AND artifact_ref = ?2",
65 params![namespace, artifact_ref],
66 |row| row.get::<_, String>(0),
67 )
68 .optional()?;
69 let Some(blob_ref) = blob_ref else {
70 return Ok(None);
71 };
72 Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
73 })
74 .await
75 .map_err(sqlite_error)?;
76 let Some(blob) = resolved else {
77 return Ok(None);
78 };
79 blob.ok_or_else(|| {
80 StoreError::Backend(format!("{missing_diagnostic} points at a missing blob"))
81 })
82 .map(Some)
83 }
84}
85
86#[async_trait::async_trait]
87impl lashlang::LashlangArtifactStore for Store {
88 fn durability_tier(&self) -> lashlang::DurabilityTier {
89 lashlang::DurabilityTier::Durable
90 }
91
92 async fn put_module_artifact(
93 &self,
94 artifact: &lashlang::ModuleArtifact,
95 ) -> Result<(), lashlang::ArtifactStoreError> {
96 let bytes = artifact
97 .to_store_bytes()
98 .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
99 let artifact_ref = artifact.module_ref.as_str().to_string();
100 self.put_artifact_ref_blob(
101 MODULE_ARTIFACT_NAMESPACE,
102 artifact_ref,
103 BlobArtifactDescriptor::lashlang_module(),
104 bytes,
105 )
106 .await
107 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
108 self.artifact_cache
109 .lock()
110 .map_err(|_| {
111 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
112 })?
113 .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
114 Ok(())
115 }
116
117 async fn get_module_artifact(
118 &self,
119 module_ref: &lashlang::ModuleRef,
120 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
121 if let Some(artifact) = self
122 .artifact_cache
123 .lock()
124 .map_err(|_| {
125 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
126 })?
127 .get(module_ref)
128 .cloned()
129 {
130 return Ok(Some(artifact));
131 }
132
133 let artifact_ref = module_ref.as_str().to_string();
134 let Some(bytes) = self
135 .get_artifact_ref_blob(
136 MODULE_ARTIFACT_NAMESPACE,
137 artifact_ref,
138 format!("lashlang module artifact `{module_ref}`"),
139 )
140 .await
141 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?
142 else {
143 return Ok(None);
144 };
145 let artifact = Arc::new(
146 lashlang::ModuleArtifact::from_store_bytes(&bytes)
147 .map_err(lashlang::ArtifactStoreError::from)?,
148 );
149 self.artifact_cache
150 .lock()
151 .map_err(|_| {
152 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
153 })?
154 .insert(module_ref.clone(), artifact.clone());
155 Ok(Some(artifact))
156 }
157
158 async fn put_artifact_bytes(
159 &self,
160 artifact_ref: &str,
161 descriptor: &str,
162 bytes: &[u8],
163 ) -> Result<(), lashlang::ArtifactStoreError> {
164 let artifact_ref = artifact_ref.to_string();
165 let descriptor = match descriptor {
166 "process_execution_env" => BlobArtifactDescriptor::process_execution_env(),
167 _ => BlobArtifactDescriptor::new(PersistedArtifactKind::GenericBlob, Vec::new()),
168 };
169 self.put_artifact_ref_blob(
170 RAW_ARTIFACT_NAMESPACE,
171 artifact_ref,
172 descriptor,
173 bytes.to_vec(),
174 )
175 .await
176 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
177 }
178
179 async fn get_artifact_bytes(
180 &self,
181 artifact_ref: &str,
182 ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
183 let artifact_ref = artifact_ref.to_string();
184 self.get_artifact_ref_blob(
185 RAW_ARTIFACT_NAMESPACE,
186 artifact_ref.clone(),
187 format!("artifact `{artifact_ref}`"),
188 )
189 .await
190 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
191 }
192}
193
194#[async_trait::async_trait]
195impl lash_core::ProcessExecutionEnvStore for Store {
196 fn durability_tier(&self) -> DurabilityTier {
197 DurabilityTier::Durable
198 }
199
200 async fn put_process_execution_env(
201 &self,
202 env_ref: &lash_core::ProcessExecutionEnvRef,
203 bytes: &[u8],
204 ) -> Result<(), lash_core::PluginError> {
205 let artifact_ref = env_ref.as_str().to_string();
206 self.put_artifact_ref_blob(
207 PROCESS_ENV_NAMESPACE,
208 artifact_ref,
209 BlobArtifactDescriptor::process_execution_env(),
210 bytes.to_vec(),
211 )
212 .await
213 .map_err(|err| lash_core::PluginError::Session(err.to_string()))
214 }
215
216 async fn get_process_execution_env(
217 &self,
218 env_ref: &lash_core::ProcessExecutionEnvRef,
219 ) -> Result<Option<Vec<u8>>, lash_core::PluginError> {
220 let artifact_ref = env_ref.as_str().to_string();
221 self.get_artifact_ref_blob(
222 PROCESS_ENV_NAMESPACE,
223 artifact_ref.clone(),
224 format!("process execution env `{artifact_ref}`"),
225 )
226 .await
227 .map_err(|err| lash_core::PluginError::Session(err.to_string()))
228 }
229}
230
231impl AttachmentManifest for Store {
232 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
233 block_on_store(async {
234 let attachment_id = intent.attachment_id.as_str().to_string();
235 let session_id = intent.session_id.as_str().to_string();
236 let canonical_uri = intent.canonical_uri.as_str().to_string();
237 let intent_at_ms = intent.intent_at_epoch_ms as i64;
238 self.conn
239 .call(move |conn| {
240 conn.execute(
241 "INSERT INTO attachment_manifest
242 (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
243 VALUES (?1, ?2, ?3, ?4, NULL)
244 ON CONFLICT(attachment_id) DO NOTHING",
245 params![attachment_id, session_id, canonical_uri, intent_at_ms],
246 )
247 })
248 .await
249 .map_err(sqlite_error)?;
250 Ok(())
251 })
252 }
253
254 fn commit_refs(
255 &self,
256 session_id: &str,
257 attachment_ids: &[AttachmentId],
258 ) -> Result<(), StoreError> {
259 if attachment_ids.is_empty() {
260 return Ok(());
261 }
262 block_on_store(async {
263 let session_id = session_id.to_string();
264 let attachment_ids: Vec<String> = attachment_ids
265 .iter()
266 .map(|id| id.as_str().to_string())
267 .collect();
268 self.conn
269 .write(move |tx| {
270 let now = current_epoch_ms() as i64;
271 let mut stmt = tx.prepare(
272 "UPDATE attachment_manifest
273 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
274 WHERE attachment_id = ?2 AND session_id = ?3",
275 )?;
276 for id in &attachment_ids {
277 stmt.execute(params![now, id, session_id])?;
278 }
279 Ok(())
280 })
281 .await
282 .map_err(sqlite_error)?;
283 Ok(())
284 })
285 }
286
287 fn list_uncommitted(
288 &self,
289 older_than_epoch_ms: u64,
290 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
291 block_on_store(async {
292 let older_than = older_than_epoch_ms as i64;
293 self.conn
294 .call(move |conn| {
295 let mut stmt = conn.prepare(
296 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
297 FROM attachment_manifest
298 WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
299 ORDER BY intent_at_ms ASC",
300 )?;
301 let rows = stmt.query_map(params![older_than], |row| {
302 let id: String = row.get(0)?;
303 let session_id: String = row.get(1)?;
304 let canonical_uri: String = row.get(2)?;
305 let intent_at_ms: i64 = row.get(3)?;
306 let committed_at_ms: Option<i64> = row.get(4)?;
307 Ok(AttachmentManifestEntry {
308 attachment_id: AttachmentId::new(id),
309 session_id,
310 canonical_uri,
311 intent_at_epoch_ms: intent_at_ms as u64,
312 committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
313 })
314 })?;
315 Ok(rows.filter_map(Result::ok).collect())
316 })
317 .await
318 .map_err(sqlite_error)
319 })
320 }
321
322 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
323 block_on_store(async {
324 let attachment_id = attachment_id.as_str().to_string();
325 self.conn
326 .call(move |conn| {
327 conn.execute(
328 "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
329 params![attachment_id],
330 )
331 })
332 .await
333 .map_err(sqlite_error)?;
334 Ok(())
335 })
336 }
337}