1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
10use sha2::{Digest, Sha256};
11
12use crate::store::{AttachmentIntent, AttachmentManifest};
13
14#[derive(Debug, thiserror::Error)]
15pub enum AttachmentStoreError {
16 #[error("attachment `{0}` was not found")]
17 NotFound(AttachmentId),
18 #[error("attachment store I/O failed at {path}: {source}")]
19 Io {
20 path: PathBuf,
21 #[source]
22 source: std::io::Error,
23 },
24 #[error("attachment store metadata is unavailable for `{0}`")]
25 MissingMeta(AttachmentId),
26 #[error("attachment store metadata decode failed for `{id}`: {source}")]
27 MetadataDecode {
28 id: AttachmentId,
29 #[source]
30 source: serde_json::Error,
31 },
32 #[error("attachment manifest write failed: {0}")]
33 ManifestRecordFailed(String),
34 #[error("attachment store backend failed: {0}")]
35 Backend(String),
36}
37
38#[derive(Clone, Debug)]
39pub struct StoredAttachment {
40 pub meta: AttachmentMeta,
41 pub bytes: Vec<u8>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum AttachmentStorePersistence {
46 Ephemeral,
47 Durable,
48}
49
50impl AttachmentStorePersistence {
51 pub fn durability_tier(self) -> crate::DurabilityTier {
56 match self {
57 Self::Ephemeral => crate::DurabilityTier::Inline,
58 Self::Durable => crate::DurabilityTier::Durable,
59 }
60 }
61}
62
63#[async_trait::async_trait]
64pub trait AttachmentStore: Send + Sync {
65 fn persistence(&self) -> AttachmentStorePersistence {
66 AttachmentStorePersistence::Ephemeral
67 }
68
69 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
78 Vec::new()
79 }
80
81 fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
84
85 async fn put(
86 &self,
87 bytes: Vec<u8>,
88 meta: AttachmentCreateMeta,
89 ) -> Result<AttachmentRef, AttachmentStoreError>;
90
91 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
92}
93
94#[derive(Default)]
95pub struct InMemoryAttachmentStore {
96 attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
97}
98
99impl InMemoryAttachmentStore {
100 pub fn new() -> Self {
101 Self::default()
102 }
103}
104
105#[async_trait::async_trait]
106impl AttachmentStore for InMemoryAttachmentStore {
107 async fn put(
108 &self,
109 bytes: Vec<u8>,
110 meta: AttachmentCreateMeta,
111 ) -> Result<AttachmentRef, AttachmentStoreError> {
112 let meta = stored_meta(&bytes, meta);
113 let reference = meta.as_ref();
114 let stored = StoredAttachment { meta, bytes };
115 self.attachments
116 .lock()
117 .expect("attachment store lock")
118 .insert(reference.id.clone(), stored);
119 Ok(reference)
120 }
121
122 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
123 self.attachments
124 .lock()
125 .expect("attachment store lock")
126 .get(id)
127 .cloned()
128 .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
129 }
130}
131
132pub fn content_id(bytes: &[u8]) -> AttachmentId {
133 AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
134}
135
136pub struct SessionScopedAttachmentStore {
150 inner: Arc<dyn AttachmentStore>,
151 manifest: Arc<dyn AttachmentManifest>,
152 session_id: String,
153 pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
154}
155
156impl SessionScopedAttachmentStore {
157 pub fn new(
158 inner: Arc<dyn AttachmentStore>,
159 manifest: Arc<dyn AttachmentManifest>,
160 session_id: impl Into<String>,
161 ) -> Self {
162 Self {
163 inner,
164 manifest,
165 session_id: session_id.into(),
166 pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
167 }
168 }
169
170 pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
171 &self.inner
172 }
173
174 pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
175 &self.manifest
176 }
177}
178
179#[async_trait::async_trait]
180impl AttachmentStore for SessionScopedAttachmentStore {
181 fn persistence(&self) -> AttachmentStorePersistence {
182 self.inner.persistence()
183 }
184
185 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
186 self.pending_manifest_commit_ids
187 .lock()
188 .expect("attachment manifest commit tracker lock")
189 .iter()
190 .cloned()
191 .collect()
192 }
193
194 fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
195 if ids.is_empty() {
196 return;
197 }
198 let mut pending = self
199 .pending_manifest_commit_ids
200 .lock()
201 .expect("attachment manifest commit tracker lock");
202 for id in ids {
203 pending.remove(id);
204 }
205 }
206
207 async fn put(
208 &self,
209 bytes: Vec<u8>,
210 meta: AttachmentCreateMeta,
211 ) -> Result<AttachmentRef, AttachmentStoreError> {
212 let attachment_id = content_id(&bytes);
213 let intent = AttachmentIntent {
214 attachment_id: attachment_id.clone(),
215 session_id: self.session_id.clone(),
216 canonical_uri: format!("sha256:{attachment_id}"),
217 intent_at_epoch_ms: now_epoch_ms(),
218 };
219 self.manifest.record_intent(intent).map_err(|err| {
222 AttachmentStoreError::ManifestRecordFailed(format!(
223 "failed to record attachment intent for `{attachment_id}`: {err}"
224 ))
225 })?;
226 let reference = self.inner.put(bytes, meta).await?;
227 if reference.id != attachment_id {
228 return Err(AttachmentStoreError::Backend(format!(
229 "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
230 reference.id
231 )));
232 }
233 self.pending_manifest_commit_ids
234 .lock()
235 .expect("attachment manifest commit tracker lock")
236 .insert(reference.id.clone());
237 Ok(reference)
238 }
239
240 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
241 self.inner.get(id).await
242 }
243}
244
245fn now_epoch_ms() -> u64 {
246 <crate::SystemClock as crate::Clock>::timestamp_ms(&crate::SystemClock)
247}
248
249pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
254
255impl AttachmentManifest for PersistenceManifestAdapter {
256 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
257 AttachmentManifest::record_intent(&*self.0, intent)
258 }
259
260 fn commit_refs(
261 &self,
262 session_id: &str,
263 attachment_ids: &[AttachmentId],
264 ) -> Result<(), crate::StoreError> {
265 AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
266 }
267
268 fn list_uncommitted(
269 &self,
270 older_than_epoch_ms: u64,
271 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
272 AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
273 }
274
275 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
276 AttachmentManifest::forget(&*self.0, attachment_id)
277 }
278}
279
280fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
281 AttachmentMeta::new(
282 content_id(bytes),
283 meta.media_type,
284 bytes.len() as u64,
285 meta.width,
286 meta.height,
287 meta.label,
288 )
289}
290
291pub async fn resolve_llm_request_attachments(
292 mut request: crate::llm::types::LlmRequest,
293 store: &dyn AttachmentStore,
294) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
295 for attachment in &mut request.attachments {
296 let Some(reference) = attachment.reference.as_ref() else {
297 continue;
298 };
299 if !attachment.data.is_empty() {
300 continue;
301 }
302 let stored = store.get(&reference.id).await?;
303 attachment.mime = stored.meta.media_type.canonical_mime().to_string();
304 attachment.data = stored.bytes;
305 }
306 Ok(request)
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use lash_sansio::{ImageMediaType, MediaType};
313 use std::collections::HashSet;
314 use std::time::{SystemTime, UNIX_EPOCH};
315
316 #[derive(Default)]
317 struct RecordingManifest {
318 intents: Mutex<Vec<AttachmentIntent>>,
319 committed: Mutex<Vec<(String, AttachmentId)>>,
320 }
321
322 impl AttachmentManifest for RecordingManifest {
323 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
324 self.intents.lock().expect("lock intents").push(intent);
325 Ok(())
326 }
327
328 fn commit_refs(
329 &self,
330 session_id: &str,
331 attachment_ids: &[AttachmentId],
332 ) -> Result<(), crate::StoreError> {
333 let mut committed = self.committed.lock().expect("lock committed attachments");
334 committed.extend(
335 attachment_ids
336 .iter()
337 .cloned()
338 .map(|attachment_id| (session_id.to_string(), attachment_id)),
339 );
340 Ok(())
341 }
342
343 fn list_uncommitted(
344 &self,
345 older_than_epoch_ms: u64,
346 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
347 let committed = self
348 .committed
349 .lock()
350 .expect("lock committed attachments")
351 .iter()
352 .cloned()
353 .collect::<HashSet<_>>();
354 Ok(self
355 .intents
356 .lock()
357 .expect("lock intents")
358 .iter()
359 .filter(|intent| intent.intent_at_epoch_ms <= older_than_epoch_ms)
360 .filter(|intent| {
361 !committed.contains(&(intent.session_id.clone(), intent.attachment_id.clone()))
362 })
363 .map(|intent| crate::AttachmentManifestEntry {
364 attachment_id: intent.attachment_id.clone(),
365 session_id: intent.session_id.clone(),
366 canonical_uri: intent.canonical_uri.clone(),
367 intent_at_epoch_ms: intent.intent_at_epoch_ms,
368 committed_at_epoch_ms: None,
369 })
370 .collect())
371 }
372
373 fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
374 Ok(())
375 }
376 }
377
378 #[derive(Default)]
379 struct RecordingRuntimePersistence {
380 inner: crate::InMemorySessionStore,
381 manifest: RecordingManifest,
382 }
383
384 impl AttachmentManifest for RecordingRuntimePersistence {
385 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
386 self.manifest.record_intent(intent)
387 }
388
389 fn commit_refs(
390 &self,
391 session_id: &str,
392 attachment_ids: &[AttachmentId],
393 ) -> Result<(), crate::StoreError> {
394 self.manifest.commit_refs(session_id, attachment_ids)
395 }
396
397 fn list_uncommitted(
398 &self,
399 older_than_epoch_ms: u64,
400 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
401 self.manifest.list_uncommitted(older_than_epoch_ms)
402 }
403
404 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
405 self.manifest.forget(attachment_id)
406 }
407 }
408
409 #[async_trait::async_trait]
410 impl crate::RuntimePersistence for RecordingRuntimePersistence {
411 async fn load_session(
412 &self,
413 scope: crate::SessionReadScope,
414 ) -> Result<Option<crate::PersistedSessionRead>, crate::StoreError> {
415 crate::RuntimePersistence::load_session(&self.inner, scope).await
416 }
417
418 async fn load_node(
419 &self,
420 node_id: &str,
421 ) -> Result<Option<crate::SessionNodeRecord>, crate::StoreError> {
422 crate::RuntimePersistence::load_node(&self.inner, node_id).await
423 }
424
425 async fn commit_runtime_state(
426 &self,
427 commit: crate::RuntimeCommit,
428 ) -> Result<crate::RuntimeCommitResult, crate::StoreError> {
429 crate::RuntimePersistence::commit_runtime_state(&self.inner, commit).await
430 }
431
432 async fn try_claim_session_execution_lease(
433 &self,
434 session_id: &str,
435 owner: &crate::LeaseOwnerIdentity,
436 lease_ttl_ms: u64,
437 ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
438 crate::RuntimePersistence::try_claim_session_execution_lease(
439 &self.inner,
440 session_id,
441 owner,
442 lease_ttl_ms,
443 )
444 .await
445 }
446
447 async fn reclaim_session_execution_lease(
448 &self,
449 session_id: &str,
450 owner: &crate::LeaseOwnerIdentity,
451 observed_holder: &crate::SessionExecutionLeaseFence,
452 lease_ttl_ms: u64,
453 ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
454 crate::RuntimePersistence::reclaim_session_execution_lease(
455 &self.inner,
456 session_id,
457 owner,
458 observed_holder,
459 lease_ttl_ms,
460 )
461 .await
462 }
463
464 async fn renew_session_execution_lease(
465 &self,
466 fence: &crate::SessionExecutionLeaseFence,
467 lease_ttl_ms: u64,
468 ) -> Result<crate::SessionExecutionLease, crate::StoreError> {
469 crate::RuntimePersistence::renew_session_execution_lease(
470 &self.inner,
471 fence,
472 lease_ttl_ms,
473 )
474 .await
475 }
476
477 async fn release_session_execution_lease(
478 &self,
479 completion: &crate::SessionExecutionLeaseCompletion,
480 ) -> Result<(), crate::StoreError> {
481 crate::RuntimePersistence::release_session_execution_lease(&self.inner, completion)
482 .await
483 }
484
485 async fn save_session_meta(
486 &self,
487 meta: crate::SessionMeta,
488 ) -> Result<(), crate::StoreError> {
489 crate::RuntimePersistence::save_session_meta(&self.inner, meta).await
490 }
491
492 async fn load_session_meta(&self) -> Result<Option<crate::SessionMeta>, crate::StoreError> {
493 crate::RuntimePersistence::load_session_meta(&self.inner).await
494 }
495
496 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), crate::StoreError> {
497 crate::RuntimePersistence::tombstone_nodes(&self.inner, ids).await
498 }
499
500 async fn vacuum(&self) -> Result<crate::VacuumReport, crate::StoreError> {
501 crate::RuntimePersistence::vacuum(&self.inner).await
502 }
503
504 async fn gc_unreachable(&self) -> Result<crate::GcReport, crate::StoreError> {
505 crate::RuntimePersistence::gc_unreachable(&self.inner).await
506 }
507 }
508
509 fn meta() -> AttachmentCreateMeta {
510 AttachmentCreateMeta::new(
511 MediaType::Image(ImageMediaType::Png),
512 Some(1),
513 Some(1),
514 Some("pixel".to_string()),
515 )
516 }
517
518 fn system_epoch_ms_for_test() -> u64 {
519 SystemTime::now()
520 .duration_since(UNIX_EPOCH)
521 .expect("system clock must be after Unix epoch")
522 .as_millis() as u64
523 }
524
525 #[tokio::test]
526 async fn memory_store_dedupes_by_bytes() {
527 let store = InMemoryAttachmentStore::new();
528 let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
529 let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
530 assert_eq!(a.id, b.id);
531 assert_eq!(a.byte_len, 3);
532 assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
533 }
534
535 #[tokio::test]
536 async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
537 let store = InMemoryAttachmentStore::new();
538 let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
539
540 assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
541 assert_eq!(reference.byte_len, 4);
542 }
543
544 #[tokio::test]
545 async fn session_scoped_attachment_store_satisfies_conformance() {
546 crate::testing::conformance::attachment_store(
547 || {
548 let manifest: Arc<dyn AttachmentManifest> = Arc::new(RecordingManifest::default());
549 Arc::new(SessionScopedAttachmentStore::new(
550 Arc::new(InMemoryAttachmentStore::new()),
551 manifest,
552 "session-scoped-conformance",
553 )) as Arc<dyn AttachmentStore>
554 },
555 AttachmentStorePersistence::Ephemeral,
556 )
557 .await;
558 }
559
560 #[tokio::test]
561 async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
562 let manifest = Arc::new(RecordingManifest::default());
563 let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
564 let store = SessionScopedAttachmentStore::new(
565 Arc::new(InMemoryAttachmentStore::new()),
566 manifest_for_store,
567 "session-1",
568 );
569
570 let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
571
572 assert_eq!(
573 manifest.intents.lock().expect("lock intents")[0].attachment_id,
574 reference.id
575 );
576 assert_eq!(
577 store.pending_manifest_commit_ids(),
578 vec![reference.id.clone()]
579 );
580
581 store.mark_manifest_committed(&[AttachmentId::new("other")]);
582 assert_eq!(
583 store.pending_manifest_commit_ids(),
584 vec![reference.id.clone()]
585 );
586
587 store.mark_manifest_committed(std::slice::from_ref(&reference.id));
588 assert!(store.pending_manifest_commit_ids().is_empty());
589 }
590
591 #[tokio::test]
592 async fn session_scoped_store_records_intent_timestamp_from_system_clock() {
593 let manifest = Arc::new(RecordingManifest::default());
594 let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
595 let store = SessionScopedAttachmentStore::new(
596 Arc::new(InMemoryAttachmentStore::new()),
597 manifest_for_store,
598 "session-clock",
599 );
600
601 let before_put_epoch_ms = system_epoch_ms_for_test();
602 let reference = store.put(vec![11, 12, 13], meta()).await.expect("put");
603 let after_put_epoch_ms = system_epoch_ms_for_test();
604
605 let intents = manifest.intents.lock().expect("lock intents");
606 assert_eq!(intents.len(), 1);
607 let intent = &intents[0];
608 assert_eq!(intent.attachment_id, reference.id);
609 assert!(
610 intent.intent_at_epoch_ms > 1_000_000_000_000,
611 "intent timestamp should be a real epoch millis value, got {}",
612 intent.intent_at_epoch_ms
613 );
614 assert!(
615 intent.intent_at_epoch_ms >= before_put_epoch_ms.saturating_sub(1000),
616 "intent timestamp {} should be close to or after put start {}",
617 intent.intent_at_epoch_ms,
618 before_put_epoch_ms
619 );
620 assert!(
621 intent.intent_at_epoch_ms <= after_put_epoch_ms.saturating_add(1000),
622 "intent timestamp {} should be close to or before put finish {}",
623 intent.intent_at_epoch_ms,
624 after_put_epoch_ms
625 );
626 }
627
628 #[test]
629 fn persistence_manifest_adapter_forwards_to_wrapped_runtime_persistence() {
630 let runtime = Arc::new(RecordingRuntimePersistence::default());
631 let persistence: Arc<dyn crate::RuntimePersistence> = runtime.clone();
632 let adapter = PersistenceManifestAdapter(persistence);
633 let attachment_id = AttachmentId::new("adapter-forwarding");
634 let intent = AttachmentIntent {
635 attachment_id: attachment_id.clone(),
636 session_id: "adapter-session".to_string(),
637 canonical_uri: "sha256:adapter-forwarding".to_string(),
638 intent_at_epoch_ms: 10,
639 };
640
641 adapter.record_intent(intent).expect("record intent");
642 let uncommitted = adapter
643 .list_uncommitted(10)
644 .expect("list uncommitted through adapter");
645 assert_eq!(uncommitted.len(), 1);
646 assert_eq!(uncommitted[0].attachment_id, attachment_id);
647 assert_eq!(uncommitted[0].session_id, "adapter-session");
648 assert_eq!(uncommitted[0].canonical_uri, "sha256:adapter-forwarding");
649 assert_eq!(uncommitted[0].intent_at_epoch_ms, 10);
650 assert!(uncommitted[0].committed_at_epoch_ms.is_none());
651
652 adapter
653 .commit_refs("adapter-session", std::slice::from_ref(&attachment_id))
654 .expect("commit refs through adapter");
655 assert!(
656 adapter
657 .list_uncommitted(10)
658 .expect("list after commit through adapter")
659 .is_empty()
660 );
661 assert_eq!(
662 runtime
663 .manifest
664 .committed
665 .lock()
666 .expect("lock committed attachments")
667 .as_slice(),
668 &[("adapter-session".to_string(), attachment_id)]
669 );
670 }
671}