Skip to main content

lash_core/
attachments.rs

1use std::collections::{BTreeSet, HashMap};
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
7use sha2::{Digest, Sha256};
8
9use crate::store::{AttachmentIntent, AttachmentManifest};
10
11#[derive(Debug, thiserror::Error)]
12pub enum AttachmentStoreError {
13    #[error("attachment `{0}` was not found")]
14    NotFound(AttachmentId),
15    #[error("attachment store I/O failed at {path}: {source}")]
16    Io {
17        path: PathBuf,
18        #[source]
19        source: std::io::Error,
20    },
21    #[error("attachment store metadata is unavailable for `{0}`")]
22    MissingMeta(AttachmentId),
23    #[error("attachment store metadata decode failed for `{id}`: {source}")]
24    MetadataDecode {
25        id: AttachmentId,
26        #[source]
27        source: serde_json::Error,
28    },
29    #[error("attachment manifest write failed: {0}")]
30    ManifestRecordFailed(String),
31    #[error("attachment store backend failed: {0}")]
32    Backend(String),
33}
34
35#[derive(Clone, Debug)]
36pub struct StoredAttachment {
37    pub meta: AttachmentMeta,
38    pub bytes: Vec<u8>,
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum AttachmentStorePersistence {
43    Ephemeral,
44    Durable,
45}
46
47impl AttachmentStorePersistence {
48    /// Map the attachment-store persistence signal onto the shared
49    /// [`DurabilityTier`](crate::DurabilityTier): `Ephemeral -> Inline`,
50    /// `Durable -> Durable`. Lets consistency checks read every wired store's
51    /// tier uniformly without a separate `durability_tier()` method here.
52    pub fn durability_tier(self) -> crate::DurabilityTier {
53        match self {
54            Self::Ephemeral => crate::DurabilityTier::Inline,
55            Self::Durable => crate::DurabilityTier::Durable,
56        }
57    }
58}
59
60#[async_trait::async_trait]
61pub trait AttachmentStore: Send + Sync {
62    fn persistence(&self) -> AttachmentStorePersistence {
63        AttachmentStorePersistence::Ephemeral
64    }
65
66    /// Attachment refs written by this store that still need their
67    /// write-ahead manifest rows stamped by the next runtime commit.
68    ///
69    /// Plain stores return an empty set. [`SessionScopedAttachmentStore`]
70    /// overrides this so attachments created through downstream tools,
71    /// Lashlang execution, and other runtime services are committed by the
72    /// same final turn transaction that makes them reachable from session
73    /// state.
74    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
75        Vec::new()
76    }
77
78    /// Clear attachment refs that were stamped committed by a successful
79    /// runtime commit.
80    fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
81
82    async fn put(
83        &self,
84        bytes: Vec<u8>,
85        meta: AttachmentCreateMeta,
86    ) -> Result<AttachmentRef, AttachmentStoreError>;
87
88    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
89}
90
91#[derive(Default)]
92pub struct InMemoryAttachmentStore {
93    attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
94}
95
96impl InMemoryAttachmentStore {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102#[async_trait::async_trait]
103impl AttachmentStore for InMemoryAttachmentStore {
104    async fn put(
105        &self,
106        bytes: Vec<u8>,
107        meta: AttachmentCreateMeta,
108    ) -> Result<AttachmentRef, AttachmentStoreError> {
109        let meta = stored_meta(&bytes, meta);
110        let reference = meta.as_ref();
111        let stored = StoredAttachment { meta, bytes };
112        self.attachments
113            .lock()
114            .expect("attachment store lock")
115            .insert(reference.id.clone(), stored);
116        Ok(reference)
117    }
118
119    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
120        self.attachments
121            .lock()
122            .expect("attachment store lock")
123            .get(id)
124            .cloned()
125            .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
126    }
127}
128
129pub fn content_id(bytes: &[u8]) -> AttachmentId {
130    AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
131}
132
133/// Session-scoped wrapper that records a write-ahead intent in
134/// [`AttachmentManifest`] before delegating each `put` to the backing
135/// [`AttachmentStore`]. The intent row durably captures "this session
136/// is about to write these bytes," so if the process dies between
137/// `put` and the next committed runtime state, a later GC sweep can
138/// reconcile the orphaned bytes by walking
139/// [`AttachmentManifest::list_uncommitted`].
140///
141/// Constructed by the runtime when both a durable [`AttachmentStore`]
142/// and a [`RuntimePersistence`](crate::RuntimePersistence) backend
143/// (which also implements [`AttachmentManifest`]) are wired up. Other
144/// callers — tests, hosts using only ephemeral storage — keep the
145/// plain inner store and skip the manifest entirely.
146pub struct SessionScopedAttachmentStore {
147    inner: Arc<dyn AttachmentStore>,
148    manifest: Arc<dyn AttachmentManifest>,
149    session_id: String,
150    pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
151}
152
153impl SessionScopedAttachmentStore {
154    pub fn new(
155        inner: Arc<dyn AttachmentStore>,
156        manifest: Arc<dyn AttachmentManifest>,
157        session_id: impl Into<String>,
158    ) -> Self {
159        Self {
160            inner,
161            manifest,
162            session_id: session_id.into(),
163            pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
164        }
165    }
166
167    pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
168        &self.inner
169    }
170
171    pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
172        &self.manifest
173    }
174}
175
176#[async_trait::async_trait]
177impl AttachmentStore for SessionScopedAttachmentStore {
178    fn persistence(&self) -> AttachmentStorePersistence {
179        self.inner.persistence()
180    }
181
182    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
183        self.pending_manifest_commit_ids
184            .lock()
185            .expect("attachment manifest commit tracker lock")
186            .iter()
187            .cloned()
188            .collect()
189    }
190
191    fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
192        if ids.is_empty() {
193            return;
194        }
195        let mut pending = self
196            .pending_manifest_commit_ids
197            .lock()
198            .expect("attachment manifest commit tracker lock");
199        for id in ids {
200            pending.remove(id);
201        }
202    }
203
204    async fn put(
205        &self,
206        bytes: Vec<u8>,
207        meta: AttachmentCreateMeta,
208    ) -> Result<AttachmentRef, AttachmentStoreError> {
209        let attachment_id = content_id(&bytes);
210        let intent = AttachmentIntent {
211            attachment_id: attachment_id.clone(),
212            session_id: self.session_id.clone(),
213            canonical_uri: format!("sha256:{attachment_id}"),
214            intent_at_epoch_ms: now_epoch_ms(),
215        };
216        // Record intent first. If this fails the bytes never land,
217        // matching the write-ahead guarantee.
218        self.manifest.record_intent(intent).map_err(|err| {
219            AttachmentStoreError::ManifestRecordFailed(format!(
220                "failed to record attachment intent for `{attachment_id}`: {err}"
221            ))
222        })?;
223        let reference = self.inner.put(bytes, meta).await?;
224        if reference.id != attachment_id {
225            return Err(AttachmentStoreError::Backend(format!(
226                "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
227                reference.id
228            )));
229        }
230        self.pending_manifest_commit_ids
231            .lock()
232            .expect("attachment manifest commit tracker lock")
233            .insert(reference.id.clone());
234        Ok(reference)
235    }
236
237    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
238        self.inner.get(id).await
239    }
240}
241
242fn now_epoch_ms() -> u64 {
243    SystemTime::now()
244        .duration_since(UNIX_EPOCH)
245        .unwrap_or_default()
246        .as_millis() as u64
247}
248
249/// Adapter that exposes the [`AttachmentManifest`] supertrait of an
250/// `Arc<dyn RuntimePersistence>` as an `Arc<dyn AttachmentManifest>`.
251/// Rust's trait-object upcasting does not yet allow direct coercion
252/// between the two; this thin forwarder is the bridge.
253pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
254
255impl AttachmentManifest for PersistenceManifestAdapter {
256    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
257        AttachmentManifest::record_intent(&*self.0, intent)
258    }
259
260    fn commit_refs(
261        &self,
262        session_id: &str,
263        attachment_ids: &[AttachmentId],
264    ) -> Result<(), crate::StoreError> {
265        AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
266    }
267
268    fn list_uncommitted(
269        &self,
270        older_than_epoch_ms: u64,
271    ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
272        AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
273    }
274
275    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
276        AttachmentManifest::forget(&*self.0, attachment_id)
277    }
278}
279
280fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
281    AttachmentMeta::new(
282        content_id(bytes),
283        meta.media_type,
284        bytes.len() as u64,
285        meta.width,
286        meta.height,
287        meta.label,
288    )
289}
290
291pub async fn resolve_llm_request_attachments(
292    mut request: crate::llm::types::LlmRequest,
293    store: &dyn AttachmentStore,
294) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
295    for attachment in &mut request.attachments {
296        let Some(reference) = attachment.reference.as_ref() else {
297            continue;
298        };
299        if !attachment.data.is_empty() {
300            continue;
301        }
302        let stored = store.get(&reference.id).await?;
303        attachment.mime = stored.meta.media_type.canonical_mime().to_string();
304        attachment.data = stored.bytes;
305    }
306    Ok(request)
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use lash_sansio::{ImageMediaType, MediaType};
313
314    #[derive(Default)]
315    struct RecordingManifest {
316        intents: Mutex<Vec<AttachmentIntent>>,
317    }
318
319    impl AttachmentManifest for RecordingManifest {
320        fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
321            self.intents.lock().expect("lock intents").push(intent);
322            Ok(())
323        }
324
325        fn commit_refs(
326            &self,
327            _session_id: &str,
328            _attachment_ids: &[AttachmentId],
329        ) -> Result<(), crate::StoreError> {
330            Ok(())
331        }
332
333        fn list_uncommitted(
334            &self,
335            _older_than_epoch_ms: u64,
336        ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
337            Ok(Vec::new())
338        }
339
340        fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
341            Ok(())
342        }
343    }
344
345    fn meta() -> AttachmentCreateMeta {
346        AttachmentCreateMeta::new(
347            MediaType::Image(ImageMediaType::Png),
348            Some(1),
349            Some(1),
350            Some("pixel".to_string()),
351        )
352    }
353
354    #[tokio::test]
355    async fn memory_store_dedupes_by_bytes() {
356        let store = InMemoryAttachmentStore::new();
357        let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
358        let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
359        assert_eq!(a.id, b.id);
360        assert_eq!(a.byte_len, 3);
361        assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
362    }
363
364    #[tokio::test]
365    async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
366        let store = InMemoryAttachmentStore::new();
367        let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
368
369        assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
370        assert_eq!(reference.byte_len, 4);
371    }
372
373    #[tokio::test]
374    async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
375        let manifest = Arc::new(RecordingManifest::default());
376        let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
377        let store = SessionScopedAttachmentStore::new(
378            Arc::new(InMemoryAttachmentStore::new()),
379            manifest_for_store,
380            "session-1",
381        );
382
383        let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
384
385        assert_eq!(
386            manifest.intents.lock().expect("lock intents")[0].attachment_id,
387            reference.id
388        );
389        assert_eq!(
390            store.pending_manifest_commit_ids(),
391            vec![reference.id.clone()]
392        );
393
394        store.mark_manifest_committed(&[AttachmentId::new("other")]);
395        assert_eq!(
396            store.pending_manifest_commit_ids(),
397            vec![reference.id.clone()]
398        );
399
400        store.mark_manifest_committed(std::slice::from_ref(&reference.id));
401        assert!(store.pending_manifest_commit_ids().is_empty());
402    }
403}