use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Mutex;
#[derive(Debug, Error)]
pub enum OutputStoreError {
#[error("output not found: {0}")]
NotFound(String),
#[error("internal: {0}")]
Internal(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct OutputRef(
pub String,
);
impl OutputRef {
pub fn new() -> Self {
OutputRef(format!("out-{}", crate::types::uid_hex(5)))
}
}
impl Default for OutputRef {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum OutputEvent {
Progress {
stage: String,
note: Option<String>,
},
Partial {
chunk: ContentRef,
},
Artifact {
name: String,
content: ContentRef,
},
Final {
content: ContentRef,
ok: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ContentRef {
Inline {
value: Value,
},
FileRef {
path: PathBuf,
mime: Option<String>,
size_hint: Option<u64>,
},
}
impl ContentRef {
pub fn inline(value: Value) -> Self {
ContentRef::Inline { value }
}
pub fn inline_text(text: impl Into<String>) -> Self {
ContentRef::Inline {
value: Value::String(text.into()),
}
}
pub fn file_ref(
path: impl Into<PathBuf>,
mime: Option<String>,
size_hint: Option<u64>,
) -> Self {
ContentRef::FileRef {
path: path.into(),
mime,
size_hint,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutputRecord {
pub id: OutputRef,
pub task_id: String,
pub attempt: u32,
pub producer_agent: String,
pub event: OutputEvent,
pub parent_refs: Vec<OutputRef>,
}
#[async_trait]
pub trait OutputStore: Send + Sync {
async fn append(
&self,
task_id: &str,
attempt: u32,
producer_agent: &str,
event: OutputEvent,
parent_refs: Vec<OutputRef>,
) -> Result<OutputRef, OutputStoreError>;
async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError>;
async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError>;
async fn list_for_attempt(
&self,
task_id: &str,
attempt: u32,
) -> Result<Vec<OutputRecord>, OutputStoreError>;
}
#[derive(Debug, Default, Clone)]
pub struct InMemoryOutputStore {
inner: Arc<Mutex<InMemoryInner>>,
}
#[derive(Debug, Default)]
struct InMemoryInner {
by_id: HashMap<OutputRef, OutputRecord>,
by_attempt: HashMap<(String, u32), Vec<OutputRef>>,
by_name: HashMap<String, Vec<OutputRef>>,
}
impl InMemoryOutputStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl OutputStore for InMemoryOutputStore {
async fn append(
&self,
task_id: &str,
attempt: u32,
producer_agent: &str,
event: OutputEvent,
parent_refs: Vec<OutputRef>,
) -> Result<OutputRef, OutputStoreError> {
let id = OutputRef::new();
let record = OutputRecord {
id: id.clone(),
task_id: task_id.to_string(),
attempt,
producer_agent: producer_agent.to_string(),
event,
parent_refs,
};
let mut guard = self.inner.lock().await;
guard.by_id.insert(id.clone(), record);
guard
.by_attempt
.entry((task_id.to_string(), attempt))
.or_default()
.push(id.clone());
guard
.by_name
.entry(producer_agent.to_string())
.or_default()
.push(id.clone());
Ok(id)
}
async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError> {
let guard = self.inner.lock().await;
guard
.by_id
.get(id)
.cloned()
.ok_or_else(|| OutputStoreError::NotFound(id.0.clone()))
}
async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError> {
let guard = self.inner.lock().await;
let latest = guard
.by_name
.get(name)
.and_then(|ids| ids.last())
.ok_or_else(|| OutputStoreError::NotFound(name.to_string()))?;
guard
.by_id
.get(latest)
.cloned()
.ok_or_else(|| OutputStoreError::Internal(format!("name index dangling: {name}")))
}
async fn list_for_attempt(
&self,
task_id: &str,
attempt: u32,
) -> Result<Vec<OutputRecord>, OutputStoreError> {
let guard = self.inner.lock().await;
let ids = guard
.by_attempt
.get(&(task_id.to_string(), attempt))
.cloned()
.unwrap_or_default();
let mut out = Vec::with_capacity(ids.len());
for id in ids {
if let Some(r) = guard.by_id.get(&id) {
out.push(r.clone());
}
}
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn append_then_get_roundtrip() {
let store = InMemoryOutputStore::new();
let event = OutputEvent::Final {
content: ContentRef::Inline {
value: Value::String("hello".into()),
},
ok: true,
};
let id = store
.append("task-1", 1, "agent-a", event.clone(), vec![])
.await
.expect("append");
let got = store.get(&id).await.expect("get");
assert_eq!(got.id, id);
assert_eq!(got.task_id, "task-1");
assert_eq!(got.attempt, 1);
assert_eq!(got.producer_agent, "agent-a");
match got.event {
OutputEvent::Final { ok, .. } => assert!(ok),
_ => panic!("wrong event variant"),
}
}
#[tokio::test]
async fn list_for_attempt_orders_by_insertion() {
let store = InMemoryOutputStore::new();
let e1 = OutputEvent::Progress {
stage: "s1".into(),
note: None,
};
let e2 = OutputEvent::Progress {
stage: "s2".into(),
note: None,
};
let id1 = store.append("t", 1, "a", e1, vec![]).await.expect("append");
let id2 = store.append("t", 1, "a", e2, vec![]).await.expect("append");
let list = store.list_for_attempt("t", 1).await.expect("list");
assert_eq!(list.len(), 2);
assert_eq!(list[0].id, id1);
assert_eq!(list[1].id, id2);
}
#[tokio::test]
async fn out_ref_is_short_prefixed_form() {
let r = OutputRef::new();
assert!(r.0.starts_with("out-"), "prefix: {}", r.0);
let hex = &r.0["out-".len()..];
assert_eq!(hex.len(), 10, "10 hex chars: {}", r.0);
assert!(hex.chars().all(|c| c.is_ascii_hexdigit()), "hex: {}", r.0);
}
#[tokio::test]
async fn get_latest_by_name_returns_newest_emit() {
let store = InMemoryOutputStore::new();
let e = |s: &str| OutputEvent::Progress {
stage: s.into(),
note: None,
};
store
.append("t", 1, "agent-a", e("first"), vec![])
.await
.expect("append 1");
let id2 = store
.append("t2", 1, "agent-a", e("second"), vec![])
.await
.expect("append 2");
store
.append("t", 1, "agent-b", e("other"), vec![])
.await
.expect("append 3");
let got = store.get_latest_by_name("agent-a").await.expect("by name");
assert_eq!(got.id, id2, "latest emit wins");
assert_eq!(got.task_id, "t2");
}
#[tokio::test]
async fn get_latest_by_name_unknown_returns_not_found() {
let store = InMemoryOutputStore::new();
let err = store.get_latest_by_name("nobody").await.unwrap_err();
assert!(matches!(err, OutputStoreError::NotFound(_)));
}
#[tokio::test]
async fn get_not_found_returns_error() {
let store = InMemoryOutputStore::new();
let missing = OutputRef("missing".into());
let err = store.get(&missing).await.unwrap_err();
assert!(matches!(err, OutputStoreError::NotFound(_)));
}
#[tokio::test]
async fn parent_refs_are_persisted() {
let store = InMemoryOutputStore::new();
let parent = OutputRef::new();
let event = OutputEvent::Final {
content: ContentRef::Inline { value: Value::Null },
ok: true,
};
let id = store
.append("t", 1, "a", event, vec![parent.clone()])
.await
.expect("append");
let got = store.get(&id).await.expect("get");
assert_eq!(got.parent_refs, vec![parent]);
}
}