Skip to main content

construct/gateway/
sse.rs

1//! Server-Sent Events (SSE) stream for real-time event delivery.
2//!
3//! Wraps the broadcast channel in AppState to deliver events to web dashboard clients.
4
5use super::AppState;
6use axum::{
7    extract::State,
8    http::{HeaderMap, StatusCode, header},
9    response::{
10        IntoResponse,
11        sse::{Event, KeepAlive, Sse},
12    },
13};
14use std::convert::Infallible;
15use std::time::Duration;
16use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
17use tokio_stream::StreamExt;
18use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
19
20/// GET /api/events — SSE event stream
21pub async fn handle_sse_events(
22    State(state): State<AppState>,
23    headers: HeaderMap,
24) -> impl IntoResponse {
25    // Auth check
26    if state.pairing.require_pairing() {
27        let token = headers
28            .get(header::AUTHORIZATION)
29            .and_then(|v| v.to_str().ok())
30            .and_then(|auth| auth.strip_prefix("Bearer "))
31            .unwrap_or("");
32
33        if !state.pairing.is_authenticated(token) {
34            return (
35                StatusCode::UNAUTHORIZED,
36                "Unauthorized — provide Authorization: Bearer <token>",
37            )
38                .into_response();
39        }
40    }
41
42    let rx = state.event_tx.subscribe();
43    let stream = BroadcastStream::new(rx).filter_map(
44        |result: Result<
45            serde_json::Value,
46            tokio_stream::wrappers::errors::BroadcastStreamRecvError,
47        >| {
48            match result {
49                Ok(value) => Some(Ok::<_, Infallible>(
50                    Event::default().data(value.to_string()),
51                )),
52                Err(_) => None, // Skip lagged messages
53            }
54        },
55    );
56
57    Sse::new(stream)
58        .keep_alive(KeepAlive::default())
59        .into_response()
60}
61
62/// GET /api/daemon/logs — tails the daemon stderr log file and streams lines as SSE events.
63///
64/// Emits an initial burst of the last ~200 lines, then polls every 500ms for appended bytes.
65/// Handles log rotation/truncation by resetting to file start when the file shrinks.
66pub async fn handle_api_daemon_logs(
67    State(state): State<AppState>,
68    headers: HeaderMap,
69) -> impl IntoResponse {
70    if state.pairing.require_pairing() {
71        let token = headers
72            .get(header::AUTHORIZATION)
73            .and_then(|v| v.to_str().ok())
74            .and_then(|auth| auth.strip_prefix("Bearer "))
75            .unwrap_or("");
76
77        if !state.pairing.is_authenticated(token) {
78            return (
79                StatusCode::UNAUTHORIZED,
80                "Unauthorized — provide Authorization: Bearer <token>",
81            )
82                .into_response();
83        }
84    }
85
86    let log_path = {
87        let cfg = state.config.lock();
88        cfg.config_path
89            .parent()
90            .map(|dir| dir.join("logs").join("daemon.stderr.log"))
91    };
92
93    let Some(log_path) = log_path else {
94        return (
95            StatusCode::INTERNAL_SERVER_ERROR,
96            "Unable to resolve daemon log path",
97        )
98            .into_response();
99    };
100
101    let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(256);
102
103    tokio::spawn(async move {
104        const TAIL_BYTES: u64 = 64 * 1024; // initial burst — last ~64KB
105
106        // Initial tail burst.
107        let mut start_pos: u64 = match tokio::fs::File::open(&log_path).await {
108            Ok(mut file) => {
109                let size = file.metadata().await.map(|m| m.len()).unwrap_or(0);
110                let seek_to = size.saturating_sub(TAIL_BYTES);
111                if file.seek(SeekFrom::Start(seek_to)).await.is_ok() {
112                    let mut reader = BufReader::new(file);
113                    // If we seeked into the middle of a line, discard the partial prefix.
114                    if seek_to > 0 {
115                        let mut discard = String::new();
116                        let _ = reader.read_line(&mut discard).await;
117                    }
118                    let mut line = String::new();
119                    loop {
120                        line.clear();
121                        match reader.read_line(&mut line).await {
122                            Ok(0) => break,
123                            Ok(_) => {
124                                let trimmed = line.trim_end_matches(['\n', '\r']).to_string();
125                                if trimmed.is_empty() {
126                                    continue;
127                                }
128                                let payload = serde_json::json!({
129                                    "type": "log",
130                                    "line": trimmed,
131                                    "timestamp": chrono::Utc::now().to_rfc3339(),
132                                });
133                                let event = Event::default().data(payload.to_string());
134                                if tx.send(Ok(event)).await.is_err() {
135                                    return;
136                                }
137                            }
138                            Err(_) => break,
139                        }
140                    }
141                }
142                size
143            }
144            Err(_) => {
145                let payload = serde_json::json!({
146                    "type": "log_unavailable",
147                    "line": format!("daemon log not readable at {}", log_path.display()),
148                    "timestamp": chrono::Utc::now().to_rfc3339(),
149                });
150                let _ = tx
151                    .send(Ok(Event::default().data(payload.to_string())))
152                    .await;
153                0
154            }
155        };
156
157        // Poll loop for appended bytes.
158        let mut interval = tokio::time::interval(Duration::from_millis(500));
159        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
160        loop {
161            interval.tick().await;
162
163            let meta = match tokio::fs::metadata(&log_path).await {
164                Ok(m) => m,
165                Err(_) => continue, // log may not exist yet; keep polling
166            };
167            let size = meta.len();
168
169            if size < start_pos {
170                // Log was rotated or truncated — restart from beginning.
171                start_pos = 0;
172            }
173            if size == start_pos {
174                continue;
175            }
176
177            let mut file = match tokio::fs::File::open(&log_path).await {
178                Ok(f) => f,
179                Err(_) => continue,
180            };
181            if file.seek(SeekFrom::Start(start_pos)).await.is_err() {
182                continue;
183            }
184            let mut reader = BufReader::new(file);
185            let mut line = String::new();
186            loop {
187                line.clear();
188                match reader.read_line(&mut line).await {
189                    Ok(0) => break,
190                    Ok(n) => {
191                        start_pos = start_pos.saturating_add(n as u64);
192                        let trimmed = line.trim_end_matches(['\n', '\r']).to_string();
193                        if trimmed.is_empty() {
194                            continue;
195                        }
196                        let payload = serde_json::json!({
197                            "type": "log",
198                            "line": trimmed,
199                            "timestamp": chrono::Utc::now().to_rfc3339(),
200                        });
201                        let event = Event::default().data(payload.to_string());
202                        if tx.send(Ok(event)).await.is_err() {
203                            return;
204                        }
205                    }
206                    Err(_) => break,
207                }
208            }
209        }
210    });
211
212    Sse::new(ReceiverStream::new(rx))
213        .keep_alive(KeepAlive::default())
214        .into_response()
215}
216
217/// Broadcast observer that forwards events to the SSE broadcast channel.
218pub struct BroadcastObserver {
219    inner: Box<dyn crate::observability::Observer>,
220    tx: tokio::sync::broadcast::Sender<serde_json::Value>,
221}
222
223impl BroadcastObserver {
224    pub fn new(
225        inner: Box<dyn crate::observability::Observer>,
226        tx: tokio::sync::broadcast::Sender<serde_json::Value>,
227    ) -> Self {
228        Self { inner, tx }
229    }
230
231    pub fn inner(&self) -> &dyn crate::observability::Observer {
232        self.inner.as_ref()
233    }
234}
235
236impl crate::observability::Observer for BroadcastObserver {
237    fn record_event(&self, event: &crate::observability::ObserverEvent) {
238        // Forward to inner observer
239        self.inner.record_event(event);
240
241        // Broadcast to SSE subscribers
242        let json = match event {
243            crate::observability::ObserverEvent::LlmRequest {
244                provider, model, ..
245            } => serde_json::json!({
246                "type": "llm_request",
247                "provider": provider,
248                "model": model,
249                "timestamp": chrono::Utc::now().to_rfc3339(),
250            }),
251            crate::observability::ObserverEvent::ToolCall {
252                tool,
253                duration,
254                success,
255            } => serde_json::json!({
256                "type": "tool_call",
257                "tool": tool,
258                "duration_ms": duration.as_millis(),
259                "success": success,
260                "timestamp": chrono::Utc::now().to_rfc3339(),
261            }),
262            crate::observability::ObserverEvent::ToolCallStart { tool, .. } => serde_json::json!({
263                "type": "tool_call_start",
264                "tool": tool,
265                "timestamp": chrono::Utc::now().to_rfc3339(),
266            }),
267            crate::observability::ObserverEvent::Error { component, message } => {
268                serde_json::json!({
269                    "type": "error",
270                    "component": component,
271                    "message": message,
272                    "timestamp": chrono::Utc::now().to_rfc3339(),
273                })
274            }
275            crate::observability::ObserverEvent::AgentStart { provider, model } => {
276                serde_json::json!({
277                    "type": "agent_start",
278                    "provider": provider,
279                    "model": model,
280                    "timestamp": chrono::Utc::now().to_rfc3339(),
281                })
282            }
283            crate::observability::ObserverEvent::AgentEnd {
284                provider,
285                model,
286                duration,
287                tokens_used,
288                cost_usd,
289            } => serde_json::json!({
290                "type": "agent_end",
291                "provider": provider,
292                "model": model,
293                "duration_ms": duration.as_millis(),
294                "tokens_used": tokens_used,
295                "cost_usd": cost_usd,
296                "timestamp": chrono::Utc::now().to_rfc3339(),
297            }),
298            _ => return, // Skip events we don't broadcast
299        };
300
301        let _ = self.tx.send(json);
302    }
303
304    fn record_metric(&self, metric: &crate::observability::traits::ObserverMetric) {
305        self.inner.record_metric(metric);
306    }
307
308    fn flush(&self) {
309        self.inner.flush();
310    }
311
312    fn name(&self) -> &str {
313        "broadcast"
314    }
315
316    fn as_any(&self) -> &dyn std::any::Any {
317        self
318    }
319}