1use std::collections::{BTreeSet, HashMap};
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
7use sha2::{Digest, Sha256};
8
9use crate::store::{AttachmentIntent, AttachmentManifest};
10
11#[derive(Debug, thiserror::Error)]
12pub enum AttachmentStoreError {
13 #[error("attachment `{0}` was not found")]
14 NotFound(AttachmentId),
15 #[error("attachment store I/O failed at {path}: {source}")]
16 Io {
17 path: PathBuf,
18 #[source]
19 source: std::io::Error,
20 },
21 #[error("attachment store metadata is unavailable for `{0}`")]
22 MissingMeta(AttachmentId),
23 #[error("attachment store metadata decode failed for `{id}`: {source}")]
24 MetadataDecode {
25 id: AttachmentId,
26 #[source]
27 source: serde_json::Error,
28 },
29 #[error("attachment manifest write failed: {0}")]
30 ManifestRecordFailed(String),
31 #[error("attachment store backend failed: {0}")]
32 Backend(String),
33}
34
35#[derive(Clone, Debug)]
36pub struct StoredAttachment {
37 pub meta: AttachmentMeta,
38 pub bytes: Vec<u8>,
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum AttachmentStorePersistence {
43 Ephemeral,
44 Durable,
45}
46
47impl AttachmentStorePersistence {
48 pub fn durability_tier(self) -> crate::DurabilityTier {
53 match self {
54 Self::Ephemeral => crate::DurabilityTier::Inline,
55 Self::Durable => crate::DurabilityTier::Durable,
56 }
57 }
58}
59
60#[async_trait::async_trait]
61pub trait AttachmentStore: Send + Sync {
62 fn persistence(&self) -> AttachmentStorePersistence {
63 AttachmentStorePersistence::Ephemeral
64 }
65
66 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
75 Vec::new()
76 }
77
78 fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
81
82 async fn put(
83 &self,
84 bytes: Vec<u8>,
85 meta: AttachmentCreateMeta,
86 ) -> Result<AttachmentRef, AttachmentStoreError>;
87
88 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
89}
90
91#[derive(Default)]
92pub struct InMemoryAttachmentStore {
93 attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
94}
95
96impl InMemoryAttachmentStore {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102#[async_trait::async_trait]
103impl AttachmentStore for InMemoryAttachmentStore {
104 async fn put(
105 &self,
106 bytes: Vec<u8>,
107 meta: AttachmentCreateMeta,
108 ) -> Result<AttachmentRef, AttachmentStoreError> {
109 let meta = stored_meta(&bytes, meta);
110 let reference = meta.as_ref();
111 let stored = StoredAttachment { meta, bytes };
112 self.attachments
113 .lock()
114 .expect("attachment store lock")
115 .insert(reference.id.clone(), stored);
116 Ok(reference)
117 }
118
119 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
120 self.attachments
121 .lock()
122 .expect("attachment store lock")
123 .get(id)
124 .cloned()
125 .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
126 }
127}
128
129pub fn content_id(bytes: &[u8]) -> AttachmentId {
130 AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
131}
132
133pub struct SessionScopedAttachmentStore {
147 inner: Arc<dyn AttachmentStore>,
148 manifest: Arc<dyn AttachmentManifest>,
149 session_id: String,
150 pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
151}
152
153impl SessionScopedAttachmentStore {
154 pub fn new(
155 inner: Arc<dyn AttachmentStore>,
156 manifest: Arc<dyn AttachmentManifest>,
157 session_id: impl Into<String>,
158 ) -> Self {
159 Self {
160 inner,
161 manifest,
162 session_id: session_id.into(),
163 pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
164 }
165 }
166
167 pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
168 &self.inner
169 }
170
171 pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
172 &self.manifest
173 }
174}
175
176#[async_trait::async_trait]
177impl AttachmentStore for SessionScopedAttachmentStore {
178 fn persistence(&self) -> AttachmentStorePersistence {
179 self.inner.persistence()
180 }
181
182 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
183 self.pending_manifest_commit_ids
184 .lock()
185 .expect("attachment manifest commit tracker lock")
186 .iter()
187 .cloned()
188 .collect()
189 }
190
191 fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
192 if ids.is_empty() {
193 return;
194 }
195 let mut pending = self
196 .pending_manifest_commit_ids
197 .lock()
198 .expect("attachment manifest commit tracker lock");
199 for id in ids {
200 pending.remove(id);
201 }
202 }
203
204 async fn put(
205 &self,
206 bytes: Vec<u8>,
207 meta: AttachmentCreateMeta,
208 ) -> Result<AttachmentRef, AttachmentStoreError> {
209 let attachment_id = content_id(&bytes);
210 let intent = AttachmentIntent {
211 attachment_id: attachment_id.clone(),
212 session_id: self.session_id.clone(),
213 canonical_uri: format!("sha256:{attachment_id}"),
214 intent_at_epoch_ms: now_epoch_ms(),
215 };
216 self.manifest.record_intent(intent).map_err(|err| {
219 AttachmentStoreError::ManifestRecordFailed(format!(
220 "failed to record attachment intent for `{attachment_id}`: {err}"
221 ))
222 })?;
223 let reference = self.inner.put(bytes, meta).await?;
224 if reference.id != attachment_id {
225 return Err(AttachmentStoreError::Backend(format!(
226 "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
227 reference.id
228 )));
229 }
230 self.pending_manifest_commit_ids
231 .lock()
232 .expect("attachment manifest commit tracker lock")
233 .insert(reference.id.clone());
234 Ok(reference)
235 }
236
237 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
238 self.inner.get(id).await
239 }
240}
241
242fn now_epoch_ms() -> u64 {
243 SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .unwrap_or_default()
246 .as_millis() as u64
247}
248
249pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
254
255impl AttachmentManifest for PersistenceManifestAdapter {
256 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
257 AttachmentManifest::record_intent(&*self.0, intent)
258 }
259
260 fn commit_refs(
261 &self,
262 session_id: &str,
263 attachment_ids: &[AttachmentId],
264 ) -> Result<(), crate::StoreError> {
265 AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
266 }
267
268 fn list_uncommitted(
269 &self,
270 older_than_epoch_ms: u64,
271 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
272 AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
273 }
274
275 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
276 AttachmentManifest::forget(&*self.0, attachment_id)
277 }
278}
279
280fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
281 AttachmentMeta::new(
282 content_id(bytes),
283 meta.media_type,
284 bytes.len() as u64,
285 meta.width,
286 meta.height,
287 meta.label,
288 )
289}
290
291pub async fn resolve_llm_request_attachments(
292 mut request: crate::llm::types::LlmRequest,
293 store: &dyn AttachmentStore,
294) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
295 for attachment in &mut request.attachments {
296 let Some(reference) = attachment.reference.as_ref() else {
297 continue;
298 };
299 if !attachment.data.is_empty() {
300 continue;
301 }
302 let stored = store.get(&reference.id).await?;
303 attachment.mime = stored.meta.media_type.canonical_mime().to_string();
304 attachment.data = stored.bytes;
305 }
306 Ok(request)
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use lash_sansio::{ImageMediaType, MediaType};
313
314 #[derive(Default)]
315 struct RecordingManifest {
316 intents: Mutex<Vec<AttachmentIntent>>,
317 }
318
319 impl AttachmentManifest for RecordingManifest {
320 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
321 self.intents.lock().expect("lock intents").push(intent);
322 Ok(())
323 }
324
325 fn commit_refs(
326 &self,
327 _session_id: &str,
328 _attachment_ids: &[AttachmentId],
329 ) -> Result<(), crate::StoreError> {
330 Ok(())
331 }
332
333 fn list_uncommitted(
334 &self,
335 _older_than_epoch_ms: u64,
336 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
337 Ok(Vec::new())
338 }
339
340 fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
341 Ok(())
342 }
343 }
344
345 fn meta() -> AttachmentCreateMeta {
346 AttachmentCreateMeta::new(
347 MediaType::Image(ImageMediaType::Png),
348 Some(1),
349 Some(1),
350 Some("pixel".to_string()),
351 )
352 }
353
354 #[tokio::test]
355 async fn memory_store_dedupes_by_bytes() {
356 let store = InMemoryAttachmentStore::new();
357 let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
358 let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
359 assert_eq!(a.id, b.id);
360 assert_eq!(a.byte_len, 3);
361 assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
362 }
363
364 #[tokio::test]
365 async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
366 let store = InMemoryAttachmentStore::new();
367 let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
368
369 assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
370 assert_eq!(reference.byte_len, 4);
371 }
372
373 #[tokio::test]
374 async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
375 let manifest = Arc::new(RecordingManifest::default());
376 let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
377 let store = SessionScopedAttachmentStore::new(
378 Arc::new(InMemoryAttachmentStore::new()),
379 manifest_for_store,
380 "session-1",
381 );
382
383 let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
384
385 assert_eq!(
386 manifest.intents.lock().expect("lock intents")[0].attachment_id,
387 reference.id
388 );
389 assert_eq!(
390 store.pending_manifest_commit_ids(),
391 vec![reference.id.clone()]
392 );
393
394 store.mark_manifest_committed(&[AttachmentId::new("other")]);
395 assert_eq!(
396 store.pending_manifest_commit_ids(),
397 vec![reference.id.clone()]
398 );
399
400 store.mark_manifest_committed(std::slice::from_ref(&reference.id));
401 assert!(store.pending_manifest_commit_ids().is_empty());
402 }
403}