Skip to main content

lash_core/
attachments.rs

1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
10use sha2::{Digest, Sha256};
11
12use crate::store::{AttachmentIntent, AttachmentManifest};
13
14#[derive(Debug, thiserror::Error)]
15pub enum AttachmentStoreError {
16    #[error("attachment `{0}` was not found")]
17    NotFound(AttachmentId),
18    #[error("attachment store I/O failed at {path}: {source}")]
19    Io {
20        path: PathBuf,
21        #[source]
22        source: std::io::Error,
23    },
24    #[error("attachment store metadata is unavailable for `{0}`")]
25    MissingMeta(AttachmentId),
26    #[error("attachment store metadata decode failed for `{id}`: {source}")]
27    MetadataDecode {
28        id: AttachmentId,
29        #[source]
30        source: serde_json::Error,
31    },
32    #[error("attachment manifest write failed: {0}")]
33    ManifestRecordFailed(String),
34    #[error("attachment store backend failed: {0}")]
35    Backend(String),
36}
37
38#[derive(Clone, Debug)]
39pub struct StoredAttachment {
40    pub meta: AttachmentMeta,
41    pub bytes: Vec<u8>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum AttachmentStorePersistence {
46    Ephemeral,
47    Durable,
48}
49
50impl AttachmentStorePersistence {
51    /// Map the attachment-store persistence signal onto the shared
52    /// [`DurabilityTier`](crate::DurabilityTier): `Ephemeral -> Inline`,
53    /// `Durable -> Durable`. Lets consistency checks read every wired store's
54    /// tier uniformly without a separate `durability_tier()` method here.
55    pub fn durability_tier(self) -> crate::DurabilityTier {
56        match self {
57            Self::Ephemeral => crate::DurabilityTier::Inline,
58            Self::Durable => crate::DurabilityTier::Durable,
59        }
60    }
61}
62
63#[async_trait::async_trait]
64pub trait AttachmentStore: Send + Sync {
65    fn persistence(&self) -> AttachmentStorePersistence {
66        AttachmentStorePersistence::Ephemeral
67    }
68
69    /// Attachment refs written by this store that still need their
70    /// write-ahead manifest rows stamped by the next runtime commit.
71    ///
72    /// Plain stores return an empty set. [`SessionScopedAttachmentStore`]
73    /// overrides this so attachments created through downstream tools,
74    /// process execution, and other runtime services are committed by the
75    /// same final turn transaction that makes them reachable from session
76    /// state.
77    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
78        Vec::new()
79    }
80
81    /// Clear attachment refs that were stamped committed by a successful
82    /// runtime commit.
83    fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
84
85    async fn put(
86        &self,
87        bytes: Vec<u8>,
88        meta: AttachmentCreateMeta,
89    ) -> Result<AttachmentRef, AttachmentStoreError>;
90
91    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
92}
93
94#[derive(Default)]
95pub struct InMemoryAttachmentStore {
96    attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
97}
98
99impl InMemoryAttachmentStore {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105#[async_trait::async_trait]
106impl AttachmentStore for InMemoryAttachmentStore {
107    async fn put(
108        &self,
109        bytes: Vec<u8>,
110        meta: AttachmentCreateMeta,
111    ) -> Result<AttachmentRef, AttachmentStoreError> {
112        let meta = stored_meta(&bytes, meta);
113        let reference = meta.as_ref();
114        let stored = StoredAttachment { meta, bytes };
115        self.attachments
116            .lock()
117            .expect("attachment store lock")
118            .insert(reference.id.clone(), stored);
119        Ok(reference)
120    }
121
122    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
123        self.attachments
124            .lock()
125            .expect("attachment store lock")
126            .get(id)
127            .cloned()
128            .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
129    }
130}
131
132pub fn content_id(bytes: &[u8]) -> AttachmentId {
133    AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
134}
135
136/// Session-scoped wrapper that records a write-ahead intent in
137/// [`AttachmentManifest`] before delegating each `put` to the backing
138/// [`AttachmentStore`]. The intent row durably captures "this session
139/// is about to write these bytes," so if the process dies between
140/// `put` and the next committed runtime state, a later GC sweep can
141/// reconcile the orphaned bytes by walking
142/// [`AttachmentManifest::list_uncommitted`].
143///
144/// Constructed by the runtime when both a durable [`AttachmentStore`]
145/// and a [`RuntimePersistence`](crate::RuntimePersistence) backend
146/// (which also implements [`AttachmentManifest`]) are wired up. Other
147/// callers — tests, hosts using only ephemeral storage — keep the
148/// plain inner store and skip the manifest entirely.
149pub struct SessionScopedAttachmentStore {
150    inner: Arc<dyn AttachmentStore>,
151    manifest: Arc<dyn AttachmentManifest>,
152    session_id: String,
153    pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
154}
155
156impl SessionScopedAttachmentStore {
157    pub fn new(
158        inner: Arc<dyn AttachmentStore>,
159        manifest: Arc<dyn AttachmentManifest>,
160        session_id: impl Into<String>,
161    ) -> Self {
162        Self {
163            inner,
164            manifest,
165            session_id: session_id.into(),
166            pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
167        }
168    }
169
170    pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
171        &self.inner
172    }
173
174    pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
175        &self.manifest
176    }
177}
178
179#[async_trait::async_trait]
180impl AttachmentStore for SessionScopedAttachmentStore {
181    fn persistence(&self) -> AttachmentStorePersistence {
182        self.inner.persistence()
183    }
184
185    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
186        self.pending_manifest_commit_ids
187            .lock()
188            .expect("attachment manifest commit tracker lock")
189            .iter()
190            .cloned()
191            .collect()
192    }
193
194    fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
195        if ids.is_empty() {
196            return;
197        }
198        let mut pending = self
199            .pending_manifest_commit_ids
200            .lock()
201            .expect("attachment manifest commit tracker lock");
202        for id in ids {
203            pending.remove(id);
204        }
205    }
206
207    async fn put(
208        &self,
209        bytes: Vec<u8>,
210        meta: AttachmentCreateMeta,
211    ) -> Result<AttachmentRef, AttachmentStoreError> {
212        let attachment_id = content_id(&bytes);
213        let intent = AttachmentIntent {
214            attachment_id: attachment_id.clone(),
215            session_id: self.session_id.clone(),
216            canonical_uri: format!("sha256:{attachment_id}"),
217            intent_at_epoch_ms: now_epoch_ms(),
218        };
219        // Record intent first. If this fails the bytes never land,
220        // matching the write-ahead guarantee.
221        self.manifest.record_intent(intent).map_err(|err| {
222            AttachmentStoreError::ManifestRecordFailed(format!(
223                "failed to record attachment intent for `{attachment_id}`: {err}"
224            ))
225        })?;
226        let reference = self.inner.put(bytes, meta).await?;
227        if reference.id != attachment_id {
228            return Err(AttachmentStoreError::Backend(format!(
229                "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
230                reference.id
231            )));
232        }
233        self.pending_manifest_commit_ids
234            .lock()
235            .expect("attachment manifest commit tracker lock")
236            .insert(reference.id.clone());
237        Ok(reference)
238    }
239
240    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
241        self.inner.get(id).await
242    }
243}
244
245fn now_epoch_ms() -> u64 {
246    <crate::SystemClock as crate::Clock>::timestamp_ms(&crate::SystemClock)
247}
248
249/// Adapter that exposes the [`AttachmentManifest`] supertrait of an
250/// `Arc<dyn RuntimePersistence>` as an `Arc<dyn AttachmentManifest>`.
251/// Rust's trait-object upcasting does not yet allow direct coercion
252/// between the two; this thin forwarder is the bridge.
253pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
254
255impl AttachmentManifest for PersistenceManifestAdapter {
256    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
257        AttachmentManifest::record_intent(&*self.0, intent)
258    }
259
260    fn commit_refs(
261        &self,
262        session_id: &str,
263        attachment_ids: &[AttachmentId],
264    ) -> Result<(), crate::StoreError> {
265        AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
266    }
267
268    fn list_uncommitted(
269        &self,
270        older_than_epoch_ms: u64,
271    ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
272        AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
273    }
274
275    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
276        AttachmentManifest::forget(&*self.0, attachment_id)
277    }
278}
279
280fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
281    AttachmentMeta::new(
282        content_id(bytes),
283        meta.media_type,
284        bytes.len() as u64,
285        meta.width,
286        meta.height,
287        meta.label,
288    )
289}
290
291pub async fn resolve_llm_request_attachments(
292    mut request: crate::llm::types::LlmRequest,
293    store: &dyn AttachmentStore,
294) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
295    for attachment in &mut request.attachments {
296        let Some(reference) = attachment.reference.as_ref() else {
297            continue;
298        };
299        if !attachment.data.is_empty() {
300            continue;
301        }
302        let stored = store.get(&reference.id).await?;
303        attachment.mime = stored.meta.media_type.canonical_mime().to_string();
304        attachment.data = stored.bytes;
305    }
306    Ok(request)
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use lash_sansio::{ImageMediaType, MediaType};
313
314    #[derive(Default)]
315    struct RecordingManifest {
316        intents: Mutex<Vec<AttachmentIntent>>,
317    }
318
319    impl AttachmentManifest for RecordingManifest {
320        fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
321            self.intents.lock().expect("lock intents").push(intent);
322            Ok(())
323        }
324
325        fn commit_refs(
326            &self,
327            _session_id: &str,
328            _attachment_ids: &[AttachmentId],
329        ) -> Result<(), crate::StoreError> {
330            Ok(())
331        }
332
333        fn list_uncommitted(
334            &self,
335            _older_than_epoch_ms: u64,
336        ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
337            Ok(Vec::new())
338        }
339
340        fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
341            Ok(())
342        }
343    }
344
345    fn meta() -> AttachmentCreateMeta {
346        AttachmentCreateMeta::new(
347            MediaType::Image(ImageMediaType::Png),
348            Some(1),
349            Some(1),
350            Some("pixel".to_string()),
351        )
352    }
353
354    #[tokio::test]
355    async fn memory_store_dedupes_by_bytes() {
356        let store = InMemoryAttachmentStore::new();
357        let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
358        let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
359        assert_eq!(a.id, b.id);
360        assert_eq!(a.byte_len, 3);
361        assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
362    }
363
364    #[tokio::test]
365    async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
366        let store = InMemoryAttachmentStore::new();
367        let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
368
369        assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
370        assert_eq!(reference.byte_len, 4);
371    }
372
373    #[tokio::test]
374    async fn session_scoped_attachment_store_satisfies_conformance() {
375        crate::testing::conformance::attachment_store(
376            || {
377                let manifest: Arc<dyn AttachmentManifest> = Arc::new(RecordingManifest::default());
378                Arc::new(SessionScopedAttachmentStore::new(
379                    Arc::new(InMemoryAttachmentStore::new()),
380                    manifest,
381                    "session-scoped-conformance",
382                )) as Arc<dyn AttachmentStore>
383            },
384            AttachmentStorePersistence::Ephemeral,
385        )
386        .await;
387    }
388
389    #[tokio::test]
390    async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
391        let manifest = Arc::new(RecordingManifest::default());
392        let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
393        let store = SessionScopedAttachmentStore::new(
394            Arc::new(InMemoryAttachmentStore::new()),
395            manifest_for_store,
396            "session-1",
397        );
398
399        let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
400
401        assert_eq!(
402            manifest.intents.lock().expect("lock intents")[0].attachment_id,
403            reference.id
404        );
405        assert_eq!(
406            store.pending_manifest_commit_ids(),
407            vec![reference.id.clone()]
408        );
409
410        store.mark_manifest_committed(&[AttachmentId::new("other")]);
411        assert_eq!(
412            store.pending_manifest_commit_ids(),
413            vec![reference.id.clone()]
414        );
415
416        store.mark_manifest_committed(std::slice::from_ref(&reference.id));
417        assert!(store.pending_manifest_commit_ids().is_empty());
418    }
419}