Skip to main content

lash_core/
attachments.rs

1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
10use sha2::{Digest, Sha256};
11
12use crate::store::{AttachmentIntent, AttachmentManifest};
13
14#[derive(Debug, thiserror::Error)]
15pub enum AttachmentStoreError {
16    #[error("attachment `{0}` was not found")]
17    NotFound(AttachmentId),
18    #[error("attachment store I/O failed at {path}: {source}")]
19    Io {
20        path: PathBuf,
21        #[source]
22        source: std::io::Error,
23    },
24    #[error("attachment store metadata is unavailable for `{0}`")]
25    MissingMeta(AttachmentId),
26    #[error("attachment store metadata decode failed for `{id}`: {source}")]
27    MetadataDecode {
28        id: AttachmentId,
29        #[source]
30        source: serde_json::Error,
31    },
32    #[error("attachment manifest write failed: {0}")]
33    ManifestRecordFailed(String),
34    #[error("attachment store backend failed: {0}")]
35    Backend(String),
36}
37
38#[derive(Clone, Debug)]
39pub struct StoredAttachment {
40    pub meta: AttachmentMeta,
41    pub bytes: Vec<u8>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum AttachmentStorePersistence {
46    Ephemeral,
47    Durable,
48}
49
50impl AttachmentStorePersistence {
51    /// Map the attachment-store persistence signal onto the shared
52    /// [`DurabilityTier`](crate::DurabilityTier): `Ephemeral -> Inline`,
53    /// `Durable -> Durable`. Lets consistency checks read every wired store's
54    /// tier uniformly without a separate `durability_tier()` method here.
55    pub fn durability_tier(self) -> crate::DurabilityTier {
56        match self {
57            Self::Ephemeral => crate::DurabilityTier::Inline,
58            Self::Durable => crate::DurabilityTier::Durable,
59        }
60    }
61}
62
63#[async_trait::async_trait]
64pub trait AttachmentStore: Send + Sync {
65    fn persistence(&self) -> AttachmentStorePersistence {
66        AttachmentStorePersistence::Ephemeral
67    }
68
69    /// Attachment refs written by this store that still need their
70    /// write-ahead manifest rows stamped by the next runtime commit.
71    ///
72    /// Plain stores return an empty set. [`SessionScopedAttachmentStore`]
73    /// overrides this so attachments created through downstream tools,
74    /// process execution, and other runtime services are committed by the
75    /// same final turn transaction that makes them reachable from session
76    /// state.
77    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
78        Vec::new()
79    }
80
81    /// Clear attachment refs that were stamped committed by a successful
82    /// runtime commit.
83    fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
84
85    async fn put(
86        &self,
87        bytes: Vec<u8>,
88        meta: AttachmentCreateMeta,
89    ) -> Result<AttachmentRef, AttachmentStoreError>;
90
91    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
92
93    /// Unconditionally remove the content addressed by `id` (its payload bytes
94    /// and any metadata sidecar). Idempotent: deleting content that is already
95    /// absent returns `Ok(())`.
96    ///
97    /// # Sharing safety — read before calling
98    ///
99    /// This is a **content-addressed** delete, not a reference-aware one. The id
100    /// is a SHA-256 of the bytes, so *any* session that writes identical bytes
101    /// produces the same id and shares one stored object. Deleting by bare id
102    /// therefore removes the bytes for **every** session that references them.
103    /// The store holds no reference information — that lives in the write-ahead
104    /// [`AttachmentManifest`](crate::AttachmentManifest), which is likewise keyed
105    /// by content id, so at most one row exists per distinct content and a set
106    /// `committed_at` means *some* durable session state references it.
107    ///
108    /// Callers MUST establish that no session references the content before
109    /// deleting — for example, only delete ids returned by
110    /// [`AttachmentManifest::list_uncommitted`](crate::AttachmentManifest::list_uncommitted),
111    /// which are never committed by any session and are thus provably
112    /// unreferenced. [`reclaim_orphaned_attachments`] is the intended safe
113    /// caller; prefer it over calling `delete` directly.
114    async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError>;
115}
116
117/// Outcome of a host-invoked orphan-attachment reclamation sweep.
118///
119/// See [`reclaim_orphaned_attachments`] for the full contract. Returned so
120/// hosts can emit metrics the same way [`GcReport`](crate::GcReport) and
121/// [`VacuumReport`](crate::VacuumReport) do for the store-side levers.
122#[derive(Clone, Debug, Default, PartialEq, Eq)]
123pub struct AttachmentReclamationReport {
124    /// Uncommitted manifest intents aged past the threshold that the sweep
125    /// examined.
126    pub scanned_intent_count: usize,
127    /// Orphans reclaimed: bytes deleted from the store and the manifest row
128    /// forgotten. Equal to `scanned_intent_count` unless a delete failed.
129    pub reclaimed_count: usize,
130}
131
132/// Reclaim attachment bytes left orphaned by a crash between `put` and the next
133/// durable commit — the host-invocable counterpart to
134/// [`StoreMaintenance::gc_unreachable`](crate::StoreMaintenance::gc_unreachable)
135/// for attachment payloads.
136///
137/// The sweep asks the write-ahead `manifest` for every intent aged past
138/// `older_than_epoch_ms` that was recorded but never committed
139/// ([`AttachmentManifest::list_uncommitted`](crate::AttachmentManifest::list_uncommitted)),
140/// deletes each one's bytes via [`AttachmentStore::delete`], then forgets the
141/// manifest row ([`AttachmentManifest::forget`](crate::AttachmentManifest::forget)).
142///
143/// # Sharing safety
144///
145/// This is the *safe* delete path. Because the manifest is keyed by content id,
146/// an uncommitted intent proves no durable session state references that
147/// content: shared/committed content carries a set `committed_at` and is
148/// excluded from `list_uncommitted`. Deleting an aged uncommitted intent's
149/// bytes is therefore safe under cross-session sharing by construction.
150///
151/// # Policy is the host's (ADR-0014)
152///
153/// This is a lever, not a scheduler: the host chooses `older_than_epoch_ms`
154/// (typically `now - grace_period`, where the grace period exceeds any live
155/// turn's duration so an in-flight `put` is never swept) and when to run it. It
156/// does no background work of its own.
157pub async fn reclaim_orphaned_attachments<M, S>(
158    manifest: &M,
159    store: &S,
160    older_than_epoch_ms: u64,
161) -> Result<AttachmentReclamationReport, AttachmentStoreError>
162where
163    M: AttachmentManifest + ?Sized,
164    S: AttachmentStore + ?Sized,
165{
166    let orphans = manifest
167        .list_uncommitted(older_than_epoch_ms)
168        .map_err(|err| {
169            AttachmentStoreError::Backend(format!(
170                "failed to list uncommitted attachment intents: {err}"
171            ))
172        })?;
173    let scanned_intent_count = orphans.len();
174    let mut reclaimed_count = 0;
175    for orphan in orphans {
176        store.delete(&orphan.attachment_id).await?;
177        manifest.forget(&orphan.attachment_id).map_err(|err| {
178            AttachmentStoreError::Backend(format!(
179                "failed to forget reclaimed attachment `{}`: {err}",
180                orphan.attachment_id
181            ))
182        })?;
183        reclaimed_count += 1;
184    }
185    Ok(AttachmentReclamationReport {
186        scanned_intent_count,
187        reclaimed_count,
188    })
189}
190
191#[derive(Default)]
192pub struct InMemoryAttachmentStore {
193    attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
194}
195
196impl InMemoryAttachmentStore {
197    pub fn new() -> Self {
198        Self::default()
199    }
200}
201
202#[async_trait::async_trait]
203impl AttachmentStore for InMemoryAttachmentStore {
204    async fn put(
205        &self,
206        bytes: Vec<u8>,
207        meta: AttachmentCreateMeta,
208    ) -> Result<AttachmentRef, AttachmentStoreError> {
209        let meta = stored_meta(&bytes, meta);
210        let reference = meta.as_ref();
211        let stored = StoredAttachment { meta, bytes };
212        self.attachments
213            .lock()
214            .expect("attachment store lock")
215            .insert(reference.id.clone(), stored);
216        Ok(reference)
217    }
218
219    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
220        self.attachments
221            .lock()
222            .expect("attachment store lock")
223            .get(id)
224            .cloned()
225            .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
226    }
227
228    async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError> {
229        self.attachments
230            .lock()
231            .expect("attachment store lock")
232            .remove(id);
233        Ok(())
234    }
235}
236
237pub fn content_id(bytes: &[u8]) -> AttachmentId {
238    AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
239}
240
241/// Session-scoped wrapper that records a write-ahead intent in
242/// [`AttachmentManifest`] before delegating each `put` to the backing
243/// [`AttachmentStore`]. The intent row durably captures "this session
244/// is about to write these bytes," so if the process dies between
245/// `put` and the next committed runtime state, a later GC sweep can
246/// reconcile the orphaned bytes by walking
247/// [`AttachmentManifest::list_uncommitted`].
248///
249/// Constructed by the runtime when both a durable [`AttachmentStore`]
250/// and a [`RuntimePersistence`](crate::RuntimePersistence) backend
251/// (which also implements [`AttachmentManifest`]) are wired up. Other
252/// callers — tests, hosts using only ephemeral storage — keep the
253/// plain inner store and skip the manifest entirely.
254pub struct SessionScopedAttachmentStore {
255    inner: Arc<dyn AttachmentStore>,
256    manifest: Arc<dyn AttachmentManifest>,
257    session_id: String,
258    pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
259}
260
261impl SessionScopedAttachmentStore {
262    pub fn new(
263        inner: Arc<dyn AttachmentStore>,
264        manifest: Arc<dyn AttachmentManifest>,
265        session_id: impl Into<String>,
266    ) -> Self {
267        Self {
268            inner,
269            manifest,
270            session_id: session_id.into(),
271            pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
272        }
273    }
274
275    pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
276        &self.inner
277    }
278
279    pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
280        &self.manifest
281    }
282}
283
284#[async_trait::async_trait]
285impl AttachmentStore for SessionScopedAttachmentStore {
286    fn persistence(&self) -> AttachmentStorePersistence {
287        self.inner.persistence()
288    }
289
290    fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
291        self.pending_manifest_commit_ids
292            .lock()
293            .expect("attachment manifest commit tracker lock")
294            .iter()
295            .cloned()
296            .collect()
297    }
298
299    fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
300        if ids.is_empty() {
301            return;
302        }
303        let mut pending = self
304            .pending_manifest_commit_ids
305            .lock()
306            .expect("attachment manifest commit tracker lock");
307        for id in ids {
308            pending.remove(id);
309        }
310    }
311
312    async fn put(
313        &self,
314        bytes: Vec<u8>,
315        meta: AttachmentCreateMeta,
316    ) -> Result<AttachmentRef, AttachmentStoreError> {
317        let attachment_id = content_id(&bytes);
318        let intent = AttachmentIntent {
319            attachment_id: attachment_id.clone(),
320            session_id: self.session_id.clone(),
321            canonical_uri: format!("sha256:{attachment_id}"),
322            intent_at_epoch_ms: now_epoch_ms(),
323        };
324        // Record intent first. If this fails the bytes never land,
325        // matching the write-ahead guarantee.
326        self.manifest.record_intent(intent).map_err(|err| {
327            AttachmentStoreError::ManifestRecordFailed(format!(
328                "failed to record attachment intent for `{attachment_id}`: {err}"
329            ))
330        })?;
331        let reference = self.inner.put(bytes, meta).await?;
332        if reference.id != attachment_id {
333            return Err(AttachmentStoreError::Backend(format!(
334                "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
335                reference.id
336            )));
337        }
338        self.pending_manifest_commit_ids
339            .lock()
340            .expect("attachment manifest commit tracker lock")
341            .insert(reference.id.clone());
342        Ok(reference)
343    }
344
345    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
346        self.inner.get(id).await
347    }
348
349    async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError> {
350        // Content-addressed delete forwards straight to the backing store. The
351        // write-ahead manifest that this wrapper stamps is reconciled separately
352        // by `reclaim_orphaned_attachments`, which is the reference-safe caller
353        // of delete; the wrapper adds no reference bookkeeping of its own here.
354        self.inner.delete(id).await
355    }
356}
357
358fn now_epoch_ms() -> u64 {
359    <crate::SystemClock as crate::Clock>::timestamp_ms(&crate::SystemClock)
360}
361
362/// Adapter that exposes the [`AttachmentManifest`] supertrait of an
363/// `Arc<dyn RuntimePersistence>` as an `Arc<dyn AttachmentManifest>`.
364/// Rust's trait-object upcasting does not yet allow direct coercion
365/// between the two; this thin forwarder is the bridge.
366pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
367
368impl AttachmentManifest for PersistenceManifestAdapter {
369    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
370        AttachmentManifest::record_intent(&*self.0, intent)
371    }
372
373    fn commit_refs(
374        &self,
375        session_id: &str,
376        attachment_ids: &[AttachmentId],
377    ) -> Result<(), crate::StoreError> {
378        AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
379    }
380
381    fn list_uncommitted(
382        &self,
383        older_than_epoch_ms: u64,
384    ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
385        AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
386    }
387
388    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
389        AttachmentManifest::forget(&*self.0, attachment_id)
390    }
391}
392
393fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
394    AttachmentMeta::new(
395        content_id(bytes),
396        meta.media_type,
397        bytes.len() as u64,
398        meta.width,
399        meta.height,
400        meta.label,
401    )
402}
403
404pub async fn resolve_llm_request_attachments(
405    mut request: crate::llm::types::LlmRequest,
406    store: &dyn AttachmentStore,
407) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
408    for attachment in &mut request.attachments {
409        let Some(reference) = attachment.reference.as_ref() else {
410            continue;
411        };
412        if !attachment.data.is_empty() {
413            continue;
414        }
415        let stored = store.get(&reference.id).await?;
416        attachment.mime = stored.meta.media_type.canonical_mime().to_string();
417        attachment.data = stored.bytes;
418    }
419    Ok(request)
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use lash_sansio::{ImageMediaType, MediaType};
426    use std::collections::HashSet;
427    use std::time::{SystemTime, UNIX_EPOCH};
428
429    #[derive(Default)]
430    struct RecordingManifest {
431        intents: Mutex<Vec<AttachmentIntent>>,
432        committed: Mutex<Vec<(String, AttachmentId)>>,
433    }
434
435    impl AttachmentManifest for RecordingManifest {
436        fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
437            self.intents.lock().expect("lock intents").push(intent);
438            Ok(())
439        }
440
441        fn commit_refs(
442            &self,
443            session_id: &str,
444            attachment_ids: &[AttachmentId],
445        ) -> Result<(), crate::StoreError> {
446            let mut committed = self.committed.lock().expect("lock committed attachments");
447            committed.extend(
448                attachment_ids
449                    .iter()
450                    .cloned()
451                    .map(|attachment_id| (session_id.to_string(), attachment_id)),
452            );
453            Ok(())
454        }
455
456        fn list_uncommitted(
457            &self,
458            older_than_epoch_ms: u64,
459        ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
460            let committed = self
461                .committed
462                .lock()
463                .expect("lock committed attachments")
464                .iter()
465                .cloned()
466                .collect::<HashSet<_>>();
467            Ok(self
468                .intents
469                .lock()
470                .expect("lock intents")
471                .iter()
472                .filter(|intent| intent.intent_at_epoch_ms <= older_than_epoch_ms)
473                .filter(|intent| {
474                    !committed.contains(&(intent.session_id.clone(), intent.attachment_id.clone()))
475                })
476                .map(|intent| crate::AttachmentManifestEntry {
477                    attachment_id: intent.attachment_id.clone(),
478                    session_id: intent.session_id.clone(),
479                    canonical_uri: intent.canonical_uri.clone(),
480                    intent_at_epoch_ms: intent.intent_at_epoch_ms,
481                    committed_at_epoch_ms: None,
482                })
483                .collect())
484        }
485
486        fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
487            Ok(())
488        }
489    }
490
491    #[derive(Default)]
492    struct RecordingRuntimePersistence {
493        inner: crate::InMemorySessionStore,
494        manifest: RecordingManifest,
495    }
496
497    impl AttachmentManifest for RecordingRuntimePersistence {
498        fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
499            self.manifest.record_intent(intent)
500        }
501
502        fn commit_refs(
503            &self,
504            session_id: &str,
505            attachment_ids: &[AttachmentId],
506        ) -> Result<(), crate::StoreError> {
507            self.manifest.commit_refs(session_id, attachment_ids)
508        }
509
510        fn list_uncommitted(
511            &self,
512            older_than_epoch_ms: u64,
513        ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
514            self.manifest.list_uncommitted(older_than_epoch_ms)
515        }
516
517        fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
518            self.manifest.forget(attachment_id)
519        }
520    }
521
522    // Pass-through wrapper: every persistence segment delegates to the inner
523    // in-memory store; only the attachment manifest is replaced with the
524    // recording double above.
525    #[async_trait::async_trait]
526    impl crate::SessionCommitStore for RecordingRuntimePersistence {
527        async fn load_session(
528            &self,
529            scope: crate::SessionReadScope,
530        ) -> Result<Option<crate::PersistedSessionRead>, crate::StoreError> {
531            crate::SessionCommitStore::load_session(&self.inner, scope).await
532        }
533
534        async fn load_node(
535            &self,
536            node_id: &str,
537        ) -> Result<Option<crate::SessionNodeRecord>, crate::StoreError> {
538            crate::SessionCommitStore::load_node(&self.inner, node_id).await
539        }
540
541        async fn commit_runtime_state(
542            &self,
543            commit: crate::RuntimeCommit,
544        ) -> Result<crate::RuntimeCommitResult, crate::StoreError> {
545            crate::SessionCommitStore::commit_runtime_state(&self.inner, commit).await
546        }
547
548        async fn save_session_meta(
549            &self,
550            meta: crate::SessionMeta,
551        ) -> Result<(), crate::StoreError> {
552            crate::SessionCommitStore::save_session_meta(&self.inner, meta).await
553        }
554
555        async fn load_session_meta(&self) -> Result<Option<crate::SessionMeta>, crate::StoreError> {
556            crate::SessionCommitStore::load_session_meta(&self.inner).await
557        }
558    }
559
560    #[async_trait::async_trait]
561    impl crate::SessionExecutionLeaseStore for RecordingRuntimePersistence {
562        async fn try_claim_session_execution_lease(
563            &self,
564            session_id: &str,
565            owner: &crate::LeaseOwnerIdentity,
566            lease_ttl_ms: u64,
567        ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
568            crate::SessionExecutionLeaseStore::try_claim_session_execution_lease(
569                &self.inner,
570                session_id,
571                owner,
572                lease_ttl_ms,
573            )
574            .await
575        }
576
577        async fn reclaim_session_execution_lease(
578            &self,
579            session_id: &str,
580            owner: &crate::LeaseOwnerIdentity,
581            observed_holder: &crate::SessionExecutionLeaseFence,
582            lease_ttl_ms: u64,
583        ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
584            crate::SessionExecutionLeaseStore::reclaim_session_execution_lease(
585                &self.inner,
586                session_id,
587                owner,
588                observed_holder,
589                lease_ttl_ms,
590            )
591            .await
592        }
593
594        async fn renew_session_execution_lease(
595            &self,
596            fence: &crate::SessionExecutionLeaseFence,
597            lease_ttl_ms: u64,
598        ) -> Result<crate::SessionExecutionLease, crate::StoreError> {
599            crate::SessionExecutionLeaseStore::renew_session_execution_lease(
600                &self.inner,
601                fence,
602                lease_ttl_ms,
603            )
604            .await
605        }
606
607        async fn release_session_execution_lease(
608            &self,
609            completion: &crate::SessionExecutionLeaseCompletion,
610        ) -> Result<(), crate::StoreError> {
611            crate::SessionExecutionLeaseStore::release_session_execution_lease(
612                &self.inner,
613                completion,
614            )
615            .await
616        }
617    }
618
619    #[async_trait::async_trait]
620    impl crate::TurnInputStore for RecordingRuntimePersistence {
621        async fn enqueue_pending_turn_input(
622            &self,
623            input: crate::PendingTurnInputDraft,
624        ) -> Result<crate::PendingTurnInput, crate::StoreError> {
625            crate::TurnInputStore::enqueue_pending_turn_input(&self.inner, input).await
626        }
627
628        async fn list_pending_turn_inputs(
629            &self,
630            session_id: &str,
631        ) -> Result<Vec<crate::PendingTurnInput>, crate::StoreError> {
632            crate::TurnInputStore::list_pending_turn_inputs(&self.inner, session_id).await
633        }
634
635        async fn cancel_pending_turn_inputs(
636            &self,
637            session_id: &str,
638            targets: &[crate::PendingTurnInputCancelTarget],
639        ) -> Result<Vec<crate::PendingTurnInputCancelResult>, crate::StoreError> {
640            crate::TurnInputStore::cancel_pending_turn_inputs(&self.inner, session_id, targets)
641                .await
642        }
643
644        async fn cancel_pending_turn_input_suffix(
645            &self,
646            session_id: &str,
647            anchor: &crate::PendingTurnInputCancelTarget,
648        ) -> Result<crate::PendingTurnInputSuffixCancelOutcome, crate::StoreError> {
649            crate::TurnInputStore::cancel_pending_turn_input_suffix(&self.inner, session_id, anchor)
650                .await
651        }
652
653        async fn claim_active_turn_inputs(
654            &self,
655            session_id: &str,
656            session_execution_lease: &crate::SessionExecutionLeaseFence,
657            owner: &crate::LeaseOwnerIdentity,
658            turn_id: &str,
659            checkpoint: crate::CheckpointKind,
660            lease_ttl_ms: u64,
661            max_inputs: usize,
662        ) -> Result<Option<crate::TurnInputClaim>, crate::StoreError> {
663            crate::TurnInputStore::claim_active_turn_inputs(
664                &self.inner,
665                session_id,
666                session_execution_lease,
667                owner,
668                turn_id,
669                checkpoint,
670                lease_ttl_ms,
671                max_inputs,
672            )
673            .await
674        }
675
676        async fn claim_next_turn_inputs(
677            &self,
678            session_id: &str,
679            session_execution_lease: &crate::SessionExecutionLeaseFence,
680            owner: &crate::LeaseOwnerIdentity,
681            lease_ttl_ms: u64,
682            max_inputs: usize,
683        ) -> Result<Option<crate::TurnInputClaim>, crate::StoreError> {
684            crate::TurnInputStore::claim_next_turn_inputs(
685                &self.inner,
686                session_id,
687                session_execution_lease,
688                owner,
689                lease_ttl_ms,
690                max_inputs,
691            )
692            .await
693        }
694
695        async fn abandon_turn_input_claim(
696            &self,
697            claim: &crate::TurnInputClaim,
698        ) -> Result<(), crate::StoreError> {
699            crate::TurnInputStore::abandon_turn_input_claim(&self.inner, claim).await
700        }
701    }
702
703    #[async_trait::async_trait]
704    impl crate::QueuedWorkStore for RecordingRuntimePersistence {
705        async fn enqueue_queued_work(
706            &self,
707            batch: crate::QueuedWorkBatchDraft,
708        ) -> Result<crate::QueuedWorkBatch, crate::StoreError> {
709            crate::QueuedWorkStore::enqueue_queued_work(&self.inner, batch).await
710        }
711
712        async fn claim_leading_ready_session_command(
713            &self,
714            session_id: &str,
715            session_execution_lease: &crate::SessionExecutionLeaseFence,
716            owner: &crate::LeaseOwnerIdentity,
717            lease_ttl_ms: u64,
718        ) -> Result<Option<crate::QueuedWorkClaim>, crate::StoreError> {
719            crate::QueuedWorkStore::claim_leading_ready_session_command(
720                &self.inner,
721                session_id,
722                session_execution_lease,
723                owner,
724                lease_ttl_ms,
725            )
726            .await
727        }
728
729        async fn claim_ready_queued_work(
730            &self,
731            session_id: &str,
732            session_execution_lease: &crate::SessionExecutionLeaseFence,
733            owner: &crate::LeaseOwnerIdentity,
734            boundary: crate::QueuedWorkClaimBoundary,
735            lease_ttl_ms: u64,
736            max_batches: usize,
737        ) -> Result<Option<crate::QueuedWorkClaim>, crate::StoreError> {
738            crate::QueuedWorkStore::claim_ready_queued_work(
739                &self.inner,
740                session_id,
741                session_execution_lease,
742                owner,
743                boundary,
744                lease_ttl_ms,
745                max_batches,
746            )
747            .await
748        }
749
750        async fn renew_queued_work_claim(
751            &self,
752            claim: &crate::QueuedWorkClaim,
753            lease_ttl_ms: u64,
754        ) -> Result<crate::QueuedWorkClaim, crate::StoreError> {
755            crate::QueuedWorkStore::renew_queued_work_claim(&self.inner, claim, lease_ttl_ms).await
756        }
757
758        async fn abandon_queued_work_claim(
759            &self,
760            claim: &crate::QueuedWorkClaim,
761        ) -> Result<(), crate::StoreError> {
762            crate::QueuedWorkStore::abandon_queued_work_claim(&self.inner, claim).await
763        }
764
765        async fn cancel_queued_work_batch(
766            &self,
767            session_id: &str,
768            batch_id: &str,
769        ) -> Result<Option<crate::QueuedWorkBatch>, crate::StoreError> {
770            crate::QueuedWorkStore::cancel_queued_work_batch(&self.inner, session_id, batch_id)
771                .await
772        }
773
774        async fn list_queued_work(
775            &self,
776            session_id: &str,
777        ) -> Result<Vec<crate::QueuedWorkBatch>, crate::StoreError> {
778            crate::QueuedWorkStore::list_queued_work(&self.inner, session_id).await
779        }
780
781        async fn list_pending_queued_work(
782            &self,
783            session_id: &str,
784        ) -> Result<Vec<crate::QueuedWorkBatch>, crate::StoreError> {
785            crate::QueuedWorkStore::list_pending_queued_work(&self.inner, session_id).await
786        }
787    }
788
789    #[async_trait::async_trait]
790    impl crate::StoreMaintenance for RecordingRuntimePersistence {
791        async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), crate::StoreError> {
792            crate::StoreMaintenance::tombstone_nodes(&self.inner, ids).await
793        }
794
795        async fn vacuum(&self) -> Result<crate::VacuumReport, crate::StoreError> {
796            crate::StoreMaintenance::vacuum(&self.inner).await
797        }
798
799        async fn gc_unreachable(&self) -> Result<crate::GcReport, crate::StoreError> {
800            crate::StoreMaintenance::gc_unreachable(&self.inner).await
801        }
802    }
803
804    fn meta() -> AttachmentCreateMeta {
805        AttachmentCreateMeta::new(
806            MediaType::Image(ImageMediaType::Png),
807            Some(1),
808            Some(1),
809            Some("pixel".to_string()),
810        )
811    }
812
813    fn system_epoch_ms_for_test() -> u64 {
814        SystemTime::now()
815            .duration_since(UNIX_EPOCH)
816            .expect("system clock must be after Unix epoch")
817            .as_millis() as u64
818    }
819
820    #[tokio::test]
821    async fn memory_store_dedupes_by_bytes() {
822        let store = InMemoryAttachmentStore::new();
823        let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
824        let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
825        assert_eq!(a.id, b.id);
826        assert_eq!(a.byte_len, 3);
827        assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
828    }
829
830    #[tokio::test]
831    async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
832        let store = InMemoryAttachmentStore::new();
833        let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
834
835        assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
836        assert_eq!(reference.byte_len, 4);
837    }
838
839    #[tokio::test]
840    async fn session_scoped_attachment_store_satisfies_conformance() {
841        crate::testing::conformance::attachment_store(
842            || {
843                let manifest: Arc<dyn AttachmentManifest> = Arc::new(RecordingManifest::default());
844                Arc::new(SessionScopedAttachmentStore::new(
845                    Arc::new(InMemoryAttachmentStore::new()),
846                    manifest,
847                    "session-scoped-conformance",
848                )) as Arc<dyn AttachmentStore>
849            },
850            AttachmentStorePersistence::Ephemeral,
851        )
852        .await;
853    }
854
855    #[tokio::test]
856    async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
857        let manifest = Arc::new(RecordingManifest::default());
858        let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
859        let store = SessionScopedAttachmentStore::new(
860            Arc::new(InMemoryAttachmentStore::new()),
861            manifest_for_store,
862            "session-1",
863        );
864
865        let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
866
867        assert_eq!(
868            manifest.intents.lock().expect("lock intents")[0].attachment_id,
869            reference.id
870        );
871        assert_eq!(
872            store.pending_manifest_commit_ids(),
873            vec![reference.id.clone()]
874        );
875
876        store.mark_manifest_committed(&[AttachmentId::new("other")]);
877        assert_eq!(
878            store.pending_manifest_commit_ids(),
879            vec![reference.id.clone()]
880        );
881
882        store.mark_manifest_committed(std::slice::from_ref(&reference.id));
883        assert!(store.pending_manifest_commit_ids().is_empty());
884    }
885
886    #[tokio::test]
887    async fn session_scoped_store_records_intent_timestamp_from_system_clock() {
888        let manifest = Arc::new(RecordingManifest::default());
889        let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
890        let store = SessionScopedAttachmentStore::new(
891            Arc::new(InMemoryAttachmentStore::new()),
892            manifest_for_store,
893            "session-clock",
894        );
895
896        let before_put_epoch_ms = system_epoch_ms_for_test();
897        let reference = store.put(vec![11, 12, 13], meta()).await.expect("put");
898        let after_put_epoch_ms = system_epoch_ms_for_test();
899
900        let intents = manifest.intents.lock().expect("lock intents");
901        assert_eq!(intents.len(), 1);
902        let intent = &intents[0];
903        assert_eq!(intent.attachment_id, reference.id);
904        assert!(
905            intent.intent_at_epoch_ms > 1_000_000_000_000,
906            "intent timestamp should be a real epoch millis value, got {}",
907            intent.intent_at_epoch_ms
908        );
909        assert!(
910            intent.intent_at_epoch_ms >= before_put_epoch_ms.saturating_sub(1000),
911            "intent timestamp {} should be close to or after put start {}",
912            intent.intent_at_epoch_ms,
913            before_put_epoch_ms
914        );
915        assert!(
916            intent.intent_at_epoch_ms <= after_put_epoch_ms.saturating_add(1000),
917            "intent timestamp {} should be close to or before put finish {}",
918            intent.intent_at_epoch_ms,
919            after_put_epoch_ms
920        );
921    }
922
923    #[test]
924    fn persistence_manifest_adapter_forwards_to_wrapped_runtime_persistence() {
925        let runtime = Arc::new(RecordingRuntimePersistence::default());
926        let persistence: Arc<dyn crate::RuntimePersistence> = runtime.clone();
927        let adapter = PersistenceManifestAdapter(persistence);
928        let attachment_id = AttachmentId::new("adapter-forwarding");
929        let intent = AttachmentIntent {
930            attachment_id: attachment_id.clone(),
931            session_id: "adapter-session".to_string(),
932            canonical_uri: "sha256:adapter-forwarding".to_string(),
933            intent_at_epoch_ms: 10,
934        };
935
936        adapter.record_intent(intent).expect("record intent");
937        let uncommitted = adapter
938            .list_uncommitted(10)
939            .expect("list uncommitted through adapter");
940        assert_eq!(uncommitted.len(), 1);
941        assert_eq!(uncommitted[0].attachment_id, attachment_id);
942        assert_eq!(uncommitted[0].session_id, "adapter-session");
943        assert_eq!(uncommitted[0].canonical_uri, "sha256:adapter-forwarding");
944        assert_eq!(uncommitted[0].intent_at_epoch_ms, 10);
945        assert!(uncommitted[0].committed_at_epoch_ms.is_none());
946
947        adapter
948            .commit_refs("adapter-session", std::slice::from_ref(&attachment_id))
949            .expect("commit refs through adapter");
950        assert!(
951            adapter
952                .list_uncommitted(10)
953                .expect("list after commit through adapter")
954                .is_empty()
955        );
956        assert_eq!(
957            runtime
958                .manifest
959                .committed
960                .lock()
961                .expect("lock committed attachments")
962                .as_slice(),
963            &[("adapter-session".to_string(), attachment_id)]
964        );
965    }
966}