1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
10use sha2::{Digest, Sha256};
11
12use crate::store::{AttachmentIntent, AttachmentManifest};
13
14#[derive(Debug, thiserror::Error)]
15pub enum AttachmentStoreError {
16 #[error("attachment `{0}` was not found")]
17 NotFound(AttachmentId),
18 #[error("attachment store I/O failed at {path}: {source}")]
19 Io {
20 path: PathBuf,
21 #[source]
22 source: std::io::Error,
23 },
24 #[error("attachment store metadata is unavailable for `{0}`")]
25 MissingMeta(AttachmentId),
26 #[error("attachment store metadata decode failed for `{id}`: {source}")]
27 MetadataDecode {
28 id: AttachmentId,
29 #[source]
30 source: serde_json::Error,
31 },
32 #[error("attachment manifest write failed: {0}")]
33 ManifestRecordFailed(String),
34 #[error("attachment store backend failed: {0}")]
35 Backend(String),
36}
37
38#[derive(Clone, Debug)]
39pub struct StoredAttachment {
40 pub meta: AttachmentMeta,
41 pub bytes: Vec<u8>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum AttachmentStorePersistence {
46 Ephemeral,
47 Durable,
48}
49
50impl AttachmentStorePersistence {
51 pub fn durability_tier(self) -> crate::DurabilityTier {
56 match self {
57 Self::Ephemeral => crate::DurabilityTier::Inline,
58 Self::Durable => crate::DurabilityTier::Durable,
59 }
60 }
61}
62
63#[async_trait::async_trait]
64pub trait AttachmentStore: Send + Sync {
65 fn persistence(&self) -> AttachmentStorePersistence {
66 AttachmentStorePersistence::Ephemeral
67 }
68
69 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
78 Vec::new()
79 }
80
81 fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
84
85 async fn put(
86 &self,
87 bytes: Vec<u8>,
88 meta: AttachmentCreateMeta,
89 ) -> Result<AttachmentRef, AttachmentStoreError>;
90
91 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
92}
93
94#[derive(Default)]
95pub struct InMemoryAttachmentStore {
96 attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
97}
98
99impl InMemoryAttachmentStore {
100 pub fn new() -> Self {
101 Self::default()
102 }
103}
104
105#[async_trait::async_trait]
106impl AttachmentStore for InMemoryAttachmentStore {
107 async fn put(
108 &self,
109 bytes: Vec<u8>,
110 meta: AttachmentCreateMeta,
111 ) -> Result<AttachmentRef, AttachmentStoreError> {
112 let meta = stored_meta(&bytes, meta);
113 let reference = meta.as_ref();
114 let stored = StoredAttachment { meta, bytes };
115 self.attachments
116 .lock()
117 .expect("attachment store lock")
118 .insert(reference.id.clone(), stored);
119 Ok(reference)
120 }
121
122 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
123 self.attachments
124 .lock()
125 .expect("attachment store lock")
126 .get(id)
127 .cloned()
128 .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
129 }
130}
131
132pub fn content_id(bytes: &[u8]) -> AttachmentId {
133 AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
134}
135
136pub struct SessionScopedAttachmentStore {
150 inner: Arc<dyn AttachmentStore>,
151 manifest: Arc<dyn AttachmentManifest>,
152 session_id: String,
153 pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
154}
155
156impl SessionScopedAttachmentStore {
157 pub fn new(
158 inner: Arc<dyn AttachmentStore>,
159 manifest: Arc<dyn AttachmentManifest>,
160 session_id: impl Into<String>,
161 ) -> Self {
162 Self {
163 inner,
164 manifest,
165 session_id: session_id.into(),
166 pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
167 }
168 }
169
170 pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
171 &self.inner
172 }
173
174 pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
175 &self.manifest
176 }
177}
178
179#[async_trait::async_trait]
180impl AttachmentStore for SessionScopedAttachmentStore {
181 fn persistence(&self) -> AttachmentStorePersistence {
182 self.inner.persistence()
183 }
184
185 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
186 self.pending_manifest_commit_ids
187 .lock()
188 .expect("attachment manifest commit tracker lock")
189 .iter()
190 .cloned()
191 .collect()
192 }
193
194 fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
195 if ids.is_empty() {
196 return;
197 }
198 let mut pending = self
199 .pending_manifest_commit_ids
200 .lock()
201 .expect("attachment manifest commit tracker lock");
202 for id in ids {
203 pending.remove(id);
204 }
205 }
206
207 async fn put(
208 &self,
209 bytes: Vec<u8>,
210 meta: AttachmentCreateMeta,
211 ) -> Result<AttachmentRef, AttachmentStoreError> {
212 let attachment_id = content_id(&bytes);
213 let intent = AttachmentIntent {
214 attachment_id: attachment_id.clone(),
215 session_id: self.session_id.clone(),
216 canonical_uri: format!("sha256:{attachment_id}"),
217 intent_at_epoch_ms: now_epoch_ms(),
218 };
219 self.manifest.record_intent(intent).map_err(|err| {
222 AttachmentStoreError::ManifestRecordFailed(format!(
223 "failed to record attachment intent for `{attachment_id}`: {err}"
224 ))
225 })?;
226 let reference = self.inner.put(bytes, meta).await?;
227 if reference.id != attachment_id {
228 return Err(AttachmentStoreError::Backend(format!(
229 "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
230 reference.id
231 )));
232 }
233 self.pending_manifest_commit_ids
234 .lock()
235 .expect("attachment manifest commit tracker lock")
236 .insert(reference.id.clone());
237 Ok(reference)
238 }
239
240 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
241 self.inner.get(id).await
242 }
243}
244
245fn now_epoch_ms() -> u64 {
246 <crate::SystemClock as crate::Clock>::timestamp_ms(&crate::SystemClock)
247}
248
249pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
254
255impl AttachmentManifest for PersistenceManifestAdapter {
256 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
257 AttachmentManifest::record_intent(&*self.0, intent)
258 }
259
260 fn commit_refs(
261 &self,
262 session_id: &str,
263 attachment_ids: &[AttachmentId],
264 ) -> Result<(), crate::StoreError> {
265 AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
266 }
267
268 fn list_uncommitted(
269 &self,
270 older_than_epoch_ms: u64,
271 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
272 AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
273 }
274
275 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
276 AttachmentManifest::forget(&*self.0, attachment_id)
277 }
278}
279
280fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
281 AttachmentMeta::new(
282 content_id(bytes),
283 meta.media_type,
284 bytes.len() as u64,
285 meta.width,
286 meta.height,
287 meta.label,
288 )
289}
290
291pub async fn resolve_llm_request_attachments(
292 mut request: crate::llm::types::LlmRequest,
293 store: &dyn AttachmentStore,
294) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
295 for attachment in &mut request.attachments {
296 let Some(reference) = attachment.reference.as_ref() else {
297 continue;
298 };
299 if !attachment.data.is_empty() {
300 continue;
301 }
302 let stored = store.get(&reference.id).await?;
303 attachment.mime = stored.meta.media_type.canonical_mime().to_string();
304 attachment.data = stored.bytes;
305 }
306 Ok(request)
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use lash_sansio::{ImageMediaType, MediaType};
313
314 #[derive(Default)]
315 struct RecordingManifest {
316 intents: Mutex<Vec<AttachmentIntent>>,
317 }
318
319 impl AttachmentManifest for RecordingManifest {
320 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
321 self.intents.lock().expect("lock intents").push(intent);
322 Ok(())
323 }
324
325 fn commit_refs(
326 &self,
327 _session_id: &str,
328 _attachment_ids: &[AttachmentId],
329 ) -> Result<(), crate::StoreError> {
330 Ok(())
331 }
332
333 fn list_uncommitted(
334 &self,
335 _older_than_epoch_ms: u64,
336 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
337 Ok(Vec::new())
338 }
339
340 fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
341 Ok(())
342 }
343 }
344
345 fn meta() -> AttachmentCreateMeta {
346 AttachmentCreateMeta::new(
347 MediaType::Image(ImageMediaType::Png),
348 Some(1),
349 Some(1),
350 Some("pixel".to_string()),
351 )
352 }
353
354 #[tokio::test]
355 async fn memory_store_dedupes_by_bytes() {
356 let store = InMemoryAttachmentStore::new();
357 let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
358 let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
359 assert_eq!(a.id, b.id);
360 assert_eq!(a.byte_len, 3);
361 assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
362 }
363
364 #[tokio::test]
365 async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
366 let store = InMemoryAttachmentStore::new();
367 let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
368
369 assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
370 assert_eq!(reference.byte_len, 4);
371 }
372
373 #[tokio::test]
374 async fn session_scoped_attachment_store_satisfies_conformance() {
375 crate::testing::conformance::attachment_store(
376 || {
377 let manifest: Arc<dyn AttachmentManifest> = Arc::new(RecordingManifest::default());
378 Arc::new(SessionScopedAttachmentStore::new(
379 Arc::new(InMemoryAttachmentStore::new()),
380 manifest,
381 "session-scoped-conformance",
382 )) as Arc<dyn AttachmentStore>
383 },
384 AttachmentStorePersistence::Ephemeral,
385 )
386 .await;
387 }
388
389 #[tokio::test]
390 async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
391 let manifest = Arc::new(RecordingManifest::default());
392 let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
393 let store = SessionScopedAttachmentStore::new(
394 Arc::new(InMemoryAttachmentStore::new()),
395 manifest_for_store,
396 "session-1",
397 );
398
399 let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
400
401 assert_eq!(
402 manifest.intents.lock().expect("lock intents")[0].attachment_id,
403 reference.id
404 );
405 assert_eq!(
406 store.pending_manifest_commit_ids(),
407 vec![reference.id.clone()]
408 );
409
410 store.mark_manifest_committed(&[AttachmentId::new("other")]);
411 assert_eq!(
412 store.pending_manifest_commit_ids(),
413 vec![reference.id.clone()]
414 );
415
416 store.mark_manifest_committed(std::slice::from_ref(&reference.id));
417 assert!(store.pending_manifest_commit_ids().is_empty());
418 }
419}