mod types;
pub use types::*;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use async_trait::async_trait;
use crate::error::Result;
use crate::graph::status::GraphRunStatus;
use crate::graph::stream::{GraphEvent, GraphEventSink};
use crate::harness::ids::{CheckpointId, EventId, GraphId, RunId, ThreadId};
use crate::harness::store::AppendStore;
impl InMemoryGraphEventJournal {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self, run_id: &str) -> usize {
self.runs
.lock()
.expect("InMemoryGraphEventJournal lock poisoned")
.get(run_id)
.map(|v| v.len())
.unwrap_or(0)
}
pub fn is_empty(&self, run_id: &str) -> bool {
self.len(run_id) == 0
}
}
#[async_trait]
impl GraphEventJournal for InMemoryGraphEventJournal {
async fn append(&self, obs: GraphObservation) -> Result<u64> {
let mut runs = self
.runs
.lock()
.map_err(|e| poisoned("InMemoryGraphEventJournal", e))?;
let entries = runs.entry(obs.run_id.as_str().to_string()).or_default();
let offset = entries.len() as u64;
entries.push(obs);
Ok(offset)
}
async fn read_from(&self, run_id: &str, offset: u64) -> Result<Vec<GraphObservation>> {
let runs = self
.runs
.lock()
.map_err(|e| poisoned("InMemoryGraphEventJournal", e))?;
let Some(entries) = runs.get(run_id) else {
return Ok(Vec::new());
};
Ok(entries.iter().skip(offset as usize).cloned().collect())
}
}
impl<A: AppendStore> StoreGraphEventJournal<A> {
pub fn new(store: A) -> Self {
Self { store }
}
pub fn store(&self) -> &A {
&self.store
}
}
#[async_trait]
impl<A: AppendStore + 'static> GraphEventJournal for StoreGraphEventJournal<A> {
async fn append(&self, obs: GraphObservation) -> Result<u64> {
let stream = obs.run_id.as_str().to_string();
let value = serde_json::to_value(&obs)?;
self.store.append(&stream, value).await
}
async fn read_from(&self, run_id: &str, offset: u64) -> Result<Vec<GraphObservation>> {
let raw = self.store.read_from(run_id, offset).await?;
let mut out = Vec::with_capacity(raw.len());
for (_offset, value) in raw {
out.push(serde_json::from_value(value)?);
}
Ok(out)
}
}
impl InMemoryGraphStatusStore {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.statuses
.lock()
.expect("InMemoryGraphStatusStore lock poisoned")
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[async_trait]
impl GraphStatusStore for InMemoryGraphStatusStore {
async fn put_status(&self, status: GraphRunStatus) -> Result<()> {
let mut statuses = self
.statuses
.lock()
.map_err(|e| poisoned("InMemoryGraphStatusStore", e))?;
statuses.insert(status.run_id.as_str().to_string(), status);
Ok(())
}
async fn get_status(&self, run_id: &str) -> Result<Option<GraphRunStatus>> {
let statuses = self
.statuses
.lock()
.map_err(|e| poisoned("InMemoryGraphStatusStore", e))?;
Ok(statuses.get(run_id).cloned())
}
async fn list_by_thread(&self, thread_id: &str) -> Result<Vec<GraphRunStatus>> {
let statuses = self
.statuses
.lock()
.map_err(|e| poisoned("InMemoryGraphStatusStore", e))?;
Ok(statuses
.values()
.filter(|s| {
s.thread_id
.as_ref()
.is_some_and(|t| t.as_str() == thread_id)
})
.cloned()
.collect())
}
}
impl JournalGraphSink {
pub fn new(journal: Arc<dyn GraphEventJournal>, run_id: RunId, graph_id: GraphId) -> Self {
Self {
journal,
inner: None,
root_run_id: run_id.clone(),
run_id,
parent_run_id: None,
thread_id: None,
graph_id,
namespace: Vec::new(),
offset: Arc::new(std::sync::atomic::AtomicU64::new(0)),
step: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn with_lineage(mut self, parent_run_id: Option<RunId>, root_run_id: RunId) -> Self {
self.parent_run_id = parent_run_id;
self.root_run_id = root_run_id;
self
}
pub fn with_thread(mut self, thread_id: Option<ThreadId>) -> Self {
self.thread_id = thread_id;
self
}
pub fn with_namespace(mut self, namespace: Vec<String>) -> Self {
self.namespace = namespace;
self
}
pub fn with_inner(mut self, inner: Arc<dyn GraphEventSink>) -> Self {
self.inner = Some(inner);
self
}
fn observe(&self, event: &GraphEvent) -> GraphObservation {
let offset = self.offset.fetch_add(1, Ordering::Relaxed);
let step = match event.step() {
Some(step) => {
self.step.store(step as u64, Ordering::Relaxed);
step
}
None => self.step.load(Ordering::Relaxed) as usize,
};
GraphObservation {
event_id: EventId::new(format!("{}-{offset}", self.run_id.as_str())),
run_id: self.run_id.clone(),
root_run_id: self.root_run_id.clone(),
parent_run_id: self.parent_run_id.clone(),
thread_id: self.thread_id.clone(),
graph_id: self.graph_id.clone(),
checkpoint_id: checkpoint_of(event),
namespace: self.namespace.clone(),
step,
offset,
ts_ms: now_ms(),
event: event.clone(),
}
}
}
impl GraphEventSink for JournalGraphSink {
fn emit(&self, event: GraphEvent) {
let obs = self.observe(&event);
let _ = futures::executor::block_on(self.journal.append(obs));
if let Some(inner) = &self.inner {
inner.emit(event);
}
}
}
fn checkpoint_of(event: &GraphEvent) -> Option<CheckpointId> {
match event {
GraphEvent::CheckpointSaved { checkpoint_id } => Some(checkpoint_id.clone()),
_ => None,
}
}
pub(crate) fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn poisoned<E: std::fmt::Display>(what: &str, err: E) -> crate::error::TinyAgentsError {
crate::error::TinyAgentsError::Validation(format!("{what} lock poisoned: {err}"))
}
#[cfg(test)]
mod test;