Skip to main content

distri_types/
events.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4use crate::core::{MessageRole, ToolCall, ToolResponse};
5use crate::execution::ContextBudget;
6use crate::hooks::InlineHookRequest;
7
8/// Token usage information for a run
9#[derive(Debug, Serialize, Deserialize, Clone, Default)]
10pub struct RunUsage {
11    /// Actual tokens used (from LLM response)
12    pub total_tokens: u32,
13    pub input_tokens: u32,
14    pub output_tokens: u32,
15    /// Tokens read from provider cache (e.g., Anthropic prompt caching)
16    #[serde(default)]
17    pub cached_tokens: u32,
18    /// Estimated tokens (pre-call estimate)
19    pub estimated_tokens: u32,
20    /// Model used for this run (e.g., "gpt-5.1", "claude-sonnet-4")
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    pub model: Option<String>,
23    /// Estimated cost in USD (based on model pricing)
24    #[serde(default, skip_serializing_if = "Option::is_none")]
25    pub cost_usd: Option<f64>,
26}
27
28#[derive(Debug, Serialize, Deserialize, Clone)]
29#[serde(rename_all = "snake_case")]
30pub struct AgentEvent {
31    pub timestamp: chrono::DateTime<chrono::Utc>,
32    pub thread_id: String,
33    pub run_id: String,
34    pub event: AgentEventType,
35    pub task_id: String,
36    /// Ancestor task that dispatched this run. `None` for root tasks.
37    /// Lets consumers reconstruct the task tree from the event stream
38    /// (and route sub-agent events to the right node in the FE store).
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub parent_task_id: Option<String>,
41    pub agent_id: String,
42    /// User ID for usage tracking
43    #[serde(default)]
44    pub user_id: Option<String>,
45    /// Identifier ID for tenant/project-level usage tracking
46    #[serde(default)]
47    pub identifier_id: Option<String>,
48    /// Workspace ID for workspace-scoped usage tracking
49    #[serde(default)]
50    pub workspace_id: Option<String>,
51    /// Channel ID for channel-scoped usage tracking
52    #[serde(default)]
53    pub channel_id: Option<String>,
54}
55
56impl AgentEvent {
57    /// Reconstruct an AgentEvent from a stored TaskEvent (e.g. for history replay).
58    pub fn from_task_event(task_event: &crate::TaskEvent, thread_id: &str) -> Self {
59        Self {
60            event: task_event.event.clone(),
61            agent_id: String::new(),
62            timestamp: chrono::DateTime::from_timestamp_millis(task_event.created_at)
63                .unwrap_or_default(),
64            thread_id: thread_id.to_string(),
65            run_id: String::new(),
66            task_id: String::new(),
67            parent_task_id: None,
68            user_id: None,
69            identifier_id: None,
70            workspace_id: None,
71            channel_id: None,
72        }
73    }
74}
75
76/// Typed payload that goes into the A2A `TaskStatusUpdateEvent.metadata` field
77/// for every event the server emits. Carries the routing fields the wire
78/// envelope (A2A) doesn't model — `parent_task_id` (for the FE/CLI task tree)
79/// and `agent_id` (for tool-registry lookups on sub-agent events).
80///
81/// Use `from_event` / `to_agent_event` to round-trip without loose-JSON
82/// extraction. The A2A `TaskStatusUpdateEvent` itself is not extended —
83/// everything Distri-specific lives inside this typed body.
84#[derive(Debug, Serialize, Deserialize, Clone)]
85pub struct AgentEventEnvelope {
86    /// The event variant inline (`type = "..."`, plus variant fields).
87    /// `serde(flatten)` keeps the wire shape readable.
88    #[serde(flatten)]
89    pub event: AgentEventType,
90    /// Definition name of the agent that emitted this event. For sub-agent
91    /// events relayed through a parent's stream, this is the sub-agent's
92    /// name — the stream's URL agent_id is the parent's, so consumers need
93    /// this to look up tool registries / display names per-event.
94    pub agent_id: String,
95    /// Dispatching task for sub-agent events. Absent for root-task events.
96    /// Lets the consumer route per-task without modifying the A2A spec.
97    #[serde(default, skip_serializing_if = "Option::is_none")]
98    pub parent_task_id: Option<String>,
99}
100
101impl AgentEventEnvelope {
102    /// Build an envelope from a full `AgentEvent` for serialization.
103    pub fn from_event(event: &AgentEvent) -> Self {
104        Self {
105            event: event.event.clone(),
106            agent_id: event.agent_id.clone(),
107            parent_task_id: event.parent_task_id.clone(),
108        }
109    }
110}
111
112#[derive(Debug, Serialize, Deserialize, Clone)]
113#[serde(rename_all = "snake_case", tag = "type")]
114#[allow(clippy::large_enum_variant)]
115pub enum AgentEventType {
116    /// Verbose diagnostic message streamed from server to client (only emitted when verbose=true).
117    DiagnosticLog {
118        message: String,
119    },
120
121    // Main run events
122    RunStarted {},
123    RunFinished {
124        success: bool,
125        total_steps: usize,
126        failed_steps: usize,
127        /// Token usage for this run
128        usage: Option<RunUsage>,
129        #[serde(default, skip_serializing_if = "Option::is_none")]
130        context_budget: Option<ContextBudget>,
131    },
132    RunError {
133        message: String,
134        code: Option<String>,
135        /// Cumulative token usage at the point of failure
136        #[serde(default, skip_serializing_if = "Option::is_none")]
137        usage: Option<RunUsage>,
138    },
139    PlanStarted {
140        initial_plan: bool,
141    },
142    PlanFinished {
143        total_steps: usize,
144    },
145    PlanPruned {
146        removed_steps: Vec<String>,
147    },
148    // Step execution events
149    StepStarted {
150        step_id: String,
151        step_index: usize,
152    },
153    StepCompleted {
154        step_id: String,
155        success: bool,
156        #[serde(default, skip_serializing_if = "Option::is_none")]
157        context_budget: Option<ContextBudget>,
158        /// Cumulative token usage for this run up to this step
159        #[serde(default, skip_serializing_if = "Option::is_none")]
160        usage: Option<RunUsage>,
161    },
162
163    // Reflection events (emitted when is_reflection_enabled() and reflection runs)
164    ReflectStarted {},
165    ReflectFinished {
166        should_retry: bool,
167        #[serde(default, skip_serializing_if = "Option::is_none")]
168        reason: Option<String>,
169    },
170
171    // Tool execution events
172    ToolExecutionStart {
173        step_id: String,
174        tool_call_id: String,
175        tool_call_name: String,
176        input: Value,
177    },
178    ToolExecutionEnd {
179        step_id: String,
180        tool_call_id: String,
181        tool_call_name: String,
182        success: bool,
183    },
184
185    // Message events for streaming
186    TextMessageStart {
187        message_id: String,
188        step_id: String,
189        role: MessageRole,
190        is_final: Option<bool>,
191    },
192    TextMessageContent {
193        message_id: String,
194        step_id: String,
195        delta: String,
196        stripped_content: Option<Vec<(usize, String)>>,
197    },
198    TextMessageEnd {
199        message_id: String,
200        step_id: String,
201    },
202
203    // Tool call events with parent/child relationships
204    ToolCalls {
205        step_id: String,
206        parent_message_id: Option<String>,
207        tool_calls: Vec<ToolCall>,
208    },
209    ToolResults {
210        step_id: String,
211        parent_message_id: Option<String>,
212        results: Vec<ToolResponse>,
213    },
214
215    // Agent transfer events
216    AgentHandover {
217        from_agent: String,
218        to_agent: String,
219        reason: Option<String>,
220    },
221
222    /// A live, embeddable view produced by a tool (e.g. browsr viewer, Grafana
223    /// dashboard, map widget). The channel renders it inline as an iframe
224    /// (web) or as a clickable link (Telegram, WhatsApp, CLI).
225    LiveView {
226        /// Unique ID for this view — used for updates and teardown
227        view_id: String,
228        /// URL to embed or link (must be https:// for iframe security)
229        url: String,
230        /// Human-readable title for the view
231        #[serde(default, skip_serializing_if = "Option::is_none")]
232        title: Option<String>,
233        /// Display mode hint: "inline", "fullscreen", or "pip"
234        #[serde(default, skip_serializing_if = "Option::is_none")]
235        display_mode: Option<String>,
236        /// Width hint in pixels
237        #[serde(default, skip_serializing_if = "Option::is_none")]
238        width: Option<u32>,
239        /// Height hint in pixels
240        #[serde(default, skip_serializing_if = "Option::is_none")]
241        height: Option<u32>,
242    },
243
244    BrowserSessionStarted {
245        session_id: String,
246        viewer_url: Option<String>,
247        stream_url: Option<String>,
248    },
249
250    InlineHookRequested {
251        request: InlineHookRequest,
252    },
253
254    // TODO events
255    TodosUpdated {
256        formatted_todos: String,
257        action: String,
258        todo_count: usize,
259        /// Per-call diff: which items got added, had their status
260        /// changed, or were removed. Empty when the renderer can't
261        /// or didn't compute it (e.g. the very first `write_todos`
262        /// call has no prior list to diff against — every item is
263        /// `Added`). Renderers should prefer rendering this list
264        /// when non-empty and fall back to `formatted_todos`.
265        #[serde(default, skip_serializing_if = "Vec::is_empty")]
266        changes: Vec<crate::todos::TodoChange>,
267    },
268
269    // Context management events
270    ContextCompaction {
271        tier: CompactionTier,
272        tokens_before: usize,
273        tokens_after: usize,
274        entries_affected: usize,
275        context_limit: usize,
276        usage_ratio: f64,
277        summary: Option<String>,
278        /// Skill IDs re-injected after compaction
279        #[serde(default, skip_serializing_if = "Vec::is_empty")]
280        reinjected_skills: Vec<String>,
281        #[serde(default, skip_serializing_if = "Option::is_none")]
282        context_budget: Option<ContextBudget>,
283    },
284
285    /// Emitted each turn with the current context budget breakdown.
286    ContextBudgetUpdate {
287        budget: ContextBudget,
288        is_warning: bool,
289        is_critical: bool,
290    },
291
292    /// A structured channel reply emitted by a workflow `StepKind::Reply`
293    /// step. The gateway renders it per channel; non-channel consumers
294    /// (CLI, web) render `reply.text` and ignore buttons they can't show.
295    ChannelReply {
296        reply: crate::channel_commands::ChannelReply,
297    },
298}
299
300/// Tier of context compaction applied
301#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(rename_all = "snake_case")]
303pub enum CompactionTier {
304    /// Mechanical: drop old entries, truncate payloads
305    Trim,
306    /// Semantic: LLM-powered summarization of history
307    Summarize,
308    /// Emergency: preserve only essentials
309    Reset,
310}
311
312#[cfg(test)]
313mod channel_reply_event_tests {
314    use super::*;
315    use crate::channel_commands::{ChannelButton, ChannelReply};
316
317    #[test]
318    fn channel_reply_event_round_trips() {
319        let ev = AgentEventType::ChannelReply {
320            reply: ChannelReply {
321                text: "Tap to continue:".into(),
322                buttons: vec![vec![ChannelButton::WebApp {
323                    label: "Continue".into(),
324                    url: "https://a.app/lesson/1".into(),
325                }]],
326            },
327        };
328        let v = serde_json::to_value(&ev).unwrap();
329        let back: AgentEventType = serde_json::from_value(v).unwrap();
330        assert!(matches!(back, AgentEventType::ChannelReply { .. }));
331    }
332
333    #[test]
334    fn channel_reply_envelope_round_trips() {
335        let ev = AgentEventType::ChannelReply {
336            reply: ChannelReply {
337                text: "Tap to continue:".into(),
338                buttons: vec![vec![ChannelButton::Callback {
339                    label: "Continue".into(),
340                    callback_data: "wf:open:x".into(),
341                }]],
342            },
343        };
344        let agent_event = AgentEvent::new(ev);
345        let envelope = AgentEventEnvelope::from_event(&agent_event);
346        let v = serde_json::to_value(&envelope).unwrap();
347        let back: AgentEventEnvelope = serde_json::from_value(v).expect("envelope deserialize");
348        assert!(matches!(back.event, AgentEventType::ChannelReply { .. }));
349    }
350}
351
352impl AgentEvent {
353    pub fn new(event: AgentEventType) -> Self {
354        Self {
355            timestamp: chrono::Utc::now(),
356            thread_id: uuid::Uuid::new_v4().to_string(),
357            run_id: uuid::Uuid::new_v4().to_string(),
358            event,
359            task_id: uuid::Uuid::new_v4().to_string(),
360            parent_task_id: None,
361            agent_id: "default".to_string(),
362            user_id: None,
363            identifier_id: None,
364            workspace_id: None,
365            channel_id: None,
366        }
367    }
368
369    pub fn with_context(
370        event: AgentEventType,
371        thread_id: String,
372        run_id: String,
373        task_id: String,
374        agent_id: String,
375    ) -> Self {
376        Self {
377            timestamp: chrono::Utc::now(),
378            thread_id,
379            run_id,
380            task_id,
381            parent_task_id: None,
382            event,
383            agent_id,
384            user_id: None,
385            identifier_id: None,
386            workspace_id: None,
387            channel_id: None,
388        }
389    }
390}