1use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::sync::Arc;
12
13use crate::stream::{AssistantStreamChunk, ToolSchema};
14use crate::tool::ToolResult;
15use crate::types::{
16 AgentMessage, AssistantBlock, RunIdentity, ToolResultBlock, UserBlock, UserContent,
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum AgentEvent {
29 AgentStart,
31
32 RunIdentified { identity: RunIdentity },
43
44 AgentEnd { messages: Vec<AgentMessage> },
48
49 TurnStart,
52
53 TurnEnd {
56 message: AgentMessage,
57 tool_results: Vec<AgentMessage>,
58 },
59
60 MessageStart { message: AgentMessage },
64
65 MessageUpdate {
67 partial: AgentMessage,
68 chunk: AssistantStreamChunk,
69 },
70
71 MessageEnd { message: AgentMessage },
73
74 ToolExecutionStart {
76 tool_call_id: String,
77 tool_name: String,
78 args: Value,
79 },
80
81 ToolExecutionUpdate {
84 tool_call_id: String,
85 tool_name: String,
86 partial: ToolResult,
87 },
88
89 ToolExecutionEnd {
91 tool_call_id: String,
92 tool_name: String,
93 result: ToolResult,
94 is_error: bool,
95 },
96
97 OutputTokensEscalation {
103 attempt: u8,
105 prev_cap: u32,
107 new_cap: u32,
109 },
110
111 ContextTransformApplied {
118 iteration: usize,
121 plugin: &'static str,
123 before: Vec<AgentMessage>,
125 after: Vec<AgentMessage>,
127 },
128
129 ToolGateApplied {
135 iteration: usize,
137 plugin: &'static str,
139 allow: Option<Vec<String>>,
143 },
144
145 ToolGateConflictResolved {
149 iteration: usize,
151 plugins: Vec<String>,
153 chosen_plugin: Option<String>,
155 allow: Vec<String>,
157 reason: String,
159 },
160
161 ProviderRequestPrepared {
168 iteration: usize,
170 model_id: Option<String>,
174 system_prompt: String,
177 messages: Vec<AgentMessage>,
179 tools: Vec<ToolSchema>,
181 temperature: Option<f32>,
184 max_output_tokens: Option<u32>,
186 },
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
196pub struct ProviderRequestSummary {
197 pub iteration: usize,
198 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub model_id: Option<String>,
200 #[serde(default, skip_serializing_if = "Option::is_none")]
201 pub temperature: Option<f32>,
202 #[serde(default, skip_serializing_if = "Option::is_none")]
203 pub max_output_tokens: Option<u32>,
204 pub system_prompt_bytes: usize,
205 pub system_prompt_chars: usize,
206 pub message_count: usize,
207 pub message_counts: ProviderMessageCounts,
208 pub content_counts: ProviderContentCounts,
209 pub tool_count: usize,
210 pub tool_names: Vec<String>,
211 pub tool_schema_bytes: usize,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
213 pub last_message_role: Option<String>,
214}
215
216#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
217pub struct ProviderMessageCounts {
218 pub system: usize,
219 pub user: usize,
220 pub assistant: usize,
221 pub tool_result: usize,
222 pub custom: usize,
223}
224
225#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
226pub struct ProviderContentCounts {
227 pub system_message_bytes: usize,
228 pub user_text_blocks: usize,
229 pub user_text_bytes: usize,
230 pub user_image_blocks: usize,
231 pub user_image_with_media_type: usize,
232 pub assistant_text_blocks: usize,
233 pub assistant_text_bytes: usize,
234 pub assistant_thinking_blocks: usize,
235 pub assistant_thinking_bytes: usize,
236 pub assistant_reasoning_blocks: usize,
237 pub assistant_reasoning_bytes: usize,
238 pub assistant_reasoning_detail_blocks: usize,
239 pub assistant_reasoning_detail_bytes: usize,
240 pub assistant_tool_call_blocks: usize,
241 pub assistant_error_messages: usize,
242 pub tool_result_text_blocks: usize,
243 pub tool_result_text_bytes: usize,
244 pub tool_result_image_blocks: usize,
245 pub tool_result_error_messages: usize,
246 pub custom_payload_bytes: usize,
247}
248
249impl ProviderRequestSummary {
250 #[allow(clippy::too_many_arguments)]
253 pub fn from_parts(
254 iteration: usize,
255 model_id: Option<&str>,
256 temperature: Option<f32>,
257 max_output_tokens: Option<u32>,
258 system_prompt: &str,
259 messages: &[AgentMessage],
260 tools: &[ToolSchema],
261 ) -> Self {
262 let mut message_counts = ProviderMessageCounts::default();
263 let mut content_counts = ProviderContentCounts::default();
264
265 for message in messages {
266 match message {
267 AgentMessage::System { content, .. } => {
268 message_counts.system += 1;
269 content_counts.system_message_bytes += content.len();
270 }
271 AgentMessage::User { content, .. } => {
272 message_counts.user += 1;
273 count_user_content(content, &mut content_counts);
274 }
275 AgentMessage::Assistant {
276 content,
277 error_message,
278 ..
279 } => {
280 message_counts.assistant += 1;
281 if error_message.is_some() {
282 content_counts.assistant_error_messages += 1;
283 }
284 count_assistant_content(content, &mut content_counts);
285 }
286 AgentMessage::ToolResult {
287 content, is_error, ..
288 } => {
289 message_counts.tool_result += 1;
290 if *is_error {
291 content_counts.tool_result_error_messages += 1;
292 }
293 count_tool_result_content(content, &mut content_counts);
294 }
295 AgentMessage::Custom { payload, .. } => {
296 message_counts.custom += 1;
297 content_counts.custom_payload_bytes += json_size(payload);
298 }
299 }
300 }
301
302 let tool_names = tools
303 .iter()
304 .map(|tool| tool.name.clone())
305 .collect::<Vec<_>>();
306 let tool_schema_bytes = tools.iter().map(tool_schema_size).sum();
307
308 Self {
309 iteration,
310 model_id: model_id
311 .map(str::trim)
312 .filter(|id| !id.is_empty())
313 .map(str::to_string),
314 temperature,
315 max_output_tokens,
316 system_prompt_bytes: system_prompt.len(),
317 system_prompt_chars: system_prompt.chars().count(),
318 message_count: messages.len(),
319 message_counts,
320 content_counts,
321 tool_count: tools.len(),
322 tool_names,
323 tool_schema_bytes,
324 last_message_role: messages.last().map(message_role).map(str::to_string),
325 }
326 }
327}
328
329fn count_user_content(content: &UserContent, counts: &mut ProviderContentCounts) {
330 match content {
331 UserContent::Text(text) => {
332 counts.user_text_blocks += 1;
333 counts.user_text_bytes += text.len();
334 }
335 UserContent::Blocks(blocks) => {
336 for block in blocks {
337 match block {
338 UserBlock::Text(text) => {
339 counts.user_text_blocks += 1;
340 counts.user_text_bytes += text.text.len();
341 }
342 UserBlock::Image(image) => {
343 counts.user_image_blocks += 1;
344 if image.media_type.is_some() {
345 counts.user_image_with_media_type += 1;
346 }
347 }
348 }
349 }
350 }
351 }
352}
353
354fn count_assistant_content(
355 content: &crate::types::AssistantContent,
356 counts: &mut ProviderContentCounts,
357) {
358 for block in &content.blocks {
359 match block {
360 AssistantBlock::Text(text) => {
361 counts.assistant_text_blocks += 1;
362 counts.assistant_text_bytes += text.text.len();
363 }
364 AssistantBlock::Thinking(text) => {
365 counts.assistant_thinking_blocks += 1;
366 counts.assistant_thinking_bytes += text.text.len();
367 }
368 AssistantBlock::Reasoning(text) => {
369 counts.assistant_reasoning_blocks += 1;
370 counts.assistant_reasoning_bytes += text.text.len();
371 }
372 AssistantBlock::ReasoningDetails(details) => {
373 counts.assistant_reasoning_detail_blocks += 1;
374 counts.assistant_reasoning_detail_bytes += json_size(&details.details);
375 }
376 AssistantBlock::ToolCall(_) => {
377 counts.assistant_tool_call_blocks += 1;
378 }
379 }
380 }
381}
382
383fn count_tool_result_content(
384 content: &crate::types::ToolResultContent,
385 counts: &mut ProviderContentCounts,
386) {
387 for block in &content.blocks {
388 match block {
389 ToolResultBlock::Text(text) => {
390 counts.tool_result_text_blocks += 1;
391 counts.tool_result_text_bytes += text.text.len();
392 }
393 ToolResultBlock::Image(_) => {
394 counts.tool_result_image_blocks += 1;
395 }
396 }
397 }
398}
399
400fn message_role(message: &AgentMessage) -> &'static str {
401 match message {
402 AgentMessage::System { .. } => "system",
403 AgentMessage::User { .. } => "user",
404 AgentMessage::Assistant { .. } => "assistant",
405 AgentMessage::ToolResult { .. } => "tool_result",
406 AgentMessage::Custom { .. } => "custom",
407 }
408}
409
410fn tool_schema_size(tool: &ToolSchema) -> usize {
411 tool.name.len()
412 + tool.description.len()
413 + serde_json::to_vec(&tool.parameters)
414 .map(|bytes| bytes.len())
415 .unwrap_or(0)
416}
417
418fn json_size(value: &impl Serialize) -> usize {
419 serde_json::to_vec(value)
420 .map(|bytes| bytes.len())
421 .unwrap_or(0)
422}
423
424#[async_trait]
431pub trait EventSink: Send + Sync {
432 async fn emit(&self, event: AgentEvent);
433}
434
435pub struct NoopSink;
438
439#[async_trait]
440impl EventSink for NoopSink {
441 async fn emit(&self, _event: AgentEvent) {}
442}
443
444pub struct ChannelSink {
450 tx: tokio::sync::mpsc::UnboundedSender<AgentEvent>,
451}
452
453impl ChannelSink {
454 pub fn new() -> (Self, tokio::sync::mpsc::UnboundedReceiver<AgentEvent>) {
455 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
456 (Self { tx }, rx)
457 }
458}
459
460#[async_trait]
461impl EventSink for ChannelSink {
462 async fn emit(&self, event: AgentEvent) {
463 if self.tx.send(event).is_err() {
464 }
466 }
467}
468
469pub struct FanOutSink {
473 sinks: Vec<Arc<dyn EventSink>>,
474}
475
476impl FanOutSink {
477 pub fn new(sinks: Vec<Arc<dyn EventSink>>) -> Self {
478 Self { sinks }
479 }
480}
481
482#[async_trait]
483impl EventSink for FanOutSink {
484 async fn emit(&self, event: AgentEvent) {
485 for sink in &self.sinks {
486 sink.emit(event.clone()).await;
487 }
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494
495 #[tokio::test]
496 async fn channel_sink_forwards_events() {
497 let (sink, mut rx) = ChannelSink::new();
498 sink.emit(AgentEvent::AgentStart).await;
499 sink.emit(AgentEvent::TurnStart).await;
500 drop(sink);
501
502 let mut received = Vec::new();
503 while let Some(e) = rx.recv().await {
504 received.push(e);
505 }
506 assert_eq!(received.len(), 2);
507 assert!(matches!(received[0], AgentEvent::AgentStart));
508 assert!(matches!(received[1], AgentEvent::TurnStart));
509 }
510
511 #[tokio::test]
512 async fn fan_out_sink_replicates() {
513 let (a, mut a_rx) = ChannelSink::new();
514 let (b, mut b_rx) = ChannelSink::new();
515 let fanout = FanOutSink::new(vec![Arc::new(a), Arc::new(b)]);
516 fanout.emit(AgentEvent::AgentStart).await;
517 drop(fanout);
518
519 assert!(matches!(a_rx.recv().await, Some(AgentEvent::AgentStart)));
520 assert!(matches!(b_rx.recv().await, Some(AgentEvent::AgentStart)));
521 }
522
523 #[test]
524 fn provider_request_summary_counts_shape_without_text() {
525 let messages = vec![
526 AgentMessage::User {
527 content: UserContent::Blocks(vec![
528 UserBlock::Text(crate::types::TextContent {
529 text: "secret user request".into(),
530 }),
531 UserBlock::Image(crate::types::ImageContent {
532 source: "data:image/png;base64,secret".into(),
533 media_type: Some("image/png".into()),
534 alt: Some("screenshot".into()),
535 }),
536 ]),
537 timestamp: None,
538 },
539 AgentMessage::Assistant {
540 content: crate::types::AssistantContent {
541 blocks: vec![
542 AssistantBlock::Thinking(crate::types::TextContent {
543 text: "private scratch".into(),
544 }),
545 AssistantBlock::ToolCall(crate::tool::ToolCall {
546 id: "call-1".into(),
547 name: "web_search".into(),
548 arguments: serde_json::json!({"q": "secret"}),
549 }),
550 ],
551 },
552 stop_reason: crate::types::StopReason::ToolUse,
553 error_message: None,
554 timestamp: None,
555 usage: None,
556 },
557 AgentMessage::ToolResult {
558 tool_call_id: "call-1".into(),
559 tool_name: "web_search".into(),
560 content: crate::types::ToolResultContent::text("secret result"),
561 is_error: false,
562 narration: None,
563 details: None,
564 timestamp: None,
565 },
566 ];
567 let tools = vec![ToolSchema {
568 name: "web_search".into(),
569 description: "Search the web".into(),
570 parameters: serde_json::json!({"type": "object", "properties": {"q": {"type": "string"}}}),
571 }];
572
573 let summary = ProviderRequestSummary::from_parts(
574 2,
575 Some("google/gemini-3.1-flash-lite-preview"),
576 Some(0.2),
577 Some(4096),
578 "system prompt secret",
579 &messages,
580 &tools,
581 );
582
583 assert_eq!(summary.iteration, 2);
584 assert_eq!(
585 summary.model_id.as_deref(),
586 Some("google/gemini-3.1-flash-lite-preview")
587 );
588 assert_eq!(summary.message_counts.user, 1);
589 assert_eq!(summary.message_counts.assistant, 1);
590 assert_eq!(summary.message_counts.tool_result, 1);
591 assert_eq!(summary.content_counts.user_text_blocks, 1);
592 assert_eq!(
593 summary.content_counts.user_text_bytes,
594 "secret user request".len()
595 );
596 assert_eq!(summary.content_counts.user_image_blocks, 1);
597 assert_eq!(summary.content_counts.assistant_thinking_blocks, 1);
598 assert_eq!(summary.content_counts.assistant_tool_call_blocks, 1);
599 assert_eq!(summary.content_counts.tool_result_text_blocks, 1);
600 assert_eq!(summary.tool_names, vec!["web_search"]);
601 assert_eq!(summary.last_message_role.as_deref(), Some("tool_result"));
602
603 let serialized = serde_json::to_string(&summary).unwrap();
604 assert!(!serialized.contains("secret user request"));
605 assert!(!serialized.contains("private scratch"));
606 assert!(!serialized.contains("secret result"));
607 assert!(!serialized.contains("data:image"));
608 }
609}