use std::sync::Arc;
use async_trait::async_trait;
use entelix_core::error::{Error, Result};
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc};
use crate::agent::event::AgentEvent;
#[async_trait]
pub trait AgentEventSink<S>: Send + Sync
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, event: AgentEvent<S>) -> Result<()>;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct DroppingSink;
#[async_trait]
impl<S> AgentEventSink<S> for DroppingSink
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, _event: AgentEvent<S>) -> Result<()> {
warn_dropped_first_event();
Ok(())
}
}
fn warn_dropped_first_event() {
use std::sync::atomic::{AtomicBool, Ordering};
static WARNED: AtomicBool = AtomicBool::new(false);
if WARNED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
tracing::debug!(
target: "entelix_agents",
"DroppingSink dropped first agent event — telemetry is not wired. \
Pass an explicit sink via Agent::builder().add_sink(...) — see \
ChannelSink, BroadcastSink, CaptureSink, FanOutSink, or wire OtelLayer."
);
}
}
pub struct ChannelSink<S>
where
S: Clone + Send + Sync + 'static,
{
tx: mpsc::Sender<AgentEvent<S>>,
}
impl<S> ChannelSink<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn new(capacity: usize) -> (Self, mpsc::Receiver<AgentEvent<S>>) {
let (tx, rx) = mpsc::channel(capacity);
(Self { tx }, rx)
}
}
impl<S> Clone for ChannelSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
impl<S> std::fmt::Debug for ChannelSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChannelSink")
.field("capacity", &self.tx.max_capacity())
.field("closed", &self.tx.is_closed())
.finish()
}
}
#[async_trait]
impl<S> AgentEventSink<S> for ChannelSink<S>
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, event: AgentEvent<S>) -> Result<()> {
self.tx.send(event).await.map_err(|_| Error::Cancelled)
}
}
pub struct BroadcastSink<S>
where
S: Clone + Send + Sync + 'static,
{
tx: broadcast::Sender<AgentEvent<S>>,
}
impl<S> BroadcastSink<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn new(capacity: usize) -> Self {
let (tx, _rx_drop) = broadcast::channel(capacity);
Self { tx }
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<AgentEvent<S>> {
self.tx.subscribe()
}
#[must_use]
pub fn receiver_count(&self) -> usize {
self.tx.receiver_count()
}
}
impl<S> Clone for BroadcastSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
impl<S> std::fmt::Debug for BroadcastSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BroadcastSink")
.field("subscribers", &self.tx.receiver_count())
.finish()
}
}
#[async_trait]
impl<S> AgentEventSink<S> for BroadcastSink<S>
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, event: AgentEvent<S>) -> Result<()> {
if self.tx.send(event).is_err() {
warn_no_subscribers_once();
}
Ok(())
}
}
fn warn_no_subscribers_once() {
use std::sync::atomic::{AtomicBool, Ordering};
static WARNED: AtomicBool = AtomicBool::new(false);
if WARNED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
tracing::debug!(
target: "entelix_agents::sink",
"BroadcastSink: no active subscribers; event dropped (further drops will be silent — \
investigate whether the consumer crashed or was detached)"
);
}
}
pub struct CaptureSink<S>
where
S: Clone + Send + Sync + 'static,
{
events: Arc<Mutex<Vec<AgentEvent<S>>>>,
}
impl<S> CaptureSink<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
}
}
#[must_use]
pub fn events(&self) -> Vec<AgentEvent<S>> {
self.events.lock().clone()
}
#[must_use]
pub fn len(&self) -> usize {
self.events.lock().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.events.lock().is_empty()
}
}
impl<S> Default for CaptureSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<S> Clone for CaptureSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
events: Arc::clone(&self.events),
}
}
}
impl<S> std::fmt::Debug for CaptureSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CaptureSink")
.field("captured", &self.len())
.finish()
}
}
#[async_trait]
impl<S> AgentEventSink<S> for CaptureSink<S>
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, event: AgentEvent<S>) -> Result<()> {
self.events.lock().push(event);
Ok(())
}
}
pub struct FanOutSink<S>
where
S: Clone + Send + Sync + 'static,
{
sinks: Vec<Arc<dyn AgentEventSink<S>>>,
}
impl<S> FanOutSink<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn new() -> Self {
Self { sinks: Vec::new() }
}
#[must_use]
pub fn push(mut self, sink: Arc<dyn AgentEventSink<S>>) -> Self {
self.sinks.push(sink);
self
}
#[must_use]
pub fn len(&self) -> usize {
self.sinks.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.sinks.is_empty()
}
}
impl<S> Default for FanOutSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<S> Clone for FanOutSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
sinks: self.sinks.clone(),
}
}
}
impl<S> std::fmt::Debug for FanOutSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FanOutSink")
.field("sinks", &self.sinks.len())
.finish()
}
}
#[async_trait]
impl<S> AgentEventSink<S> for FanOutSink<S>
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, event: AgentEvent<S>) -> Result<()> {
for sink in &self.sinks {
sink.send(event.clone()).await?;
}
Ok(())
}
}
pub struct FailOpenSink<S>
where
S: Clone + Send + Sync + 'static,
{
inner: Arc<dyn AgentEventSink<S>>,
}
impl<S> FailOpenSink<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn new(inner: Arc<dyn AgentEventSink<S>>) -> Self {
Self { inner }
}
}
impl<S> Clone for FailOpenSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<S> std::fmt::Debug for FailOpenSink<S>
where
S: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FailOpenSink").finish_non_exhaustive()
}
}
#[async_trait]
impl<S> AgentEventSink<S> for FailOpenSink<S>
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, event: AgentEvent<S>) -> Result<()> {
if let Err(err) = self.inner.send(event).await {
tracing::warn!(
target: "entelix_agents::sink",
error = %err,
"FailOpenSink: inner sink errored — discarding event and continuing"
);
}
Ok(())
}
}
pub struct StateErasureSink<S> {
inner: Arc<dyn AgentEventSink<()>>,
_phantom: std::marker::PhantomData<S>,
}
impl<S> StateErasureSink<S> {
#[must_use]
pub fn new(inner: Arc<dyn AgentEventSink<()>>) -> Self {
Self {
inner,
_phantom: std::marker::PhantomData,
}
}
}
impl<S> Clone for StateErasureSink<S> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
_phantom: std::marker::PhantomData,
}
}
}
impl<S> std::fmt::Debug for StateErasureSink<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateErasureSink").finish_non_exhaustive()
}
}
#[async_trait]
impl<S> AgentEventSink<S> for StateErasureSink<S>
where
S: Clone + Send + Sync + 'static,
{
async fn send(&self, event: AgentEvent<S>) -> Result<()> {
self.inner.send(event.erase_state()).await
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
use super::*;
type TestEvent = AgentEvent<i32>;
fn started(agent: impl Into<String>) -> TestEvent {
TestEvent::Started {
run_id: "test-run".into(),
tenant_id: entelix_core::TenantId::new("t-test"),
parent_run_id: None,
agent: agent.into(),
}
}
fn complete(state: i32) -> TestEvent {
TestEvent::Complete {
run_id: "test-run".into(),
tenant_id: entelix_core::TenantId::new("t-test"),
state,
usage: None,
}
}
#[tokio::test]
async fn dropping_sink_silently_discards_events() {
let sink = DroppingSink;
for i in 0..10 {
sink.send(started(format!("a{i}"))).await.unwrap();
}
}
#[tokio::test]
async fn channel_sink_round_trips_in_order() {
let (sink, mut rx) = ChannelSink::<i32>::new(4);
for i in 0..3 {
sink.send(started(format!("a{i}"))).await.unwrap();
}
drop(sink);
let mut received = Vec::new();
while let Some(event) = rx.recv().await {
received.push(event);
}
assert_eq!(received.len(), 3);
assert!(matches!(&received[0], AgentEvent::Started { agent, .. } if agent == "a0"));
}
#[tokio::test]
async fn channel_sink_returns_cancelled_when_receiver_dropped() {
let (sink, rx) = ChannelSink::<i32>::new(1);
sink.send(started("a")).await.unwrap();
drop(rx);
let err = sink.send(complete(0)).await.unwrap_err();
assert!(matches!(err, Error::Cancelled));
}
#[tokio::test]
async fn broadcast_sink_fans_out_to_multiple_subscribers() {
let sink = BroadcastSink::<i32>::new(8);
let mut a = sink.subscribe();
let mut b = sink.subscribe();
assert_eq!(sink.receiver_count(), 2);
sink.send(complete(7)).await.unwrap();
let ea = a.recv().await.unwrap();
let eb = b.recv().await.unwrap();
assert!(matches!(ea, AgentEvent::Complete { state: 7, .. }));
assert!(matches!(eb, AgentEvent::Complete { state: 7, .. }));
}
#[tokio::test]
async fn broadcast_sink_no_subscribers_does_not_error() {
let sink = BroadcastSink::<i32>::new(8);
sink.send(complete(0)).await.unwrap();
}
#[tokio::test]
async fn capture_sink_preserves_order_and_content() {
let sink = CaptureSink::<i32>::new();
sink.send(started("test")).await.unwrap();
sink.send(complete(42)).await.unwrap();
assert_eq!(sink.len(), 2);
let events = sink.events();
assert!(matches!(events[0], AgentEvent::Started { .. }));
assert!(matches!(events[1], AgentEvent::Complete { state: 42, .. }));
}
#[tokio::test]
async fn capture_sink_clones_share_underlying_buffer() {
let sink_a = CaptureSink::<i32>::new();
let sink_b = sink_a.clone();
sink_a.send(complete(1)).await.unwrap();
sink_b.send(complete(2)).await.unwrap();
assert_eq!(sink_a.len(), 2);
assert_eq!(sink_b.len(), 2);
}
#[derive(Default)]
struct FailingSink {
calls: parking_lot::Mutex<u32>,
}
#[async_trait]
impl AgentEventSink<i32> for FailingSink {
async fn send(&self, _event: AgentEvent<i32>) -> Result<()> {
*self.calls.lock() += 1;
Err(Error::Cancelled)
}
}
#[tokio::test]
async fn fan_out_sink_dispatches_in_registration_order() {
let a = CaptureSink::<i32>::new();
let b = CaptureSink::<i32>::new();
let fan = FanOutSink::<i32>::new()
.push(Arc::new(a.clone()))
.push(Arc::new(b.clone()));
fan.send(complete(1)).await.unwrap();
fan.send(complete(2)).await.unwrap();
assert_eq!(a.len(), 2);
assert_eq!(b.len(), 2);
assert!(matches!(
a.events()[0],
AgentEvent::Complete { state: 1, .. }
));
}
#[tokio::test]
async fn fan_out_sink_propagates_first_error_and_stops() {
let recorded = CaptureSink::<i32>::new();
let failing = Arc::new(FailingSink::default());
let fan = FanOutSink::<i32>::new()
.push(Arc::clone(&failing) as Arc<dyn AgentEventSink<i32>>)
.push(Arc::new(recorded.clone()));
let err = fan.send(complete(1)).await.unwrap_err();
assert!(matches!(err, Error::Cancelled));
assert_eq!(*failing.calls.lock(), 1, "failing sink saw the event");
assert_eq!(
recorded.len(),
0,
"downstream sinks must not see the event after an upstream failure"
);
}
#[tokio::test]
async fn fail_open_sink_swallows_inner_error() {
let failing: Arc<dyn AgentEventSink<i32>> = Arc::new(FailingSink::default());
let lifted = FailOpenSink::new(failing);
for _ in 0..3 {
lifted.send(complete(0)).await.unwrap();
}
}
#[tokio::test]
async fn fail_open_sink_lifts_into_fan_out_to_isolate_observe_only() {
let recorded = CaptureSink::<i32>::new();
let failing: Arc<dyn AgentEventSink<i32>> = Arc::new(FailingSink::default());
let fan = FanOutSink::<i32>::new()
.push(Arc::new(FailOpenSink::new(failing))) .push(Arc::new(recorded.clone())); fan.send(complete(7)).await.unwrap();
assert_eq!(
recorded.len(),
1,
"lifting the failing sink with FailOpenSink must keep downstream sinks reachable"
);
}
#[tokio::test]
async fn state_erasure_sink_fans_in_heterogeneous_agents_to_one_unit_sink() {
let audit: Arc<CaptureSink<()>> = Arc::new(CaptureSink::<()>::new());
let audit_dyn: Arc<dyn AgentEventSink<()>> = audit.clone();
let a_adapter: StateErasureSink<i32> = StateErasureSink::new(Arc::clone(&audit_dyn));
a_adapter
.send(AgentEvent::Started {
run_id: "a-run".into(),
tenant_id: entelix_core::TenantId::new("t-shared"),
parent_run_id: None,
agent: "agent-a".into(),
})
.await
.unwrap();
a_adapter
.send(AgentEvent::Complete {
run_id: "a-run".into(),
tenant_id: entelix_core::TenantId::new("t-shared"),
state: 42_i32,
usage: None,
})
.await
.unwrap();
let b_adapter: StateErasureSink<String> = StateErasureSink::new(audit_dyn);
b_adapter
.send(AgentEvent::Started {
run_id: "b-run".into(),
tenant_id: entelix_core::TenantId::new("t-shared"),
parent_run_id: Some("a-run".into()),
agent: "agent-b".into(),
})
.await
.unwrap();
b_adapter
.send(AgentEvent::Complete {
run_id: "b-run".into(),
tenant_id: entelix_core::TenantId::new("t-shared"),
state: "done".to_owned(),
usage: None,
})
.await
.unwrap();
let events = audit.events();
assert_eq!(events.len(), 4);
match &events[0] {
AgentEvent::Started {
run_id,
tenant_id,
parent_run_id,
agent,
} => {
assert_eq!(run_id, "a-run");
assert_eq!(tenant_id.as_str(), "t-shared");
assert_eq!(parent_run_id.as_deref(), None);
assert_eq!(agent, "agent-a");
}
other => panic!("unexpected event: {other:?}"),
}
match &events[2] {
AgentEvent::Started {
parent_run_id,
agent,
..
} => {
assert_eq!(parent_run_id.as_deref(), Some("a-run"));
assert_eq!(agent, "agent-b");
}
other => panic!("unexpected event: {other:?}"),
}
match &events[1] {
AgentEvent::Complete { state, .. } => assert_eq!(*state, ()),
other => panic!("unexpected event: {other:?}"),
}
match &events[3] {
AgentEvent::Complete { state, .. } => assert_eq!(*state, ()),
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test]
async fn state_erasure_sink_pins_first_party_recipe_state_types() {
use crate::state::{ChatState, ReActState, SupervisorState};
let audit: Arc<CaptureSink<()>> = Arc::new(CaptureSink::<()>::new());
let audit_dyn: Arc<dyn AgentEventSink<()>> = audit.clone();
let react_adapter: StateErasureSink<ReActState> =
StateErasureSink::new(Arc::clone(&audit_dyn));
let chat_adapter: StateErasureSink<ChatState> =
StateErasureSink::new(Arc::clone(&audit_dyn));
let supervisor_adapter: StateErasureSink<SupervisorState> =
StateErasureSink::new(audit_dyn);
let tenant = entelix_core::TenantId::new("t-multi");
react_adapter
.send(AgentEvent::Complete {
run_id: "react-run".into(),
tenant_id: tenant.clone(),
state: ReActState::from_user("hello react"),
usage: None,
})
.await
.unwrap();
chat_adapter
.send(AgentEvent::Complete {
run_id: "chat-run".into(),
tenant_id: tenant.clone(),
state: ChatState::from_user("hello chat"),
usage: None,
})
.await
.unwrap();
supervisor_adapter
.send(AgentEvent::Complete {
run_id: "super-run".into(),
tenant_id: tenant,
state: SupervisorState::from_user("hello supervisor"),
usage: None,
})
.await
.unwrap();
let events = audit.events();
assert_eq!(events.len(), 3);
for event in &events {
match event {
AgentEvent::Complete { state, .. } => assert_eq!(*state, ()),
other => panic!("unexpected event: {other:?}"),
}
}
}
}