lash_sqlite_store/
attachments.rs1use super::*;
16
17#[async_trait::async_trait]
18impl lashlang::LashlangArtifactStore for Store {
19 fn durability_tier(&self) -> lashlang::DurabilityTier {
20 lashlang::DurabilityTier::Durable
21 }
22
23 async fn put_module_artifact(
24 &self,
25 artifact: &lashlang::ModuleArtifact,
26 ) -> Result<(), lashlang::ArtifactStoreError> {
27 let bytes = artifact
28 .to_store_bytes()
29 .map_err(|err| lashlang::ArtifactStoreError::Encode(err.to_string()))?;
30 let blob_profile = self.options.blob_profile;
31 let artifact_ref = artifact.module_ref.as_str().to_string();
32 self.conn
33 .write(move |tx| {
34 let blob_ref = Self::insert_artifact_blob_conn(
35 tx,
36 BlobArtifactDescriptor::lashlang_module(),
37 &bytes,
38 blob_profile,
39 )?;
40 tx.execute(
41 "INSERT OR REPLACE INTO artifact_refs (artifact_ref, blob_ref) VALUES (?1, ?2)",
42 params![artifact_ref, blob_ref.as_str()],
43 )?;
44 Ok(())
45 })
46 .await
47 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
48 self.artifact_cache
49 .lock()
50 .map_err(|_| {
51 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
52 })?
53 .insert(artifact.module_ref.clone(), Arc::new(artifact.clone()));
54 Ok(())
55 }
56
57 async fn get_module_artifact(
58 &self,
59 module_ref: &lashlang::ModuleRef,
60 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
61 if let Some(artifact) = self
62 .artifact_cache
63 .lock()
64 .map_err(|_| {
65 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
66 })?
67 .get(module_ref)
68 .cloned()
69 {
70 return Ok(Some(artifact));
71 }
72
73 let artifact_ref = module_ref.as_str().to_string();
74 let resolved = self
78 .conn
79 .call(move |conn| {
80 let blob_ref: Option<String> = conn
81 .query_row(
82 "SELECT blob_ref FROM artifact_refs WHERE artifact_ref = ?1",
83 params![artifact_ref],
84 |row| row.get::<_, String>(0),
85 )
86 .optional()?;
87 let Some(blob_ref) = blob_ref else {
88 return Ok(None);
89 };
90 Ok(Some(Self::get_blob_conn(conn, &BlobRef(blob_ref))))
91 })
92 .await
93 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
94 let Some(blob) = resolved else {
95 return Ok(None);
96 };
97 let bytes = blob.ok_or_else(|| {
98 lashlang::ArtifactStoreError::Backend(format!(
99 "lashlang module artifact `{}` points at a missing blob",
100 module_ref
101 ))
102 })?;
103 let artifact = Arc::new(
104 lashlang::ModuleArtifact::from_store_bytes(&bytes)
105 .map_err(lashlang::ArtifactStoreError::from)?,
106 );
107 self.artifact_cache
108 .lock()
109 .map_err(|_| {
110 lashlang::ArtifactStoreError::Backend("artifact cache lock poisoned".to_string())
111 })?
112 .insert(module_ref.clone(), artifact.clone());
113 Ok(Some(artifact))
114 }
115}
116
117impl AttachmentManifest for Store {
118 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
119 block_on_store(async {
120 let attachment_id = intent.attachment_id.as_str().to_string();
121 let session_id = intent.session_id.as_str().to_string();
122 let canonical_uri = intent.canonical_uri.as_str().to_string();
123 let intent_at_ms = intent.intent_at_epoch_ms as i64;
124 self.conn
125 .call(move |conn| {
126 conn.execute(
127 "INSERT INTO attachment_manifest
128 (attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms)
129 VALUES (?1, ?2, ?3, ?4, NULL)
130 ON CONFLICT(attachment_id) DO NOTHING",
131 params![attachment_id, session_id, canonical_uri, intent_at_ms],
132 )
133 })
134 .await
135 .map_err(sqlite_error)?;
136 Ok(())
137 })
138 }
139
140 fn commit_refs(
141 &self,
142 session_id: &str,
143 attachment_ids: &[AttachmentId],
144 ) -> Result<(), StoreError> {
145 if attachment_ids.is_empty() {
146 return Ok(());
147 }
148 block_on_store(async {
149 let session_id = session_id.to_string();
150 let attachment_ids: Vec<String> = attachment_ids
151 .iter()
152 .map(|id| id.as_str().to_string())
153 .collect();
154 self.conn
155 .write(move |tx| {
156 let now = current_epoch_ms() as i64;
157 let mut stmt = tx.prepare(
158 "UPDATE attachment_manifest
159 SET committed_at_ms = COALESCE(committed_at_ms, ?1)
160 WHERE attachment_id = ?2 AND session_id = ?3",
161 )?;
162 for id in &attachment_ids {
163 stmt.execute(params![now, id, session_id])?;
164 }
165 Ok(())
166 })
167 .await
168 .map_err(sqlite_error)?;
169 Ok(())
170 })
171 }
172
173 fn list_uncommitted(
174 &self,
175 older_than_epoch_ms: u64,
176 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
177 block_on_store(async {
178 let older_than = older_than_epoch_ms as i64;
179 self.conn
180 .call(move |conn| {
181 let mut stmt = conn.prepare(
182 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
183 FROM attachment_manifest
184 WHERE committed_at_ms IS NULL AND intent_at_ms <= ?1
185 ORDER BY intent_at_ms ASC",
186 )?;
187 let rows = stmt.query_map(params![older_than], |row| {
188 let id: String = row.get(0)?;
189 let session_id: String = row.get(1)?;
190 let canonical_uri: String = row.get(2)?;
191 let intent_at_ms: i64 = row.get(3)?;
192 let committed_at_ms: Option<i64> = row.get(4)?;
193 Ok(AttachmentManifestEntry {
194 attachment_id: AttachmentId::new(id),
195 session_id,
196 canonical_uri,
197 intent_at_epoch_ms: intent_at_ms as u64,
198 committed_at_epoch_ms: committed_at_ms.map(|v| v as u64),
199 })
200 })?;
201 Ok(rows.filter_map(Result::ok).collect())
202 })
203 .await
204 .map_err(sqlite_error)
205 })
206 }
207
208 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
209 block_on_store(async {
210 let attachment_id = attachment_id.as_str().to_string();
211 self.conn
212 .call(move |conn| {
213 conn.execute(
214 "DELETE FROM attachment_manifest WHERE attachment_id = ?1",
215 params![attachment_id],
216 )
217 })
218 .await
219 .map_err(sqlite_error)?;
220 Ok(())
221 })
222 }
223}