use cap_rs::core::{AgentEvent, TextChannel};
use tokio::sync::broadcast;
use super::runtime::NotifTarget;
#[allow(dead_code)]
pub struct Sinks<'a> {
pub notif: Option<&'a NotifTarget>,
pub agent_event: Option<&'a broadcast::Sender<rsclaw_events::AgentEvent>>,
pub reply: Option<&'a mut String>,
pub session_id: &'a str,
pub agent_id: &'a str,
}
#[allow(dead_code)]
pub fn dispatch(event: &AgentEvent, sinks: &mut Sinks<'_>) -> bool {
match event {
AgentEvent::TextChunk { text, channel, .. } => {
let (delta_channel, write_to_reply) = match channel {
TextChannel::Assistant => (rsclaw_events::TextChannel::Assistant, true),
TextChannel::Thought => (rsclaw_events::TextChannel::Thought, false),
TextChannel::System => (rsclaw_events::TextChannel::System, false),
_ => (rsclaw_events::TextChannel::System, false),
};
if write_to_reply {
if let Some(buf) = sinks.reply.as_deref_mut() {
buf.push_str(text);
}
}
if let Some(bus) = sinks.agent_event {
let _ = bus.send(rsclaw_events::AgentEvent {
session_id: sinks.session_id.to_owned(),
agent_id: sinks.agent_id.to_owned(),
delta: text.clone(),
done: false,
files: Vec::new(),
images: Vec::new(),
tool_log: Vec::new(),
question: None,
channel: Some(delta_channel),
});
}
false
}
AgentEvent::Thought { text, .. } => {
tracing::debug!(
target: "cap",
agent = sinks.agent_id,
thought_len = text.len(),
"cap thought event"
);
if let Some(bus) = sinks.agent_event {
let _ = bus.send(rsclaw_events::AgentEvent {
session_id: sinks.session_id.to_owned(),
agent_id: sinks.agent_id.to_owned(),
delta: text.clone(),
done: false,
files: Vec::new(),
images: Vec::new(),
tool_log: Vec::new(),
question: None,
channel: Some(rsclaw_events::TextChannel::Thought),
});
}
false
}
AgentEvent::ToolCallStart { name, .. } => {
tracing::debug!(target: "cap", agent = sinks.agent_id, tool = %name, "cap tool start");
false
}
AgentEvent::ToolCallEnd { is_error, .. } => {
tracing::debug!(target: "cap", agent = sinks.agent_id, is_error, "cap tool end");
false
}
AgentEvent::Done { .. } => true,
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use cap_rs::core::{StopReason, TextChannel};
use tokio::sync::broadcast;
fn assistant_chunk(text: &str) -> AgentEvent {
AgentEvent::TextChunk {
msg_id: "m1".into(),
text: text.into(),
channel: TextChannel::Assistant,
}
}
#[test]
fn text_chunk_accumulates_into_reply_and_bus() {
let mut reply = String::new();
let (tx, mut rx) = broadcast::channel(8);
let mut sinks = Sinks {
notif: None,
agent_event: Some(&tx),
reply: Some(&mut reply),
session_id: "sess",
agent_id: "claudecode",
};
let done = dispatch(&assistant_chunk("hello "), &mut sinks);
assert!(!done);
let done = dispatch(&assistant_chunk("world"), &mut sinks);
assert!(!done);
assert_eq!(reply, "hello world");
assert!(matches!(rx.try_recv(), Ok(ev) if ev.delta == "hello "));
assert!(matches!(rx.try_recv(), Ok(ev) if ev.delta == "world"));
}
#[test]
fn done_returns_true() {
let mut sinks = Sinks {
notif: None,
agent_event: None,
reply: None,
session_id: "sess",
agent_id: "claudecode",
};
let done = dispatch(
&AgentEvent::Done {
stop_reason: StopReason::EndTurn,
usage: Default::default(),
},
&mut sinks,
);
assert!(done);
}
#[test]
fn thought_is_swallowed_not_relayed() {
let mut reply = String::new();
let mut sinks = Sinks {
notif: None,
agent_event: None,
reply: Some(&mut reply),
session_id: "sess",
agent_id: "x",
};
let done = dispatch(
&AgentEvent::Thought {
msg_id: "t1".into(),
text: "internal".into(),
},
&mut sinks,
);
assert!(!done);
assert!(reply.is_empty());
}
}