Skip to main content

lash_core/
attachments.rs

1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
10use sha2::{Digest, Sha256};
11
12use crate::store::{AttachmentIntent, AttachmentManifest};
13
14#[derive(Debug, thiserror::Error)]
15pub enum AttachmentStoreError {
16    #[error("attachment `{0}` was not found")]
17    NotFound(AttachmentId),
18    #[error("attachment store I/O failed at {path}: {source}")]
19    Io {
20        path: PathBuf,
21        #[source]
22        source: std::io::Error,
23    },
24    #[error("attachment store metadata is unavailable for `{0}`")]
25    MissingMeta(AttachmentId),
26    #[error("attachment store metadata decode failed for `{id}`: {source}")]
27    MetadataDecode {
28        id: AttachmentId,
29        #[source]
30        source: serde_json::Error,
31    },
32    #[error("attachment manifest write failed: {0}")]
33    ManifestRecordFailed(String),
34    #[error("attachment store backend failed: {0}")]
35    Backend(String),
36}
37
38#[derive(Clone, Debug)]
39pub struct StoredAttachment {
40    pub meta: AttachmentMeta,
41    pub bytes: Vec<u8>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum AttachmentStorePersistence {
46    Ephemeral,
47    Durable,
48}
49
50impl AttachmentStorePersistence {
51    /// Map the attachment-store persistence signal onto the shared
52    /// [`DurabilityTier`](crate::DurabilityTier): `Ephemeral -> Inline`,
53    /// `Durable -> Durable`. Lets consistency checks read every wired store's
54    /// tier uniformly without a separate `durability_tier()` method here.
55    pub fn durability_tier(self) -> crate::DurabilityTier {
56        match self {
57            Self::Ephemeral => crate::DurabilityTier::Inline,
58            Self::Durable => crate::DurabilityTier::Durable,
59        }
60    }
61}
62
63#[async_trait::async_trait]
64pub trait AttachmentStore: Send + Sync {
65    fn persistence(&self) -> AttachmentStorePersistence {
66        AttachmentStorePersistence::Ephemeral
67    }
68
69    /// Attachment refs written by this store that still need their
70    /// write-ahead manifest rows stamped by the next runtime commit.
71    ///
72    /// Plain stores return an empty set. [`SessionScopedAttachmentStore`]
73    /// overrides this so attachments created through downstream tools,
74    /// process execution, and other runtime services are committed by the
75    /// same final turn transaction that makes them reachable from session
76    /// state.
77    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
78        Vec::new()
79    }
80
81    /// Clear attachment refs that were stamped committed by a successful
82    /// runtime commit.
83    fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
84
85    async fn put(
86        &self,
87        bytes: Vec<u8>,
88        meta: AttachmentCreateMeta,
89    ) -> Result<AttachmentRef, AttachmentStoreError>;
90
91    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
92}
93
94#[derive(Default)]
95pub struct InMemoryAttachmentStore {
96    attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
97}
98
99impl InMemoryAttachmentStore {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105#[async_trait::async_trait]
106impl AttachmentStore for InMemoryAttachmentStore {
107    async fn put(
108        &self,
109        bytes: Vec<u8>,
110        meta: AttachmentCreateMeta,
111    ) -> Result<AttachmentRef, AttachmentStoreError> {
112        let meta = stored_meta(&bytes, meta);
113        let reference = meta.as_ref();
114        let stored = StoredAttachment { meta, bytes };
115        self.attachments
116            .lock()
117            .expect("attachment store lock")
118            .insert(reference.id.clone(), stored);
119        Ok(reference)
120    }
121
122    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
123        self.attachments
124            .lock()
125            .expect("attachment store lock")
126            .get(id)
127            .cloned()
128            .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
129    }
130}
131
132pub fn content_id(bytes: &[u8]) -> AttachmentId {
133    AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
134}
135
136/// Session-scoped wrapper that records a write-ahead intent in
137/// [`AttachmentManifest`] before delegating each `put` to the backing
138/// [`AttachmentStore`]. The intent row durably captures "this session
139/// is about to write these bytes," so if the process dies between
140/// `put` and the next committed runtime state, a later GC sweep can
141/// reconcile the orphaned bytes by walking
142/// [`AttachmentManifest::list_uncommitted`].
143///
144/// Constructed by the runtime when both a durable [`AttachmentStore`]
145/// and a [`RuntimePersistence`](crate::RuntimePersistence) backend
146/// (which also implements [`AttachmentManifest`]) are wired up. Other
147/// callers — tests, hosts using only ephemeral storage — keep the
148/// plain inner store and skip the manifest entirely.
149pub struct SessionScopedAttachmentStore {
150    inner: Arc<dyn AttachmentStore>,
151    manifest: Arc<dyn AttachmentManifest>,
152    session_id: String,
153    pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
154}
155
156impl SessionScopedAttachmentStore {
157    pub fn new(
158        inner: Arc<dyn AttachmentStore>,
159        manifest: Arc<dyn AttachmentManifest>,
160        session_id: impl Into<String>,
161    ) -> Self {
162        Self {
163            inner,
164            manifest,
165            session_id: session_id.into(),
166            pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
167        }
168    }
169
170    pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
171        &self.inner
172    }
173
174    pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
175        &self.manifest
176    }
177}
178
179#[async_trait::async_trait]
180impl AttachmentStore for SessionScopedAttachmentStore {
181    fn persistence(&self) -> AttachmentStorePersistence {
182        self.inner.persistence()
183    }
184
185    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
186        self.pending_manifest_commit_ids
187            .lock()
188            .expect("attachment manifest commit tracker lock")
189            .iter()
190            .cloned()
191            .collect()
192    }
193
194    fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
195        if ids.is_empty() {
196            return;
197        }
198        let mut pending = self
199            .pending_manifest_commit_ids
200            .lock()
201            .expect("attachment manifest commit tracker lock");
202        for id in ids {
203            pending.remove(id);
204        }
205    }
206
207    async fn put(
208        &self,
209        bytes: Vec<u8>,
210        meta: AttachmentCreateMeta,
211    ) -> Result<AttachmentRef, AttachmentStoreError> {
212        let attachment_id = content_id(&bytes);
213        let intent = AttachmentIntent {
214            attachment_id: attachment_id.clone(),
215            session_id: self.session_id.clone(),
216            canonical_uri: format!("sha256:{attachment_id}"),
217            intent_at_epoch_ms: now_epoch_ms(),
218        };
219        // Record intent first. If this fails the bytes never land,
220        // matching the write-ahead guarantee.
221        self.manifest.record_intent(intent).map_err(|err| {
222            AttachmentStoreError::ManifestRecordFailed(format!(
223                "failed to record attachment intent for `{attachment_id}`: {err}"
224            ))
225        })?;
226        let reference = self.inner.put(bytes, meta).await?;
227        if reference.id != attachment_id {
228            return Err(AttachmentStoreError::Backend(format!(
229                "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
230                reference.id
231            )));
232        }
233        self.pending_manifest_commit_ids
234            .lock()
235            .expect("attachment manifest commit tracker lock")
236            .insert(reference.id.clone());
237        Ok(reference)
238    }
239
240    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
241        self.inner.get(id).await
242    }
243}
244
245fn now_epoch_ms() -> u64 {
246    <crate::SystemClock as crate::Clock>::timestamp_ms(&crate::SystemClock)
247}
248
249/// Adapter that exposes the [`AttachmentManifest`] supertrait of an
250/// `Arc<dyn RuntimePersistence>` as an `Arc<dyn AttachmentManifest>`.
251/// Rust's trait-object upcasting does not yet allow direct coercion
252/// between the two; this thin forwarder is the bridge.
253pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
254
255impl AttachmentManifest for PersistenceManifestAdapter {
256    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
257        AttachmentManifest::record_intent(&*self.0, intent)
258    }
259
260    fn commit_refs(
261        &self,
262        session_id: &str,
263        attachment_ids: &[AttachmentId],
264    ) -> Result<(), crate::StoreError> {
265        AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
266    }
267
268    fn list_uncommitted(
269        &self,
270        older_than_epoch_ms: u64,
271    ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
272        AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
273    }
274
275    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
276        AttachmentManifest::forget(&*self.0, attachment_id)
277    }
278}
279
280fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
281    AttachmentMeta::new(
282        content_id(bytes),
283        meta.media_type,
284        bytes.len() as u64,
285        meta.width,
286        meta.height,
287        meta.label,
288    )
289}
290
291pub async fn resolve_llm_request_attachments(
292    mut request: crate::llm::types::LlmRequest,
293    store: &dyn AttachmentStore,
294) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
295    for attachment in &mut request.attachments {
296        let Some(reference) = attachment.reference.as_ref() else {
297            continue;
298        };
299        if !attachment.data.is_empty() {
300            continue;
301        }
302        let stored = store.get(&reference.id).await?;
303        attachment.mime = stored.meta.media_type.canonical_mime().to_string();
304        attachment.data = stored.bytes;
305    }
306    Ok(request)
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use lash_sansio::{ImageMediaType, MediaType};
313    use std::collections::HashSet;
314    use std::time::{SystemTime, UNIX_EPOCH};
315
316    #[derive(Default)]
317    struct RecordingManifest {
318        intents: Mutex<Vec<AttachmentIntent>>,
319        committed: Mutex<Vec<(String, AttachmentId)>>,
320    }
321
322    impl AttachmentManifest for RecordingManifest {
323        fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
324            self.intents.lock().expect("lock intents").push(intent);
325            Ok(())
326        }
327
328        fn commit_refs(
329            &self,
330            session_id: &str,
331            attachment_ids: &[AttachmentId],
332        ) -> Result<(), crate::StoreError> {
333            let mut committed = self.committed.lock().expect("lock committed attachments");
334            committed.extend(
335                attachment_ids
336                    .iter()
337                    .cloned()
338                    .map(|attachment_id| (session_id.to_string(), attachment_id)),
339            );
340            Ok(())
341        }
342
343        fn list_uncommitted(
344            &self,
345            older_than_epoch_ms: u64,
346        ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
347            let committed = self
348                .committed
349                .lock()
350                .expect("lock committed attachments")
351                .iter()
352                .cloned()
353                .collect::<HashSet<_>>();
354            Ok(self
355                .intents
356                .lock()
357                .expect("lock intents")
358                .iter()
359                .filter(|intent| intent.intent_at_epoch_ms <= older_than_epoch_ms)
360                .filter(|intent| {
361                    !committed.contains(&(intent.session_id.clone(), intent.attachment_id.clone()))
362                })
363                .map(|intent| crate::AttachmentManifestEntry {
364                    attachment_id: intent.attachment_id.clone(),
365                    session_id: intent.session_id.clone(),
366                    canonical_uri: intent.canonical_uri.clone(),
367                    intent_at_epoch_ms: intent.intent_at_epoch_ms,
368                    committed_at_epoch_ms: None,
369                })
370                .collect())
371        }
372
373        fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
374            Ok(())
375        }
376    }
377
378    #[derive(Default)]
379    struct RecordingRuntimePersistence {
380        inner: crate::InMemorySessionStore,
381        manifest: RecordingManifest,
382    }
383
384    impl AttachmentManifest for RecordingRuntimePersistence {
385        fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
386            self.manifest.record_intent(intent)
387        }
388
389        fn commit_refs(
390            &self,
391            session_id: &str,
392            attachment_ids: &[AttachmentId],
393        ) -> Result<(), crate::StoreError> {
394            self.manifest.commit_refs(session_id, attachment_ids)
395        }
396
397        fn list_uncommitted(
398            &self,
399            older_than_epoch_ms: u64,
400        ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
401            self.manifest.list_uncommitted(older_than_epoch_ms)
402        }
403
404        fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
405            self.manifest.forget(attachment_id)
406        }
407    }
408
409    #[async_trait::async_trait]
410    impl crate::RuntimePersistence for RecordingRuntimePersistence {
411        async fn load_session(
412            &self,
413            scope: crate::SessionReadScope,
414        ) -> Result<Option<crate::PersistedSessionRead>, crate::StoreError> {
415            crate::RuntimePersistence::load_session(&self.inner, scope).await
416        }
417
418        async fn load_node(
419            &self,
420            node_id: &str,
421        ) -> Result<Option<crate::SessionNodeRecord>, crate::StoreError> {
422            crate::RuntimePersistence::load_node(&self.inner, node_id).await
423        }
424
425        async fn commit_runtime_state(
426            &self,
427            commit: crate::RuntimeCommit,
428        ) -> Result<crate::RuntimeCommitResult, crate::StoreError> {
429            crate::RuntimePersistence::commit_runtime_state(&self.inner, commit).await
430        }
431
432        async fn try_claim_session_execution_lease(
433            &self,
434            session_id: &str,
435            owner: &crate::LeaseOwnerIdentity,
436            lease_ttl_ms: u64,
437        ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
438            crate::RuntimePersistence::try_claim_session_execution_lease(
439                &self.inner,
440                session_id,
441                owner,
442                lease_ttl_ms,
443            )
444            .await
445        }
446
447        async fn reclaim_session_execution_lease(
448            &self,
449            session_id: &str,
450            owner: &crate::LeaseOwnerIdentity,
451            observed_holder: &crate::SessionExecutionLeaseFence,
452            lease_ttl_ms: u64,
453        ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
454            crate::RuntimePersistence::reclaim_session_execution_lease(
455                &self.inner,
456                session_id,
457                owner,
458                observed_holder,
459                lease_ttl_ms,
460            )
461            .await
462        }
463
464        async fn renew_session_execution_lease(
465            &self,
466            fence: &crate::SessionExecutionLeaseFence,
467            lease_ttl_ms: u64,
468        ) -> Result<crate::SessionExecutionLease, crate::StoreError> {
469            crate::RuntimePersistence::renew_session_execution_lease(
470                &self.inner,
471                fence,
472                lease_ttl_ms,
473            )
474            .await
475        }
476
477        async fn release_session_execution_lease(
478            &self,
479            completion: &crate::SessionExecutionLeaseCompletion,
480        ) -> Result<(), crate::StoreError> {
481            crate::RuntimePersistence::release_session_execution_lease(&self.inner, completion)
482                .await
483        }
484
485        async fn save_session_meta(
486            &self,
487            meta: crate::SessionMeta,
488        ) -> Result<(), crate::StoreError> {
489            crate::RuntimePersistence::save_session_meta(&self.inner, meta).await
490        }
491
492        async fn load_session_meta(&self) -> Result<Option<crate::SessionMeta>, crate::StoreError> {
493            crate::RuntimePersistence::load_session_meta(&self.inner).await
494        }
495
496        async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), crate::StoreError> {
497            crate::RuntimePersistence::tombstone_nodes(&self.inner, ids).await
498        }
499
500        async fn vacuum(&self) -> Result<crate::VacuumReport, crate::StoreError> {
501            crate::RuntimePersistence::vacuum(&self.inner).await
502        }
503
504        async fn gc_unreachable(&self) -> Result<crate::GcReport, crate::StoreError> {
505            crate::RuntimePersistence::gc_unreachable(&self.inner).await
506        }
507    }
508
509    fn meta() -> AttachmentCreateMeta {
510        AttachmentCreateMeta::new(
511            MediaType::Image(ImageMediaType::Png),
512            Some(1),
513            Some(1),
514            Some("pixel".to_string()),
515        )
516    }
517
518    fn system_epoch_ms_for_test() -> u64 {
519        SystemTime::now()
520            .duration_since(UNIX_EPOCH)
521            .expect("system clock must be after Unix epoch")
522            .as_millis() as u64
523    }
524
525    #[tokio::test]
526    async fn memory_store_dedupes_by_bytes() {
527        let store = InMemoryAttachmentStore::new();
528        let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
529        let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
530        assert_eq!(a.id, b.id);
531        assert_eq!(a.byte_len, 3);
532        assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
533    }
534
535    #[tokio::test]
536    async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
537        let store = InMemoryAttachmentStore::new();
538        let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
539
540        assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
541        assert_eq!(reference.byte_len, 4);
542    }
543
544    #[tokio::test]
545    async fn session_scoped_attachment_store_satisfies_conformance() {
546        crate::testing::conformance::attachment_store(
547            || {
548                let manifest: Arc<dyn AttachmentManifest> = Arc::new(RecordingManifest::default());
549                Arc::new(SessionScopedAttachmentStore::new(
550                    Arc::new(InMemoryAttachmentStore::new()),
551                    manifest,
552                    "session-scoped-conformance",
553                )) as Arc<dyn AttachmentStore>
554            },
555            AttachmentStorePersistence::Ephemeral,
556        )
557        .await;
558    }
559
560    #[tokio::test]
561    async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
562        let manifest = Arc::new(RecordingManifest::default());
563        let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
564        let store = SessionScopedAttachmentStore::new(
565            Arc::new(InMemoryAttachmentStore::new()),
566            manifest_for_store,
567            "session-1",
568        );
569
570        let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
571
572        assert_eq!(
573            manifest.intents.lock().expect("lock intents")[0].attachment_id,
574            reference.id
575        );
576        assert_eq!(
577            store.pending_manifest_commit_ids(),
578            vec![reference.id.clone()]
579        );
580
581        store.mark_manifest_committed(&[AttachmentId::new("other")]);
582        assert_eq!(
583            store.pending_manifest_commit_ids(),
584            vec![reference.id.clone()]
585        );
586
587        store.mark_manifest_committed(std::slice::from_ref(&reference.id));
588        assert!(store.pending_manifest_commit_ids().is_empty());
589    }
590
591    #[tokio::test]
592    async fn session_scoped_store_records_intent_timestamp_from_system_clock() {
593        let manifest = Arc::new(RecordingManifest::default());
594        let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
595        let store = SessionScopedAttachmentStore::new(
596            Arc::new(InMemoryAttachmentStore::new()),
597            manifest_for_store,
598            "session-clock",
599        );
600
601        let before_put_epoch_ms = system_epoch_ms_for_test();
602        let reference = store.put(vec![11, 12, 13], meta()).await.expect("put");
603        let after_put_epoch_ms = system_epoch_ms_for_test();
604
605        let intents = manifest.intents.lock().expect("lock intents");
606        assert_eq!(intents.len(), 1);
607        let intent = &intents[0];
608        assert_eq!(intent.attachment_id, reference.id);
609        assert!(
610            intent.intent_at_epoch_ms > 1_000_000_000_000,
611            "intent timestamp should be a real epoch millis value, got {}",
612            intent.intent_at_epoch_ms
613        );
614        assert!(
615            intent.intent_at_epoch_ms >= before_put_epoch_ms.saturating_sub(1000),
616            "intent timestamp {} should be close to or after put start {}",
617            intent.intent_at_epoch_ms,
618            before_put_epoch_ms
619        );
620        assert!(
621            intent.intent_at_epoch_ms <= after_put_epoch_ms.saturating_add(1000),
622            "intent timestamp {} should be close to or before put finish {}",
623            intent.intent_at_epoch_ms,
624            after_put_epoch_ms
625        );
626    }
627
628    #[test]
629    fn persistence_manifest_adapter_forwards_to_wrapped_runtime_persistence() {
630        let runtime = Arc::new(RecordingRuntimePersistence::default());
631        let persistence: Arc<dyn crate::RuntimePersistence> = runtime.clone();
632        let adapter = PersistenceManifestAdapter(persistence);
633        let attachment_id = AttachmentId::new("adapter-forwarding");
634        let intent = AttachmentIntent {
635            attachment_id: attachment_id.clone(),
636            session_id: "adapter-session".to_string(),
637            canonical_uri: "sha256:adapter-forwarding".to_string(),
638            intent_at_epoch_ms: 10,
639        };
640
641        adapter.record_intent(intent).expect("record intent");
642        let uncommitted = adapter
643            .list_uncommitted(10)
644            .expect("list uncommitted through adapter");
645        assert_eq!(uncommitted.len(), 1);
646        assert_eq!(uncommitted[0].attachment_id, attachment_id);
647        assert_eq!(uncommitted[0].session_id, "adapter-session");
648        assert_eq!(uncommitted[0].canonical_uri, "sha256:adapter-forwarding");
649        assert_eq!(uncommitted[0].intent_at_epoch_ms, 10);
650        assert!(uncommitted[0].committed_at_epoch_ms.is_none());
651
652        adapter
653            .commit_refs("adapter-session", std::slice::from_ref(&attachment_id))
654            .expect("commit refs through adapter");
655        assert!(
656            adapter
657                .list_uncommitted(10)
658                .expect("list after commit through adapter")
659                .is_empty()
660        );
661        assert_eq!(
662            runtime
663                .manifest
664                .committed
665                .lock()
666                .expect("lock committed attachments")
667                .as_slice(),
668            &[("adapter-session".to_string(), attachment_id)]
669        );
670    }
671}