Skip to main content

arbor_server/
sync_server.rs

1//! Real-time sync server for the Arbor Visualizer.
2//!
3//! This module implements a WebSocket server that acts as the "Source of Truth"
4//! for the visualizer. It broadcasts graph updates whenever the filesystem changes,
5//! keeping the visualization in sync with the codebase.
6//!
7//! "Give Arbor a voice so the visualizer can hear the code breathe."
8
9use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::tungstenite::Message;
22use tracing::{debug, error, info, warn};
23
24// ─────────────────────────────────────────────────────────────────────────────
25// Types
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Configuration for the real-time server.
29#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31    /// Address to bind the WebSocket server.
32    pub addr: SocketAddr,
33    /// Root path to watch for file changes.
34    pub watch_path: PathBuf,
35    /// Debounce duration for file events.
36    pub debounce_ms: u64,
37    /// File extensions to watch.
38    pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42    fn default() -> Self {
43        Self {
44            addr: "127.0.0.1:8080".parse().unwrap(),
45            watch_path: PathBuf::from("."),
46            debounce_ms: 150,
47            extensions: vec![
48                "ts".into(),
49                "tsx".into(),
50                "js".into(),
51                "jsx".into(),
52                "rs".into(),
53                "py".into(),
54                "go".into(),
55                "java".into(),
56                "c".into(),
57                "h".into(),
58                "cpp".into(),
59                "hpp".into(),
60                "cc".into(),
61                "hh".into(),
62                "cxx".into(),
63                "hxx".into(),
64                "cs".into(),
65                "dart".into(),
66                "kt".into(),
67                "kts".into(),
68                "swift".into(),
69                "rb".into(),
70                "php".into(),
71                "phtml".into(),
72                "sh".into(),
73                "bash".into(),
74                "zsh".into(),
75            ],
76        }
77    }
78}
79
80/// Server messages broadcast to all connected clients.
81#[derive(Debug, Clone, serde::Serialize)]
82#[serde(tag = "type", content = "payload")]
83pub enum BroadcastMessage {
84    /// Initial handshake with server info
85    Hello(HelloPayload),
86    /// Start of a graph stream
87    GraphBegin(GraphBeginPayload),
88    /// Batch of nodes
89    NodeBatch(NodeBatchPayload),
90    /// Batch of edges
91    EdgeBatch(EdgeBatchPayload),
92    /// End of graph stream
93    GraphEnd,
94    /// Full graph snapshot or delta update (Legacy/Incremental)
95    GraphUpdate(GraphUpdatePayload),
96    /// Tell the visualizer to focus on a specific node.
97    FocusNode(FocusNodePayload),
98    /// Indexer progress status.
99    IndexerStatus(IndexerStatusPayload),
100}
101
102#[derive(Debug, Clone, serde::Serialize)]
103pub struct HelloPayload {
104    pub version: String,
105    pub node_count: usize,
106    pub edge_count: usize,
107}
108
109#[derive(Debug, Clone, serde::Serialize)]
110pub struct GraphBeginPayload {
111    pub total_nodes: usize,
112    pub total_edges: usize,
113}
114
115#[derive(Debug, Clone, serde::Serialize)]
116pub struct NodeBatchPayload {
117    pub nodes: Vec<arbor_core::CodeNode>,
118}
119
120#[derive(Debug, Clone, serde::Serialize)]
121pub struct EdgeBatchPayload {
122    pub edges: Vec<arbor_graph::GraphEdge>,
123}
124
125#[derive(Debug, Clone, serde::Serialize)]
126pub struct GraphUpdatePayload {
127    /// Whether this is a full snapshot or delta.
128    pub is_delta: bool,
129    /// Number of nodes in the graph.
130    pub node_count: usize,
131    /// Number of edges in the graph.
132    pub edge_count: usize,
133    /// Number of files indexed.
134    pub file_count: usize,
135    /// Changed files (for delta updates).
136    pub changed_files: Vec<String>,
137    /// Timestamp of the update.
138    pub timestamp: u64,
139    pub nodes: Option<Vec<arbor_core::CodeNode>>,
140    pub edges: Option<Vec<arbor_graph::GraphEdge>>,
141}
142
143#[derive(Debug, Clone, serde::Serialize)]
144pub struct FocusNodePayload {
145    /// The node ID to focus.
146    pub node_id: String,
147    /// The file path containing the node.
148    pub file: String,
149    /// Line number in the file.
150    pub line: u32,
151}
152
153#[derive(Debug, Clone, serde::Serialize)]
154pub struct IndexerStatusPayload {
155    /// Current indexing phase.
156    pub phase: String,
157    /// Files processed so far.
158    pub files_processed: usize,
159    /// Total files to process.
160    pub files_total: usize,
161    /// Current file being processed.
162    pub current_file: Option<String>,
163}
164
165/// Internal event for the file watcher.
166#[derive(Debug, Clone)]
167#[allow(dead_code)]
168enum WatcherEvent {
169    Changed(PathBuf),
170    Created(PathBuf),
171    Deleted(PathBuf),
172}
173
174// ─────────────────────────────────────────────────────────────────────────────
175// SyncServer
176// ─────────────────────────────────────────────────────────────────────────────
177
178/// High-performance real-time sync server.
179///
180/// This server:
181/// - Hosts a WebSocket server for client connections
182/// - Watches the filesystem for changes
183/// - Debounces file events to prevent thrashing
184/// - Re-parses changed files and updates the graph
185/// - Broadcasts updates to all connected clients
186pub struct SyncServer {
187    config: SyncServerConfig,
188    graph: SharedGraph,
189    broadcast_tx: broadcast::Sender<BroadcastMessage>,
190}
191
192/// A cloneable handle to trigger spotlight events from external components (like MCP).
193#[derive(Clone)]
194pub struct SyncServerHandle {
195    broadcast_tx: broadcast::Sender<BroadcastMessage>,
196    graph: SharedGraph,
197}
198
199impl SyncServerHandle {
200    /// Triggers a spotlight on a specific node.
201    pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
202        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
203            node_id: node_id.to_string(),
204            file: file.to_string(),
205            line,
206        });
207        let _ = self.broadcast_tx.send(msg);
208    }
209
210    /// Returns the shared graph for context lookups.
211    pub fn graph(&self) -> SharedGraph {
212        self.graph.clone()
213    }
214}
215
216impl SyncServer {
217    /// Creates a new sync server.
218    pub fn new(config: SyncServerConfig) -> Self {
219        let (broadcast_tx, _) = broadcast::channel(256);
220
221        Self {
222            config,
223            graph: Arc::new(RwLock::new(ArborGraph::new())),
224            broadcast_tx,
225        }
226    }
227
228    /// Creates a sync server with an existing graph.
229    pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
230        let (broadcast_tx, _) = broadcast::channel(256);
231
232        Self {
233            config,
234            graph: Arc::new(RwLock::new(graph)),
235            broadcast_tx,
236        }
237    }
238
239    /// Creates a sync server with a shared graph.
240    pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
241        let (broadcast_tx, _) = broadcast::channel(256);
242
243        Self {
244            config,
245            graph,
246            broadcast_tx,
247        }
248    }
249
250    /// Returns a handle to the shared graph.
251    pub fn graph(&self) -> SharedGraph {
252        self.graph.clone()
253    }
254
255    /// Returns a broadcast receiver for server messages.
256    pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
257        self.broadcast_tx.subscribe()
258    }
259
260    /// Returns a cloneable handle for triggering spotlight events.
261    pub fn handle(&self) -> SyncServerHandle {
262        SyncServerHandle {
263            broadcast_tx: self.broadcast_tx.clone(),
264            graph: self.graph.clone(),
265        }
266    }
267
268    /// Runs the server with file watching enabled.
269    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
270        info!("╔═══════════════════════════════════════════════════════════╗");
271        info!("║          ARBOR SYNC SERVER - THE PULSE OF CODE            ║");
272        info!("╚═══════════════════════════════════════════════════════════╝");
273
274        // Channel for watcher events
275        let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
276
277        // Start the file watcher
278        let watch_path = self.config.watch_path.clone();
279        let extensions = self.config.extensions.clone();
280        let debounce_ms = self.config.debounce_ms;
281
282        tokio::spawn(async move {
283            if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
284            {
285                error!("File watcher error: {}", e);
286            }
287        });
288
289        // Start the indexer background task
290        let graph = self.graph.clone();
291        let broadcast_tx = self.broadcast_tx.clone();
292        let watch_path = self.config.watch_path.clone();
293
294        tokio::spawn(async move {
295            run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
296        });
297
298        // Start accepting WebSocket connections
299        self.run_websocket_server().await
300    }
301
302    /// Runs just the WebSocket server (no file watching).
303    async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
304        let listener = TcpListener::bind(&self.config.addr).await?;
305        info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
306        info!("👁️  Watching: {}", self.config.watch_path.display());
307        info!("⏱️  Debounce: {}ms", self.config.debounce_ms);
308
309        loop {
310            match listener.accept().await {
311                Ok((stream, addr)) => {
312                    info!("🔌 New connection from {}", addr);
313                    let graph = self.graph.clone();
314                    let broadcast_rx = self.broadcast_tx.subscribe();
315
316                    tokio::spawn(async move {
317                        if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
318                            warn!("Connection error from {}: {}", addr, e);
319                        }
320                    });
321                }
322                Err(e) => {
323                    error!("Accept error: {}", e);
324                }
325            }
326        }
327    }
328
329    /// Broadcasts a focus command to all clients.
330    pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
331        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
332            node_id: node_id.to_string(),
333            file: file.to_string(),
334            line,
335        });
336
337        let _ = self.broadcast_tx.send(msg);
338    }
339
340    /// Broadcasts an indexer status update.
341    pub fn update_status(
342        &self,
343        phase: &str,
344        processed: usize,
345        total: usize,
346        current: Option<&str>,
347    ) {
348        let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
349            phase: phase.to_string(),
350            files_processed: processed,
351            files_total: total,
352            current_file: current.map(|s| s.to_string()),
353        });
354
355        let _ = self.broadcast_tx.send(msg);
356    }
357}
358
359// ─────────────────────────────────────────────────────────────────────────────
360// Client Connection Handler
361// ─────────────────────────────────────────────────────────────────────────────
362
363/// Handles a single WebSocket client connection.
364async fn handle_client(
365    stream: TcpStream,
366    addr: SocketAddr,
367    graph: SharedGraph,
368    mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
369) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
370    use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
371
372    let config = WebSocketConfig {
373        max_message_size: Some(64 * 1024 * 1024), // 64 MB
374        max_frame_size: Some(64 * 1024 * 1024),   // 64 MB
375        accept_unmasked_frames: false,
376        ..Default::default()
377    };
378
379    let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
380    let (mut write, mut read) = ws_stream.split();
381
382    info!("✅ WebSocket handshake complete with {}", addr);
383
384    // 1. Send Hello (Metadata)
385    let (node_count, edge_count, nodes, edges) = {
386        let g = graph.read().await;
387        let mut nodes: Vec<_> = g.nodes().cloned().collect();
388        let edges_raw = g.export_edges();
389        // Sort for deterministic output (run twice = identical)
390        nodes.sort_by(|a, b| a.id.cmp(&b.id));
391        let mut edges = edges_raw;
392        edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
393        (g.node_count(), g.edge_count(), nodes, edges)
394    };
395
396    let hello = BroadcastMessage::Hello(HelloPayload {
397        version: env!("CARGO_PKG_VERSION").to_string(),
398        node_count,
399        edge_count,
400    });
401
402    let json = serde_json::to_string(&hello)?;
403    write.send(Message::Text(json)).await?;
404    info!(
405        "👋 Sent Hello ({} nodes, {} edges) to {}",
406        node_count, edge_count, addr
407    );
408
409    // 2. Wait for Client Ready
410    info!("⏳ Waiting for client {} to be ready...", addr);
411    let mut ready = false;
412    while let Some(msg) = read.next().await {
413        match msg {
414            Ok(Message::Text(text)) => {
415                // Simple parsing for "ready_for_graph"
416                if text.contains("ready_for_graph") {
417                    ready = true;
418                    info!("✅ Client {} is ready for graph", addr);
419                    break;
420                }
421                debug!("Running pre-ready protocol with {}: {}", addr, text);
422            }
423            Ok(Message::Ping(data)) => {
424                write.send(Message::Pong(data)).await?;
425            }
426            Ok(Message::Close(_)) => return Ok(()),
427            Err(e) => return Err(e.into()),
428            _ => {}
429        }
430    }
431
432    if !ready {
433        warn!("Client {} disconnected before sending ready signal", addr);
434        return Ok(());
435    }
436
437    // 3. Stream Graph (Chunked)
438    let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
439        total_nodes: node_count,
440        total_edges: edge_count,
441    });
442    write
443        .send(Message::Text(serde_json::to_string(&begin)?))
444        .await?;
445
446    // Stream Nodes
447    for chunk in nodes.chunks(50) {
448        let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
449            nodes: chunk.to_vec(),
450        });
451        write
452            .send(Message::Text(serde_json::to_string(&batch)?))
453            .await?;
454    }
455    info!("📤 Streamed {} nodes to {}", node_count, addr);
456
457    // Stream Edges
458    for chunk in edges.chunks(100) {
459        let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
460            edges: chunk.to_vec(),
461        });
462        write
463            .send(Message::Text(serde_json::to_string(&batch)?))
464            .await?;
465    }
466    info!("📤 Streamed {} edges to {}", edge_count, addr);
467
468    // End Stream
469    write
470        .send(Message::Text(serde_json::to_string(
471            &BroadcastMessage::GraphEnd,
472        )?))
473        .await?;
474    info!("🏁 Graph stream complete for {}", addr);
475
476    // Two-way message handling
477    loop {
478        tokio::select! {
479            // Handle incoming messages from client
480            msg = read.next() => {
481                match msg {
482                    Some(Ok(Message::Text(text))) => {
483                        debug!("📥 Received from {}: {}", addr, text);
484                        // Process client requests here (JSON-RPC)
485                        // For now, just echo
486                    }
487                    Some(Ok(Message::Ping(data))) => {
488                        write.send(Message::Pong(data)).await?;
489                    }
490                    Some(Ok(Message::Close(_))) => {
491                        info!("👋 Client {} disconnected gracefully", addr);
492                        break;
493                    }
494                    Some(Err(e)) => {
495                        warn!("⚠️  Error from {}: {}", addr, e);
496                        break;
497                    }
498                    None => break,
499                    _ => {}
500                }
501            }
502
503            // Forward broadcast messages to client
504            msg = broadcast_rx.recv() => {
505                match msg {
506                    Ok(broadcast) => {
507                        let json = serde_json::to_string(&broadcast)?;
508                        if write.send(Message::Text(json)).await.is_err() {
509                            break;
510                        }
511                    }
512                    Err(broadcast::error::RecvError::Lagged(n)) => {
513                        warn!("Client {} lagged by {} messages", addr, n);
514                    }
515                    Err(broadcast::error::RecvError::Closed) => {
516                        break;
517                    }
518                }
519            }
520        }
521    }
522
523    info!("🔌 Connection closed: {}", addr);
524    Ok(())
525}
526
527// ─────────────────────────────────────────────────────────────────────────────
528// File Watcher with Debouncing
529// ─────────────────────────────────────────────────────────────────────────────
530
531/// Runs the file watcher with debouncing.
532async fn run_file_watcher(
533    watch_path: PathBuf,
534    extensions: Vec<String>,
535    debounce_ms: u64,
536    tx: mpsc::Sender<WatcherEvent>,
537) -> notify::Result<()> {
538    let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
539
540    // Create watcher in sync context
541    let mut watcher = RecommendedWatcher::new(
542        move |res| {
543            let _ = notify_tx.blocking_send(res);
544        },
545        Config::default(),
546    )?;
547
548    watcher.watch(&watch_path, RecursiveMode::Recursive)?;
549    info!("👁️  File watcher started for {}", watch_path.display());
550
551    // Debounce state
552    let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
553    let debounce_dur = Duration::from_millis(debounce_ms);
554
555    loop {
556        // Process pending debounced events
557        let now = Instant::now();
558        let mut ready: Vec<PathBuf> = Vec::new();
559
560        for (path, time) in pending.iter() {
561            if now.duration_since(*time) >= debounce_dur {
562                ready.push(path.clone());
563            }
564        }
565
566        for path in ready {
567            pending.remove(&path);
568            if should_process_file(&path, &extensions) {
569                let event = if path.exists() {
570                    WatcherEvent::Changed(path)
571                } else {
572                    WatcherEvent::Deleted(path)
573                };
574                let _ = tx.send(event).await;
575            }
576        }
577
578        // Wait for new events with timeout
579        match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
580            Ok(Some(Ok(event))) => {
581                for path in event.paths {
582                    if should_process_file(&path, &extensions) {
583                        pending.insert(path, Instant::now());
584                    }
585                }
586            }
587            Ok(Some(Err(e))) => {
588                warn!("Watch error: {}", e);
589            }
590            Ok(None) => break, // Channel closed
591            Err(_) => {}       // Timeout, continue
592        }
593    }
594
595    Ok(())
596}
597
598/// Checks if a file should be processed based on extension.
599fn should_process_file(path: &Path, extensions: &[String]) -> bool {
600    path.extension()
601        .and_then(|ext| ext.to_str())
602        .map(|ext| extensions.iter().any(|e| e == ext))
603        .unwrap_or(false)
604}
605
606// ─────────────────────────────────────────────────────────────────────────────
607// Background Indexer
608// ─────────────────────────────────────────────────────────────────────────────
609
610/// Runs the background indexer that processes file changes.
611async fn run_background_indexer(
612    mut rx: mpsc::Receiver<WatcherEvent>,
613    graph: SharedGraph,
614    broadcast_tx: broadcast::Sender<BroadcastMessage>,
615    _root_path: PathBuf,
616) {
617    let mut parser = ArborParser::new().expect("Failed to initialize parser");
618
619    info!("🔧 Background indexer started");
620
621    while let Some(event) = rx.recv().await {
622        let start = Instant::now();
623
624        match event {
625            WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
626                let file_name = path
627                    .file_name()
628                    .and_then(|n| n.to_str())
629                    .unwrap_or("unknown");
630
631                info!("📝 Re-indexing: {}", file_name);
632
633                match parser.parse_file(&path) {
634                    Ok(result) => {
635                        let mut g = graph.write().await;
636
637                        // Remove old nodes from this file
638                        g.remove_file(&result.file_path);
639
640                        // Add new nodes
641                        let mut node_ids = HashMap::new();
642                        for symbol in &result.symbols {
643                            let id = g.add_node(symbol.clone());
644                            node_ids.insert(symbol.id.clone(), id);
645                        }
646
647                        // Add edges for relations
648                        for relation in &result.relations {
649                            if let Some(&from_id) = node_ids.get(&relation.from_id) {
650                                // Try to find the target by name
651                                let targets = g.find_by_name(&relation.to_name);
652                                if let Some(target) = targets.first() {
653                                    if let Some(to_id) = g.get_index(&target.id) {
654                                        let edge_kind = match relation.kind {
655                                            arbor_core::RelationType::Calls => EdgeKind::Calls,
656                                            arbor_core::RelationType::Imports => EdgeKind::Imports,
657                                            arbor_core::RelationType::Extends => EdgeKind::Extends,
658                                            arbor_core::RelationType::Implements => {
659                                                EdgeKind::Implements
660                                            }
661                                        };
662                                        g.add_edge(from_id, to_id, Edge::new(edge_kind));
663                                    }
664                                }
665                            }
666                        }
667
668                        let elapsed = start.elapsed();
669                        info!(
670                            "✅ Indexed {} in {:?} ({} symbols, {} relations)",
671                            file_name,
672                            elapsed,
673                            result.symbols.len(),
674                            result.relations.len()
675                        );
676
677                        // Broadcast update
678                        let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
679                            is_delta: true,
680                            node_count: g.node_count(),
681                            edge_count: g.edge_count(),
682                            file_count: g.stats().files,
683                            changed_files: vec![result.file_path],
684                            timestamp: std::time::SystemTime::now()
685                                .duration_since(std::time::UNIX_EPOCH)
686                                .unwrap()
687                                .as_secs(),
688                            nodes: Some(g.nodes().cloned().collect()),
689                            edges: Some(g.export_edges()),
690                        });
691
692                        let _ = broadcast_tx.send(update);
693                    }
694                    Err(e) => {
695                        warn!("⚠️  Parse error for {}: {}", file_name, e);
696                    }
697                }
698            }
699
700            WatcherEvent::Deleted(path) => {
701                let file_str = path.to_string_lossy().to_string();
702                info!("🗑️  File deleted: {}", path.display());
703
704                let mut g = graph.write().await;
705                g.remove_file(&file_str);
706
707                let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
708                    is_delta: true,
709                    node_count: g.node_count(),
710                    edge_count: g.edge_count(),
711                    file_count: g.stats().files,
712                    changed_files: vec![file_str],
713                    timestamp: std::time::SystemTime::now()
714                        .duration_since(std::time::UNIX_EPOCH)
715                        .unwrap()
716                        .as_secs(),
717                    nodes: Some(g.nodes().cloned().collect()),
718                    edges: Some(g.export_edges()),
719                });
720
721                let _ = broadcast_tx.send(update);
722            }
723        }
724    }
725}
726
727// ─────────────────────────────────────────────────────────────────────────────
728// Tests
729// ─────────────────────────────────────────────────────────────────────────────
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734
735    #[test]
736    fn test_should_process_file() {
737        let extensions = vec!["ts".to_string(), "rs".to_string()];
738
739        assert!(should_process_file(Path::new("foo.ts"), &extensions));
740        assert!(should_process_file(Path::new("bar.rs"), &extensions));
741        assert!(!should_process_file(Path::new("baz.py"), &extensions));
742        assert!(!should_process_file(Path::new("README.md"), &extensions));
743    }
744
745    #[test]
746    fn test_broadcast_message_serialization() {
747        let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
748            is_delta: true,
749            node_count: 42,
750            edge_count: 100,
751            file_count: 5,
752            changed_files: vec!["foo.ts".to_string()],
753            timestamp: 1234567890,
754            nodes: None,
755            edges: None,
756        });
757
758        let json = serde_json::to_string(&msg).unwrap();
759        assert!(json.contains("GraphUpdate"));
760        assert!(json.contains("42"));
761    }
762}