use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::domain::{ToolCallId, TurnId};
use crate::models::tool_call::ToolCall as ModelToolCall;
use crate::models::{ChatMessage, ReasoningChunk, TokenUsage};
#[derive(Debug)]
pub struct StreamContext {
pub token: CancellationToken,
pub sink: mpsc::Sender<StreamEvent>,
pub turn: TurnId,
}
impl StreamContext {
pub fn new(token: CancellationToken, sink: mpsc::Sender<StreamEvent>, turn: TurnId) -> Self {
Self { token, sink, turn }
}
}
#[derive(Debug, Clone)]
pub enum StreamEvent {
Text(String),
Reasoning(ReasoningChunk),
ToolCall(ModelToolCall),
ThinkingSignature(String),
Done {
usage: Option<TokenUsage>,
thinking_signature: Option<String>,
},
}
#[derive(Debug, Clone)]
pub struct FinalResponse {
pub usage: Option<TokenUsage>,
pub thinking_signature: Option<String>,
pub tool_calls: Vec<ModelToolCall>,
}
#[derive(Debug)]
pub struct ExecContext {
pub token: CancellationToken,
pub progress: mpsc::Sender<ProgressEvent>,
pub call_id: ToolCallId,
pub turn: TurnId,
pub workdir: PathBuf,
pub config: Arc<crate::app::Config>,
pub model_id: String,
}
impl ExecContext {
pub fn new(
token: CancellationToken,
progress: mpsc::Sender<ProgressEvent>,
call_id: ToolCallId,
turn: TurnId,
workdir: PathBuf,
config: Arc<crate::app::Config>,
model_id: String,
) -> Self {
Self {
token,
progress,
call_id,
turn,
workdir,
config,
model_id,
}
}
}
#[derive(Debug, Clone)]
pub enum ProgressEvent {
Output(String),
Status(String),
Bytes { done: u64, total: Option<u64> },
Artifact {
mime: String,
data: Vec<u8>,
caption: Option<String>,
},
SubagentToolCall {
child_call_id: ToolCallId,
tool_name: String,
phase: SubagentPhase,
},
SubagentText(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubagentPhase {
Started,
Finished,
Errored,
}
pub fn clone_messages(msgs: &[ChatMessage]) -> Vec<ChatMessage> {
msgs.to_vec()
}
pub fn test_stream_context(turn: TurnId) -> (StreamContext, mpsc::Receiver<StreamEvent>) {
let token = CancellationToken::new();
let (tx, rx) = mpsc::channel(64);
(StreamContext::new(token, tx, turn), rx)
}
pub fn test_exec_context(
turn: TurnId,
call_id: ToolCallId,
workdir: PathBuf,
) -> (ExecContext, mpsc::Receiver<ProgressEvent>) {
let token = CancellationToken::new();
let (tx, rx) = mpsc::channel(64);
let config = Arc::new(crate::app::Config::default());
(
ExecContext::new(token, tx, call_id, turn, workdir, config, String::new()),
rx,
)
}
pub fn arc_sink(tx: mpsc::Sender<StreamEvent>) -> Arc<mpsc::Sender<StreamEvent>> {
Arc::new(tx)
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[tokio::test]
async fn stream_context_carries_token_and_turn() {
let (ctx, _rx) = test_stream_context(TurnId(5));
assert_eq!(ctx.turn, TurnId(5));
assert!(!ctx.token.is_cancelled());
}
#[tokio::test]
async fn exec_context_propagates_cancel_signal() {
let (ctx, _rx) = test_exec_context(TurnId(1), ToolCallId(2), PathBuf::from("/tmp"));
let token = ctx.token.clone();
tokio::spawn(async move {
token.cancel();
});
ctx.token.cancelled().await;
assert!(ctx.token.is_cancelled());
}
#[tokio::test]
async fn progress_event_round_trips_through_channel() {
let (ctx, mut rx) = test_exec_context(TurnId(1), ToolCallId(2), PathBuf::from("/tmp"));
ctx.progress
.send(ProgressEvent::Status("halfway".to_string()))
.await
.expect("send");
match rx.recv().await.expect("recv") {
ProgressEvent::Status(s) => assert_eq!(s, "halfway"),
_ => panic!("wrong variant"),
}
}
}