1use super::AppState;
6use axum::{
7 extract::State,
8 http::{HeaderMap, StatusCode, header},
9 response::{
10 IntoResponse,
11 sse::{Event, KeepAlive, Sse},
12 },
13};
14use std::convert::Infallible;
15use std::time::Duration;
16use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
17use tokio_stream::StreamExt;
18use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
19
20pub async fn handle_sse_events(
22 State(state): State<AppState>,
23 headers: HeaderMap,
24) -> impl IntoResponse {
25 if state.pairing.require_pairing() {
27 let token = headers
28 .get(header::AUTHORIZATION)
29 .and_then(|v| v.to_str().ok())
30 .and_then(|auth| auth.strip_prefix("Bearer "))
31 .unwrap_or("");
32
33 if !state.pairing.is_authenticated(token) {
34 return (
35 StatusCode::UNAUTHORIZED,
36 "Unauthorized — provide Authorization: Bearer <token>",
37 )
38 .into_response();
39 }
40 }
41
42 let rx = state.event_tx.subscribe();
43 let stream = BroadcastStream::new(rx).filter_map(
44 |result: Result<
45 serde_json::Value,
46 tokio_stream::wrappers::errors::BroadcastStreamRecvError,
47 >| {
48 match result {
49 Ok(value) => Some(Ok::<_, Infallible>(
50 Event::default().data(value.to_string()),
51 )),
52 Err(_) => None, }
54 },
55 );
56
57 Sse::new(stream)
58 .keep_alive(KeepAlive::default())
59 .into_response()
60}
61
62pub async fn handle_api_daemon_logs(
67 State(state): State<AppState>,
68 headers: HeaderMap,
69) -> impl IntoResponse {
70 if state.pairing.require_pairing() {
71 let token = headers
72 .get(header::AUTHORIZATION)
73 .and_then(|v| v.to_str().ok())
74 .and_then(|auth| auth.strip_prefix("Bearer "))
75 .unwrap_or("");
76
77 if !state.pairing.is_authenticated(token) {
78 return (
79 StatusCode::UNAUTHORIZED,
80 "Unauthorized — provide Authorization: Bearer <token>",
81 )
82 .into_response();
83 }
84 }
85
86 let log_path = {
87 let cfg = state.config.lock();
88 cfg.config_path
89 .parent()
90 .map(|dir| dir.join("logs").join("daemon.stderr.log"))
91 };
92
93 let Some(log_path) = log_path else {
94 return (
95 StatusCode::INTERNAL_SERVER_ERROR,
96 "Unable to resolve daemon log path",
97 )
98 .into_response();
99 };
100
101 let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(256);
102
103 tokio::spawn(async move {
104 const TAIL_BYTES: u64 = 64 * 1024; let mut start_pos: u64 = match tokio::fs::File::open(&log_path).await {
108 Ok(mut file) => {
109 let size = file.metadata().await.map(|m| m.len()).unwrap_or(0);
110 let seek_to = size.saturating_sub(TAIL_BYTES);
111 if file.seek(SeekFrom::Start(seek_to)).await.is_ok() {
112 let mut reader = BufReader::new(file);
113 if seek_to > 0 {
115 let mut discard = String::new();
116 let _ = reader.read_line(&mut discard).await;
117 }
118 let mut line = String::new();
119 loop {
120 line.clear();
121 match reader.read_line(&mut line).await {
122 Ok(0) => break,
123 Ok(_) => {
124 let trimmed = line.trim_end_matches(['\n', '\r']).to_string();
125 if trimmed.is_empty() {
126 continue;
127 }
128 let payload = serde_json::json!({
129 "type": "log",
130 "line": trimmed,
131 "timestamp": chrono::Utc::now().to_rfc3339(),
132 });
133 let event = Event::default().data(payload.to_string());
134 if tx.send(Ok(event)).await.is_err() {
135 return;
136 }
137 }
138 Err(_) => break,
139 }
140 }
141 }
142 size
143 }
144 Err(_) => {
145 let payload = serde_json::json!({
146 "type": "log_unavailable",
147 "line": format!("daemon log not readable at {}", log_path.display()),
148 "timestamp": chrono::Utc::now().to_rfc3339(),
149 });
150 let _ = tx
151 .send(Ok(Event::default().data(payload.to_string())))
152 .await;
153 0
154 }
155 };
156
157 let mut interval = tokio::time::interval(Duration::from_millis(500));
159 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
160 loop {
161 interval.tick().await;
162
163 let meta = match tokio::fs::metadata(&log_path).await {
164 Ok(m) => m,
165 Err(_) => continue, };
167 let size = meta.len();
168
169 if size < start_pos {
170 start_pos = 0;
172 }
173 if size == start_pos {
174 continue;
175 }
176
177 let mut file = match tokio::fs::File::open(&log_path).await {
178 Ok(f) => f,
179 Err(_) => continue,
180 };
181 if file.seek(SeekFrom::Start(start_pos)).await.is_err() {
182 continue;
183 }
184 let mut reader = BufReader::new(file);
185 let mut line = String::new();
186 loop {
187 line.clear();
188 match reader.read_line(&mut line).await {
189 Ok(0) => break,
190 Ok(n) => {
191 start_pos = start_pos.saturating_add(n as u64);
192 let trimmed = line.trim_end_matches(['\n', '\r']).to_string();
193 if trimmed.is_empty() {
194 continue;
195 }
196 let payload = serde_json::json!({
197 "type": "log",
198 "line": trimmed,
199 "timestamp": chrono::Utc::now().to_rfc3339(),
200 });
201 let event = Event::default().data(payload.to_string());
202 if tx.send(Ok(event)).await.is_err() {
203 return;
204 }
205 }
206 Err(_) => break,
207 }
208 }
209 }
210 });
211
212 Sse::new(ReceiverStream::new(rx))
213 .keep_alive(KeepAlive::default())
214 .into_response()
215}
216
217pub struct BroadcastObserver {
219 inner: Box<dyn crate::observability::Observer>,
220 tx: tokio::sync::broadcast::Sender<serde_json::Value>,
221}
222
223impl BroadcastObserver {
224 pub fn new(
225 inner: Box<dyn crate::observability::Observer>,
226 tx: tokio::sync::broadcast::Sender<serde_json::Value>,
227 ) -> Self {
228 Self { inner, tx }
229 }
230
231 pub fn inner(&self) -> &dyn crate::observability::Observer {
232 self.inner.as_ref()
233 }
234}
235
236impl crate::observability::Observer for BroadcastObserver {
237 fn record_event(&self, event: &crate::observability::ObserverEvent) {
238 self.inner.record_event(event);
240
241 let json = match event {
243 crate::observability::ObserverEvent::LlmRequest {
244 provider, model, ..
245 } => serde_json::json!({
246 "type": "llm_request",
247 "provider": provider,
248 "model": model,
249 "timestamp": chrono::Utc::now().to_rfc3339(),
250 }),
251 crate::observability::ObserverEvent::ToolCall {
252 tool,
253 duration,
254 success,
255 } => serde_json::json!({
256 "type": "tool_call",
257 "tool": tool,
258 "duration_ms": duration.as_millis(),
259 "success": success,
260 "timestamp": chrono::Utc::now().to_rfc3339(),
261 }),
262 crate::observability::ObserverEvent::ToolCallStart { tool, .. } => serde_json::json!({
263 "type": "tool_call_start",
264 "tool": tool,
265 "timestamp": chrono::Utc::now().to_rfc3339(),
266 }),
267 crate::observability::ObserverEvent::Error { component, message } => {
268 serde_json::json!({
269 "type": "error",
270 "component": component,
271 "message": message,
272 "timestamp": chrono::Utc::now().to_rfc3339(),
273 })
274 }
275 crate::observability::ObserverEvent::AgentStart { provider, model } => {
276 serde_json::json!({
277 "type": "agent_start",
278 "provider": provider,
279 "model": model,
280 "timestamp": chrono::Utc::now().to_rfc3339(),
281 })
282 }
283 crate::observability::ObserverEvent::AgentEnd {
284 provider,
285 model,
286 duration,
287 tokens_used,
288 cost_usd,
289 } => serde_json::json!({
290 "type": "agent_end",
291 "provider": provider,
292 "model": model,
293 "duration_ms": duration.as_millis(),
294 "tokens_used": tokens_used,
295 "cost_usd": cost_usd,
296 "timestamp": chrono::Utc::now().to_rfc3339(),
297 }),
298 _ => return, };
300
301 let _ = self.tx.send(json);
302 }
303
304 fn record_metric(&self, metric: &crate::observability::traits::ObserverMetric) {
305 self.inner.record_metric(metric);
306 }
307
308 fn flush(&self) {
309 self.inner.flush();
310 }
311
312 fn name(&self) -> &str {
313 "broadcast"
314 }
315
316 fn as_any(&self) -> &dyn std::any::Any {
317 self
318 }
319}