Skip to main content

clark_agent/
event.rs

1//! Typed events emitted by the loop.
2//!
3//! Single sink, single enum. Streaming consumers pattern-match on the
4//! event kind. Events are observation-only — they cannot change loop
5//! state. Plugins that need to mutate state use the dedicated capability
6//! traits in [`crate::plugin`].
7
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::sync::Arc;
12
13use crate::stream::{AssistantStreamChunk, ToolSchema};
14use crate::tool::ToolResult;
15use crate::types::{
16    AgentMessage, AssistantBlock, RunIdentity, ToolResultBlock, UserBlock, UserContent,
17};
18
19/// All events the loop emits.
20///
21/// Lifecycle events (`AgentStart`, `AgentEnd`, `TurnStart`, `TurnEnd`)
22/// bracket the run. Message events (`MessageStart`, `MessageUpdate`,
23/// `MessageEnd`) bracket each individual message. Tool events
24/// (`ToolExecutionStart`, `ToolExecutionUpdate`, `ToolExecutionEnd`)
25/// bracket each tool call.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum AgentEvent {
29    /// First event in a run. Emitted once.
30    AgentStart,
31
32    /// Run identity, emitted immediately after [`AgentEvent::AgentStart`]
33    /// when the context carries a [`RunIdentity`]. Trajectory sinks key
34    /// every subsequent event of the same run on `identity.run_id`;
35    /// child runs surface their `parent_run_id` so the spawn tree
36    /// rebuilds without external bookkeeping.
37    ///
38    /// Existing observers that don't care about identity ignore this
39    /// variant (every match arm in the tree already has a wildcard
40    /// fallback). Plugins and sinks that want identity pattern-match
41    /// directly.
42    RunIdentified { identity: RunIdentity },
43
44    /// Last event in a run. Carries the messages produced *during this run*
45    /// (not the full transcript). Listeners that want the full transcript
46    /// should fold prior messages into a state of their own.
47    AgentEnd { messages: Vec<AgentMessage> },
48
49    /// Bracket: a new turn begins. A turn is one assistant response plus
50    /// any tool calls/results it spawned.
51    TurnStart,
52
53    /// Bracket: a turn ends. Carries the assistant message and the tool
54    /// results for that turn (empty if the model didn't call any tools).
55    TurnEnd {
56        message: AgentMessage,
57        tool_results: Vec<AgentMessage>,
58    },
59
60    /// A message has been added to the transcript (user, assistant, or
61    /// tool result). For assistant messages, this fires before streaming
62    /// begins; subsequent `MessageUpdate` events carry deltas.
63    MessageStart { message: AgentMessage },
64
65    /// Streaming delta for the in-progress assistant message.
66    MessageUpdate {
67        partial: AgentMessage,
68        chunk: AssistantStreamChunk,
69    },
70
71    /// The message has been fully assembled (final content, stop reason).
72    MessageEnd { message: AgentMessage },
73
74    /// A tool execution has begun.
75    ToolExecutionStart {
76        tool_call_id: String,
77        tool_name: String,
78        args: Value,
79    },
80
81    /// Partial progress from a long-running tool. The tool calls
82    /// `update.send(...)` to surface intermediate state without ending.
83    ToolExecutionUpdate {
84        tool_call_id: String,
85        tool_name: String,
86        partial: ToolResult,
87    },
88
89    /// A tool execution has finished.
90    ToolExecutionEnd {
91        tool_call_id: String,
92        tool_name: String,
93        result: ToolResult,
94        is_error: bool,
95    },
96
97    /// The loop discarded a truncated assistant turn and re-streamed
98    /// with a higher `max_output_tokens` cap. Emitted once per
99    /// retry attempt; multiple events for the same turn signal the
100    /// recovery walked the configured ladder. See
101    /// [`crate::config::MaxTokensRecovery`].
102    OutputTokensEscalation {
103        /// 1-indexed retry counter within the current turn.
104        attempt: u8,
105        /// Cap that produced the truncated turn we're discarding.
106        prev_cap: u32,
107        /// Cap we're re-streaming with.
108        new_cap: u32,
109    },
110
111    /// A `ContextTransform` plugin ran on this turn's transcript.
112    /// Emitted once per active transform per turn, in registration
113    /// order. Carries the full before/after message slices so observers
114    /// can reconstruct exactly which messages each transform removed,
115    /// added, or rewrote — the canonical answer to "which compaction
116    /// stripped that tool result we expected the model to still see?".
117    ContextTransformApplied {
118        /// Zero-indexed turn within the current run. Same semantics as
119        /// [`crate::plugin::TransformContext::iteration`].
120        iteration: usize,
121        /// `Plugin::name` of the transform that just ran.
122        plugin: &'static str,
123        /// Transcript handed to the transform.
124        before: Vec<AgentMessage>,
125        /// Transcript the transform returned.
126        after: Vec<AgentMessage>,
127    },
128
129    /// A `ToolGate` plugin contributed to this turn's allowlist.
130    /// Emitted once per gate per turn. Multiple gates compose by
131    /// intersection downstream; this event records the gate's own
132    /// decision before composition so observers can attribute the
133    /// final allowlist to specific plugins.
134    ToolGateApplied {
135        /// Zero-indexed turn within the current run.
136        iteration: usize,
137        /// `Plugin::name` of the gate.
138        plugin: &'static str,
139        /// `None` when the gate declined to constrain;
140        /// `Some(names)` when it returned an allowlist (sorted for
141        /// stable diffing).
142        allow: Option<Vec<String>>,
143    },
144
145    /// Multiple `ToolGate` plugins narrowed the same turn to disjoint
146    /// non-empty allowlists. The loop repaired the composition to avoid
147    /// advertising an empty tool catalog to the model.
148    ToolGateConflictResolved {
149        /// Zero-indexed turn within the current run.
150        iteration: usize,
151        /// Gate names that returned a non-empty allowlist.
152        plugins: Vec<String>,
153        /// Gate whose allowlist won the deterministic repair policy.
154        chosen_plugin: Option<String>,
155        /// Final repaired allowlist, sorted for stable diffing.
156        allow: Vec<String>,
157        /// Human-readable policy reason for trajectory/debug inspection.
158        reason: String,
159    },
160
161    /// Snapshot of the request the loop is about to send to the
162    /// provider on this turn, taken after every `ContextTransform`
163    /// has run and every `ToolGate` has filtered. This is the typed
164    /// view of "what the model sees" — wire-format conversion
165    /// (provider-specific shapes) happens downstream inside the
166    /// `StreamFn`. Emitted once per turn, just before the stream call.
167    ProviderRequestPrepared {
168        /// Zero-indexed turn within the current run.
169        iteration: usize,
170        /// Model identifier the host associated with this loop, when
171        /// known. Provider transports still own their wire conversion,
172        /// so this is observability metadata only.
173        model_id: Option<String>,
174        /// System prompt for this turn. May include ephemeral system
175        /// reminders injected by `ContextTransform` plugins.
176        system_prompt: String,
177        /// Full message history the loop is about to send.
178        messages: Vec<AgentMessage>,
179        /// Tool schemas advertised this turn, post-`ToolGate` filtering.
180        tools: Vec<ToolSchema>,
181        /// Sampling temperature forwarded to the provider stream, when
182        /// configured.
183        temperature: Option<f32>,
184        /// Resolved per-turn output cap.
185        max_output_tokens: Option<u32>,
186    },
187}
188
189/// Redacted, durable metadata for one provider request.
190///
191/// This deliberately excludes free-form prompt, message, image URL,
192/// tool-description, and schema content. It keeps the dimensions needed
193/// to debug "what shape did we send?" without leaking user text or
194/// hidden/private reasoning.
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
196pub struct ProviderRequestSummary {
197    pub iteration: usize,
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub model_id: Option<String>,
200    #[serde(default, skip_serializing_if = "Option::is_none")]
201    pub temperature: Option<f32>,
202    #[serde(default, skip_serializing_if = "Option::is_none")]
203    pub max_output_tokens: Option<u32>,
204    pub system_prompt_bytes: usize,
205    pub system_prompt_chars: usize,
206    pub message_count: usize,
207    pub message_counts: ProviderMessageCounts,
208    pub content_counts: ProviderContentCounts,
209    pub tool_count: usize,
210    pub tool_names: Vec<String>,
211    pub tool_schema_bytes: usize,
212    #[serde(default, skip_serializing_if = "Option::is_none")]
213    pub last_message_role: Option<String>,
214}
215
216#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
217pub struct ProviderMessageCounts {
218    pub system: usize,
219    pub user: usize,
220    pub assistant: usize,
221    pub tool_result: usize,
222    pub custom: usize,
223}
224
225#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
226pub struct ProviderContentCounts {
227    pub system_message_bytes: usize,
228    pub user_text_blocks: usize,
229    pub user_text_bytes: usize,
230    pub user_image_blocks: usize,
231    pub user_image_with_media_type: usize,
232    pub assistant_text_blocks: usize,
233    pub assistant_text_bytes: usize,
234    pub assistant_thinking_blocks: usize,
235    pub assistant_thinking_bytes: usize,
236    pub assistant_reasoning_blocks: usize,
237    pub assistant_reasoning_bytes: usize,
238    pub assistant_reasoning_detail_blocks: usize,
239    pub assistant_reasoning_detail_bytes: usize,
240    pub assistant_tool_call_blocks: usize,
241    pub assistant_error_messages: usize,
242    pub tool_result_text_blocks: usize,
243    pub tool_result_text_bytes: usize,
244    pub tool_result_image_blocks: usize,
245    pub tool_result_error_messages: usize,
246    pub custom_payload_bytes: usize,
247}
248
249impl ProviderRequestSummary {
250    // Mirrors the provider-request fields directly; grouping would only hide
251    // the shape this summary is meant to expose.
252    #[allow(clippy::too_many_arguments)]
253    pub fn from_parts(
254        iteration: usize,
255        model_id: Option<&str>,
256        temperature: Option<f32>,
257        max_output_tokens: Option<u32>,
258        system_prompt: &str,
259        messages: &[AgentMessage],
260        tools: &[ToolSchema],
261    ) -> Self {
262        let mut message_counts = ProviderMessageCounts::default();
263        let mut content_counts = ProviderContentCounts::default();
264
265        for message in messages {
266            match message {
267                AgentMessage::System { content, .. } => {
268                    message_counts.system += 1;
269                    content_counts.system_message_bytes += content.len();
270                }
271                AgentMessage::User { content, .. } => {
272                    message_counts.user += 1;
273                    count_user_content(content, &mut content_counts);
274                }
275                AgentMessage::Assistant {
276                    content,
277                    error_message,
278                    ..
279                } => {
280                    message_counts.assistant += 1;
281                    if error_message.is_some() {
282                        content_counts.assistant_error_messages += 1;
283                    }
284                    count_assistant_content(content, &mut content_counts);
285                }
286                AgentMessage::ToolResult {
287                    content, is_error, ..
288                } => {
289                    message_counts.tool_result += 1;
290                    if *is_error {
291                        content_counts.tool_result_error_messages += 1;
292                    }
293                    count_tool_result_content(content, &mut content_counts);
294                }
295                AgentMessage::Custom { payload, .. } => {
296                    message_counts.custom += 1;
297                    content_counts.custom_payload_bytes += json_size(payload);
298                }
299            }
300        }
301
302        let tool_names = tools
303            .iter()
304            .map(|tool| tool.name.clone())
305            .collect::<Vec<_>>();
306        let tool_schema_bytes = tools.iter().map(tool_schema_size).sum();
307
308        Self {
309            iteration,
310            model_id: model_id
311                .map(str::trim)
312                .filter(|id| !id.is_empty())
313                .map(str::to_string),
314            temperature,
315            max_output_tokens,
316            system_prompt_bytes: system_prompt.len(),
317            system_prompt_chars: system_prompt.chars().count(),
318            message_count: messages.len(),
319            message_counts,
320            content_counts,
321            tool_count: tools.len(),
322            tool_names,
323            tool_schema_bytes,
324            last_message_role: messages.last().map(message_role).map(str::to_string),
325        }
326    }
327}
328
329fn count_user_content(content: &UserContent, counts: &mut ProviderContentCounts) {
330    match content {
331        UserContent::Text(text) => {
332            counts.user_text_blocks += 1;
333            counts.user_text_bytes += text.len();
334        }
335        UserContent::Blocks(blocks) => {
336            for block in blocks {
337                match block {
338                    UserBlock::Text(text) => {
339                        counts.user_text_blocks += 1;
340                        counts.user_text_bytes += text.text.len();
341                    }
342                    UserBlock::Image(image) => {
343                        counts.user_image_blocks += 1;
344                        if image.media_type.is_some() {
345                            counts.user_image_with_media_type += 1;
346                        }
347                    }
348                }
349            }
350        }
351    }
352}
353
354fn count_assistant_content(
355    content: &crate::types::AssistantContent,
356    counts: &mut ProviderContentCounts,
357) {
358    for block in &content.blocks {
359        match block {
360            AssistantBlock::Text(text) => {
361                counts.assistant_text_blocks += 1;
362                counts.assistant_text_bytes += text.text.len();
363            }
364            AssistantBlock::Thinking(text) => {
365                counts.assistant_thinking_blocks += 1;
366                counts.assistant_thinking_bytes += text.text.len();
367            }
368            AssistantBlock::Reasoning(text) => {
369                counts.assistant_reasoning_blocks += 1;
370                counts.assistant_reasoning_bytes += text.text.len();
371            }
372            AssistantBlock::ReasoningDetails(details) => {
373                counts.assistant_reasoning_detail_blocks += 1;
374                counts.assistant_reasoning_detail_bytes += json_size(&details.details);
375            }
376            AssistantBlock::ToolCall(_) => {
377                counts.assistant_tool_call_blocks += 1;
378            }
379        }
380    }
381}
382
383fn count_tool_result_content(
384    content: &crate::types::ToolResultContent,
385    counts: &mut ProviderContentCounts,
386) {
387    for block in &content.blocks {
388        match block {
389            ToolResultBlock::Text(text) => {
390                counts.tool_result_text_blocks += 1;
391                counts.tool_result_text_bytes += text.text.len();
392            }
393            ToolResultBlock::Image(_) => {
394                counts.tool_result_image_blocks += 1;
395            }
396        }
397    }
398}
399
400fn message_role(message: &AgentMessage) -> &'static str {
401    match message {
402        AgentMessage::System { .. } => "system",
403        AgentMessage::User { .. } => "user",
404        AgentMessage::Assistant { .. } => "assistant",
405        AgentMessage::ToolResult { .. } => "tool_result",
406        AgentMessage::Custom { .. } => "custom",
407    }
408}
409
410fn tool_schema_size(tool: &ToolSchema) -> usize {
411    tool.name.len()
412        + tool.description.len()
413        + serde_json::to_vec(&tool.parameters)
414            .map(|bytes| bytes.len())
415            .unwrap_or(0)
416}
417
418fn json_size(value: &impl Serialize) -> usize {
419    serde_json::to_vec(value)
420        .map(|bytes| bytes.len())
421        .unwrap_or(0)
422}
423
424/// Sink the loop publishes events to.
425///
426/// Implementations buffer, log, forward, persist, etc. The loop awaits
427/// `emit` so backpressure flows naturally. Failures inside the sink must
428/// not propagate out of the loop — observers that fail are logged and
429/// skipped.
430#[async_trait]
431pub trait EventSink: Send + Sync {
432    async fn emit(&self, event: AgentEvent);
433}
434
435/// Trivial discard sink, useful when the caller only cares about the
436/// final result of `run`.
437pub struct NoopSink;
438
439#[async_trait]
440impl EventSink for NoopSink {
441    async fn emit(&self, _event: AgentEvent) {}
442}
443
444/// Sink that forwards events into a `tokio::sync::mpsc::UnboundedSender`.
445///
446/// The loop is the producer; the consumer drains the channel and renders /
447/// persists / forwards each event. Drops events silently when the receiver
448/// is gone.
449pub struct ChannelSink {
450    tx: tokio::sync::mpsc::UnboundedSender<AgentEvent>,
451}
452
453impl ChannelSink {
454    pub fn new() -> (Self, tokio::sync::mpsc::UnboundedReceiver<AgentEvent>) {
455        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
456        (Self { tx }, rx)
457    }
458}
459
460#[async_trait]
461impl EventSink for ChannelSink {
462    async fn emit(&self, event: AgentEvent) {
463        if self.tx.send(event).is_err() {
464            // Receiver shutdown is an accepted best-effort sink outcome.
465        }
466    }
467}
468
469/// Composite sink that fans events out to multiple downstream sinks in
470/// declaration order. Useful for mixing a logger, a UI forwarder, and a
471/// persistence layer.
472pub struct FanOutSink {
473    sinks: Vec<Arc<dyn EventSink>>,
474}
475
476impl FanOutSink {
477    pub fn new(sinks: Vec<Arc<dyn EventSink>>) -> Self {
478        Self { sinks }
479    }
480}
481
482#[async_trait]
483impl EventSink for FanOutSink {
484    async fn emit(&self, event: AgentEvent) {
485        for sink in &self.sinks {
486            sink.emit(event.clone()).await;
487        }
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494
495    #[tokio::test]
496    async fn channel_sink_forwards_events() {
497        let (sink, mut rx) = ChannelSink::new();
498        sink.emit(AgentEvent::AgentStart).await;
499        sink.emit(AgentEvent::TurnStart).await;
500        drop(sink);
501
502        let mut received = Vec::new();
503        while let Some(e) = rx.recv().await {
504            received.push(e);
505        }
506        assert_eq!(received.len(), 2);
507        assert!(matches!(received[0], AgentEvent::AgentStart));
508        assert!(matches!(received[1], AgentEvent::TurnStart));
509    }
510
511    #[tokio::test]
512    async fn fan_out_sink_replicates() {
513        let (a, mut a_rx) = ChannelSink::new();
514        let (b, mut b_rx) = ChannelSink::new();
515        let fanout = FanOutSink::new(vec![Arc::new(a), Arc::new(b)]);
516        fanout.emit(AgentEvent::AgentStart).await;
517        drop(fanout);
518
519        assert!(matches!(a_rx.recv().await, Some(AgentEvent::AgentStart)));
520        assert!(matches!(b_rx.recv().await, Some(AgentEvent::AgentStart)));
521    }
522
523    #[test]
524    fn provider_request_summary_counts_shape_without_text() {
525        let messages = vec![
526            AgentMessage::User {
527                content: UserContent::Blocks(vec![
528                    UserBlock::Text(crate::types::TextContent {
529                        text: "secret user request".into(),
530                    }),
531                    UserBlock::Image(crate::types::ImageContent {
532                        source: "data:image/png;base64,secret".into(),
533                        media_type: Some("image/png".into()),
534                        alt: Some("screenshot".into()),
535                    }),
536                ]),
537                timestamp: None,
538            },
539            AgentMessage::Assistant {
540                content: crate::types::AssistantContent {
541                    blocks: vec![
542                        AssistantBlock::Thinking(crate::types::TextContent {
543                            text: "private scratch".into(),
544                        }),
545                        AssistantBlock::ToolCall(crate::tool::ToolCall {
546                            id: "call-1".into(),
547                            name: "web_search".into(),
548                            arguments: serde_json::json!({"q": "secret"}),
549                        }),
550                    ],
551                },
552                stop_reason: crate::types::StopReason::ToolUse,
553                error_message: None,
554                timestamp: None,
555                usage: None,
556            },
557            AgentMessage::ToolResult {
558                tool_call_id: "call-1".into(),
559                tool_name: "web_search".into(),
560                content: crate::types::ToolResultContent::text("secret result"),
561                is_error: false,
562                narration: None,
563                details: None,
564                timestamp: None,
565            },
566        ];
567        let tools = vec![ToolSchema {
568            name: "web_search".into(),
569            description: "Search the web".into(),
570            parameters: serde_json::json!({"type": "object", "properties": {"q": {"type": "string"}}}),
571        }];
572
573        let summary = ProviderRequestSummary::from_parts(
574            2,
575            Some("google/gemini-3.1-flash-lite-preview"),
576            Some(0.2),
577            Some(4096),
578            "system prompt secret",
579            &messages,
580            &tools,
581        );
582
583        assert_eq!(summary.iteration, 2);
584        assert_eq!(
585            summary.model_id.as_deref(),
586            Some("google/gemini-3.1-flash-lite-preview")
587        );
588        assert_eq!(summary.message_counts.user, 1);
589        assert_eq!(summary.message_counts.assistant, 1);
590        assert_eq!(summary.message_counts.tool_result, 1);
591        assert_eq!(summary.content_counts.user_text_blocks, 1);
592        assert_eq!(
593            summary.content_counts.user_text_bytes,
594            "secret user request".len()
595        );
596        assert_eq!(summary.content_counts.user_image_blocks, 1);
597        assert_eq!(summary.content_counts.assistant_thinking_blocks, 1);
598        assert_eq!(summary.content_counts.assistant_tool_call_blocks, 1);
599        assert_eq!(summary.content_counts.tool_result_text_blocks, 1);
600        assert_eq!(summary.tool_names, vec!["web_search"]);
601        assert_eq!(summary.last_message_role.as_deref(), Some("tool_result"));
602
603        let serialized = serde_json::to_string(&summary).unwrap();
604        assert!(!serialized.contains("secret user request"));
605        assert!(!serialized.contains("private scratch"));
606        assert!(!serialized.contains("secret result"));
607        assert!(!serialized.contains("data:image"));
608    }
609}