mod types;
pub use types::*;
use std::sync::Arc;
use async_trait::async_trait;
use crate::error::Result;
use crate::harness::events::{AgentEvent, EventListener, EventRecord, HarnessRunStatus};
use crate::harness::ids::RunId;
use crate::harness::store::{AppendStore, JsonlAppendStore};
impl InMemoryEventJournal {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self, run_id: &str) -> usize {
self.runs
.lock()
.expect("InMemoryEventJournal lock poisoned")
.get(run_id)
.map(|v| v.len())
.unwrap_or(0)
}
pub fn is_empty(&self, run_id: &str) -> bool {
self.len(run_id) == 0
}
}
#[async_trait]
impl HarnessEventJournal for InMemoryEventJournal {
async fn append(&self, obs: AgentObservation) -> Result<u64> {
let mut runs = self
.runs
.lock()
.map_err(|e| poisoned("InMemoryEventJournal", e))?;
let entries = runs.entry(obs.run_id.as_str().to_string()).or_default();
let offset = entries.len() as u64;
entries.push(obs);
Ok(offset)
}
async fn read_from(&self, run_id: &str, offset: u64) -> Result<Vec<AgentObservation>> {
let runs = self
.runs
.lock()
.map_err(|e| poisoned("InMemoryEventJournal", e))?;
let Some(entries) = runs.get(run_id) else {
return Ok(Vec::new());
};
Ok(entries.iter().skip(offset as usize).cloned().collect())
}
}
impl<A: AppendStore> StoreEventJournal<A> {
pub fn new(store: A) -> Self {
Self { store }
}
pub fn store(&self) -> &A {
&self.store
}
}
#[async_trait]
impl<A: AppendStore + 'static> HarnessEventJournal for StoreEventJournal<A> {
async fn append(&self, obs: AgentObservation) -> Result<u64> {
let stream = obs.run_id.as_str().to_string();
let value = serde_json::to_value(&obs)?;
self.store.append(&stream, value).await
}
async fn read_from(&self, run_id: &str, offset: u64) -> Result<Vec<AgentObservation>> {
let raw = self.store.read_from(run_id, offset).await?;
let mut out = Vec::with_capacity(raw.len());
for (_offset, value) in raw {
out.push(serde_json::from_value(value)?);
}
Ok(out)
}
}
impl InMemoryStatusStore {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.statuses
.lock()
.expect("InMemoryStatusStore lock poisoned")
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[async_trait]
impl HarnessStatusStore for InMemoryStatusStore {
async fn put_status(&self, status: HarnessRunStatus) -> Result<()> {
let mut statuses = self
.statuses
.lock()
.map_err(|e| poisoned("InMemoryStatusStore", e))?;
statuses.insert(status.run_id.as_str().to_string(), status);
Ok(())
}
async fn get_status(&self, run_id: &str) -> Result<Option<HarnessRunStatus>> {
let statuses = self
.statuses
.lock()
.map_err(|e| poisoned("InMemoryStatusStore", e))?;
Ok(statuses.get(run_id).cloned())
}
async fn list_by_thread(&self, thread_id: &str) -> Result<Vec<HarnessRunStatus>> {
let statuses = self
.statuses
.lock()
.map_err(|e| poisoned("InMemoryStatusStore", e))?;
Ok(statuses
.values()
.filter(|s| {
s.thread_id
.as_ref()
.is_some_and(|t| t.as_str() == thread_id)
})
.cloned()
.collect())
}
}
impl FanOutSink {
pub fn new() -> Self {
Self::default()
}
pub fn with(mut self, listener: Arc<dyn EventListener>) -> Self {
self.listeners.push(listener);
self
}
pub fn add(&mut self, listener: Arc<dyn EventListener>) -> &mut Self {
self.listeners.push(listener);
self
}
pub fn len(&self) -> usize {
self.listeners.len()
}
pub fn is_empty(&self) -> bool {
self.listeners.is_empty()
}
}
impl EventListener for FanOutSink {
fn on_event(&self, record: &EventRecord) {
for listener in &self.listeners {
listener.on_event(record);
}
}
}
impl RedactingSink {
pub const DEFAULT_MASK: &'static str = "[REDACTED]";
pub fn new(inner: Arc<dyn EventListener>, secrets: Vec<String>) -> Self {
Self {
inner,
secrets,
mask: Self::DEFAULT_MASK.to_string(),
}
}
pub fn with_mask(mut self, mask: impl Into<String>) -> Self {
self.mask = mask.into();
self
}
}
impl EventListener for RedactingSink {
fn on_event(&self, record: &EventRecord) {
let Ok(mut value) = serde_json::to_value(&record.event) else {
self.inner.on_event(record);
return;
};
redact_value(&mut value, &self.secrets, &self.mask);
let Ok(event) = serde_json::from_value::<AgentEvent>(value) else {
self.inner.on_event(record);
return;
};
let redacted = EventRecord {
id: record.id.clone(),
offset: record.offset,
event,
};
self.inner.on_event(&redacted);
}
}
fn redact_value(value: &mut serde_json::Value, secrets: &[String], mask: &str) {
match value {
serde_json::Value::String(s) => {
for secret in secrets {
if !secret.is_empty() && s.contains(secret.as_str()) {
*s = s.replace(secret.as_str(), mask);
}
}
}
serde_json::Value::Array(items) => {
for item in items {
redact_value(item, secrets, mask);
}
}
serde_json::Value::Object(map) => {
for entry in map.values_mut() {
redact_value(entry, secrets, mask);
}
}
_ => {}
}
}
impl JournalSink {
pub fn new(journal: Arc<dyn HarnessEventJournal>, run_id: RunId) -> Self {
Self {
root_run_id: run_id.clone(),
run_id,
parent_run_id: None,
journal,
}
}
pub fn with_lineage(mut self, parent_run_id: Option<RunId>, root_run_id: RunId) -> Self {
self.parent_run_id = parent_run_id;
self.root_run_id = root_run_id;
self
}
}
impl EventListener for JournalSink {
fn on_event(&self, record: &EventRecord) {
let obs = AgentObservation::from_record(
record,
self.run_id.clone(),
self.parent_run_id.clone(),
self.root_run_id.clone(),
);
let _ = futures::executor::block_on(self.journal.append(obs));
}
}
impl JsonlSink {
pub fn new(store: JsonlAppendStore, stream: impl Into<String>) -> Self {
Self {
store,
stream: stream.into(),
}
}
}
impl EventListener for JsonlSink {
fn on_event(&self, record: &EventRecord) {
let Ok(value) = serde_json::to_value(record) else {
return;
};
let _ = futures::executor::block_on(self.store.append(&self.stream, value));
}
}
fn poisoned<E: std::fmt::Display>(what: &str, err: E) -> crate::error::TinyAgentsError {
crate::error::TinyAgentsError::Validation(format!("{what} lock poisoned: {err}"))
}
#[cfg(test)]
mod test;