1use super::*;
16
17impl Store {
18 async fn put_artifact_ref_blob(
19 &self,
20 artifact_ref: String,
21 descriptor: BlobArtifactDescriptor,
22 bytes: Vec<u8>,
23 ) -> Result<(), StoreError> {
24 let blob_profile = self.options.blob_profile;
25 self.conn
26 .write(move |tx| {
27 let blob_ref =
28 Self::insert_artifact_blob_conn(tx, descriptor, &bytes, blob_profile)?;
29 tx.execute(
30 "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
31 params![artifact_ref, blob_ref.as_str()],
32 )?;
33 Ok(())
34 })
35 .await
36 .map_err(sqlite_error)
37 }
38
39 async fn get_artifact_ref_blob(
40 &self,
41 artifact_ref: String,
42 missing_diagnostic: String,
43 ) -> Result<Option<Vec<u8>>, StoreError> {
44 let resolved = self
45 .conn
46 .call(move |conn| {
47 let blob_ref: Option<String> = conn
48 .query_row(
49 "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
50 params![artifact_ref],
51 |row| row.get::<_, String>(0),
52 )
53 .optional()?;
54 let Some(blob_ref) = blob_ref else {
55 return Ok(None);
56 };
57 Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
58 })
59 .await
60 .map_err(sqlite_error)?;
61 let Some(blob) = resolved else {
62 return Ok(None);
63 };
64 blob.ok_or_else(|| {
65 StoreError::Backend(format!("{missing_diagnostic} points at a missing blob"))
66 })
67 .map(Some)
68 }
69}
70
71#[async_trait::async_trait]
72impl lashlang::LashlangArtifactStore for Store {
73 fn durability_tier(&self) -> lashlang::DurabilityTier {
74 lashlang::DurabilityTier::Durable
75 }
76
77 async fn put_module_artifact(
78 &self,
79 artifact: &lashlang::ModuleArtifact,
80 ) -> Result<(), lashlang::ArtifactStoreError> {
81 let bytes = artifact
82 .to_store_bytes()
83 .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
84 let artifact_ref = artifact.module_ref.as_str().to_string();
85 self.put_artifact_ref_blob(
86 artifact_ref,
87 BlobArtifactDescriptor::lashlang_module(),
88 bytes,
89 )
90 .await
91 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
92 self.artifact_cache
93 .lock()
94 .map_err(|_| {
95 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
96 })?
97 .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
98 Ok(())
99 }
100
101 async fn get_module_artifact(
102 &self,
103 module_ref: &lashlang::ModuleRef,
104 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
105 if let Some(artifact) = self
106 .artifact_cache
107 .lock()
108 .map_err(|_| {
109 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
110 })?
111 .get(module_ref)
112 .cloned()
113 {
114 return Ok(Some(artifact));
115 }
116
117 let artifact_ref = module_ref.as_str().to_string();
118 let Some(bytes) = self
119 .get_artifact_ref_blob(
120 artifact_ref,
121 format!("lashlang module artifact `{module_ref}`"),
122 )
123 .await
124 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?
125 else {
126 return Ok(None);
127 };
128 let artifact = Arc::new(
129 lashlang::ModuleArtifact::from_store_bytes(&bytes)
130 .map_err(lashlang::ArtifactStoreError::from)?,
131 );
132 self.artifact_cache
133 .lock()
134 .map_err(|_| {
135 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
136 })?
137 .insert(module_ref.clone(), artifact.clone());
138 Ok(Some(artifact))
139 }
140
141 async fn put_artifact_bytes(
142 &self,
143 artifact_ref: &str,
144 descriptor: &str,
145 bytes: &[u8],
146 ) -> Result<(), lashlang::ArtifactStoreError> {
147 let artifact_ref = artifact_ref.to_string();
148 let descriptor = match descriptor {
149 "process_execution_env" => BlobArtifactDescriptor::process_execution_env(),
150 _ => BlobArtifactDescriptor::new(PersistedArtifactKind::GenericBlob, Vec::new()),
151 };
152 self.put_artifact_ref_blob(artifact_ref, descriptor, bytes.to_vec())
153 .await
154 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
155 }
156
157 async fn get_artifact_bytes(
158 &self,
159 artifact_ref: &str,
160 ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
161 let artifact_ref = artifact_ref.to_string();
162 self.get_artifact_ref_blob(artifact_ref.clone(), format!("artifact `{artifact_ref}`"))
163 .await
164 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
165 }
166}
167
168#[async_trait::async_trait]
169impl lash_core::ProcessExecutionEnvStore for Store {
170 fn durability_tier(&self) -> DurabilityTier {
171 DurabilityTier::Durable
172 }
173
174 async fn put_process_execution_env(
175 &self,
176 env_ref: &lash_core::ProcessExecutionEnvRef,
177 bytes: &[u8],
178 ) -> Result<(), lash_core::PluginError> {
179 let artifact_ref = env_ref.as_str().to_string();
180 self.put_artifact_ref_blob(
181 artifact_ref,
182 BlobArtifactDescriptor::process_execution_env(),
183 bytes.to_vec(),
184 )
185 .await
186 .map_err(|err| lash_core::PluginError::Session(err.to_string()))
187 }
188
189 async fn get_process_execution_env(
190 &self,
191 env_ref: &lash_core::ProcessExecutionEnvRef,
192 ) -> Result<Option<Vec<u8>>, lash_core::PluginError> {
193 let artifact_ref = env_ref.as_str().to_string();
194 self.get_artifact_ref_blob(
195 artifact_ref.clone(),
196 format!("process execution env `{artifact_ref}`"),
197 )
198 .await
199 .map_err(|err| lash_core::PluginError::Session(err.to_string()))
200 }
201}
202
203impl AttachmentManifest for Store {
204 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
205 block_on_store(async {
206 let attachment_id = intent.attachment_id.as_str().to_string();
207 let session_id = intent.session_id.as_str().to_string();
208 let canonical_uri = intent.canonical_uri.as_str().to_string();
209 let intent_at_ms = intent.intent_at_epoch_ms as i64;
210 self.conn
211 .call(move |conn| {
212 conn.execute(
213 "INSERT INTO attachment_manifest
214 (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
215 VALUES (?1, ?2, ?3, ?4, NULL)
216 ON CONFLICT(attachment_id) DO NOTHING",
217 params![attachment_id, session_id, canonical_uri, intent_at_ms],
218 )
219 })
220 .await
221 .map_err(sqlite_error)?;
222 Ok(())
223 })
224 }
225
226 fn commit_refs(
227 &self,
228 session_id: &str,
229 attachment_ids: &[AttachmentId],
230 ) -> Result<(), StoreError> {
231 if attachment_ids.is_empty() {
232 return Ok(());
233 }
234 block_on_store(async {
235 let session_id = session_id.to_string();
236 let attachment_ids: Vec<String> = attachment_ids
237 .iter()
238 .map(|id| id.as_str().to_string())
239 .collect();
240 self.conn
241 .write(move |tx| {
242 let now = current_epoch_ms() as i64;
243 let mut stmt = tx.prepare(
244 "UPDATE attachment_manifest
245 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
246 WHERE attachment_id = ?2 AND session_id = ?3",
247 )?;
248 for id in &attachment_ids {
249 stmt.execute(params![now, id, session_id])?;
250 }
251 Ok(())
252 })
253 .await
254 .map_err(sqlite_error)?;
255 Ok(())
256 })
257 }
258
259 fn list_uncommitted(
260 &self,
261 older_than_epoch_ms: u64,
262 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
263 block_on_store(async {
264 let older_than = older_than_epoch_ms as i64;
265 self.conn
266 .call(move |conn| {
267 let mut stmt = conn.prepare(
268 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
269 FROM attachment_manifest
270 WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
271 ORDER BY intent_at_ms ASC",
272 )?;
273 let rows = stmt.query_map(params![older_than], |row| {
274 let id: String = row.get(0)?;
275 let session_id: String = row.get(1)?;
276 let canonical_uri: String = row.get(2)?;
277 let intent_at_ms: i64 = row.get(3)?;
278 let committed_at_ms: Option<i64> = row.get(4)?;
279 Ok(AttachmentManifestEntry {
280 attachment_id: AttachmentId::new(id),
281 session_id,
282 canonical_uri,
283 intent_at_epoch_ms: intent_at_ms as u64,
284 committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
285 })
286 })?;
287 Ok(rows.filter_map(Result::ok).collect())
288 })
289 .await
290 .map_err(sqlite_error)
291 })
292 }
293
294 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
295 block_on_store(async {
296 let attachment_id = attachment_id.as_str().to_string();
297 self.conn
298 .call(move |conn| {
299 conn.execute(
300 "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
301 params![attachment_id],
302 )
303 })
304 .await
305 .map_err(sqlite_error)?;
306 Ok(())
307 })
308 }
309}