use std::collections::{BTreeSet, HashMap};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
use sha2::{Digest, Sha256};
use crate::store::{AttachmentIntent, AttachmentManifest};
#[derive(Debug, thiserror::Error)]
pub enum AttachmentStoreError {
#[error("attachment `{0}` was not found")]
NotFound(AttachmentId),
#[error("attachment store I/O failed at {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("attachment store metadata is unavailable for `{0}`")]
MissingMeta(AttachmentId),
#[error("attachment store metadata decode failed for `{id}`: {source}")]
MetadataDecode {
id: AttachmentId,
#[source]
source: serde_json::Error,
},
#[error("attachment manifest write failed: {0}")]
ManifestRecordFailed(String),
#[error("attachment store backend failed: {0}")]
Backend(String),
}
#[derive(Clone, Debug)]
pub struct StoredAttachment {
pub meta: AttachmentMeta,
pub bytes: Vec<u8>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum AttachmentStorePersistence {
Ephemeral,
Durable,
}
impl AttachmentStorePersistence {
pub fn durability_tier(self) -> crate::DurabilityTier {
match self {
Self::Ephemeral => crate::DurabilityTier::Inline,
Self::Durable => crate::DurabilityTier::Durable,
}
}
}
#[async_trait::async_trait]
pub trait AttachmentStore: Send + Sync {
fn persistence(&self) -> AttachmentStorePersistence {
AttachmentStorePersistence::Ephemeral
}
fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
Vec::new()
}
fn mark_manifest_committed(&self, _ids: &[AttachmentId]) {}
async fn put(
&self,
bytes: Vec<u8>,
meta: AttachmentCreateMeta,
) -> Result<AttachmentRef, AttachmentStoreError>;
async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError>;
}
#[derive(Default)]
pub struct InMemoryAttachmentStore {
attachments: Mutex<HashMap<AttachmentId, StoredAttachment>>,
}
impl InMemoryAttachmentStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait::async_trait]
impl AttachmentStore for InMemoryAttachmentStore {
async fn put(
&self,
bytes: Vec<u8>,
meta: AttachmentCreateMeta,
) -> Result<AttachmentRef, AttachmentStoreError> {
let meta = stored_meta(&bytes, meta);
let reference = meta.as_ref();
let stored = StoredAttachment { meta, bytes };
self.attachments
.lock()
.expect("attachment store lock")
.insert(reference.id.clone(), stored);
Ok(reference)
}
async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
self.attachments
.lock()
.expect("attachment store lock")
.get(id)
.cloned()
.ok_or_else(|| AttachmentStoreError::NotFound(id.clone()))
}
}
pub fn content_id(bytes: &[u8]) -> AttachmentId {
AttachmentId::new(format!("{:x}", Sha256::digest(bytes)))
}
pub struct SessionScopedAttachmentStore {
inner: Arc<dyn AttachmentStore>,
manifest: Arc<dyn AttachmentManifest>,
session_id: String,
pending_manifest_commit_ids: Mutex<BTreeSet<AttachmentId>>,
}
impl SessionScopedAttachmentStore {
pub fn new(
inner: Arc<dyn AttachmentStore>,
manifest: Arc<dyn AttachmentManifest>,
session_id: impl Into<String>,
) -> Self {
Self {
inner,
manifest,
session_id: session_id.into(),
pending_manifest_commit_ids: Mutex::new(BTreeSet::new()),
}
}
pub fn inner(&self) -> &Arc<dyn AttachmentStore> {
&self.inner
}
pub fn manifest(&self) -> &Arc<dyn AttachmentManifest> {
&self.manifest
}
}
#[async_trait::async_trait]
impl AttachmentStore for SessionScopedAttachmentStore {
fn persistence(&self) -> AttachmentStorePersistence {
self.inner.persistence()
}
fn pending_manifest_commit_ids(&self) -> Vec<AttachmentId> {
self.pending_manifest_commit_ids
.lock()
.expect("attachment manifest commit tracker lock")
.iter()
.cloned()
.collect()
}
fn mark_manifest_committed(&self, ids: &[AttachmentId]) {
if ids.is_empty() {
return;
}
let mut pending = self
.pending_manifest_commit_ids
.lock()
.expect("attachment manifest commit tracker lock");
for id in ids {
pending.remove(id);
}
}
async fn put(
&self,
bytes: Vec<u8>,
meta: AttachmentCreateMeta,
) -> Result<AttachmentRef, AttachmentStoreError> {
let attachment_id = content_id(&bytes);
let intent = AttachmentIntent {
attachment_id: attachment_id.clone(),
session_id: self.session_id.clone(),
canonical_uri: format!("sha256:{attachment_id}"),
intent_at_epoch_ms: now_epoch_ms(),
};
self.manifest.record_intent(intent).map_err(|err| {
AttachmentStoreError::ManifestRecordFailed(format!(
"failed to record attachment intent for `{attachment_id}`: {err}"
))
})?;
let reference = self.inner.put(bytes, meta).await?;
if reference.id != attachment_id {
return Err(AttachmentStoreError::Backend(format!(
"attachment store returned id `{}` after manifest intent for `{attachment_id}`",
reference.id
)));
}
self.pending_manifest_commit_ids
.lock()
.expect("attachment manifest commit tracker lock")
.insert(reference.id.clone());
Ok(reference)
}
async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
self.inner.get(id).await
}
}
fn now_epoch_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
pub(crate) struct PersistenceManifestAdapter(pub Arc<dyn crate::RuntimePersistence>);
impl AttachmentManifest for PersistenceManifestAdapter {
fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
AttachmentManifest::record_intent(&*self.0, intent)
}
fn commit_refs(
&self,
session_id: &str,
attachment_ids: &[AttachmentId],
) -> Result<(), crate::StoreError> {
AttachmentManifest::commit_refs(&*self.0, session_id, attachment_ids)
}
fn list_uncommitted(
&self,
older_than_epoch_ms: u64,
) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
AttachmentManifest::list_uncommitted(&*self.0, older_than_epoch_ms)
}
fn forget(&self, attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
AttachmentManifest::forget(&*self.0, attachment_id)
}
}
fn stored_meta(bytes: &[u8], meta: AttachmentCreateMeta) -> AttachmentMeta {
AttachmentMeta::new(
content_id(bytes),
meta.media_type,
bytes.len() as u64,
meta.width,
meta.height,
meta.label,
)
}
pub async fn resolve_llm_request_attachments(
mut request: crate::llm::types::LlmRequest,
store: &dyn AttachmentStore,
) -> Result<crate::llm::types::LlmRequest, AttachmentStoreError> {
for attachment in &mut request.attachments {
let Some(reference) = attachment.reference.as_ref() else {
continue;
};
if !attachment.data.is_empty() {
continue;
}
let stored = store.get(&reference.id).await?;
attachment.mime = stored.meta.media_type.canonical_mime().to_string();
attachment.data = stored.bytes;
}
Ok(request)
}
#[cfg(test)]
mod tests {
use super::*;
use lash_sansio::{ImageMediaType, MediaType};
#[derive(Default)]
struct RecordingManifest {
intents: Mutex<Vec<AttachmentIntent>>,
}
impl AttachmentManifest for RecordingManifest {
fn record_intent(&self, intent: AttachmentIntent) -> Result<(), crate::StoreError> {
self.intents.lock().expect("lock intents").push(intent);
Ok(())
}
fn commit_refs(
&self,
_session_id: &str,
_attachment_ids: &[AttachmentId],
) -> Result<(), crate::StoreError> {
Ok(())
}
fn list_uncommitted(
&self,
_older_than_epoch_ms: u64,
) -> Result<Vec<crate::AttachmentManifestEntry>, crate::StoreError> {
Ok(Vec::new())
}
fn forget(&self, _attachment_id: &AttachmentId) -> Result<(), crate::StoreError> {
Ok(())
}
}
fn meta() -> AttachmentCreateMeta {
AttachmentCreateMeta::new(
MediaType::Image(ImageMediaType::Png),
Some(1),
Some(1),
Some("pixel".to_string()),
)
}
#[tokio::test]
async fn memory_store_dedupes_by_bytes() {
let store = InMemoryAttachmentStore::new();
let a = store.put(vec![1, 2, 3], meta()).await.expect("put a");
let b = store.put(vec![1, 2, 3], meta()).await.expect("put b");
assert_eq!(a.id, b.id);
assert_eq!(a.byte_len, 3);
assert_eq!(store.get(&a.id).await.expect("get").bytes, vec![1, 2, 3]);
}
#[tokio::test]
async fn memory_store_assigns_identity_and_byte_len_from_bytes() {
let store = InMemoryAttachmentStore::new();
let reference = store.put(vec![4, 5, 6, 7], meta()).await.expect("put");
assert_eq!(reference.id, content_id(&[4, 5, 6, 7]));
assert_eq!(reference.byte_len, 4);
}
#[tokio::test]
async fn session_scoped_store_tracks_successful_puts_until_commit_mark() {
let manifest = Arc::new(RecordingManifest::default());
let manifest_for_store: Arc<dyn AttachmentManifest> = manifest.clone();
let store = SessionScopedAttachmentStore::new(
Arc::new(InMemoryAttachmentStore::new()),
manifest_for_store,
"session-1",
);
let reference = store.put(vec![8, 9, 10], meta()).await.expect("put");
assert_eq!(
manifest.intents.lock().expect("lock intents")[0].attachment_id,
reference.id
);
assert_eq!(
store.pending_manifest_commit_ids(),
vec![reference.id.clone()]
);
store.mark_manifest_committed(&[AttachmentId::new("other")]);
assert_eq!(
store.pending_manifest_commit_ids(),
vec![reference.id.clone()]
);
store.mark_manifest_committed(std::slice::from_ref(&reference.id));
assert!(store.pending_manifest_commit_ids().is_empty());
}
}