1use axum::{
7 extract::State,
8 response::sse::{Event, KeepAlive, Sse},
9};
10use futures::stream::Stream;
11use serde::Serialize;
12use std::convert::Infallible;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16use tokio::sync::broadcast;
17use tokio_stream::wrappers::BroadcastStream;
18use tokio_stream::StreamExt;
19
20#[derive(Clone)]
26pub struct EventsState {
27 pub sender: Arc<broadcast::Sender<SseEvent>>,
29 pub project_path: PathBuf,
31 pub indexed_at: Arc<std::sync::atomic::AtomicU64>,
33 pub daemon_connected: Arc<std::sync::atomic::AtomicBool>,
35}
36
37impl EventsState {
38 pub fn new(project_path: PathBuf) -> Self {
39 let (sender, _) = broadcast::channel(256);
40 let now = SystemTime::now()
41 .duration_since(UNIX_EPOCH)
42 .unwrap_or_default()
43 .as_secs();
44
45 Self {
46 sender: Arc::new(sender),
47 project_path,
48 indexed_at: Arc::new(std::sync::atomic::AtomicU64::new(now)),
49 daemon_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
50 }
51 }
52
53 pub fn broadcast(&self, event: SseEvent) -> usize {
55 self.sender.send(event).unwrap_or(0)
56 }
57
58 pub fn update_indexed_at(&self) {
60 let now = SystemTime::now()
61 .duration_since(UNIX_EPOCH)
62 .unwrap_or_default()
63 .as_secs();
64 self.indexed_at
65 .store(now, std::sync::atomic::Ordering::SeqCst);
66 }
67
68 pub fn get_indexed_at(&self) -> u64 {
70 self.indexed_at.load(std::sync::atomic::Ordering::SeqCst)
71 }
72
73 pub fn set_daemon_connected(&self, connected: bool) {
75 self.daemon_connected
76 .store(connected, std::sync::atomic::Ordering::SeqCst);
77 }
78
79 pub fn is_daemon_connected(&self) -> bool {
81 self.daemon_connected
82 .load(std::sync::atomic::Ordering::SeqCst)
83 }
84}
85
86#[derive(Debug, Clone, Serialize)]
88#[serde(tag = "event", content = "data")]
89pub enum SseEvent {
90 Connected {
92 daemon: bool,
93 indexed_at: u64,
94 project: String,
95 },
96 ReindexStart { files: usize, reason: String },
98 ReindexProgress { processed: usize, total: usize },
100 ReindexComplete {
102 files: usize,
103 symbols: usize,
104 dead: usize,
105 duration_ms: f64,
106 },
107 FileChanged { path: String, action: String },
109}
110
111pub async fn api_events(
117 State(state): State<EventsState>,
118) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
119 let rx = state.sender.subscribe();
120 let project_path = state.project_path.clone();
121 let indexed_at = state.get_indexed_at();
122 let daemon_connected = state.is_daemon_connected();
123
124 let event_stream = BroadcastStream::new(rx).filter_map(|result| {
126 match result {
127 Ok(event) => {
128 let event_name = match &event {
129 SseEvent::Connected { .. } => "connected",
130 SseEvent::ReindexStart { .. } => "reindex-start",
131 SseEvent::ReindexProgress { .. } => "reindex-progress",
132 SseEvent::ReindexComplete { .. } => "reindex-complete",
133 SseEvent::FileChanged { .. } => "file-changed",
134 };
135
136 let data = serde_json::to_string(&event).unwrap_or_default();
137 Some(Ok::<_, Infallible>(
138 Event::default().event(event_name).data(data),
139 ))
140 }
141 Err(_) => None, }
143 });
144
145 let project_name = project_path
147 .file_name()
148 .map(|s| s.to_string_lossy().to_string())
149 .unwrap_or_else(|| "unknown".to_string());
150
151 let initial_event = SseEvent::Connected {
152 daemon: daemon_connected,
153 indexed_at,
154 project: project_name,
155 };
156
157 let initial_data = serde_json::to_string(&initial_event).unwrap_or_default();
158 let initial =
159 futures::stream::once(
160 async move { Ok(Event::default().event("connected").data(initial_data)) },
161 );
162
163 let combined_stream = initial.chain(event_stream);
165
166 Sse::new(combined_stream).keep_alive(
167 KeepAlive::new()
168 .interval(Duration::from_secs(15))
169 .text("ping"),
170 )
171}
172
173pub async fn start_daemon_event_forwarder(state: EventsState) {
179 loop {
180 let result = connect_to_daemon().await;
182
183 match result {
184 Ok(stream) => {
185 state.set_daemon_connected(true);
186 state.broadcast(SseEvent::Connected {
187 daemon: true,
188 indexed_at: state.get_indexed_at(),
189 project: state
190 .project_path
191 .file_name()
192 .map(|s| s.to_string_lossy().to_string())
193 .unwrap_or_default(),
194 });
195
196 tracing::info!("Connected to daemon for event streaming");
197
198 if let Err(e) = handle_daemon_connection(stream, state.clone()).await {
200 tracing::warn!("Daemon connection error: {}", e);
201 }
202 }
203 Err(e) => {
204 tracing::debug!("Could not connect to daemon: {}", e);
205 }
206 }
207
208 state.set_daemon_connected(false);
210
211 tokio::time::sleep(Duration::from_secs(5)).await;
213 }
214}
215
216#[cfg(unix)]
218async fn connect_to_daemon() -> std::io::Result<tokio::net::UnixStream> {
219 use crate::core::config::Config;
220
221 let socket_path = Config::socket_path()
222 .map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e.to_string()))?;
223
224 if !socket_path.exists() {
225 return Err(std::io::Error::new(
226 std::io::ErrorKind::NotFound,
227 "Daemon socket not found",
228 ));
229 }
230
231 tokio::net::UnixStream::connect(&socket_path).await
232}
233
234#[cfg(windows)]
236async fn connect_to_daemon() -> std::io::Result<tokio::net::TcpStream> {
237 use crate::core::config::Config;
238
239 let port_path = Config::port_path()
240 .map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e.to_string()))?;
241
242 if !port_path.exists() {
243 return Err(std::io::Error::new(
244 std::io::ErrorKind::NotFound,
245 "Daemon port file not found",
246 ));
247 }
248
249 let port_str = std::fs::read_to_string(&port_path)?;
250 let port: u16 = port_str
251 .trim()
252 .parse()
253 .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid port number"))?;
254
255 let addr = format!("127.0.0.1:{}", port);
256 tokio::net::TcpStream::connect(&addr).await
257}
258
259#[cfg(unix)]
261async fn handle_daemon_connection(
262 stream: tokio::net::UnixStream,
263 state: EventsState,
264) -> std::io::Result<()> {
265 handle_daemon_stream(stream, state).await
266}
267
268#[cfg(windows)]
269async fn handle_daemon_connection(
270 stream: tokio::net::TcpStream,
271 state: EventsState,
272) -> std::io::Result<()> {
273 handle_daemon_stream(stream, state).await
274}
275
276async fn handle_daemon_stream<S>(stream: S, state: EventsState) -> std::io::Result<()>
278where
279 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
280{
281 use crate::daemon::events::DaemonEvent;
282 use crate::daemon::protocol::{Method, Request, Response, ResponseResult};
283 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
284
285 let (reader, mut writer) = tokio::io::split(stream);
286 let mut reader = BufReader::new(reader);
287
288 let subscribe_request = Request {
290 id: "web-events".to_string(),
291 method: Method::Subscribe,
292 };
293 let json = serde_json::to_string(&subscribe_request)
294 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?
295 + "\n";
296 writer.write_all(json.as_bytes()).await?;
297
298 let mut line = String::new();
300 while reader.read_line(&mut line).await? > 0 {
301 if let Ok(response) = serde_json::from_str::<Response>(&line) {
302 match response.result {
303 ResponseResult::Subscribed => {
304 tracing::debug!("Subscribed to daemon events");
305 }
306 ResponseResult::Event(daemon_event) => {
307 let sse_event = match daemon_event {
309 DaemonEvent::FileChanged { path, action, .. } => {
310 Some(SseEvent::FileChanged {
311 path,
312 action: match action {
313 crate::daemon::events::FileAction::Created => "created",
314 crate::daemon::events::FileAction::Modified => "modified",
315 crate::daemon::events::FileAction::Deleted => "deleted",
316 }
317 .to_string(),
318 })
319 }
320 DaemonEvent::ReindexStart { files, reason, .. } => {
321 Some(SseEvent::ReindexStart { files, reason })
322 }
323 DaemonEvent::ReindexProgress {
324 processed, total, ..
325 } => Some(SseEvent::ReindexProgress { processed, total }),
326 DaemonEvent::ReindexComplete {
327 files,
328 symbols,
329 dead,
330 duration_ms,
331 ..
332 } => {
333 state.update_indexed_at();
335 Some(SseEvent::ReindexComplete {
336 files,
337 symbols,
338 dead,
339 duration_ms,
340 })
341 }
342 DaemonEvent::StatusUpdate { .. } => None,
343 };
344
345 if let Some(event) = sse_event {
346 state.broadcast(event);
347 }
348 }
349 ResponseResult::Error { message } => {
350 tracing::warn!("Daemon error: {}", message);
351 }
352 _ => {}
353 }
354 }
355 line.clear();
356 }
357
358 Ok(())
359}