Skip to main content

lash_core/
attachments.rs

1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
11use sha2::{Digest, Sha256};
12
13use crate::store::{AttachmentIntent, AttachmentManifest};
14
15#[derive(Debug, thiserror::Error)]
16pub enum AttachmentStoreError {
17    #[error("attachment `{0}` was not found")]
18    NotFound(AttachmentId),
19    #[error("attachment store I/O failed at {path}: {source}")]
20    Io {
21        path: PathBuf,
22        #[source]
23        source: std::io::Error,
24    },
25    #[error("attachment store metadata is unavailable for `{0}`")]
26    MissingMeta(AttachmentId),
27    #[error("attachment store metadata decode failed for `{id}`: {source}")]
28    MetadataDecode {
29        id: AttachmentId,
30        #[source]
31        source: serde_json::Error,
32    },
33    #[error("attachment manifest write failed: {0}")]
34    ManifestRecordFailed(String),
35    #[error("attachment store backend failed: {0}")]
36    Backend(String),
37}
38
39#[derive(Clone, Debug)]
40pub struct StoredAttachment {
41    pub meta: AttachmentMeta,
42    pub bytes: Vec<u8>,
43}
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46pub enum AttachmentStorePersistence {
47    Ephemeral,
48    Durable,
49}
50
51impl AttachmentStorePersistence {
52    /// Map the attachment-store persistence signal onto the shared
53    /// [`DurabilityTier`](crate::DurabilityTier): `Ephemeral -> Inline`,
54    /// `Durable -> Durable`. Lets consistency checks read every wired store's
55    /// tier uniformly without a separate `durability_tier()` method here.
56    pub fn durability_tier(self) -> crate::DurabilityTier {
57        match self {
58            Self::Ephemeral => crate::DurabilityTier::Inline,
59            Self::Durable => crate::DurabilityTier::Durable,
60        }
61    }
62}
63
64#[async_trait::async_trait]
65pub trait AttachmentStore: Send + Sync {
66    fn persistence(&self) -> AttachmentStorePersistence {
67        AttachmentStorePersistence::Ephemeral
68    }
69
70    /// Attachment refs written by this store that still need their
71    /// write-ahead manifest rows stamped by the next runtime commit.
72    ///
73    /// Plain stores return an empty set. [`SessionScopedAttachmentStore`]
74    /// overrides this so attachments created through downstream tools,
75    /// Lashlang execution, and other runtime services are committed by the
76    /// same final turn transaction that makes them reachable from session
77    /// state.
78    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
79        Vec::new()
80    }
81
82    /// Clear attachment refs that were stamped committed by a successful
83    /// runtime commit.
84    fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
85
86    async fn put(
87        &self,
88        bytes: Vec<u8>,
89        meta: AttachmentCreateMeta,
90    ) -> Result<AttachmentRef, AttachmentStoreError>;
91
92    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
93}
94
95#[derive(Default)]
96pub struct InMemoryAttachmentStore {
97    attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
98}
99
100impl InMemoryAttachmentStore {
101    pub fn new() -> Self {
102        Self::default()
103    }
104}
105
106#[async_trait::async_trait]
107impl AttachmentStore for InMemoryAttachmentStore {
108    async fn put(
109        &self,
110        bytes: Vec<u8>,
111        meta: AttachmentCreateMeta,
112    ) -> Result<AttachmentRef, AttachmentStoreError> {
113        let meta = stored_meta(&bytes, meta);
114        let reference = meta.as_ref();
115        let stored = StoredAttachment { meta, bytes };
116        self.attachments
117            .lock()
118            .expect("attachment store lock")
119            .insert(reference.id.clone(), stored);
120        Ok(reference)
121    }
122
123    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
124        self.attachments
125            .lock()
126            .expect("attachment store lock")
127            .get(id)
128            .cloned()
129            .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
130    }
131}
132
133pub fn content_id(bytes: &[u8]) -> AttachmentId {
134    AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
135}
136
137/// Session-scoped wrapper that records a write-ahead intent in
138/// [`AttachmentManifest`] before delegating each `put` to the backing
139/// [`AttachmentStore`]. The intent row durably captures "this session
140/// is about to write these bytes," so if the process dies between
141/// `put` and the next committed runtime state, a later GC sweep can
142/// reconcile the orphaned bytes by walking
143/// [`AttachmentManifest::list_uncommitted`].
144///
145/// Constructed by the runtime when both a durable [`AttachmentStore`]
146/// and a [`RuntimePersistence`](crate::RuntimePersistence) backend
147/// (which also implements [`AttachmentManifest`]) are wired up. Other
148/// callers — tests, hosts using only ephemeral storage — keep the
149/// plain inner store and skip the manifest entirely.
150pub struct SessionScopedAttachmentStore {
151    inner: Arc<dyn AttachmentStore>,
152    manifest: Arc<dyn AttachmentManifest>,
153    session_id: String,
154    pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
155}
156
157impl SessionScopedAttachmentStore {
158    pub fn new(
159        inner: Arc<dyn AttachmentStore>,
160        manifest: Arc<dyn AttachmentManifest>,
161        session_id: impl Into<String>,
162    ) -> Self {
163        Self {
164            inner,
165            manifest,
166            session_id: session_id.into(),
167            pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
168        }
169    }
170
171    pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
172        &self.inner
173    }
174
175    pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
176        &self.manifest
177    }
178}
179
180#[async_trait::async_trait]
181impl AttachmentStore for SessionScopedAttachmentStore {
182    fn persistence(&self) -> AttachmentStorePersistence {
183        self.inner.persistence()
184    }
185
186    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
187        self.pending_manifest_commit_ids
188            .lock()
189            .expect("attachment manifest commit tracker lock")
190            .iter()
191            .cloned()
192            .collect()
193    }
194
195    fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
196        if ids.is_empty() {
197            return;
198        }
199        let mut pending = self
200            .pending_manifest_commit_ids
201            .lock()
202            .expect("attachment manifest commit tracker lock");
203        for id in ids {
204            pending.remove(id);
205        }
206    }
207
208    async fn put(
209        &self,
210        bytes: Vec<u8>,
211        meta: AttachmentCreateMeta,
212    ) -> Result<AttachmentRef, AttachmentStoreError> {
213        let attachment_id = content_id(&bytes);
214        let intent = AttachmentIntent {
215            attachment_id: attachment_id.clone(),
216            session_id: self.session_id.clone(),
217            canonical_uri: format!("sha256:{attachment_id}"),
218            intent_at_epoch_ms: now_epoch_ms(),
219        };
220        // Record intent first. If this fails the bytes never land,
221        // matching the write-ahead guarantee.
222        self.manifest.record_intent(intent).map_err(|err| {
223            AttachmentStoreError::ManifestRecordFailed(format!(
224                "failed to record attachment intent for `{attachment_id}`: {err}"
225            ))
226        })?;
227        let reference = self.inner.put(bytes, meta).await?;
228        if reference.id != attachment_id {
229            return Err(AttachmentStoreError::Backend(format!(
230                "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
231                reference.id
232            )));
233        }
234        self.pending_manifest_commit_ids
235            .lock()
236            .expect("attachment manifest commit tracker lock")
237            .insert(reference.id.clone());
238        Ok(reference)
239    }
240
241    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
242        self.inner.get(id).await
243    }
244}
245
246fn now_epoch_ms() -> u64 {
247    SystemTime::now()
248        .duration_since(UNIX_EPOCH)
249        .unwrap_or_default()
250        .as_millis() as u64
251}
252
253/// Adapter that exposes the [`AttachmentManifest`] supertrait of an
254/// `Arc<dyn RuntimePersistence>` as an `Arc<dyn AttachmentManifest>`.
255/// Rust's trait-object upcasting does not yet allow direct coercion
256/// between the two; this thin forwarder is the bridge.
257pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
258
259impl AttachmentManifest for PersistenceManifestAdapter {
260    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
261        AttachmentManifest::record_intent(&*self.0, intent)
262    }
263
264    fn commit_refs(
265        &self,
266        session_id: &str,
267        attachment_ids: &[AttachmentId],
268    ) -> Result<(), crate::StoreError> {
269        AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
270    }
271
272    fn list_uncommitted(
273        &self,
274        older_than_epoch_ms: u64,
275    ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
276        AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
277    }
278
279    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
280        AttachmentManifest::forget(&*self.0, attachment_id)
281    }
282}
283
284fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
285    AttachmentMeta::new(
286        content_id(bytes),
287        meta.media_type,
288        bytes.len() as u64,
289        meta.width,
290        meta.height,
291        meta.label,
292    )
293}
294
295pub async fn resolve_llm_request_attachments(
296    mut request: crate::llm::types::LlmRequest,
297    store: &dyn AttachmentStore,
298) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
299    for attachment in &mut request.attachments {
300        let Some(reference) = attachment.reference.as_ref() else {
301            continue;
302        };
303        if !attachment.data.is_empty() {
304            continue;
305        }
306        let stored = store.get(&reference.id).await?;
307        attachment.mime = stored.meta.media_type.canonical_mime().to_string();
308        attachment.data = stored.bytes;
309    }
310    Ok(request)
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use lash_sansio::{ImageMediaType, MediaType};
317
318    #[derive(Default)]
319    struct RecordingManifest {
320        intents: Mutex<Vec<AttachmentIntent>>,
321    }
322
323    impl AttachmentManifest for RecordingManifest {
324        fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
325            self.intents.lock().expect("lock intents").push(intent);
326            Ok(())
327        }
328
329        fn commit_refs(
330            &self,
331            _session_id: &str,
332            _attachment_ids: &[AttachmentId],
333        ) -> Result<(), crate::StoreError> {
334            Ok(())
335        }
336
337        fn list_uncommitted(
338            &self,
339            _older_than_epoch_ms: u64,
340        ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
341            Ok(Vec::new())
342        }
343
344        fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
345            Ok(())
346        }
347    }
348
349    fn meta() -> AttachmentCreateMeta {
350        AttachmentCreateMeta::new(
351            MediaType::Image(ImageMediaType::Png),
352            Some(1),
353            Some(1),
354            Some("pixel".to_string()),
355        )
356    }
357
358    #[tokio::test]
359    async fn memory_store_dedupes_by_bytes() {
360        let store = InMemoryAttachmentStore::new();
361        let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
362        let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
363        assert_eq!(a.id, b.id);
364        assert_eq!(a.byte_len, 3);
365        assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
366    }
367
368    #[tokio::test]
369    async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
370        let store = InMemoryAttachmentStore::new();
371        let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
372
373        assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
374        assert_eq!(reference.byte_len, 4);
375    }
376
377    #[tokio::test]
378    async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
379        let manifest = Arc::new(RecordingManifest::default());
380        let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
381        let store = SessionScopedAttachmentStore::new(
382            Arc::new(InMemoryAttachmentStore::new()),
383            manifest_for_store,
384            "session-1",
385        );
386
387        let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
388
389        assert_eq!(
390            manifest.intents.lock().expect("lock intents")[0].attachment_id,
391            reference.id
392        );
393        assert_eq!(
394            store.pending_manifest_commit_ids(),
395            vec![reference.id.clone()]
396        );
397
398        store.mark_manifest_committed(&[AttachmentId::new("other")]);
399        assert_eq!(
400            store.pending_manifest_commit_ids(),
401            vec![reference.id.clone()]
402        );
403
404        store.mark_manifest_committed(std::slice::from_ref(&reference.id));
405        assert!(store.pending_manifest_commit_ids().is_empty());
406    }
407}