arbor_server/
sync_server.rs

1//! Real-time sync server for the Arbor Visualizer.
2//!
3//! This module implements a WebSocket server that acts as the "Source of Truth"
4//! for the visualizer. It broadcasts graph updates whenever the filesystem changes,
5//! keeping the visualization in sync with the codebase.
6//!
7//! "Give Arbor a voice so the visualizer can hear the code breathe."
8
9use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::{accept_async, tungstenite::Message};
22use tracing::{debug, error, info, warn};
23
24// ─────────────────────────────────────────────────────────────────────────────
25// Types
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Configuration for the real-time server.
29#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31    /// Address to bind the WebSocket server.
32    pub addr: SocketAddr,
33    /// Root path to watch for file changes.
34    pub watch_path: PathBuf,
35    /// Debounce duration for file events.
36    pub debounce_ms: u64,
37    /// File extensions to watch.
38    pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42    fn default() -> Self {
43        Self {
44            addr: "127.0.0.1:8080".parse().unwrap(),
45            watch_path: PathBuf::from("."),
46            debounce_ms: 150,
47            extensions: vec![
48                "ts".into(),
49                "tsx".into(),
50                "js".into(),
51                "jsx".into(),
52                "rs".into(),
53                "py".into(),
54            ],
55        }
56    }
57}
58
59/// Server messages broadcast to all connected clients.
60#[derive(Debug, Clone, serde::Serialize)]
61#[serde(tag = "type", content = "payload")]
62pub enum BroadcastMessage {
63    /// Full graph snapshot or delta update.
64    GraphUpdate(GraphUpdatePayload),
65    /// Tell the visualizer to focus on a specific node.
66    FocusNode(FocusNodePayload),
67    /// Indexer progress status.
68    IndexerStatus(IndexerStatusPayload),
69}
70
71#[derive(Debug, Clone, serde::Serialize)]
72pub struct GraphUpdatePayload {
73    /// Whether this is a full snapshot or delta.
74    pub is_delta: bool,
75    /// Number of nodes in the graph.
76    pub node_count: usize,
77    /// Number of edges in the graph.
78    pub edge_count: usize,
79    /// Number of files indexed.
80    pub file_count: usize,
81    /// Changed files (for delta updates).
82    pub changed_files: Vec<String>,
83    /// Timestamp of the update.
84    pub timestamp: u64,
85    pub nodes: Option<Vec<arbor_core::CodeNode>>,
86    pub edges: Option<Vec<arbor_graph::GraphEdge>>,
87}
88
89#[derive(Debug, Clone, serde::Serialize)]
90pub struct FocusNodePayload {
91    /// The node ID to focus.
92    pub node_id: String,
93    /// The file path containing the node.
94    pub file: String,
95    /// Line number in the file.
96    pub line: u32,
97}
98
99#[derive(Debug, Clone, serde::Serialize)]
100pub struct IndexerStatusPayload {
101    /// Current indexing phase.
102    pub phase: String,
103    /// Files processed so far.
104    pub files_processed: usize,
105    /// Total files to process.
106    pub files_total: usize,
107    /// Current file being processed.
108    pub current_file: Option<String>,
109}
110
111/// Internal event for the file watcher.
112#[derive(Debug, Clone)]
113#[allow(dead_code)]
114enum WatcherEvent {
115    Changed(PathBuf),
116    Created(PathBuf),
117    Deleted(PathBuf),
118}
119
120// ─────────────────────────────────────────────────────────────────────────────
121// SyncServer
122// ─────────────────────────────────────────────────────────────────────────────
123
124/// High-performance real-time sync server.
125///
126/// This server:
127/// - Hosts a WebSocket server for client connections
128/// - Watches the filesystem for changes
129/// - Debounces file events to prevent thrashing
130/// - Re-parses changed files and updates the graph
131/// - Broadcasts updates to all connected clients
132pub struct SyncServer {
133    config: SyncServerConfig,
134    graph: SharedGraph,
135    broadcast_tx: broadcast::Sender<BroadcastMessage>,
136}
137
138/// A cloneable handle to trigger spotlight events from external components (like MCP).
139#[derive(Clone)]
140pub struct SyncServerHandle {
141    broadcast_tx: broadcast::Sender<BroadcastMessage>,
142    graph: SharedGraph,
143}
144
145impl SyncServerHandle {
146    /// Triggers a spotlight on a specific node.
147    pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
148        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
149            node_id: node_id.to_string(),
150            file: file.to_string(),
151            line,
152        });
153        let _ = self.broadcast_tx.send(msg);
154    }
155
156    /// Returns the shared graph for context lookups.
157    pub fn graph(&self) -> SharedGraph {
158        self.graph.clone()
159    }
160}
161
162impl SyncServer {
163    /// Creates a new sync server.
164    pub fn new(config: SyncServerConfig) -> Self {
165        let (broadcast_tx, _) = broadcast::channel(256);
166
167        Self {
168            config,
169            graph: Arc::new(RwLock::new(ArborGraph::new())),
170            broadcast_tx,
171        }
172    }
173
174    /// Creates a sync server with an existing graph.
175    pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
176        let (broadcast_tx, _) = broadcast::channel(256);
177
178        Self {
179            config,
180            graph: Arc::new(RwLock::new(graph)),
181            broadcast_tx,
182        }
183    }
184
185    /// Creates a sync server with a shared graph.
186    pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
187        let (broadcast_tx, _) = broadcast::channel(256);
188
189        Self {
190            config,
191            graph,
192            broadcast_tx,
193        }
194    }
195
196    /// Returns a handle to the shared graph.
197    pub fn graph(&self) -> SharedGraph {
198        self.graph.clone()
199    }
200
201    /// Returns a broadcast receiver for server messages.
202    pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
203        self.broadcast_tx.subscribe()
204    }
205
206    /// Returns a cloneable handle for triggering spotlight events.
207    pub fn handle(&self) -> SyncServerHandle {
208        SyncServerHandle {
209            broadcast_tx: self.broadcast_tx.clone(),
210            graph: self.graph.clone(),
211        }
212    }
213
214    /// Runs the server with file watching enabled.
215    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
216        info!("╔═══════════════════════════════════════════════════════════╗");
217        info!("║          ARBOR SYNC SERVER - THE PULSE OF CODE            ║");
218        info!("╚═══════════════════════════════════════════════════════════╝");
219
220        // Channel for watcher events
221        let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
222
223        // Start the file watcher
224        let watch_path = self.config.watch_path.clone();
225        let extensions = self.config.extensions.clone();
226        let debounce_ms = self.config.debounce_ms;
227
228        tokio::spawn(async move {
229            if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
230            {
231                error!("File watcher error: {}", e);
232            }
233        });
234
235        // Start the indexer background task
236        let graph = self.graph.clone();
237        let broadcast_tx = self.broadcast_tx.clone();
238        let watch_path = self.config.watch_path.clone();
239
240        tokio::spawn(async move {
241            run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
242        });
243
244        // Start accepting WebSocket connections
245        self.run_websocket_server().await
246    }
247
248    /// Runs just the WebSocket server (no file watching).
249    async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
250        let listener = TcpListener::bind(&self.config.addr).await?;
251        info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
252        info!("👁️  Watching: {}", self.config.watch_path.display());
253        info!("⏱️  Debounce: {}ms", self.config.debounce_ms);
254
255        loop {
256            match listener.accept().await {
257                Ok((stream, addr)) => {
258                    info!("🔌 New connection from {}", addr);
259                    let graph = self.graph.clone();
260                    let broadcast_rx = self.broadcast_tx.subscribe();
261
262                    tokio::spawn(async move {
263                        if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
264                            warn!("Connection error from {}: {}", addr, e);
265                        }
266                    });
267                }
268                Err(e) => {
269                    error!("Accept error: {}", e);
270                }
271            }
272        }
273    }
274
275    /// Broadcasts a focus command to all clients.
276    pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
277        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
278            node_id: node_id.to_string(),
279            file: file.to_string(),
280            line,
281        });
282
283        let _ = self.broadcast_tx.send(msg);
284    }
285
286    /// Broadcasts an indexer status update.
287    pub fn update_status(
288        &self,
289        phase: &str,
290        processed: usize,
291        total: usize,
292        current: Option<&str>,
293    ) {
294        let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
295            phase: phase.to_string(),
296            files_processed: processed,
297            files_total: total,
298            current_file: current.map(|s| s.to_string()),
299        });
300
301        let _ = self.broadcast_tx.send(msg);
302    }
303}
304
305// ─────────────────────────────────────────────────────────────────────────────
306// Client Connection Handler
307// ─────────────────────────────────────────────────────────────────────────────
308
309/// Handles a single WebSocket client connection.
310async fn handle_client(
311    stream: TcpStream,
312    addr: SocketAddr,
313    graph: SharedGraph,
314    mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
315) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
316    let ws_stream = accept_async(stream).await?;
317    let (mut write, mut read) = ws_stream.split();
318
319    info!("✅ WebSocket handshake complete with {}", addr);
320
321    // Send initial graph snapshot
322    {
323        let g = graph.read().await;
324        let snapshot = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
325            is_delta: false,
326            node_count: g.node_count(),
327            edge_count: g.edge_count(),
328            file_count: g.stats().files,
329            changed_files: vec![],
330            timestamp: std::time::SystemTime::now()
331                .duration_since(std::time::UNIX_EPOCH)
332                .unwrap()
333                .as_secs(),
334            nodes: Some(g.nodes().cloned().collect()),
335            edges: Some(g.export_edges()),
336        });
337
338        let json = serde_json::to_string(&snapshot)?;
339        write.send(Message::Text(json)).await?;
340        debug!("📤 Sent initial snapshot to {}", addr);
341    }
342
343    // Two-way message handling
344    loop {
345        tokio::select! {
346            // Handle incoming messages from client
347            msg = read.next() => {
348                match msg {
349                    Some(Ok(Message::Text(text))) => {
350                        debug!("📥 Received from {}: {}", addr, text);
351                        // Process client requests here (JSON-RPC)
352                        // For now, just echo
353                    }
354                    Some(Ok(Message::Ping(data))) => {
355                        write.send(Message::Pong(data)).await?;
356                    }
357                    Some(Ok(Message::Close(_))) => {
358                        info!("👋 Client {} disconnected gracefully", addr);
359                        break;
360                    }
361                    Some(Err(e)) => {
362                        warn!("⚠️  Error from {}: {}", addr, e);
363                        break;
364                    }
365                    None => break,
366                    _ => {}
367                }
368            }
369
370            // Forward broadcast messages to client
371            msg = broadcast_rx.recv() => {
372                match msg {
373                    Ok(broadcast) => {
374                        let json = serde_json::to_string(&broadcast)?;
375                        if write.send(Message::Text(json)).await.is_err() {
376                            break;
377                        }
378                    }
379                    Err(broadcast::error::RecvError::Lagged(n)) => {
380                        warn!("Client {} lagged by {} messages", addr, n);
381                    }
382                    Err(broadcast::error::RecvError::Closed) => {
383                        break;
384                    }
385                }
386            }
387        }
388    }
389
390    info!("🔌 Connection closed: {}", addr);
391    Ok(())
392}
393
394// ─────────────────────────────────────────────────────────────────────────────
395// File Watcher with Debouncing
396// ─────────────────────────────────────────────────────────────────────────────
397
398/// Runs the file watcher with debouncing.
399async fn run_file_watcher(
400    watch_path: PathBuf,
401    extensions: Vec<String>,
402    debounce_ms: u64,
403    tx: mpsc::Sender<WatcherEvent>,
404) -> notify::Result<()> {
405    let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
406
407    // Create watcher in sync context
408    let mut watcher = RecommendedWatcher::new(
409        move |res| {
410            let _ = notify_tx.blocking_send(res);
411        },
412        Config::default(),
413    )?;
414
415    watcher.watch(&watch_path, RecursiveMode::Recursive)?;
416    info!("👁️  File watcher started for {}", watch_path.display());
417
418    // Debounce state
419    let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
420    let debounce_dur = Duration::from_millis(debounce_ms);
421
422    loop {
423        // Process pending debounced events
424        let now = Instant::now();
425        let mut ready: Vec<PathBuf> = Vec::new();
426
427        for (path, time) in pending.iter() {
428            if now.duration_since(*time) >= debounce_dur {
429                ready.push(path.clone());
430            }
431        }
432
433        for path in ready {
434            pending.remove(&path);
435            if should_process_file(&path, &extensions) {
436                let event = if path.exists() {
437                    WatcherEvent::Changed(path)
438                } else {
439                    WatcherEvent::Deleted(path)
440                };
441                let _ = tx.send(event).await;
442            }
443        }
444
445        // Wait for new events with timeout
446        match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
447            Ok(Some(Ok(event))) => {
448                for path in event.paths {
449                    if should_process_file(&path, &extensions) {
450                        pending.insert(path, Instant::now());
451                    }
452                }
453            }
454            Ok(Some(Err(e))) => {
455                warn!("Watch error: {}", e);
456            }
457            Ok(None) => break, // Channel closed
458            Err(_) => {}       // Timeout, continue
459        }
460    }
461
462    Ok(())
463}
464
465/// Checks if a file should be processed based on extension.
466fn should_process_file(path: &Path, extensions: &[String]) -> bool {
467    path.extension()
468        .and_then(|ext| ext.to_str())
469        .map(|ext| extensions.iter().any(|e| e == ext))
470        .unwrap_or(false)
471}
472
473// ─────────────────────────────────────────────────────────────────────────────
474// Background Indexer
475// ─────────────────────────────────────────────────────────────────────────────
476
477/// Runs the background indexer that processes file changes.
478async fn run_background_indexer(
479    mut rx: mpsc::Receiver<WatcherEvent>,
480    graph: SharedGraph,
481    broadcast_tx: broadcast::Sender<BroadcastMessage>,
482    _root_path: PathBuf,
483) {
484    let mut parser = ArborParser::new().expect("Failed to initialize parser");
485
486    info!("🔧 Background indexer started");
487
488    while let Some(event) = rx.recv().await {
489        let start = Instant::now();
490
491        match event {
492            WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
493                let file_name = path
494                    .file_name()
495                    .and_then(|n| n.to_str())
496                    .unwrap_or("unknown");
497
498                info!("📝 Re-indexing: {}", file_name);
499
500                match parser.parse_file(&path) {
501                    Ok(result) => {
502                        let mut g = graph.write().await;
503
504                        // Remove old nodes from this file
505                        g.remove_file(&result.file_path);
506
507                        // Add new nodes
508                        let mut node_ids = HashMap::new();
509                        for symbol in &result.symbols {
510                            let id = g.add_node(symbol.clone());
511                            node_ids.insert(symbol.id.clone(), id);
512                        }
513
514                        // Add edges for relations
515                        for relation in &result.relations {
516                            if let Some(&from_id) = node_ids.get(&relation.from_id) {
517                                // Try to find the target by name
518                                let targets = g.find_by_name(&relation.to_name);
519                                if let Some(target) = targets.first() {
520                                    if let Some(to_id) = g.get_index(&target.id) {
521                                        let edge_kind = match relation.kind {
522                                            arbor_core::RelationType::Calls => EdgeKind::Calls,
523                                            arbor_core::RelationType::Imports => EdgeKind::Imports,
524                                            arbor_core::RelationType::Extends => EdgeKind::Extends,
525                                            arbor_core::RelationType::Implements => {
526                                                EdgeKind::Implements
527                                            }
528                                        };
529                                        g.add_edge(from_id, to_id, Edge::new(edge_kind));
530                                    }
531                                }
532                            }
533                        }
534
535                        let elapsed = start.elapsed();
536                        info!(
537                            "✅ Indexed {} in {:?} ({} symbols, {} relations)",
538                            file_name,
539                            elapsed,
540                            result.symbols.len(),
541                            result.relations.len()
542                        );
543
544                        // Broadcast update
545                        let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
546                            is_delta: true,
547                            node_count: g.node_count(),
548                            edge_count: g.edge_count(),
549                            file_count: g.stats().files,
550                            changed_files: vec![result.file_path],
551                            timestamp: std::time::SystemTime::now()
552                                .duration_since(std::time::UNIX_EPOCH)
553                                .unwrap()
554                                .as_secs(),
555                            nodes: Some(g.nodes().cloned().collect()),
556                            edges: Some(g.export_edges()),
557                        });
558
559                        let _ = broadcast_tx.send(update);
560                    }
561                    Err(e) => {
562                        warn!("⚠️  Parse error for {}: {}", file_name, e);
563                    }
564                }
565            }
566
567            WatcherEvent::Deleted(path) => {
568                let file_str = path.to_string_lossy().to_string();
569                info!("🗑️  File deleted: {}", path.display());
570
571                let mut g = graph.write().await;
572                g.remove_file(&file_str);
573
574                let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
575                    is_delta: true,
576                    node_count: g.node_count(),
577                    edge_count: g.edge_count(),
578                    file_count: g.stats().files,
579                    changed_files: vec![file_str],
580                    timestamp: std::time::SystemTime::now()
581                        .duration_since(std::time::UNIX_EPOCH)
582                        .unwrap()
583                        .as_secs(),
584                    nodes: Some(g.nodes().cloned().collect()),
585                    edges: Some(g.export_edges()),
586                });
587
588                let _ = broadcast_tx.send(update);
589            }
590        }
591    }
592}
593
594// ─────────────────────────────────────────────────────────────────────────────
595// Tests
596// ─────────────────────────────────────────────────────────────────────────────
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    #[test]
603    fn test_should_process_file() {
604        let extensions = vec!["ts".to_string(), "rs".to_string()];
605
606        assert!(should_process_file(Path::new("foo.ts"), &extensions));
607        assert!(should_process_file(Path::new("bar.rs"), &extensions));
608        assert!(!should_process_file(Path::new("baz.py"), &extensions));
609        assert!(!should_process_file(Path::new("README.md"), &extensions));
610    }
611
612    #[test]
613    fn test_broadcast_message_serialization() {
614        let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
615            is_delta: true,
616            node_count: 42,
617            edge_count: 100,
618            file_count: 5,
619            changed_files: vec!["foo.ts".to_string()],
620            timestamp: 1234567890,
621            nodes: None,
622            edges: None,
623        });
624
625        let json = serde_json::to_string(&msg).unwrap();
626        assert!(json.contains("GraphUpdate"));
627        assert!(json.contains("42"));
628    }
629}