Skip to main content

arbor_server/
sync_server.rs

1//! Real-time sync server for the Arbor Visualizer.
2//!
3//! This module implements a WebSocket server that acts as the "Source of Truth"
4//! for the visualizer. It broadcasts graph updates whenever the filesystem changes,
5//! keeping the visualization in sync with the codebase.
6//!
7//! "Give Arbor a voice so the visualizer can hear the code breathe."
8
9use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::tungstenite::Message;
22use tracing::{debug, error, info, warn};
23
24// ─────────────────────────────────────────────────────────────────────────────
25// Types
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Configuration for the real-time server.
29#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31    /// Address to bind the WebSocket server.
32    pub addr: SocketAddr,
33    /// Root path to watch for file changes.
34    pub watch_path: PathBuf,
35    /// Debounce duration for file events.
36    pub debounce_ms: u64,
37    /// File extensions to watch.
38    pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42    fn default() -> Self {
43        Self {
44            addr: SocketAddr::from(([127, 0, 0, 1], 8080)),
45            watch_path: PathBuf::from("."),
46            debounce_ms: 150,
47            extensions: vec![
48                "ts".into(),
49                "tsx".into(),
50                "mts".into(),
51                "cts".into(),
52                "js".into(),
53                "jsx".into(),
54                "mjs".into(),
55                "cjs".into(),
56                "rs".into(),
57                "py".into(),
58                "pyi".into(),
59                "go".into(),
60                "java".into(),
61                "c".into(),
62                "h".into(),
63                "cpp".into(),
64                "hpp".into(),
65                "cc".into(),
66                "hh".into(),
67                "cxx".into(),
68                "hxx".into(),
69                "cs".into(),
70                "dart".into(),
71                "kt".into(),
72                "kts".into(),
73                "swift".into(),
74                "rb".into(),
75                "php".into(),
76                "phtml".into(),
77                "sh".into(),
78                "bash".into(),
79                "zsh".into(),
80            ],
81        }
82    }
83}
84
85/// Server messages broadcast to all connected clients.
86#[derive(Debug, Clone, serde::Serialize)]
87#[serde(tag = "type", content = "payload")]
88pub enum BroadcastMessage {
89    /// Initial handshake with server info
90    Hello(HelloPayload),
91    /// Start of a graph stream
92    GraphBegin(GraphBeginPayload),
93    /// Batch of nodes
94    NodeBatch(NodeBatchPayload),
95    /// Batch of edges
96    EdgeBatch(EdgeBatchPayload),
97    /// End of graph stream
98    GraphEnd,
99    /// Full graph snapshot or delta update (Legacy/Incremental)
100    GraphUpdate(GraphUpdatePayload),
101    /// Tell the visualizer to focus on a specific node.
102    FocusNode(FocusNodePayload),
103    /// Indexer progress status.
104    IndexerStatus(IndexerStatusPayload),
105}
106
107#[derive(Debug, Clone, serde::Serialize)]
108pub struct HelloPayload {
109    pub version: String,
110    pub node_count: usize,
111    pub edge_count: usize,
112}
113
114#[derive(Debug, Clone, serde::Serialize)]
115pub struct GraphBeginPayload {
116    pub total_nodes: usize,
117    pub total_edges: usize,
118}
119
120#[derive(Debug, Clone, serde::Serialize)]
121pub struct NodeBatchPayload {
122    pub nodes: Vec<arbor_core::CodeNode>,
123}
124
125#[derive(Debug, Clone, serde::Serialize)]
126pub struct EdgeBatchPayload {
127    pub edges: Vec<arbor_graph::GraphEdge>,
128}
129
130#[derive(Debug, Clone, serde::Serialize)]
131pub struct GraphUpdatePayload {
132    /// Whether this is a full snapshot or delta.
133    pub is_delta: bool,
134    /// Number of nodes in the graph.
135    pub node_count: usize,
136    /// Number of edges in the graph.
137    pub edge_count: usize,
138    /// Number of files indexed.
139    pub file_count: usize,
140    /// Changed files (for delta updates).
141    pub changed_files: Vec<String>,
142    /// Timestamp of the update.
143    pub timestamp: u64,
144    pub nodes: Option<Vec<arbor_core::CodeNode>>,
145    pub edges: Option<Vec<arbor_graph::GraphEdge>>,
146}
147
148#[derive(Debug, Clone, serde::Serialize)]
149pub struct FocusNodePayload {
150    /// The node ID to focus.
151    pub node_id: String,
152    /// The file path containing the node.
153    pub file: String,
154    /// Line number in the file.
155    pub line: u32,
156}
157
158#[derive(Debug, Clone, serde::Serialize)]
159pub struct IndexerStatusPayload {
160    /// Current indexing phase.
161    pub phase: String,
162    /// Files processed so far.
163    pub files_processed: usize,
164    /// Total files to process.
165    pub files_total: usize,
166    /// Current file being processed.
167    pub current_file: Option<String>,
168}
169
170/// Internal event for the file watcher.
171#[derive(Debug, Clone)]
172#[allow(dead_code)]
173enum WatcherEvent {
174    Changed(PathBuf),
175    Created(PathBuf),
176    Deleted(PathBuf),
177}
178
179// ─────────────────────────────────────────────────────────────────────────────
180// SyncServer
181// ─────────────────────────────────────────────────────────────────────────────
182
183/// High-performance real-time sync server.
184///
185/// This server:
186/// - Hosts a WebSocket server for client connections
187/// - Watches the filesystem for changes
188/// - Debounces file events to prevent thrashing
189/// - Re-parses changed files and updates the graph
190/// - Broadcasts updates to all connected clients
191pub struct SyncServer {
192    config: SyncServerConfig,
193    graph: SharedGraph,
194    broadcast_tx: broadcast::Sender<BroadcastMessage>,
195}
196
197/// A cloneable handle to trigger spotlight events from external components (like MCP).
198#[derive(Clone)]
199pub struct SyncServerHandle {
200    broadcast_tx: broadcast::Sender<BroadcastMessage>,
201    graph: SharedGraph,
202}
203
204impl SyncServerHandle {
205    /// Triggers a spotlight on a specific node.
206    pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
207        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
208            node_id: node_id.to_string(),
209            file: file.to_string(),
210            line,
211        });
212        let _ = self.broadcast_tx.send(msg);
213    }
214
215    /// Returns the shared graph for context lookups.
216    pub fn graph(&self) -> SharedGraph {
217        self.graph.clone()
218    }
219}
220
221impl SyncServer {
222    /// Creates a new sync server.
223    pub fn new(config: SyncServerConfig) -> Self {
224        let (broadcast_tx, _) = broadcast::channel(256);
225
226        Self {
227            config,
228            graph: Arc::new(RwLock::new(ArborGraph::new())),
229            broadcast_tx,
230        }
231    }
232
233    /// Creates a sync server with an existing graph.
234    pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
235        let (broadcast_tx, _) = broadcast::channel(256);
236
237        Self {
238            config,
239            graph: Arc::new(RwLock::new(graph)),
240            broadcast_tx,
241        }
242    }
243
244    /// Creates a sync server with a shared graph.
245    pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
246        let (broadcast_tx, _) = broadcast::channel(256);
247
248        Self {
249            config,
250            graph,
251            broadcast_tx,
252        }
253    }
254
255    /// Returns a handle to the shared graph.
256    pub fn graph(&self) -> SharedGraph {
257        self.graph.clone()
258    }
259
260    /// Returns a broadcast receiver for server messages.
261    pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
262        self.broadcast_tx.subscribe()
263    }
264
265    /// Returns a cloneable handle for triggering spotlight events.
266    pub fn handle(&self) -> SyncServerHandle {
267        SyncServerHandle {
268            broadcast_tx: self.broadcast_tx.clone(),
269            graph: self.graph.clone(),
270        }
271    }
272
273    /// Runs the server with file watching enabled.
274    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
275        info!("╔═══════════════════════════════════════════════════════════╗");
276        info!("║          ARBOR SYNC SERVER - THE PULSE OF CODE            ║");
277        info!("╚═══════════════════════════════════════════════════════════╝");
278
279        // Channel for watcher events
280        let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
281
282        // Start the file watcher
283        let watch_path = self.config.watch_path.clone();
284        let extensions = self.config.extensions.clone();
285        let debounce_ms = self.config.debounce_ms;
286
287        tokio::spawn(async move {
288            if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
289            {
290                error!("File watcher error: {}", e);
291            }
292        });
293
294        // Start the indexer background task
295        let graph = self.graph.clone();
296        let broadcast_tx = self.broadcast_tx.clone();
297        let watch_path = self.config.watch_path.clone();
298
299        tokio::spawn(async move {
300            run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
301        });
302
303        // Start accepting WebSocket connections
304        self.run_websocket_server().await
305    }
306
307    /// Runs just the WebSocket server (no file watching).
308    async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
309        let listener = TcpListener::bind(&self.config.addr).await?;
310        info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
311        info!("👁️  Watching: {}", self.config.watch_path.display());
312        info!("⏱️  Debounce: {}ms", self.config.debounce_ms);
313
314        loop {
315            match listener.accept().await {
316                Ok((stream, addr)) => {
317                    info!("🔌 New connection from {}", addr);
318                    let graph = self.graph.clone();
319                    let broadcast_rx = self.broadcast_tx.subscribe();
320
321                    tokio::spawn(async move {
322                        if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
323                            warn!("Connection error from {}: {}", addr, e);
324                        }
325                    });
326                }
327                Err(e) => {
328                    error!("Accept error: {}", e);
329                }
330            }
331        }
332    }
333
334    /// Broadcasts a focus command to all clients.
335    pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
336        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
337            node_id: node_id.to_string(),
338            file: file.to_string(),
339            line,
340        });
341
342        let _ = self.broadcast_tx.send(msg);
343    }
344
345    /// Broadcasts an indexer status update.
346    pub fn update_status(
347        &self,
348        phase: &str,
349        processed: usize,
350        total: usize,
351        current: Option<&str>,
352    ) {
353        let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
354            phase: phase.to_string(),
355            files_processed: processed,
356            files_total: total,
357            current_file: current.map(|s| s.to_string()),
358        });
359
360        let _ = self.broadcast_tx.send(msg);
361    }
362}
363
364// ─────────────────────────────────────────────────────────────────────────────
365// Client Connection Handler
366// ─────────────────────────────────────────────────────────────────────────────
367
368/// Handles a single WebSocket client connection.
369async fn handle_client(
370    stream: TcpStream,
371    addr: SocketAddr,
372    graph: SharedGraph,
373    mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
374) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
375    use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
376
377    let config = WebSocketConfig {
378        max_message_size: Some(64 * 1024 * 1024), // 64 MB
379        max_frame_size: Some(64 * 1024 * 1024),   // 64 MB
380        accept_unmasked_frames: false,
381        ..Default::default()
382    };
383
384    let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
385    let (mut write, mut read) = ws_stream.split();
386
387    info!("✅ WebSocket handshake complete with {}", addr);
388
389    // 1. Send Hello (Metadata)
390    let (node_count, edge_count, nodes, edges) = {
391        let g = graph.read().await;
392        let mut nodes: Vec<_> = g.nodes().cloned().collect();
393        let edges_raw = g.export_edges();
394        // Sort for deterministic output (run twice = identical)
395        nodes.sort_by(|a, b| a.id.cmp(&b.id));
396        let mut edges = edges_raw;
397        edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
398        (g.node_count(), g.edge_count(), nodes, edges)
399    };
400
401    let hello = BroadcastMessage::Hello(HelloPayload {
402        version: env!("CARGO_PKG_VERSION").to_string(),
403        node_count,
404        edge_count,
405    });
406
407    let json = serde_json::to_string(&hello)?;
408    write.send(Message::Text(json)).await?;
409    info!(
410        "👋 Sent Hello ({} nodes, {} edges) to {}",
411        node_count, edge_count, addr
412    );
413
414    // 2. Wait for Client Ready
415    info!("⏳ Waiting for client {} to be ready...", addr);
416    let mut ready = false;
417    while let Some(msg) = read.next().await {
418        match msg {
419            Ok(Message::Text(text)) => {
420                // Simple parsing for "ready_for_graph"
421                if text.contains("ready_for_graph") {
422                    ready = true;
423                    info!("✅ Client {} is ready for graph", addr);
424                    break;
425                }
426                debug!("Running pre-ready protocol with {}: {}", addr, text);
427            }
428            Ok(Message::Ping(data)) => {
429                write.send(Message::Pong(data)).await?;
430            }
431            Ok(Message::Close(_)) => return Ok(()),
432            Err(e) => return Err(e.into()),
433            _ => {}
434        }
435    }
436
437    if !ready {
438        warn!("Client {} disconnected before sending ready signal", addr);
439        return Ok(());
440    }
441
442    // 3. Stream Graph (Chunked)
443    let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
444        total_nodes: node_count,
445        total_edges: edge_count,
446    });
447    write
448        .send(Message::Text(serde_json::to_string(&begin)?))
449        .await?;
450
451    // Stream Nodes
452    for chunk in nodes.chunks(50) {
453        let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
454            nodes: chunk.to_vec(),
455        });
456        write
457            .send(Message::Text(serde_json::to_string(&batch)?))
458            .await?;
459    }
460    info!("📤 Streamed {} nodes to {}", node_count, addr);
461
462    // Stream Edges
463    for chunk in edges.chunks(100) {
464        let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
465            edges: chunk.to_vec(),
466        });
467        write
468            .send(Message::Text(serde_json::to_string(&batch)?))
469            .await?;
470    }
471    info!("📤 Streamed {} edges to {}", edge_count, addr);
472
473    // End Stream
474    write
475        .send(Message::Text(serde_json::to_string(
476            &BroadcastMessage::GraphEnd,
477        )?))
478        .await?;
479    info!("🏁 Graph stream complete for {}", addr);
480
481    // Two-way message handling
482    loop {
483        tokio::select! {
484            // Handle incoming messages from client
485            msg = read.next() => {
486                match msg {
487                    Some(Ok(Message::Text(text))) => {
488                        debug!("📥 Received from {}: {}", addr, text);
489                        // Process client requests here (JSON-RPC)
490                        // For now, just echo
491                    }
492                    Some(Ok(Message::Ping(data))) => {
493                        write.send(Message::Pong(data)).await?;
494                    }
495                    Some(Ok(Message::Close(_))) => {
496                        info!("👋 Client {} disconnected gracefully", addr);
497                        break;
498                    }
499                    Some(Err(e)) => {
500                        warn!("⚠️  Error from {}: {}", addr, e);
501                        break;
502                    }
503                    None => break,
504                    _ => {}
505                }
506            }
507
508            // Forward broadcast messages to client
509            msg = broadcast_rx.recv() => {
510                match msg {
511                    Ok(broadcast) => {
512                        let json = serde_json::to_string(&broadcast)?;
513                        if write.send(Message::Text(json)).await.is_err() {
514                            break;
515                        }
516                    }
517                    Err(broadcast::error::RecvError::Lagged(n)) => {
518                        warn!("Client {} lagged by {} messages", addr, n);
519                    }
520                    Err(broadcast::error::RecvError::Closed) => {
521                        break;
522                    }
523                }
524            }
525        }
526    }
527
528    info!("🔌 Connection closed: {}", addr);
529    Ok(())
530}
531
532// ─────────────────────────────────────────────────────────────────────────────
533// File Watcher with Debouncing
534// ─────────────────────────────────────────────────────────────────────────────
535
536/// Runs the file watcher with debouncing.
537async fn run_file_watcher(
538    watch_path: PathBuf,
539    extensions: Vec<String>,
540    debounce_ms: u64,
541    tx: mpsc::Sender<WatcherEvent>,
542) -> notify::Result<()> {
543    let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
544
545    // Create watcher in sync context
546    let mut watcher = RecommendedWatcher::new(
547        move |res| {
548            let _ = notify_tx.blocking_send(res);
549        },
550        Config::default(),
551    )?;
552
553    watcher.watch(&watch_path, RecursiveMode::Recursive)?;
554    info!("👁️  File watcher started for {}", watch_path.display());
555
556    // Debounce state
557    let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
558    let debounce_dur = Duration::from_millis(debounce_ms);
559
560    loop {
561        // Process pending debounced events
562        let now = Instant::now();
563        let mut ready: Vec<PathBuf> = Vec::new();
564
565        for (path, time) in pending.iter() {
566            if now.duration_since(*time) >= debounce_dur {
567                ready.push(path.clone());
568            }
569        }
570
571        for path in ready {
572            pending.remove(&path);
573            if should_process_file(&path, &extensions) {
574                let event = if path.exists() {
575                    WatcherEvent::Changed(path)
576                } else {
577                    WatcherEvent::Deleted(path)
578                };
579                let _ = tx.send(event).await;
580            }
581        }
582
583        // Wait for new events with timeout
584        match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
585            Ok(Some(Ok(event))) => {
586                for path in event.paths {
587                    if should_process_file(&path, &extensions) {
588                        pending.insert(path, Instant::now());
589                    }
590                }
591            }
592            Ok(Some(Err(e))) => {
593                warn!("Watch error: {}", e);
594            }
595            Ok(None) => break, // Channel closed
596            Err(_) => {}       // Timeout, continue
597        }
598    }
599
600    Ok(())
601}
602
603/// Checks if a file should be processed based on extension.
604fn should_process_file(path: &Path, extensions: &[String]) -> bool {
605    path.extension()
606        .and_then(|ext| ext.to_str())
607        .map(|ext| extensions.iter().any(|e| e == ext))
608        .unwrap_or(false)
609}
610
611// ─────────────────────────────────────────────────────────────────────────────
612// Background Indexer
613// ─────────────────────────────────────────────────────────────────────────────
614
615/// Runs the background indexer that processes file changes.
616async fn run_background_indexer(
617    mut rx: mpsc::Receiver<WatcherEvent>,
618    graph: SharedGraph,
619    broadcast_tx: broadcast::Sender<BroadcastMessage>,
620    _root_path: PathBuf,
621) {
622    let mut parser = match ArborParser::new() {
623        Ok(parser) => Some(parser),
624        Err(error) => {
625            warn!(
626                "Failed to initialize parser for background indexer; will retry lazily per event: {}",
627                error
628            );
629            None
630        }
631    };
632
633    info!("🔧 Background indexer started");
634
635    while let Some(event) = rx.recv().await {
636        let start = Instant::now();
637
638        match event {
639            WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
640                let file_name = path
641                    .file_name()
642                    .and_then(|n| n.to_str())
643                    .unwrap_or("unknown");
644
645                info!("📝 Re-indexing: {}", file_name);
646
647                if parser.is_none() {
648                    parser = match ArborParser::new() {
649                        Ok(parser) => Some(parser),
650                        Err(error) => {
651                            warn!(
652                                "Skipping '{}' due to parser init failure: {}",
653                                file_name,
654                                error
655                            );
656                            None
657                        }
658                    };
659                }
660
661                let Some(parser) = parser.as_mut() else {
662                    continue;
663                };
664
665                match parser.parse_file(&path) {
666                    Ok(result) => {
667                        let mut g = graph.write().await;
668
669                        // Remove old nodes from this file
670                        g.remove_file(&result.file_path);
671
672                        // Add new nodes
673                        let mut node_ids = HashMap::new();
674                        for symbol in &result.symbols {
675                            let id = g.add_node(symbol.clone());
676                            node_ids.insert(symbol.id.clone(), id);
677                        }
678
679                        // Add edges for relations
680                        for relation in &result.relations {
681                            if let Some(&from_id) = node_ids.get(&relation.from_id) {
682                                // Try to find the target by name
683                                let targets = g.find_by_name(&relation.to_name);
684                                if let Some(target) = targets.first() {
685                                    if let Some(to_id) = g.get_index(&target.id) {
686                                        let edge_kind = match relation.kind {
687                                            arbor_core::RelationType::Calls => EdgeKind::Calls,
688                                            arbor_core::RelationType::Imports => EdgeKind::Imports,
689                                            arbor_core::RelationType::Extends => EdgeKind::Extends,
690                                            arbor_core::RelationType::Implements => {
691                                                EdgeKind::Implements
692                                            }
693                                        };
694                                        g.add_edge(from_id, to_id, Edge::new(edge_kind));
695                                    }
696                                }
697                            }
698                        }
699
700                        let elapsed = start.elapsed();
701                        info!(
702                            "✅ Indexed {} in {:?} ({} symbols, {} relations)",
703                            file_name,
704                            elapsed,
705                            result.symbols.len(),
706                            result.relations.len()
707                        );
708
709                        // Broadcast update
710                        let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
711                            is_delta: true,
712                            node_count: g.node_count(),
713                            edge_count: g.edge_count(),
714                            file_count: g.stats().files,
715                            changed_files: vec![result.file_path],
716                            timestamp: std::time::SystemTime::now()
717                                .duration_since(std::time::UNIX_EPOCH)
718                                .map_or(0, |d| d.as_secs()),
719                            nodes: Some(g.nodes().cloned().collect()),
720                            edges: Some(g.export_edges()),
721                        });
722
723                        let _ = broadcast_tx.send(update);
724                    }
725                    Err(e) => {
726                        warn!("⚠️  Parse error for {}: {}", file_name, e);
727                    }
728                }
729            }
730
731            WatcherEvent::Deleted(path) => {
732                let file_str = path.to_string_lossy().to_string();
733                info!("🗑️  File deleted: {}", path.display());
734
735                let mut g = graph.write().await;
736                g.remove_file(&file_str);
737
738                let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
739                    is_delta: true,
740                    node_count: g.node_count(),
741                    edge_count: g.edge_count(),
742                    file_count: g.stats().files,
743                    changed_files: vec![file_str],
744                    timestamp: std::time::SystemTime::now()
745                        .duration_since(std::time::UNIX_EPOCH)
746                        .map_or(0, |d| d.as_secs()),
747                    nodes: Some(g.nodes().cloned().collect()),
748                    edges: Some(g.export_edges()),
749                });
750
751                let _ = broadcast_tx.send(update);
752            }
753        }
754    }
755}
756
757// ─────────────────────────────────────────────────────────────────────────────
758// Tests
759// ─────────────────────────────────────────────────────────────────────────────
760
761#[cfg(test)]
762mod tests {
763    use super::*;
764
765    #[test]
766    fn test_should_process_file() {
767        let extensions = vec!["ts".to_string(), "rs".to_string()];
768
769        assert!(should_process_file(Path::new("foo.ts"), &extensions));
770        assert!(should_process_file(Path::new("bar.rs"), &extensions));
771        assert!(!should_process_file(Path::new("baz.py"), &extensions));
772        assert!(!should_process_file(Path::new("README.md"), &extensions));
773    }
774
775    #[test]
776    fn test_broadcast_message_serialization() {
777        let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
778            is_delta: true,
779            node_count: 42,
780            edge_count: 100,
781            file_count: 5,
782            changed_files: vec!["foo.ts".to_string()],
783            timestamp: 1234567890,
784            nodes: None,
785            edges: None,
786        });
787
788        let json = serde_json::to_string(&msg).unwrap();
789        assert!(json.contains("GraphUpdate"));
790        assert!(json.contains("42"));
791    }
792
793    #[test]
794    fn test_sync_config_default_has_all_extensions() {
795        let config = SyncServerConfig::default();
796        let exts = &config.extensions;
797
798        // Verify all supported extensions from arbor_core::languages are present.
799        let required: std::collections::HashSet<String> =
800            arbor_core::languages::supported_extensions()
801                .iter()
802                .map(|ext| ext.to_string())
803                .collect();
804
805        let actual: std::collections::HashSet<String> = exts.iter().cloned().collect();
806
807        for ext in &required {
808            assert!(
809                actual.contains(ext),
810                "SyncServerConfig is missing extension: {}",
811                ext
812            );
813        }
814    }
815
816    #[test]
817    fn test_focus_node_serialization() {
818        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
819            node_id: "abc123".to_string(),
820            file: "main.rs".to_string(),
821            line: 42,
822        });
823
824        let json = serde_json::to_string(&msg).unwrap();
825        assert!(json.contains("FocusNode"));
826        assert!(json.contains("abc123"));
827        assert!(json.contains("main.rs"));
828    }
829
830    #[test]
831    fn test_indexer_status_serialization() {
832        let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
833            phase: "scanning".to_string(),
834            files_processed: 10,
835            files_total: 100,
836            current_file: Some("test.rs".to_string()),
837        });
838
839        let json = serde_json::to_string(&msg).unwrap();
840        assert!(json.contains("scanning"));
841        assert!(json.contains("test.rs"));
842    }
843
844    #[test]
845    fn test_hello_payload_serialization() {
846        let msg = BroadcastMessage::Hello(HelloPayload {
847            version: "2.0.0".to_string(),
848            node_count: 100,
849            edge_count: 200,
850        });
851
852        let json = serde_json::to_string(&msg).unwrap();
853        assert!(json.contains("2.0.0"));
854        assert!(json.contains("100"));
855    }
856
857    #[test]
858    fn test_graph_end_serialization() {
859        let msg = BroadcastMessage::GraphEnd;
860        let json = serde_json::to_string(&msg).unwrap();
861        assert!(json.contains("GraphEnd"));
862    }
863}