1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
11use sha2::{Digest, Sha256};
12
13use crate::store::{AttachmentIntent, AttachmentManifest};
14
15#[derive(Debug, thiserror::Error)]
16pub enum AttachmentStoreError {
17 #[error("attachment `{0}` was not found")]
18 NotFound(AttachmentId),
19 #[error("attachment store I/O failed at {path}: {source}")]
20 Io {
21 path: PathBuf,
22 #[source]
23 source: std::io::Error,
24 },
25 #[error("attachment store metadata is unavailable for `{0}`")]
26 MissingMeta(AttachmentId),
27 #[error("attachment store metadata decode failed for `{id}`: {source}")]
28 MetadataDecode {
29 id: AttachmentId,
30 #[source]
31 source: serde_json::Error,
32 },
33 #[error("attachment manifest write failed: {0}")]
34 ManifestRecordFailed(String),
35 #[error("attachment store backend failed: {0}")]
36 Backend(String),
37}
38
39#[derive(Clone, Debug)]
40pub struct StoredAttachment {
41 pub meta: AttachmentMeta,
42 pub bytes: Vec<u8>,
43}
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46pub enum AttachmentStorePersistence {
47 Ephemeral,
48 Durable,
49}
50
51impl AttachmentStorePersistence {
52 pub fn durability_tier(self) -> crate::DurabilityTier {
57 match self {
58 Self::Ephemeral => crate::DurabilityTier::Inline,
59 Self::Durable => crate::DurabilityTier::Durable,
60 }
61 }
62}
63
64#[async_trait::async_trait]
65pub trait AttachmentStore: Send + Sync {
66 fn persistence(&self) -> AttachmentStorePersistence {
67 AttachmentStorePersistence::Ephemeral
68 }
69
70 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
79 Vec::new()
80 }
81
82 fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
85
86 async fn put(
87 &self,
88 bytes: Vec<u8>,
89 meta: AttachmentCreateMeta,
90 ) -> Result<AttachmentRef, AttachmentStoreError>;
91
92 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
93}
94
95#[derive(Default)]
96pub struct InMemoryAttachmentStore {
97 attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
98}
99
100impl InMemoryAttachmentStore {
101 pub fn new() -> Self {
102 Self::default()
103 }
104}
105
106#[async_trait::async_trait]
107impl AttachmentStore for InMemoryAttachmentStore {
108 async fn put(
109 &self,
110 bytes: Vec<u8>,
111 meta: AttachmentCreateMeta,
112 ) -> Result<AttachmentRef, AttachmentStoreError> {
113 let meta = stored_meta(&bytes, meta);
114 let reference = meta.as_ref();
115 let stored = StoredAttachment { meta, bytes };
116 self.attachments
117 .lock()
118 .expect("attachment store lock")
119 .insert(reference.id.clone(), stored);
120 Ok(reference)
121 }
122
123 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
124 self.attachments
125 .lock()
126 .expect("attachment store lock")
127 .get(id)
128 .cloned()
129 .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
130 }
131}
132
133pub fn content_id(bytes: &[u8]) -> AttachmentId {
134 AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
135}
136
137pub struct SessionScopedAttachmentStore {
151 inner: Arc<dyn AttachmentStore>,
152 manifest: Arc<dyn AttachmentManifest>,
153 session_id: String,
154 pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
155}
156
157impl SessionScopedAttachmentStore {
158 pub fn new(
159 inner: Arc<dyn AttachmentStore>,
160 manifest: Arc<dyn AttachmentManifest>,
161 session_id: impl Into<String>,
162 ) -> Self {
163 Self {
164 inner,
165 manifest,
166 session_id: session_id.into(),
167 pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
168 }
169 }
170
171 pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
172 &self.inner
173 }
174
175 pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
176 &self.manifest
177 }
178}
179
180#[async_trait::async_trait]
181impl AttachmentStore for SessionScopedAttachmentStore {
182 fn persistence(&self) -> AttachmentStorePersistence {
183 self.inner.persistence()
184 }
185
186 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
187 self.pending_manifest_commit_ids
188 .lock()
189 .expect("attachment manifest commit tracker lock")
190 .iter()
191 .cloned()
192 .collect()
193 }
194
195 fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
196 if ids.is_empty() {
197 return;
198 }
199 let mut pending = self
200 .pending_manifest_commit_ids
201 .lock()
202 .expect("attachment manifest commit tracker lock");
203 for id in ids {
204 pending.remove(id);
205 }
206 }
207
208 async fn put(
209 &self,
210 bytes: Vec<u8>,
211 meta: AttachmentCreateMeta,
212 ) -> Result<AttachmentRef, AttachmentStoreError> {
213 let attachment_id = content_id(&bytes);
214 let intent = AttachmentIntent {
215 attachment_id: attachment_id.clone(),
216 session_id: self.session_id.clone(),
217 canonical_uri: format!("sha256:{attachment_id}"),
218 intent_at_epoch_ms: now_epoch_ms(),
219 };
220 self.manifest.record_intent(intent).map_err(|err| {
223 AttachmentStoreError::ManifestRecordFailed(format!(
224 "failed to record attachment intent for `{attachment_id}`: {err}"
225 ))
226 })?;
227 let reference = self.inner.put(bytes, meta).await?;
228 if reference.id != attachment_id {
229 return Err(AttachmentStoreError::Backend(format!(
230 "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
231 reference.id
232 )));
233 }
234 self.pending_manifest_commit_ids
235 .lock()
236 .expect("attachment manifest commit tracker lock")
237 .insert(reference.id.clone());
238 Ok(reference)
239 }
240
241 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
242 self.inner.get(id).await
243 }
244}
245
246fn now_epoch_ms() -> u64 {
247 SystemTime::now()
248 .duration_since(UNIX_EPOCH)
249 .unwrap_or_default()
250 .as_millis() as u64
251}
252
253pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
258
259impl AttachmentManifest for PersistenceManifestAdapter {
260 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
261 AttachmentManifest::record_intent(&*self.0, intent)
262 }
263
264 fn commit_refs(
265 &self,
266 session_id: &str,
267 attachment_ids: &[AttachmentId],
268 ) -> Result<(), crate::StoreError> {
269 AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
270 }
271
272 fn list_uncommitted(
273 &self,
274 older_than_epoch_ms: u64,
275 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
276 AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
277 }
278
279 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
280 AttachmentManifest::forget(&*self.0, attachment_id)
281 }
282}
283
284fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
285 AttachmentMeta::new(
286 content_id(bytes),
287 meta.media_type,
288 bytes.len() as u64,
289 meta.width,
290 meta.height,
291 meta.label,
292 )
293}
294
295pub async fn resolve_llm_request_attachments(
296 mut request: crate::llm::types::LlmRequest,
297 store: &dyn AttachmentStore,
298) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
299 for attachment in &mut request.attachments {
300 let Some(reference) = attachment.reference.as_ref() else {
301 continue;
302 };
303 if !attachment.data.is_empty() {
304 continue;
305 }
306 let stored = store.get(&reference.id).await?;
307 attachment.mime = stored.meta.media_type.canonical_mime().to_string();
308 attachment.data = stored.bytes;
309 }
310 Ok(request)
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use lash_sansio::{ImageMediaType, MediaType};
317
318 #[derive(Default)]
319 struct RecordingManifest {
320 intents: Mutex<Vec<AttachmentIntent>>,
321 }
322
323 impl AttachmentManifest for RecordingManifest {
324 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
325 self.intents.lock().expect("lock intents").push(intent);
326 Ok(())
327 }
328
329 fn commit_refs(
330 &self,
331 _session_id: &str,
332 _attachment_ids: &[AttachmentId],
333 ) -> Result<(), crate::StoreError> {
334 Ok(())
335 }
336
337 fn list_uncommitted(
338 &self,
339 _older_than_epoch_ms: u64,
340 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
341 Ok(Vec::new())
342 }
343
344 fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
345 Ok(())
346 }
347 }
348
349 fn meta() -> AttachmentCreateMeta {
350 AttachmentCreateMeta::new(
351 MediaType::Image(ImageMediaType::Png),
352 Some(1),
353 Some(1),
354 Some("pixel".to_string()),
355 )
356 }
357
358 #[tokio::test]
359 async fn memory_store_dedupes_by_bytes() {
360 let store = InMemoryAttachmentStore::new();
361 let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
362 let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
363 assert_eq!(a.id, b.id);
364 assert_eq!(a.byte_len, 3);
365 assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
366 }
367
368 #[tokio::test]
369 async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
370 let store = InMemoryAttachmentStore::new();
371 let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
372
373 assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
374 assert_eq!(reference.byte_len, 4);
375 }
376
377 #[tokio::test]
378 async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
379 let manifest = Arc::new(RecordingManifest::default());
380 let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
381 let store = SessionScopedAttachmentStore::new(
382 Arc::new(InMemoryAttachmentStore::new()),
383 manifest_for_store,
384 "session-1",
385 );
386
387 let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
388
389 assert_eq!(
390 manifest.intents.lock().expect("lock intents")[0].attachment_id,
391 reference.id
392 );
393 assert_eq!(
394 store.pending_manifest_commit_ids(),
395 vec![reference.id.clone()]
396 );
397
398 store.mark_manifest_committed(&[AttachmentId::new("other")]);
399 assert_eq!(
400 store.pending_manifest_commit_ids(),
401 vec![reference.id.clone()]
402 );
403
404 store.mark_manifest_committed(std::slice::from_ref(&reference.id));
405 assert!(store.pending_manifest_commit_ids().is_empty());
406 }
407}