1mod file_store;
2
3pub use file_store::FileAttachmentStore;
4
5use std::collections::{BTreeSet, HashMap};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
10use sha2::{Digest, Sha256};
11
12use crate::store::{AttachmentIntent, AttachmentManifest};
13
14#[derive(Debug, thiserror::Error)]
15pub enum AttachmentStoreError {
16 #[error("attachment `{0}` was not found")]
17 NotFound(AttachmentId),
18 #[error("attachment store I/O failed at {path}: {source}")]
19 Io {
20 path: PathBuf,
21 #[source]
22 source: std::io::Error,
23 },
24 #[error("attachment store metadata is unavailable for `{0}`")]
25 MissingMeta(AttachmentId),
26 #[error("attachment store metadata decode failed for `{id}`: {source}")]
27 MetadataDecode {
28 id: AttachmentId,
29 #[source]
30 source: serde_json::Error,
31 },
32 #[error("attachment manifest write failed: {0}")]
33 ManifestRecordFailed(String),
34 #[error("attachment store backend failed: {0}")]
35 Backend(String),
36}
37
38#[derive(Clone, Debug)]
39pub struct StoredAttachment {
40 pub meta: AttachmentMeta,
41 pub bytes: Vec<u8>,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum AttachmentStorePersistence {
46 Ephemeral,
47 Durable,
48}
49
50impl AttachmentStorePersistence {
51 pub fn durability_tier(self) -> crate::DurabilityTier {
56 match self {
57 Self::Ephemeral => crate::DurabilityTier::Inline,
58 Self::Durable => crate::DurabilityTier::Durable,
59 }
60 }
61}
62
63#[async_trait::async_trait]
64pub trait AttachmentStore: Send + Sync {
65 fn persistence(&self) -> AttachmentStorePersistence {
66 AttachmentStorePersistence::Ephemeral
67 }
68
69 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
78 Vec::new()
79 }
80
81 fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
84
85 async fn put(
86 &self,
87 bytes: Vec<u8>,
88 meta: AttachmentCreateMeta,
89 ) -> Result<AttachmentRef, AttachmentStoreError>;
90
91 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
92
93 async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError>;
115}
116
117#[derive(Clone, Debug, Default, PartialEq, Eq)]
123pub struct AttachmentReclamationReport {
124 pub scanned_intent_count: usize,
127 pub reclaimed_count: usize,
130}
131
132pub async fn reclaim_orphaned_attachments<M, S>(
158 manifest: &M,
159 store: &S,
160 older_than_epoch_ms: u64,
161) -> Result<AttachmentReclamationReport, AttachmentStoreError>
162where
163 M: AttachmentManifest + ?Sized,
164 S: AttachmentStore + ?Sized,
165{
166 let orphans = manifest
167 .list_uncommitted(older_than_epoch_ms)
168 .map_err(|err| {
169 AttachmentStoreError::Backend(format!(
170 "failed to list uncommitted attachment intents: {err}"
171 ))
172 })?;
173 let scanned_intent_count = orphans.len();
174 let mut reclaimed_count = 0;
175 for orphan in orphans {
176 store.delete(&orphan.attachment_id).await?;
177 manifest.forget(&orphan.attachment_id).map_err(|err| {
178 AttachmentStoreError::Backend(format!(
179 "failed to forget reclaimed attachment `{}`: {err}",
180 orphan.attachment_id
181 ))
182 })?;
183 reclaimed_count += 1;
184 }
185 Ok(AttachmentReclamationReport {
186 scanned_intent_count,
187 reclaimed_count,
188 })
189}
190
191#[derive(Default)]
192pub struct InMemoryAttachmentStore {
193 attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
194}
195
196impl InMemoryAttachmentStore {
197 pub fn new() -> Self {
198 Self::default()
199 }
200}
201
202#[async_trait::async_trait]
203impl AttachmentStore for InMemoryAttachmentStore {
204 async fn put(
205 &self,
206 bytes: Vec<u8>,
207 meta: AttachmentCreateMeta,
208 ) -> Result<AttachmentRef, AttachmentStoreError> {
209 let meta = stored_meta(&bytes, meta);
210 let reference = meta.as_ref();
211 let stored = StoredAttachment { meta, bytes };
212 self.attachments
213 .lock()
214 .expect("attachment store lock")
215 .insert(reference.id.clone(), stored);
216 Ok(reference)
217 }
218
219 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
220 self.attachments
221 .lock()
222 .expect("attachment store lock")
223 .get(id)
224 .cloned()
225 .ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
226 }
227
228 async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError> {
229 self.attachments
230 .lock()
231 .expect("attachment store lock")
232 .remove(id);
233 Ok(())
234 }
235}
236
237pub fn content_id(bytes: &[u8]) -> AttachmentId {
238 AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
239}
240
241pub struct SessionScopedAttachmentStore {
255 inner: Arc<dyn AttachmentStore>,
256 manifest: Arc<dyn AttachmentManifest>,
257 session_id: String,
258 pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
259}
260
261impl SessionScopedAttachmentStore {
262 pub fn new(
263 inner: Arc<dyn AttachmentStore>,
264 manifest: Arc<dyn AttachmentManifest>,
265 session_id: impl Into<String>,
266 ) -> Self {
267 Self {
268 inner,
269 manifest,
270 session_id: session_id.into(),
271 pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
272 }
273 }
274
275 pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
276 &self.inner
277 }
278
279 pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
280 &self.manifest
281 }
282}
283
284#[async_trait::async_trait]
285impl AttachmentStore for SessionScopedAttachmentStore {
286 fn persistence(&self) -> AttachmentStorePersistence {
287 self.inner.persistence()
288 }
289
290 fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
291 self.pending_manifest_commit_ids
292 .lock()
293 .expect("attachment manifest commit tracker lock")
294 .iter()
295 .cloned()
296 .collect()
297 }
298
299 fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
300 if ids.is_empty() {
301 return;
302 }
303 let mut pending = self
304 .pending_manifest_commit_ids
305 .lock()
306 .expect("attachment manifest commit tracker lock");
307 for id in ids {
308 pending.remove(id);
309 }
310 }
311
312 async fn put(
313 &self,
314 bytes: Vec<u8>,
315 meta: AttachmentCreateMeta,
316 ) -> Result<AttachmentRef, AttachmentStoreError> {
317 let attachment_id = content_id(&bytes);
318 let intent = AttachmentIntent {
319 attachment_id: attachment_id.clone(),
320 session_id: self.session_id.clone(),
321 canonical_uri: format!("sha256:{attachment_id}"),
322 intent_at_epoch_ms: now_epoch_ms(),
323 };
324 self.manifest.record_intent(intent).map_err(|err| {
327 AttachmentStoreError::ManifestRecordFailed(format!(
328 "failed to record attachment intent for `{attachment_id}`: {err}"
329 ))
330 })?;
331 let reference = self.inner.put(bytes, meta).await?;
332 if reference.id != attachment_id {
333 return Err(AttachmentStoreError::Backend(format!(
334 "attachment store returned id `{}` after manifest intent for `{attachment_id}`",
335 reference.id
336 )));
337 }
338 self.pending_manifest_commit_ids
339 .lock()
340 .expect("attachment manifest commit tracker lock")
341 .insert(reference.id.clone());
342 Ok(reference)
343 }
344
345 async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
346 self.inner.get(id).await
347 }
348
349 async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError> {
350 self.inner.delete(id).await
355 }
356}
357
358fn now_epoch_ms() -> u64 {
359 <crate::SystemClock as crate::Clock>::timestamp_ms(&crate::SystemClock)
360}
361
362pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
367
368impl AttachmentManifest for PersistenceManifestAdapter {
369 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
370 AttachmentManifest::record_intent(&*self.0, intent)
371 }
372
373 fn commit_refs(
374 &self,
375 session_id: &str,
376 attachment_ids: &[AttachmentId],
377 ) -> Result<(), crate::StoreError> {
378 AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
379 }
380
381 fn list_uncommitted(
382 &self,
383 older_than_epoch_ms: u64,
384 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
385 AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
386 }
387
388 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
389 AttachmentManifest::forget(&*self.0, attachment_id)
390 }
391}
392
393fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
394 AttachmentMeta::new(
395 content_id(bytes),
396 meta.media_type,
397 bytes.len() as u64,
398 meta.width,
399 meta.height,
400 meta.label,
401 )
402}
403
404pub async fn resolve_llm_request_attachments(
405 mut request: crate::llm::types::LlmRequest,
406 store: &dyn AttachmentStore,
407) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
408 for attachment in &mut request.attachments {
409 let Some(reference) = attachment.reference.as_ref() else {
410 continue;
411 };
412 if !attachment.data.is_empty() {
413 continue;
414 }
415 let stored = store.get(&reference.id).await?;
416 attachment.mime = stored.meta.media_type.canonical_mime().to_string();
417 attachment.data = stored.bytes;
418 }
419 Ok(request)
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use lash_sansio::{ImageMediaType, MediaType};
426 use std::collections::HashSet;
427 use std::time::{SystemTime, UNIX_EPOCH};
428
429 #[derive(Default)]
430 struct RecordingManifest {
431 intents: Mutex<Vec<AttachmentIntent>>,
432 committed: Mutex<Vec<(String, AttachmentId)>>,
433 }
434
435 impl AttachmentManifest for RecordingManifest {
436 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
437 self.intents.lock().expect("lock intents").push(intent);
438 Ok(())
439 }
440
441 fn commit_refs(
442 &self,
443 session_id: &str,
444 attachment_ids: &[AttachmentId],
445 ) -> Result<(), crate::StoreError> {
446 let mut committed = self.committed.lock().expect("lock committed attachments");
447 committed.extend(
448 attachment_ids
449 .iter()
450 .cloned()
451 .map(|attachment_id| (session_id.to_string(), attachment_id)),
452 );
453 Ok(())
454 }
455
456 fn list_uncommitted(
457 &self,
458 older_than_epoch_ms: u64,
459 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
460 let committed = self
461 .committed
462 .lock()
463 .expect("lock committed attachments")
464 .iter()
465 .cloned()
466 .collect::<HashSet<_>>();
467 Ok(self
468 .intents
469 .lock()
470 .expect("lock intents")
471 .iter()
472 .filter(|intent| intent.intent_at_epoch_ms <= older_than_epoch_ms)
473 .filter(|intent| {
474 !committed.contains(&(intent.session_id.clone(), intent.attachment_id.clone()))
475 })
476 .map(|intent| crate::AttachmentManifestEntry {
477 attachment_id: intent.attachment_id.clone(),
478 session_id: intent.session_id.clone(),
479 canonical_uri: intent.canonical_uri.clone(),
480 intent_at_epoch_ms: intent.intent_at_epoch_ms,
481 committed_at_epoch_ms: None,
482 })
483 .collect())
484 }
485
486 fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
487 Ok(())
488 }
489 }
490
491 #[derive(Default)]
492 struct RecordingRuntimePersistence {
493 inner: crate::InMemorySessionStore,
494 manifest: RecordingManifest,
495 }
496
497 impl AttachmentManifest for RecordingRuntimePersistence {
498 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
499 self.manifest.record_intent(intent)
500 }
501
502 fn commit_refs(
503 &self,
504 session_id: &str,
505 attachment_ids: &[AttachmentId],
506 ) -> Result<(), crate::StoreError> {
507 self.manifest.commit_refs(session_id, attachment_ids)
508 }
509
510 fn list_uncommitted(
511 &self,
512 older_than_epoch_ms: u64,
513 ) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
514 self.manifest.list_uncommitted(older_than_epoch_ms)
515 }
516
517 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
518 self.manifest.forget(attachment_id)
519 }
520 }
521
522 #[async_trait::async_trait]
526 impl crate::SessionCommitStore for RecordingRuntimePersistence {
527 async fn load_session(
528 &self,
529 scope: crate::SessionReadScope,
530 ) -> Result<Option<crate::PersistedSessionRead>, crate::StoreError> {
531 crate::SessionCommitStore::load_session(&self.inner, scope).await
532 }
533
534 async fn load_node(
535 &self,
536 node_id: &str,
537 ) -> Result<Option<crate::SessionNodeRecord>, crate::StoreError> {
538 crate::SessionCommitStore::load_node(&self.inner, node_id).await
539 }
540
541 async fn commit_runtime_state(
542 &self,
543 commit: crate::RuntimeCommit,
544 ) -> Result<crate::RuntimeCommitResult, crate::StoreError> {
545 crate::SessionCommitStore::commit_runtime_state(&self.inner, commit).await
546 }
547
548 async fn save_session_meta(
549 &self,
550 meta: crate::SessionMeta,
551 ) -> Result<(), crate::StoreError> {
552 crate::SessionCommitStore::save_session_meta(&self.inner, meta).await
553 }
554
555 async fn load_session_meta(&self) -> Result<Option<crate::SessionMeta>, crate::StoreError> {
556 crate::SessionCommitStore::load_session_meta(&self.inner).await
557 }
558 }
559
560 #[async_trait::async_trait]
561 impl crate::SessionExecutionLeaseStore for RecordingRuntimePersistence {
562 async fn try_claim_session_execution_lease(
563 &self,
564 session_id: &str,
565 owner: &crate::LeaseOwnerIdentity,
566 lease_ttl_ms: u64,
567 ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
568 crate::SessionExecutionLeaseStore::try_claim_session_execution_lease(
569 &self.inner,
570 session_id,
571 owner,
572 lease_ttl_ms,
573 )
574 .await
575 }
576
577 async fn reclaim_session_execution_lease(
578 &self,
579 session_id: &str,
580 owner: &crate::LeaseOwnerIdentity,
581 observed_holder: &crate::SessionExecutionLeaseFence,
582 lease_ttl_ms: u64,
583 ) -> Result<crate::SessionExecutionLeaseClaimOutcome, crate::StoreError> {
584 crate::SessionExecutionLeaseStore::reclaim_session_execution_lease(
585 &self.inner,
586 session_id,
587 owner,
588 observed_holder,
589 lease_ttl_ms,
590 )
591 .await
592 }
593
594 async fn renew_session_execution_lease(
595 &self,
596 fence: &crate::SessionExecutionLeaseFence,
597 lease_ttl_ms: u64,
598 ) -> Result<crate::SessionExecutionLease, crate::StoreError> {
599 crate::SessionExecutionLeaseStore::renew_session_execution_lease(
600 &self.inner,
601 fence,
602 lease_ttl_ms,
603 )
604 .await
605 }
606
607 async fn release_session_execution_lease(
608 &self,
609 completion: &crate::SessionExecutionLeaseCompletion,
610 ) -> Result<(), crate::StoreError> {
611 crate::SessionExecutionLeaseStore::release_session_execution_lease(
612 &self.inner,
613 completion,
614 )
615 .await
616 }
617 }
618
619 #[async_trait::async_trait]
620 impl crate::TurnInputStore for RecordingRuntimePersistence {
621 async fn enqueue_pending_turn_input(
622 &self,
623 input: crate::PendingTurnInputDraft,
624 ) -> Result<crate::PendingTurnInput, crate::StoreError> {
625 crate::TurnInputStore::enqueue_pending_turn_input(&self.inner, input).await
626 }
627
628 async fn list_pending_turn_inputs(
629 &self,
630 session_id: &str,
631 ) -> Result<Vec<crate::PendingTurnInput>, crate::StoreError> {
632 crate::TurnInputStore::list_pending_turn_inputs(&self.inner, session_id).await
633 }
634
635 async fn cancel_pending_turn_inputs(
636 &self,
637 session_id: &str,
638 targets: &[crate::PendingTurnInputCancelTarget],
639 ) -> Result<Vec<crate::PendingTurnInputCancelResult>, crate::StoreError> {
640 crate::TurnInputStore::cancel_pending_turn_inputs(&self.inner, session_id, targets)
641 .await
642 }
643
644 async fn cancel_pending_turn_input_suffix(
645 &self,
646 session_id: &str,
647 anchor: &crate::PendingTurnInputCancelTarget,
648 ) -> Result<crate::PendingTurnInputSuffixCancelOutcome, crate::StoreError> {
649 crate::TurnInputStore::cancel_pending_turn_input_suffix(&self.inner, session_id, anchor)
650 .await
651 }
652
653 async fn claim_active_turn_inputs(
654 &self,
655 session_id: &str,
656 session_execution_lease: &crate::SessionExecutionLeaseFence,
657 owner: &crate::LeaseOwnerIdentity,
658 turn_id: &str,
659 checkpoint: crate::CheckpointKind,
660 lease_ttl_ms: u64,
661 max_inputs: usize,
662 ) -> Result<Option<crate::TurnInputClaim>, crate::StoreError> {
663 crate::TurnInputStore::claim_active_turn_inputs(
664 &self.inner,
665 session_id,
666 session_execution_lease,
667 owner,
668 turn_id,
669 checkpoint,
670 lease_ttl_ms,
671 max_inputs,
672 )
673 .await
674 }
675
676 async fn claim_next_turn_inputs(
677 &self,
678 session_id: &str,
679 session_execution_lease: &crate::SessionExecutionLeaseFence,
680 owner: &crate::LeaseOwnerIdentity,
681 lease_ttl_ms: u64,
682 max_inputs: usize,
683 ) -> Result<Option<crate::TurnInputClaim>, crate::StoreError> {
684 crate::TurnInputStore::claim_next_turn_inputs(
685 &self.inner,
686 session_id,
687 session_execution_lease,
688 owner,
689 lease_ttl_ms,
690 max_inputs,
691 )
692 .await
693 }
694
695 async fn abandon_turn_input_claim(
696 &self,
697 claim: &crate::TurnInputClaim,
698 ) -> Result<(), crate::StoreError> {
699 crate::TurnInputStore::abandon_turn_input_claim(&self.inner, claim).await
700 }
701 }
702
703 #[async_trait::async_trait]
704 impl crate::QueuedWorkStore for RecordingRuntimePersistence {
705 async fn enqueue_queued_work(
706 &self,
707 batch: crate::QueuedWorkBatchDraft,
708 ) -> Result<crate::QueuedWorkBatch, crate::StoreError> {
709 crate::QueuedWorkStore::enqueue_queued_work(&self.inner, batch).await
710 }
711
712 async fn claim_leading_ready_session_command(
713 &self,
714 session_id: &str,
715 session_execution_lease: &crate::SessionExecutionLeaseFence,
716 owner: &crate::LeaseOwnerIdentity,
717 lease_ttl_ms: u64,
718 ) -> Result<Option<crate::QueuedWorkClaim>, crate::StoreError> {
719 crate::QueuedWorkStore::claim_leading_ready_session_command(
720 &self.inner,
721 session_id,
722 session_execution_lease,
723 owner,
724 lease_ttl_ms,
725 )
726 .await
727 }
728
729 async fn claim_ready_queued_work(
730 &self,
731 session_id: &str,
732 session_execution_lease: &crate::SessionExecutionLeaseFence,
733 owner: &crate::LeaseOwnerIdentity,
734 boundary: crate::QueuedWorkClaimBoundary,
735 lease_ttl_ms: u64,
736 max_batches: usize,
737 ) -> Result<Option<crate::QueuedWorkClaim>, crate::StoreError> {
738 crate::QueuedWorkStore::claim_ready_queued_work(
739 &self.inner,
740 session_id,
741 session_execution_lease,
742 owner,
743 boundary,
744 lease_ttl_ms,
745 max_batches,
746 )
747 .await
748 }
749
750 async fn renew_queued_work_claim(
751 &self,
752 claim: &crate::QueuedWorkClaim,
753 lease_ttl_ms: u64,
754 ) -> Result<crate::QueuedWorkClaim, crate::StoreError> {
755 crate::QueuedWorkStore::renew_queued_work_claim(&self.inner, claim, lease_ttl_ms).await
756 }
757
758 async fn abandon_queued_work_claim(
759 &self,
760 claim: &crate::QueuedWorkClaim,
761 ) -> Result<(), crate::StoreError> {
762 crate::QueuedWorkStore::abandon_queued_work_claim(&self.inner, claim).await
763 }
764
765 async fn cancel_queued_work_batch(
766 &self,
767 session_id: &str,
768 batch_id: &str,
769 ) -> Result<Option<crate::QueuedWorkBatch>, crate::StoreError> {
770 crate::QueuedWorkStore::cancel_queued_work_batch(&self.inner, session_id, batch_id)
771 .await
772 }
773
774 async fn list_queued_work(
775 &self,
776 session_id: &str,
777 ) -> Result<Vec<crate::QueuedWorkBatch>, crate::StoreError> {
778 crate::QueuedWorkStore::list_queued_work(&self.inner, session_id).await
779 }
780
781 async fn list_pending_queued_work(
782 &self,
783 session_id: &str,
784 ) -> Result<Vec<crate::QueuedWorkBatch>, crate::StoreError> {
785 crate::QueuedWorkStore::list_pending_queued_work(&self.inner, session_id).await
786 }
787 }
788
789 #[async_trait::async_trait]
790 impl crate::StoreMaintenance for RecordingRuntimePersistence {
791 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), crate::StoreError> {
792 crate::StoreMaintenance::tombstone_nodes(&self.inner, ids).await
793 }
794
795 async fn vacuum(&self) -> Result<crate::VacuumReport, crate::StoreError> {
796 crate::StoreMaintenance::vacuum(&self.inner).await
797 }
798
799 async fn gc_unreachable(&self) -> Result<crate::GcReport, crate::StoreError> {
800 crate::StoreMaintenance::gc_unreachable(&self.inner).await
801 }
802 }
803
804 fn meta() -> AttachmentCreateMeta {
805 AttachmentCreateMeta::new(
806 MediaType::Image(ImageMediaType::Png),
807 Some(1),
808 Some(1),
809 Some("pixel".to_string()),
810 )
811 }
812
813 fn system_epoch_ms_for_test() -> u64 {
814 SystemTime::now()
815 .duration_since(UNIX_EPOCH)
816 .expect("system clock must be after Unix epoch")
817 .as_millis() as u64
818 }
819
820 #[tokio::test]
821 async fn memory_store_dedupes_by_bytes() {
822 let store = InMemoryAttachmentStore::new();
823 let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
824 let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
825 assert_eq!(a.id, b.id);
826 assert_eq!(a.byte_len, 3);
827 assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
828 }
829
830 #[tokio::test]
831 async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
832 let store = InMemoryAttachmentStore::new();
833 let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
834
835 assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
836 assert_eq!(reference.byte_len, 4);
837 }
838
839 #[tokio::test]
840 async fn session_scoped_attachment_store_satisfies_conformance() {
841 crate::testing::conformance::attachment_store(
842 || {
843 let manifest: Arc<dyn AttachmentManifest> = Arc::new(RecordingManifest::default());
844 Arc::new(SessionScopedAttachmentStore::new(
845 Arc::new(InMemoryAttachmentStore::new()),
846 manifest,
847 "session-scoped-conformance",
848 )) as Arc<dyn AttachmentStore>
849 },
850 AttachmentStorePersistence::Ephemeral,
851 )
852 .await;
853 }
854
855 #[tokio::test]
856 async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
857 let manifest = Arc::new(RecordingManifest::default());
858 let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
859 let store = SessionScopedAttachmentStore::new(
860 Arc::new(InMemoryAttachmentStore::new()),
861 manifest_for_store,
862 "session-1",
863 );
864
865 let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
866
867 assert_eq!(
868 manifest.intents.lock().expect("lock intents")[0].attachment_id,
869 reference.id
870 );
871 assert_eq!(
872 store.pending_manifest_commit_ids(),
873 vec![reference.id.clone()]
874 );
875
876 store.mark_manifest_committed(&[AttachmentId::new("other")]);
877 assert_eq!(
878 store.pending_manifest_commit_ids(),
879 vec![reference.id.clone()]
880 );
881
882 store.mark_manifest_committed(std::slice::from_ref(&reference.id));
883 assert!(store.pending_manifest_commit_ids().is_empty());
884 }
885
886 #[tokio::test]
887 async fn session_scoped_store_records_intent_timestamp_from_system_clock() {
888 let manifest = Arc::new(RecordingManifest::default());
889 let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
890 let store = SessionScopedAttachmentStore::new(
891 Arc::new(InMemoryAttachmentStore::new()),
892 manifest_for_store,
893 "session-clock",
894 );
895
896 let before_put_epoch_ms = system_epoch_ms_for_test();
897 let reference = store.put(vec![11, 12, 13], meta()).await.expect("put");
898 let after_put_epoch_ms = system_epoch_ms_for_test();
899
900 let intents = manifest.intents.lock().expect("lock intents");
901 assert_eq!(intents.len(), 1);
902 let intent = &intents[0];
903 assert_eq!(intent.attachment_id, reference.id);
904 assert!(
905 intent.intent_at_epoch_ms > 1_000_000_000_000,
906 "intent timestamp should be a real epoch millis value, got {}",
907 intent.intent_at_epoch_ms
908 );
909 assert!(
910 intent.intent_at_epoch_ms >= before_put_epoch_ms.saturating_sub(1000),
911 "intent timestamp {} should be close to or after put start {}",
912 intent.intent_at_epoch_ms,
913 before_put_epoch_ms
914 );
915 assert!(
916 intent.intent_at_epoch_ms <= after_put_epoch_ms.saturating_add(1000),
917 "intent timestamp {} should be close to or before put finish {}",
918 intent.intent_at_epoch_ms,
919 after_put_epoch_ms
920 );
921 }
922
923 #[test]
924 fn persistence_manifest_adapter_forwards_to_wrapped_runtime_persistence() {
925 let runtime = Arc::new(RecordingRuntimePersistence::default());
926 let persistence: Arc<dyn crate::RuntimePersistence> = runtime.clone();
927 let adapter = PersistenceManifestAdapter(persistence);
928 let attachment_id = AttachmentId::new("adapter-forwarding");
929 let intent = AttachmentIntent {
930 attachment_id: attachment_id.clone(),
931 session_id: "adapter-session".to_string(),
932 canonical_uri: "sha256:adapter-forwarding".to_string(),
933 intent_at_epoch_ms: 10,
934 };
935
936 adapter.record_intent(intent).expect("record intent");
937 let uncommitted = adapter
938 .list_uncommitted(10)
939 .expect("list uncommitted through adapter");
940 assert_eq!(uncommitted.len(), 1);
941 assert_eq!(uncommitted[0].attachment_id, attachment_id);
942 assert_eq!(uncommitted[0].session_id, "adapter-session");
943 assert_eq!(uncommitted[0].canonical_uri, "sha256:adapter-forwarding");
944 assert_eq!(uncommitted[0].intent_at_epoch_ms, 10);
945 assert!(uncommitted[0].committed_at_epoch_ms.is_none());
946
947 adapter
948 .commit_refs("adapter-session", std::slice::from_ref(&attachment_id))
949 .expect("commit refs through adapter");
950 assert!(
951 adapter
952 .list_uncommitted(10)
953 .expect("list after commit through adapter")
954 .is_empty()
955 );
956 assert_eq!(
957 runtime
958 .manifest
959 .committed
960 .lock()
961 .expect("lock committed attachments")
962 .as_slice(),
963 &[("adapter-session".to_string(), attachment_id)]
964 );
965 }
966}