Skip to main content

lash_core/
attachments.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
7use sha2::{Digest, Sha256};
8
9use crate::store::{AttachmentIntent, AttachmentManifest};
10
11#[derive(Debug, thiserror::Error)]
12pub enum AttachmentStoreError {
13    #[error("attachment `{0}` was not found")]
14    NotFound(AttachmentId),
15    #[error("attachment store I/O failed at {path}: {source}")]
16    Io {
17        path: PathBuf,
18        #[source]
19        source: std::io::Error,
20    },
21    #[error("attachment store metadata is unavailable for `{0}`")]
22    MissingMeta(AttachmentId),
23    #[error("attachment manifest write failed: {0}")]
24    ManifestRecordFailed(String),
25}
26
27#[derive(Clone, Debug)]
28pub struct StoredAttachment {
29    pub meta: AttachmentMeta,
30    pub bytes: Vec<u8>,
31}
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34pub enum AttachmentStorePersistence {
35    Ephemeral,
36    Durable,
37}
38
39impl AttachmentStorePersistence {
40    /// Map the attachment-store persistence signal onto the shared
41    /// [`DurabilityTier`](crate::DurabilityTier): `Ephemeral -> Inline`,
42    /// `Durable -> Durable`. Lets consistency checks read every wired store's
43    /// tier uniformly without a separate `durability_tier()` method here.
44    pub fn durability_tier(self) -> crate::DurabilityTier {
45        match self {
46            Self::Ephemeral => crate::DurabilityTier::Inline,
47            Self::Durable => crate::DurabilityTier::Durable,
48        }
49    }
50}
51
52pub trait AttachmentStore: Send + Sync {
53    fn persistence(&self) -> AttachmentStorePersistence {
54        AttachmentStorePersistence::Ephemeral
55    }
56
57    fn put(
58        &self,
59        bytes: Vec<u8>,
60        meta: AttachmentCreateMeta,
61    ) -> Result<AttachmentRef, AttachmentStoreError>;
62
63    fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
64}
65
66#[derive(Default)]
67pub struct InMemoryAttachmentStore {
68    attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
69}
70
71impl InMemoryAttachmentStore {
72    pub fn new() -> Self {
73        Self::default()
74    }
75}
76
77impl AttachmentStore for InMemoryAttachmentStore {
78    fn put(
79        &self,
80        bytes: Vec<u8>,
81        meta: AttachmentCreateMeta,
82    ) -> Result<AttachmentRef, AttachmentStoreError> {
83        let meta = stored_meta(&bytes, meta);
84        let reference = meta.as_ref();
85        let stored = StoredAttachment { meta, bytes };
86        self.attachments
87            .lock()
88            .expect("attachment store lock")
89            .insert(reference.id.clone(), stored);
90        Ok(reference)
91    }
92
93    fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
94        self.attachments
95            .lock()
96            .expect("attachment store lock")
97            .get(id)
98            .cloned()
99            .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
100    }
101}
102
103pub fn content_id(bytes: &[u8]) -> AttachmentId {
104    AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
105}
106
107/// Session-scoped wrapper that records a write-ahead intent in
108/// [`AttachmentManifest`] before delegating each `put` to the backing
109/// [`AttachmentStore`]. The intent row durably captures "this session
110/// is about to write these bytes," so if the process dies between
111/// `put` and the next committed runtime state, a later GC sweep can
112/// reconcile the orphaned bytes by walking
113/// [`AttachmentManifest::list_uncommitted`].
114///
115/// Constructed by the runtime when both a durable [`AttachmentStore`]
116/// and a [`RuntimePersistence`](crate::RuntimePersistence) backend
117/// (which also implements [`AttachmentManifest`]) are wired up. Other
118/// callers — tests, hosts using only ephemeral storage — keep the
119/// plain inner store and skip the manifest entirely.
120pub struct SessionScopedAttachmentStore {
121    inner: Arc<dyn AttachmentStore>,
122    manifest: Arc<dyn AttachmentManifest>,
123    session_id: String,
124}
125
126impl SessionScopedAttachmentStore {
127    pub fn new(
128        inner: Arc<dyn AttachmentStore>,
129        manifest: Arc<dyn AttachmentManifest>,
130        session_id: impl Into<String>,
131    ) -> Self {
132        Self {
133            inner,
134            manifest,
135            session_id: session_id.into(),
136        }
137    }
138
139    pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
140        &self.inner
141    }
142
143    pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
144        &self.manifest
145    }
146}
147
148impl AttachmentStore for SessionScopedAttachmentStore {
149    fn persistence(&self) -> AttachmentStorePersistence {
150        self.inner.persistence()
151    }
152
153    fn put(
154        &self,
155        bytes: Vec<u8>,
156        meta: AttachmentCreateMeta,
157    ) -> Result<AttachmentRef, AttachmentStoreError> {
158        let attachment_id = content_id(&bytes);
159        let intent = AttachmentIntent {
160            attachment_id: attachment_id.clone(),
161            session_id: self.session_id.clone(),
162            canonical_uri: format!("sha256:{attachment_id}"),
163            intent_at_epoch_ms: now_epoch_ms(),
164        };
165        // Record intent first. If this fails the bytes never land,
166        // matching the write-ahead guarantee.
167        self.manifest.record_intent(intent).map_err(|err| {
168            AttachmentStoreError::ManifestRecordFailed(format!(
169                "failed to record attachment intent for `{attachment_id}`: {err}"
170            ))
171        })?;
172        self.inner.put(bytes, meta)
173    }
174
175    fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
176        self.inner.get(id)
177    }
178}
179
180fn now_epoch_ms() -> u64 {
181    SystemTime::now()
182        .duration_since(UNIX_EPOCH)
183        .unwrap_or_default()
184        .as_millis() as u64
185}
186
187/// Adapter that exposes the [`AttachmentManifest`] supertrait of an
188/// `Arc<dyn RuntimePersistence>` as an `Arc<dyn AttachmentManifest>`.
189/// Rust's trait-object upcasting does not yet allow direct coercion
190/// between the two; this thin forwarder is the bridge.
191pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
192
193impl AttachmentManifest for PersistenceManifestAdapter {
194    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
195        AttachmentManifest::record_intent(&*self.0, intent)
196    }
197
198    fn commit_refs(
199        &self,
200        session_id: &str,
201        attachment_ids: &[AttachmentId],
202    ) -> Result<(), crate::StoreError> {
203        AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
204    }
205
206    fn list_uncommitted(
207        &self,
208        older_than_epoch_ms: u64,
209    ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
210        AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
211    }
212
213    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
214        AttachmentManifest::forget(&*self.0, attachment_id)
215    }
216}
217
218fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
219    AttachmentMeta::new(
220        content_id(bytes),
221        meta.media_type,
222        bytes.len() as u64,
223        meta.width,
224        meta.height,
225        meta.label,
226    )
227}
228
229pub fn resolve_llm_request_attachments(
230    mut request: crate::llm::types::LlmRequest,
231    store: &dyn AttachmentStore,
232) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
233    for attachment in &mut request.attachments {
234        let Some(reference) = attachment.reference.as_ref() else {
235            continue;
236        };
237        if !attachment.data.is_empty() {
238            continue;
239        }
240        let stored = store.get(&reference.id)?;
241        attachment.mime = stored.meta.media_type.canonical_mime().to_string();
242        attachment.data = stored.bytes;
243    }
244    Ok(request)
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use lash_sansio::{ImageMediaType, MediaType};
251
252    fn meta() -> AttachmentCreateMeta {
253        AttachmentCreateMeta::new(
254            MediaType::Image(ImageMediaType::Png),
255            Some(1),
256            Some(1),
257            Some("pixel".to_string()),
258        )
259    }
260
261    #[test]
262    fn memory_store_dedupes_by_bytes() {
263        let store = InMemoryAttachmentStore::new();
264        let a = store.put(vec![1, 2, 3], meta()).expect("put a");
265        let b = store.put(vec![1, 2, 3], meta()).expect("put b");
266        assert_eq!(a.id, b.id);
267        assert_eq!(a.byte_len, 3);
268        assert_eq!(store.get(&a.id).expect("get").bytes, vec![1, 2, 3]);
269    }
270
271    #[test]
272    fn memory_store_assigns_identity_and_byte_len_from_bytes() {
273        let store = InMemoryAttachmentStore::new();
274        let reference = store.put(vec![4, 5, 6, 7], meta()).expect("put");
275
276        assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
277        assert_eq!(reference.byte_len, 4);
278    }
279}