greppy/web/
events.rs

1//! Server-Sent Events (SSE) endpoint for real-time updates
2//!
3//! Connects to the daemon via Unix socket (or TCP on Windows) to receive
4//! events and forwards them to web clients via SSE.
5
6use axum::{
7    extract::State,
8    response::sse::{Event, KeepAlive, Sse},
9};
10use futures::stream::Stream;
11use serde::Serialize;
12use std::convert::Infallible;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16use tokio::sync::broadcast;
17use tokio_stream::wrappers::BroadcastStream;
18use tokio_stream::StreamExt;
19
20// =============================================================================
21// TYPES
22// =============================================================================
23
24/// State for the SSE endpoint
25#[derive(Clone)]
26pub struct EventsState {
27    /// Broadcast sender for SSE events
28    pub sender: Arc<broadcast::Sender<SseEvent>>,
29    /// Project path being monitored
30    pub project_path: PathBuf,
31    /// Timestamp when index was last updated
32    pub indexed_at: Arc<std::sync::atomic::AtomicU64>,
33    /// Whether daemon is connected
34    pub daemon_connected: Arc<std::sync::atomic::AtomicBool>,
35}
36
37impl EventsState {
38    pub fn new(project_path: PathBuf) -> Self {
39        let (sender, _) = broadcast::channel(256);
40        let now = SystemTime::now()
41            .duration_since(UNIX_EPOCH)
42            .unwrap_or_default()
43            .as_secs();
44
45        Self {
46            sender: Arc::new(sender),
47            project_path,
48            indexed_at: Arc::new(std::sync::atomic::AtomicU64::new(now)),
49            daemon_connected: Arc::new(std::sync::atomic::AtomicBool::new(false)),
50        }
51    }
52
53    /// Broadcast an event to all SSE clients
54    pub fn broadcast(&self, event: SseEvent) -> usize {
55        self.sender.send(event).unwrap_or(0)
56    }
57
58    /// Update the indexed_at timestamp to now
59    pub fn update_indexed_at(&self) {
60        let now = SystemTime::now()
61            .duration_since(UNIX_EPOCH)
62            .unwrap_or_default()
63            .as_secs();
64        self.indexed_at
65            .store(now, std::sync::atomic::Ordering::SeqCst);
66    }
67
68    /// Get the indexed_at timestamp
69    pub fn get_indexed_at(&self) -> u64 {
70        self.indexed_at.load(std::sync::atomic::Ordering::SeqCst)
71    }
72
73    /// Set daemon connection status
74    pub fn set_daemon_connected(&self, connected: bool) {
75        self.daemon_connected
76            .store(connected, std::sync::atomic::Ordering::SeqCst);
77    }
78
79    /// Get daemon connection status
80    pub fn is_daemon_connected(&self) -> bool {
81        self.daemon_connected
82            .load(std::sync::atomic::Ordering::SeqCst)
83    }
84}
85
86/// Events sent via SSE to the browser
87#[derive(Debug, Clone, Serialize)]
88#[serde(tag = "event", content = "data")]
89pub enum SseEvent {
90    /// Initial connection event
91    Connected {
92        daemon: bool,
93        indexed_at: u64,
94        project: String,
95    },
96    /// Reindexing has started
97    ReindexStart { files: usize, reason: String },
98    /// Reindex progress update
99    ReindexProgress { processed: usize, total: usize },
100    /// Reindexing completed
101    ReindexComplete {
102        files: usize,
103        symbols: usize,
104        dead: usize,
105        duration_ms: f64,
106    },
107    /// A single file changed
108    FileChanged { path: String, action: String },
109}
110
111// =============================================================================
112// SSE ENDPOINT
113// =============================================================================
114
115/// SSE endpoint handler - `/api/events`
116pub async fn api_events(
117    State(state): State<EventsState>,
118) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
119    let rx = state.sender.subscribe();
120    let project_path = state.project_path.clone();
121    let indexed_at = state.get_indexed_at();
122    let daemon_connected = state.is_daemon_connected();
123
124    // Convert broadcast receiver to a stream, filtering out errors
125    let event_stream = BroadcastStream::new(rx).filter_map(|result| {
126        match result {
127            Ok(event) => {
128                let event_name = match &event {
129                    SseEvent::Connected { .. } => "connected",
130                    SseEvent::ReindexStart { .. } => "reindex-start",
131                    SseEvent::ReindexProgress { .. } => "reindex-progress",
132                    SseEvent::ReindexComplete { .. } => "reindex-complete",
133                    SseEvent::FileChanged { .. } => "file-changed",
134                };
135
136                let data = serde_json::to_string(&event).unwrap_or_default();
137                Some(Ok::<_, Infallible>(
138                    Event::default().event(event_name).data(data),
139                ))
140            }
141            Err(_) => None, // Skip lagged/closed errors
142        }
143    });
144
145    // Create initial connection event
146    let project_name = project_path
147        .file_name()
148        .map(|s| s.to_string_lossy().to_string())
149        .unwrap_or_else(|| "unknown".to_string());
150
151    let initial_event = SseEvent::Connected {
152        daemon: daemon_connected,
153        indexed_at,
154        project: project_name,
155    };
156
157    let initial_data = serde_json::to_string(&initial_event).unwrap_or_default();
158    let initial =
159        futures::stream::once(
160            async move { Ok(Event::default().event("connected").data(initial_data)) },
161        );
162
163    // Combine initial event with the event stream
164    let combined_stream = initial.chain(event_stream);
165
166    Sse::new(combined_stream).keep_alive(
167        KeepAlive::new()
168            .interval(Duration::from_secs(15))
169            .text("ping"),
170    )
171}
172
173// =============================================================================
174// DAEMON CONNECTION
175// =============================================================================
176
177/// Connect to the daemon and forward events to SSE clients
178pub async fn start_daemon_event_forwarder(state: EventsState) {
179    loop {
180        // Try to connect to daemon
181        let result = connect_to_daemon().await;
182
183        match result {
184            Ok(stream) => {
185                state.set_daemon_connected(true);
186                state.broadcast(SseEvent::Connected {
187                    daemon: true,
188                    indexed_at: state.get_indexed_at(),
189                    project: state
190                        .project_path
191                        .file_name()
192                        .map(|s| s.to_string_lossy().to_string())
193                        .unwrap_or_default(),
194                });
195
196                tracing::info!("Connected to daemon for event streaming");
197
198                // Handle the connection
199                if let Err(e) = handle_daemon_connection(stream, state.clone()).await {
200                    tracing::warn!("Daemon connection error: {}", e);
201                }
202            }
203            Err(e) => {
204                tracing::debug!("Could not connect to daemon: {}", e);
205            }
206        }
207
208        // Mark as disconnected
209        state.set_daemon_connected(false);
210
211        // Wait before retrying
212        tokio::time::sleep(Duration::from_secs(5)).await;
213    }
214}
215
216/// Connect to the daemon socket
217#[cfg(unix)]
218async fn connect_to_daemon() -> std::io::Result<tokio::net::UnixStream> {
219    use crate::core::config::Config;
220
221    let socket_path = Config::socket_path()
222        .map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e.to_string()))?;
223
224    if !socket_path.exists() {
225        return Err(std::io::Error::new(
226            std::io::ErrorKind::NotFound,
227            "Daemon socket not found",
228        ));
229    }
230
231    tokio::net::UnixStream::connect(&socket_path).await
232}
233
234/// Connect to the daemon socket (Windows uses TCP)
235#[cfg(windows)]
236async fn connect_to_daemon() -> std::io::Result<tokio::net::TcpStream> {
237    use crate::core::config::Config;
238
239    let port_path = Config::port_path()
240        .map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, e.to_string()))?;
241
242    if !port_path.exists() {
243        return Err(std::io::Error::new(
244            std::io::ErrorKind::NotFound,
245            "Daemon port file not found",
246        ));
247    }
248
249    let port_str = std::fs::read_to_string(&port_path)?;
250    let port: u16 = port_str
251        .trim()
252        .parse()
253        .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid port number"))?;
254
255    let addr = format!("127.0.0.1:{}", port);
256    tokio::net::TcpStream::connect(&addr).await
257}
258
259/// Handle the daemon connection, forwarding events to SSE
260#[cfg(unix)]
261async fn handle_daemon_connection(
262    stream: tokio::net::UnixStream,
263    state: EventsState,
264) -> std::io::Result<()> {
265    handle_daemon_stream(stream, state).await
266}
267
268#[cfg(windows)]
269async fn handle_daemon_connection(
270    stream: tokio::net::TcpStream,
271    state: EventsState,
272) -> std::io::Result<()> {
273    handle_daemon_stream(stream, state).await
274}
275
276/// Generic stream handler for daemon events
277async fn handle_daemon_stream<S>(stream: S, state: EventsState) -> std::io::Result<()>
278where
279    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
280{
281    use crate::daemon::events::DaemonEvent;
282    use crate::daemon::protocol::{Method, Request, Response, ResponseResult};
283    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
284
285    let (reader, mut writer) = tokio::io::split(stream);
286    let mut reader = BufReader::new(reader);
287
288    // Send Subscribe request
289    let subscribe_request = Request {
290        id: "web-events".to_string(),
291        method: Method::Subscribe,
292    };
293    let json = serde_json::to_string(&subscribe_request)
294        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?
295        + "\n";
296    writer.write_all(json.as_bytes()).await?;
297
298    // Read events
299    let mut line = String::new();
300    while reader.read_line(&mut line).await? > 0 {
301        if let Ok(response) = serde_json::from_str::<Response>(&line) {
302            match response.result {
303                ResponseResult::Subscribed => {
304                    tracing::debug!("Subscribed to daemon events");
305                }
306                ResponseResult::Event(daemon_event) => {
307                    // Convert daemon event to SSE event
308                    let sse_event = match daemon_event {
309                        DaemonEvent::FileChanged { path, action, .. } => {
310                            Some(SseEvent::FileChanged {
311                                path,
312                                action: match action {
313                                    crate::daemon::events::FileAction::Created => "created",
314                                    crate::daemon::events::FileAction::Modified => "modified",
315                                    crate::daemon::events::FileAction::Deleted => "deleted",
316                                }
317                                .to_string(),
318                            })
319                        }
320                        DaemonEvent::ReindexStart { files, reason, .. } => {
321                            Some(SseEvent::ReindexStart { files, reason })
322                        }
323                        DaemonEvent::ReindexProgress {
324                            processed, total, ..
325                        } => Some(SseEvent::ReindexProgress { processed, total }),
326                        DaemonEvent::ReindexComplete {
327                            files,
328                            symbols,
329                            dead,
330                            duration_ms,
331                            ..
332                        } => {
333                            // Update indexed_at timestamp
334                            state.update_indexed_at();
335                            Some(SseEvent::ReindexComplete {
336                                files,
337                                symbols,
338                                dead,
339                                duration_ms,
340                            })
341                        }
342                        DaemonEvent::StatusUpdate { .. } => None,
343                    };
344
345                    if let Some(event) = sse_event {
346                        state.broadcast(event);
347                    }
348                }
349                ResponseResult::Error { message } => {
350                    tracing::warn!("Daemon error: {}", message);
351                }
352                _ => {}
353            }
354        }
355        line.clear();
356    }
357
358    Ok(())
359}