Skip to main content

cinch_web/
broadcast.rs

1//! [`EventHandler`] that converts harness events into WebSocket messages.
2//!
3//! [`WebBroadcastHandler`] intercepts [`HarnessEvent`] variants and serializes
4//! them into [`WsMessage`] values, broadcasting to all connected WebSocket
5//! clients via a `tokio::sync::broadcast` channel.
6
7use std::sync::{Arc, Mutex};
8
9use cinch_rs::agent::events::{EventHandler, EventResponse, HarnessEvent};
10use cinch_rs::ui::UiState;
11use serde::Serialize;
12use tokio::sync::broadcast;
13
14use crate::ext::WebExtensionRenderer;
15
16/// Maximum tool result size sent over WebSocket (8 KB).
17/// Full results remain in `UiState` and can be fetched via `/api/state`.
18const MAX_WS_TOOL_RESULT_BYTES: usize = 8 * 1024;
19
20/// A message sent from the server to WebSocket clients.
21///
22/// Discriminated on the `type` field when serialized to JSON.
23#[derive(Clone, Debug, Serialize)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum WsMessage {
26    /// Full state snapshot (sent on initial connect and after reconnect).
27    Snapshot { data: serde_json::Value },
28    /// Complete LLM text block.
29    Text { text: String },
30    /// Streaming token delta.
31    TextDelta { delta: String },
32    /// A tool is about to execute.
33    ToolExecuting { name: String, arguments: String },
34    /// A tool finished executing.
35    ToolResult {
36        name: String,
37        result: String,
38        is_error: bool,
39    },
40    /// LLM reasoning / extended thinking content.
41    Reasoning { text: String },
42    /// Streaming reasoning delta.
43    ReasoningDelta { delta: String },
44    /// Round progress update.
45    Round {
46        round: u32,
47        max_rounds: u32,
48        context_pct: f64,
49    },
50    /// Phase change.
51    Phase { phase: String },
52    /// A question has been presented to the user.
53    Question {
54        question: cinch_rs::ui::UserQuestion,
55    },
56    /// The active question has been resolved.
57    QuestionDismissed,
58    /// The agent finished (no more tool calls).
59    Finished,
60    /// A log line captured from tracing.
61    Log { line: cinch_rs::ui::LogLine },
62    /// Domain-specific extension state update.
63    Extension { data: serde_json::Value },
64    /// A user message sent from the chat UI.
65    UserMessage { message: String },
66    /// Token usage for the current round.
67    TokenUsage {
68        prompt_tokens: u32,
69        completion_tokens: u32,
70    },
71    /// The agent received tool calls this round.
72    ToolCallsReceived { round: u32, count: usize },
73    /// A tool result was served from cache.
74    ToolCacheHit { name: String, arguments: String },
75    /// Context eviction freed memory.
76    Eviction {
77        freed_chars: usize,
78        evicted_count: usize,
79    },
80    /// Context compaction completed.
81    Compaction { compaction_number: usize },
82    /// Model routing selected a different model.
83    ModelRouted { model: String, round: u32 },
84    /// Checkpoint saved after a round.
85    CheckpointSaved { round: u32, path: String },
86    /// Resumed from a checkpoint.
87    CheckpointResumed { round: u32 },
88    /// The API returned an empty response and is retrying.
89    EmptyResponse {
90        round: u32,
91        attempt: u32,
92        max_retries: u32,
93    },
94    /// A tool execution requires human approval.
95    ApprovalRequired { name: String, arguments: String },
96    /// Consolidated todo list update (replaces previous state in the client).
97    TodoUpdate { content: String },
98}
99
100/// Event handler that broadcasts harness events to WebSocket clients.
101///
102/// Compose alongside [`UiEventHandler`](cinch_rs::ui::event_handler::UiEventHandler)
103/// in a [`CompositeEventHandler`](cinch_rs::agent::CompositeEventHandler):
104///
105/// ```ignore
106/// let handler = CompositeEventHandler::new()
107///     .with(UiEventHandler::new(ui_state.clone()))
108///     .with(WebBroadcastHandler::new(ws_sender, ext_renderer, ui_state.clone()));
109/// ```
110pub struct WebBroadcastHandler {
111    sender: broadcast::Sender<WsMessage>,
112    extension_renderer: Arc<dyn WebExtensionRenderer>,
113    ui_state: Arc<Mutex<UiState>>,
114}
115
116impl WebBroadcastHandler {
117    /// Create a new broadcast handler.
118    pub fn new(
119        sender: broadcast::Sender<WsMessage>,
120        extension_renderer: Arc<dyn WebExtensionRenderer>,
121        ui_state: Arc<Mutex<UiState>>,
122    ) -> Self {
123        Self {
124            sender,
125            extension_renderer,
126            ui_state,
127        }
128    }
129
130    /// Broadcast a message to all connected clients.
131    ///
132    /// Silently ignores send errors (no subscribers is fine).
133    fn broadcast(&self, msg: WsMessage) {
134        let _ = self.sender.send(msg);
135    }
136
137    /// Broadcast the extension state if the renderer provides it.
138    fn broadcast_extension(&self) {
139        if let Ok(s) = self.ui_state.lock()
140            && let Some(data) = self.extension_renderer.to_ws_json(&*s.extensions)
141        {
142            self.broadcast(WsMessage::Extension { data });
143        }
144    }
145}
146
147impl EventHandler for WebBroadcastHandler {
148    fn on_event(&self, event: &HarnessEvent<'_>) -> Option<EventResponse> {
149        match event {
150            HarnessEvent::RoundStart {
151                round,
152                max_rounds,
153                context_usage,
154                ..
155            } => {
156                self.broadcast(WsMessage::Round {
157                    round: *round,
158                    max_rounds: *max_rounds,
159                    context_pct: context_usage.usage_pct,
160                });
161            }
162            HarnessEvent::Text(text) => {
163                self.broadcast(WsMessage::Text {
164                    text: text.to_string(),
165                });
166            }
167            HarnessEvent::TextDelta(delta) => {
168                self.broadcast(WsMessage::TextDelta {
169                    delta: delta.to_string(),
170                });
171            }
172            HarnessEvent::ToolCallsReceived { round, count } => {
173                self.broadcast(WsMessage::ToolCallsReceived {
174                    round: *round,
175                    count: *count,
176                });
177            }
178            HarnessEvent::ToolExecuting {
179                name, arguments, ..
180            } => {
181                // The todo tool updates in-place; skip ToolExecuting so the
182                // client shows only the consolidated checklist.
183                if *name != "todo" {
184                    self.broadcast(WsMessage::Phase {
185                        phase: format!("Tool: {name}"),
186                    });
187                    self.broadcast(WsMessage::ToolExecuting {
188                        name: name.to_string(),
189                        arguments: arguments.to_string(),
190                    });
191                }
192            }
193            HarnessEvent::ToolResult { name, result, .. } => {
194                if *name == "todo" {
195                    self.broadcast(WsMessage::TodoUpdate {
196                        content: result.to_string(),
197                    });
198                } else {
199                    let is_error = result.starts_with("Error") || result.starts_with("error:");
200                    // Truncate large results for WebSocket transport.
201                    #[allow(clippy::string_slice)] // end from floor_char_boundary
202                    let truncated = if result.len() > MAX_WS_TOOL_RESULT_BYTES {
203                        let end = result.floor_char_boundary(MAX_WS_TOOL_RESULT_BYTES);
204                        let cut = &result[..end];
205                        format!(
206                            "{cut}\n... (truncated, {total} bytes total)",
207                            total = result.len()
208                        )
209                    } else {
210                        result.to_string()
211                    };
212                    self.broadcast(WsMessage::ToolResult {
213                        name: name.to_string(),
214                        result: truncated,
215                        is_error,
216                    });
217                }
218                // Tool results may change domain state (e.g. tweet drafted count).
219                self.broadcast_extension();
220            }
221            HarnessEvent::TokenUsage {
222                prompt_tokens,
223                completion_tokens,
224            } => {
225                self.broadcast(WsMessage::TokenUsage {
226                    prompt_tokens: *prompt_tokens,
227                    completion_tokens: *completion_tokens,
228                });
229            }
230            HarnessEvent::Reasoning(text) => {
231                self.broadcast(WsMessage::Reasoning {
232                    text: text.to_string(),
233                });
234            }
235            HarnessEvent::ReasoningDelta(delta) => {
236                self.broadcast(WsMessage::ReasoningDelta {
237                    delta: delta.to_string(),
238                });
239            }
240            HarnessEvent::Finished => {
241                self.broadcast(WsMessage::Finished);
242            }
243            HarnessEvent::EmptyResponse {
244                round,
245                attempt,
246                max_retries,
247            } => {
248                self.broadcast(WsMessage::EmptyResponse {
249                    round: *round,
250                    attempt: *attempt,
251                    max_retries: *max_retries,
252                });
253            }
254            HarnessEvent::RoundLimitReached { .. } => {
255                self.broadcast(WsMessage::Phase {
256                    phase: "Round limit reached".to_string(),
257                });
258                self.broadcast(WsMessage::Finished);
259            }
260            HarnessEvent::Eviction {
261                freed_chars,
262                evicted_count,
263            } => {
264                self.broadcast(WsMessage::Eviction {
265                    freed_chars: *freed_chars,
266                    evicted_count: *evicted_count,
267                });
268            }
269            HarnessEvent::Compaction { compaction_number } => {
270                self.broadcast(WsMessage::Compaction {
271                    compaction_number: *compaction_number,
272                });
273            }
274            HarnessEvent::PreCompaction => {
275                // No WebSocket message needed for pre-compaction events.
276            }
277            HarnessEvent::ModelRouted { model, round } => {
278                self.broadcast(WsMessage::ModelRouted {
279                    model: model.to_string(),
280                    round: *round,
281                });
282            }
283            HarnessEvent::CheckpointSaved { round, path } => {
284                self.broadcast(WsMessage::CheckpointSaved {
285                    round: *round,
286                    path: path.to_string(),
287                });
288            }
289            HarnessEvent::CheckpointResumed { round } => {
290                self.broadcast(WsMessage::CheckpointResumed { round: *round });
291            }
292            HarnessEvent::ToolCacheHit { name, arguments } => {
293                self.broadcast(WsMessage::ToolCacheHit {
294                    name: name.to_string(),
295                    arguments: arguments.to_string(),
296                });
297            }
298            HarnessEvent::ApprovalRequired { name, arguments } => {
299                self.broadcast(WsMessage::ApprovalRequired {
300                    name: name.to_string(),
301                    arguments: arguments.to_string(),
302                });
303            }
304            HarnessEvent::PhaseTransition { from, to } => {
305                self.broadcast(WsMessage::Phase {
306                    phase: format!("{from:?} → {to:?}"),
307                });
308            }
309            HarnessEvent::PlanSubmitted { summary } => {
310                self.broadcast(WsMessage::Text {
311                    text: format!("[plan] {summary}"),
312                });
313            }
314            HarnessEvent::MemoryConsolidated {
315                lines_before,
316                lines_after,
317            } => {
318                self.broadcast(WsMessage::Phase {
319                    phase: format!("Memory consolidated: {lines_before} → {lines_after} lines"),
320                });
321            }
322            HarnessEvent::ToolDefinitionsBudgeted {
323                original_tokens,
324                trimmed_tokens,
325                truncated_count,
326            } => {
327                self.broadcast(WsMessage::Phase {
328                    phase: format!(
329                        "Tool definitions budgeted: {original_tokens} → {trimmed_tokens} tokens \
330                         ({truncated_count} truncated)"
331                    ),
332                });
333            }
334            HarnessEvent::SessionStarting { .. } | HarnessEvent::SessionFinishing { .. } => {
335                // Session lifecycle events are handled by hooks, not WebSocket.
336            }
337        }
338        None // Never controls flow.
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[test]
347    fn ws_message_serializes_with_type_tag() {
348        let msg = WsMessage::TextDelta {
349            delta: "hello".into(),
350        };
351        let json = serde_json::to_value(&msg).unwrap();
352        assert_eq!(json["type"], "text_delta");
353        assert_eq!(json["delta"], "hello");
354    }
355
356    #[test]
357    fn ws_message_round_serializes() {
358        let msg = WsMessage::Round {
359            round: 3,
360            max_rounds: 30,
361            context_pct: 0.42,
362        };
363        let json = serde_json::to_value(&msg).unwrap();
364        assert_eq!(json["type"], "round");
365        assert_eq!(json["round"], 3);
366        assert_eq!(json["max_rounds"], 30);
367    }
368
369    #[test]
370    fn ws_message_tool_result_serializes() {
371        let msg = WsMessage::ToolResult {
372            name: "read_file".into(),
373            result: "contents".into(),
374            is_error: false,
375        };
376        let json = serde_json::to_value(&msg).unwrap();
377        assert_eq!(json["type"], "tool_result");
378        assert_eq!(json["is_error"], false);
379    }
380
381    #[test]
382    fn ws_message_user_message_serializes() {
383        let msg = WsMessage::UserMessage {
384            message: "hello agent".into(),
385        };
386        let json = serde_json::to_value(&msg).unwrap();
387        assert_eq!(json["type"], "user_message");
388        assert_eq!(json["message"], "hello agent");
389    }
390
391    #[test]
392    fn ws_message_token_usage_serializes() {
393        let msg = WsMessage::TokenUsage {
394            prompt_tokens: 100,
395            completion_tokens: 50,
396        };
397        let json = serde_json::to_value(&msg).unwrap();
398        assert_eq!(json["type"], "token_usage");
399        assert_eq!(json["prompt_tokens"], 100);
400        assert_eq!(json["completion_tokens"], 50);
401    }
402
403    #[test]
404    fn ws_message_reasoning_delta_serializes() {
405        let msg = WsMessage::ReasoningDelta {
406            delta: "thinking...".into(),
407        };
408        let json = serde_json::to_value(&msg).unwrap();
409        assert_eq!(json["type"], "reasoning_delta");
410        assert_eq!(json["delta"], "thinking...");
411    }
412
413    #[test]
414    fn broadcast_handler_creation() {
415        let (sender, _) = broadcast::channel(16);
416        let state = Arc::new(Mutex::new(UiState::default()));
417        let ext: Arc<dyn WebExtensionRenderer> = Arc::new(crate::ext::NoWebExtension);
418        let handler = WebBroadcastHandler::new(sender, ext, state);
419
420        // Verify it implements EventHandler by calling on_event.
421        let result = handler.on_event(&HarnessEvent::Finished);
422        assert!(result.is_none());
423    }
424}