use crate::event::AgentEvent;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc;
pub struct EventTapState {
pub tx: mpsc::Sender<AgentEvent>,
pub truncated: AtomicBool,
}
pub type EventTap = Arc<parking_lot::Mutex<Option<EventTapState>>>;
pub fn new_event_tap() -> EventTap {
Arc::new(parking_lot::Mutex::new(None))
}
pub fn tap_try_send(tap: &EventTap, event: &AgentEvent) {
let guard = tap.lock();
let Some(state) = guard.as_ref() else {
return;
};
match state.tx.try_send(event.clone()) {
Ok(()) | Err(mpsc::error::TrySendError::Closed(_)) => {
}
Err(mpsc::error::TrySendError::Full(_)) => {
if !state.truncated.swap(true, Ordering::Relaxed) {
let _ = state.tx.try_send(AgentEvent::StreamTruncated {
reason: "channel full".to_string(),
});
}
}
}
}
pub async fn tap_send_terminal(tap: &EventTap, event: AgentEvent) {
let tx = {
let guard = tap.lock();
match guard.as_ref() {
Some(state) => state.tx.clone(),
None => return,
}
};
match tokio::time::timeout(std::time::Duration::from_secs(5), tx.send(event)).await {
Ok(Ok(()) | Err(_)) => {
}
Err(_) => {
tracing::warn!("tap_send_terminal timed out after 5s; continuing");
}
}
}
pub async fn tap_emit(
tap: &EventTap,
tx: Option<&mpsc::Sender<AgentEvent>>,
event: AgentEvent,
) -> bool {
tap_try_send(tap, &event);
if let Some(tx) = tx {
return tx.send(event).await.is_ok();
}
true
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn tap_try_send_none_is_noop() {
let tap = new_event_tap();
tap_try_send(
&tap,
&AgentEvent::TextDelta {
delta: "hello".to_string(),
},
);
}
#[test]
fn tap_try_send_delivers_to_active_subscriber() {
let tap = new_event_tap();
let (tx, mut rx) = mpsc::channel(16);
{
let mut guard = tap.lock();
*guard = Some(EventTapState {
tx,
truncated: AtomicBool::new(false),
});
}
tap_try_send(
&tap,
&AgentEvent::TextDelta {
delta: "hello".to_string(),
},
);
let event = rx.try_recv().unwrap();
let AgentEvent::TextDelta { delta } = event else {
unreachable!("Expected TextDelta, got {:?}", event);
};
assert_eq!(delta, "hello");
}
#[test]
fn tap_try_send_full_channel_sets_truncated_and_sends_marker() {
let tap = new_event_tap();
let (tx, mut rx) = mpsc::channel(1);
{
let mut guard = tap.lock();
*guard = Some(EventTapState {
tx,
truncated: AtomicBool::new(false),
});
}
tap_try_send(
&tap,
&AgentEvent::TextDelta {
delta: "first".to_string(),
},
);
tap_try_send(
&tap,
&AgentEvent::TextDelta {
delta: "second".to_string(),
},
);
{
let guard = tap.lock();
assert!(guard.as_ref().unwrap().truncated.load(Ordering::Relaxed));
}
let first = rx.try_recv().unwrap();
assert!(
matches!(&first, AgentEvent::TextDelta { delta } if delta == "first"),
"Expected first TextDelta, got {first:?}"
);
}
#[tokio::test]
async fn tap_send_terminal_delivers_within_timeout() {
let tap = new_event_tap();
let (tx, mut rx) = mpsc::channel(16);
{
let mut guard = tap.lock();
*guard = Some(EventTapState {
tx,
truncated: AtomicBool::new(false),
});
}
tap_send_terminal(
&tap,
AgentEvent::RunCompleted {
session_id: crate::types::SessionId::new(),
result: "done".to_string(),
usage: crate::types::Usage::default(),
},
)
.await;
let event = rx.try_recv().unwrap();
assert!(matches!(event, AgentEvent::RunCompleted { .. }));
}
#[tokio::test]
async fn tap_send_terminal_no_subscriber_is_noop() {
let tap = new_event_tap();
tap_send_terminal(
&tap,
AgentEvent::RunCompleted {
session_id: crate::types::SessionId::new(),
result: "done".to_string(),
usage: crate::types::Usage::default(),
},
)
.await;
}
#[tokio::test]
async fn tap_emit_sends_to_both_tap_and_primary() {
let tap = new_event_tap();
let (tap_tx, mut tap_rx) = mpsc::channel(16);
let (primary_tx, mut primary_rx) = mpsc::channel(16);
{
let mut guard = tap.lock();
*guard = Some(EventTapState {
tx: tap_tx,
truncated: AtomicBool::new(false),
});
}
let ok = tap_emit(
&tap,
Some(&primary_tx),
AgentEvent::TextDelta {
delta: "both".to_string(),
},
)
.await;
assert!(ok);
let tap_event = tap_rx.try_recv().unwrap();
let primary_event = primary_rx.try_recv().unwrap();
assert!(matches!(tap_event, AgentEvent::TextDelta { delta } if delta == "both"));
assert!(matches!(primary_event, AgentEvent::TextDelta { delta } if delta == "both"));
}
#[tokio::test]
async fn tap_emit_primary_none_returns_true() {
let tap = new_event_tap();
let ok = tap_emit(
&tap,
None,
AgentEvent::TextDelta {
delta: "x".to_string(),
},
)
.await;
assert!(ok);
}
}