Skip to main content

arbor_server/
sync_server.rs

1//! Real-time sync server for the Arbor Visualizer.
2//!
3//! This module implements a WebSocket server that acts as the "Source of Truth"
4//! for the visualizer. It broadcasts graph updates whenever the filesystem changes,
5//! keeping the visualization in sync with the codebase.
6//!
7//! "Give Arbor a voice so the visualizer can hear the code breathe."
8
9use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::tungstenite::Message;
22use tracing::{debug, error, info, warn};
23
24// ─────────────────────────────────────────────────────────────────────────────
25// Types
26// ─────────────────────────────────────────────────────────────────────────────
27
28/// Configuration for the real-time server.
29#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31    /// Address to bind the WebSocket server.
32    pub addr: SocketAddr,
33    /// Root path to watch for file changes.
34    pub watch_path: PathBuf,
35    /// Debounce duration for file events.
36    pub debounce_ms: u64,
37    /// File extensions to watch.
38    pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42    fn default() -> Self {
43        Self {
44            addr: "127.0.0.1:8080".parse().unwrap(),
45            watch_path: PathBuf::from("."),
46            debounce_ms: 150,
47            extensions: vec![
48                "ts".into(),
49                "tsx".into(),
50                "js".into(),
51                "jsx".into(),
52                "rs".into(),
53                "py".into(),
54                "kt".into(),
55                "kts".into(),
56                "swift".into(),
57                "rb".into(),
58                "php".into(),
59                "phtml".into(),
60                "sh".into(),
61                "bash".into(),
62                "zsh".into(),
63            ],
64        }
65    }
66}
67
68/// Server messages broadcast to all connected clients.
69#[derive(Debug, Clone, serde::Serialize)]
70#[serde(tag = "type", content = "payload")]
71pub enum BroadcastMessage {
72    /// Initial handshake with server info
73    Hello(HelloPayload),
74    /// Start of a graph stream
75    GraphBegin(GraphBeginPayload),
76    /// Batch of nodes
77    NodeBatch(NodeBatchPayload),
78    /// Batch of edges
79    EdgeBatch(EdgeBatchPayload),
80    /// End of graph stream
81    GraphEnd,
82    /// Full graph snapshot or delta update (Legacy/Incremental)
83    GraphUpdate(GraphUpdatePayload),
84    /// Tell the visualizer to focus on a specific node.
85    FocusNode(FocusNodePayload),
86    /// Indexer progress status.
87    IndexerStatus(IndexerStatusPayload),
88}
89
90#[derive(Debug, Clone, serde::Serialize)]
91pub struct HelloPayload {
92    pub version: String,
93    pub node_count: usize,
94    pub edge_count: usize,
95}
96
97#[derive(Debug, Clone, serde::Serialize)]
98pub struct GraphBeginPayload {
99    pub total_nodes: usize,
100    pub total_edges: usize,
101}
102
103#[derive(Debug, Clone, serde::Serialize)]
104pub struct NodeBatchPayload {
105    pub nodes: Vec<arbor_core::CodeNode>,
106}
107
108#[derive(Debug, Clone, serde::Serialize)]
109pub struct EdgeBatchPayload {
110    pub edges: Vec<arbor_graph::GraphEdge>,
111}
112
113#[derive(Debug, Clone, serde::Serialize)]
114pub struct GraphUpdatePayload {
115    /// Whether this is a full snapshot or delta.
116    pub is_delta: bool,
117    /// Number of nodes in the graph.
118    pub node_count: usize,
119    /// Number of edges in the graph.
120    pub edge_count: usize,
121    /// Number of files indexed.
122    pub file_count: usize,
123    /// Changed files (for delta updates).
124    pub changed_files: Vec<String>,
125    /// Timestamp of the update.
126    pub timestamp: u64,
127    pub nodes: Option<Vec<arbor_core::CodeNode>>,
128    pub edges: Option<Vec<arbor_graph::GraphEdge>>,
129}
130
131#[derive(Debug, Clone, serde::Serialize)]
132pub struct FocusNodePayload {
133    /// The node ID to focus.
134    pub node_id: String,
135    /// The file path containing the node.
136    pub file: String,
137    /// Line number in the file.
138    pub line: u32,
139}
140
141#[derive(Debug, Clone, serde::Serialize)]
142pub struct IndexerStatusPayload {
143    /// Current indexing phase.
144    pub phase: String,
145    /// Files processed so far.
146    pub files_processed: usize,
147    /// Total files to process.
148    pub files_total: usize,
149    /// Current file being processed.
150    pub current_file: Option<String>,
151}
152
153/// Internal event for the file watcher.
154#[derive(Debug, Clone)]
155#[allow(dead_code)]
156enum WatcherEvent {
157    Changed(PathBuf),
158    Created(PathBuf),
159    Deleted(PathBuf),
160}
161
162// ─────────────────────────────────────────────────────────────────────────────
163// SyncServer
164// ─────────────────────────────────────────────────────────────────────────────
165
166/// High-performance real-time sync server.
167///
168/// This server:
169/// - Hosts a WebSocket server for client connections
170/// - Watches the filesystem for changes
171/// - Debounces file events to prevent thrashing
172/// - Re-parses changed files and updates the graph
173/// - Broadcasts updates to all connected clients
174pub struct SyncServer {
175    config: SyncServerConfig,
176    graph: SharedGraph,
177    broadcast_tx: broadcast::Sender<BroadcastMessage>,
178}
179
180/// A cloneable handle to trigger spotlight events from external components (like MCP).
181#[derive(Clone)]
182pub struct SyncServerHandle {
183    broadcast_tx: broadcast::Sender<BroadcastMessage>,
184    graph: SharedGraph,
185}
186
187impl SyncServerHandle {
188    /// Triggers a spotlight on a specific node.
189    pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
190        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
191            node_id: node_id.to_string(),
192            file: file.to_string(),
193            line,
194        });
195        let _ = self.broadcast_tx.send(msg);
196    }
197
198    /// Returns the shared graph for context lookups.
199    pub fn graph(&self) -> SharedGraph {
200        self.graph.clone()
201    }
202}
203
204impl SyncServer {
205    /// Creates a new sync server.
206    pub fn new(config: SyncServerConfig) -> Self {
207        let (broadcast_tx, _) = broadcast::channel(256);
208
209        Self {
210            config,
211            graph: Arc::new(RwLock::new(ArborGraph::new())),
212            broadcast_tx,
213        }
214    }
215
216    /// Creates a sync server with an existing graph.
217    pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
218        let (broadcast_tx, _) = broadcast::channel(256);
219
220        Self {
221            config,
222            graph: Arc::new(RwLock::new(graph)),
223            broadcast_tx,
224        }
225    }
226
227    /// Creates a sync server with a shared graph.
228    pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
229        let (broadcast_tx, _) = broadcast::channel(256);
230
231        Self {
232            config,
233            graph,
234            broadcast_tx,
235        }
236    }
237
238    /// Returns a handle to the shared graph.
239    pub fn graph(&self) -> SharedGraph {
240        self.graph.clone()
241    }
242
243    /// Returns a broadcast receiver for server messages.
244    pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
245        self.broadcast_tx.subscribe()
246    }
247
248    /// Returns a cloneable handle for triggering spotlight events.
249    pub fn handle(&self) -> SyncServerHandle {
250        SyncServerHandle {
251            broadcast_tx: self.broadcast_tx.clone(),
252            graph: self.graph.clone(),
253        }
254    }
255
256    /// Runs the server with file watching enabled.
257    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
258        info!("╔═══════════════════════════════════════════════════════════╗");
259        info!("║          ARBOR SYNC SERVER - THE PULSE OF CODE            ║");
260        info!("╚═══════════════════════════════════════════════════════════╝");
261
262        // Channel for watcher events
263        let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
264
265        // Start the file watcher
266        let watch_path = self.config.watch_path.clone();
267        let extensions = self.config.extensions.clone();
268        let debounce_ms = self.config.debounce_ms;
269
270        tokio::spawn(async move {
271            if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
272            {
273                error!("File watcher error: {}", e);
274            }
275        });
276
277        // Start the indexer background task
278        let graph = self.graph.clone();
279        let broadcast_tx = self.broadcast_tx.clone();
280        let watch_path = self.config.watch_path.clone();
281
282        tokio::spawn(async move {
283            run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
284        });
285
286        // Start accepting WebSocket connections
287        self.run_websocket_server().await
288    }
289
290    /// Runs just the WebSocket server (no file watching).
291    async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
292        let listener = TcpListener::bind(&self.config.addr).await?;
293        info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
294        info!("👁️  Watching: {}", self.config.watch_path.display());
295        info!("⏱️  Debounce: {}ms", self.config.debounce_ms);
296
297        loop {
298            match listener.accept().await {
299                Ok((stream, addr)) => {
300                    info!("🔌 New connection from {}", addr);
301                    let graph = self.graph.clone();
302                    let broadcast_rx = self.broadcast_tx.subscribe();
303
304                    tokio::spawn(async move {
305                        if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
306                            warn!("Connection error from {}: {}", addr, e);
307                        }
308                    });
309                }
310                Err(e) => {
311                    error!("Accept error: {}", e);
312                }
313            }
314        }
315    }
316
317    /// Broadcasts a focus command to all clients.
318    pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
319        let msg = BroadcastMessage::FocusNode(FocusNodePayload {
320            node_id: node_id.to_string(),
321            file: file.to_string(),
322            line,
323        });
324
325        let _ = self.broadcast_tx.send(msg);
326    }
327
328    /// Broadcasts an indexer status update.
329    pub fn update_status(
330        &self,
331        phase: &str,
332        processed: usize,
333        total: usize,
334        current: Option<&str>,
335    ) {
336        let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
337            phase: phase.to_string(),
338            files_processed: processed,
339            files_total: total,
340            current_file: current.map(|s| s.to_string()),
341        });
342
343        let _ = self.broadcast_tx.send(msg);
344    }
345}
346
347// ─────────────────────────────────────────────────────────────────────────────
348// Client Connection Handler
349// ─────────────────────────────────────────────────────────────────────────────
350
351/// Handles a single WebSocket client connection.
352async fn handle_client(
353    stream: TcpStream,
354    addr: SocketAddr,
355    graph: SharedGraph,
356    mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
357) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
358    use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
359
360    let config = WebSocketConfig {
361        max_message_size: Some(64 * 1024 * 1024), // 64 MB
362        max_frame_size: Some(64 * 1024 * 1024),   // 64 MB
363        accept_unmasked_frames: false,
364        ..Default::default()
365    };
366
367    let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
368    let (mut write, mut read) = ws_stream.split();
369
370    info!("✅ WebSocket handshake complete with {}", addr);
371
372    // 1. Send Hello (Metadata)
373    let (node_count, edge_count, nodes, edges) = {
374        let g = graph.read().await;
375        let mut nodes: Vec<_> = g.nodes().cloned().collect();
376        let edges_raw = g.export_edges();
377        // Sort for deterministic output (run twice = identical)
378        nodes.sort_by(|a, b| a.id.cmp(&b.id));
379        let mut edges = edges_raw;
380        edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
381        (g.node_count(), g.edge_count(), nodes, edges)
382    };
383
384    let hello = BroadcastMessage::Hello(HelloPayload {
385        version: "1.1.1".to_string(),
386        node_count,
387        edge_count,
388    });
389
390    let json = serde_json::to_string(&hello)?;
391    write.send(Message::Text(json)).await?;
392    info!(
393        "👋 Sent Hello ({} nodes, {} edges) to {}",
394        node_count, edge_count, addr
395    );
396
397    // 2. Wait for Client Ready
398    info!("⏳ Waiting for client {} to be ready...", addr);
399    let mut ready = false;
400    while let Some(msg) = read.next().await {
401        match msg {
402            Ok(Message::Text(text)) => {
403                // Simple parsing for "ready_for_graph"
404                if text.contains("ready_for_graph") {
405                    ready = true;
406                    info!("✅ Client {} is ready for graph", addr);
407                    break;
408                }
409                debug!("Running pre-ready protocol with {}: {}", addr, text);
410            }
411            Ok(Message::Ping(data)) => {
412                write.send(Message::Pong(data)).await?;
413            }
414            Ok(Message::Close(_)) => return Ok(()),
415            Err(e) => return Err(e.into()),
416            _ => {}
417        }
418    }
419
420    if !ready {
421        warn!("Client {} disconnected before sending ready signal", addr);
422        return Ok(());
423    }
424
425    // 3. Stream Graph (Chunked)
426    let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
427        total_nodes: node_count,
428        total_edges: edge_count,
429    });
430    write
431        .send(Message::Text(serde_json::to_string(&begin)?))
432        .await?;
433
434    // Stream Nodes
435    for chunk in nodes.chunks(50) {
436        let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
437            nodes: chunk.to_vec(),
438        });
439        write
440            .send(Message::Text(serde_json::to_string(&batch)?))
441            .await?;
442    }
443    info!("📤 Streamed {} nodes to {}", node_count, addr);
444
445    // Stream Edges
446    for chunk in edges.chunks(100) {
447        let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
448            edges: chunk.to_vec(),
449        });
450        write
451            .send(Message::Text(serde_json::to_string(&batch)?))
452            .await?;
453    }
454    info!("📤 Streamed {} edges to {}", edge_count, addr);
455
456    // End Stream
457    write
458        .send(Message::Text(serde_json::to_string(
459            &BroadcastMessage::GraphEnd,
460        )?))
461        .await?;
462    info!("🏁 Graph stream complete for {}", addr);
463
464    // Two-way message handling
465    loop {
466        tokio::select! {
467            // Handle incoming messages from client
468            msg = read.next() => {
469                match msg {
470                    Some(Ok(Message::Text(text))) => {
471                        debug!("📥 Received from {}: {}", addr, text);
472                        // Process client requests here (JSON-RPC)
473                        // For now, just echo
474                    }
475                    Some(Ok(Message::Ping(data))) => {
476                        write.send(Message::Pong(data)).await?;
477                    }
478                    Some(Ok(Message::Close(_))) => {
479                        info!("👋 Client {} disconnected gracefully", addr);
480                        break;
481                    }
482                    Some(Err(e)) => {
483                        warn!("⚠️  Error from {}: {}", addr, e);
484                        break;
485                    }
486                    None => break,
487                    _ => {}
488                }
489            }
490
491            // Forward broadcast messages to client
492            msg = broadcast_rx.recv() => {
493                match msg {
494                    Ok(broadcast) => {
495                        let json = serde_json::to_string(&broadcast)?;
496                        if write.send(Message::Text(json)).await.is_err() {
497                            break;
498                        }
499                    }
500                    Err(broadcast::error::RecvError::Lagged(n)) => {
501                        warn!("Client {} lagged by {} messages", addr, n);
502                    }
503                    Err(broadcast::error::RecvError::Closed) => {
504                        break;
505                    }
506                }
507            }
508        }
509    }
510
511    info!("🔌 Connection closed: {}", addr);
512    Ok(())
513}
514
515// ─────────────────────────────────────────────────────────────────────────────
516// File Watcher with Debouncing
517// ─────────────────────────────────────────────────────────────────────────────
518
519/// Runs the file watcher with debouncing.
520async fn run_file_watcher(
521    watch_path: PathBuf,
522    extensions: Vec<String>,
523    debounce_ms: u64,
524    tx: mpsc::Sender<WatcherEvent>,
525) -> notify::Result<()> {
526    let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
527
528    // Create watcher in sync context
529    let mut watcher = RecommendedWatcher::new(
530        move |res| {
531            let _ = notify_tx.blocking_send(res);
532        },
533        Config::default(),
534    )?;
535
536    watcher.watch(&watch_path, RecursiveMode::Recursive)?;
537    info!("👁️  File watcher started for {}", watch_path.display());
538
539    // Debounce state
540    let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
541    let debounce_dur = Duration::from_millis(debounce_ms);
542
543    loop {
544        // Process pending debounced events
545        let now = Instant::now();
546        let mut ready: Vec<PathBuf> = Vec::new();
547
548        for (path, time) in pending.iter() {
549            if now.duration_since(*time) >= debounce_dur {
550                ready.push(path.clone());
551            }
552        }
553
554        for path in ready {
555            pending.remove(&path);
556            if should_process_file(&path, &extensions) {
557                let event = if path.exists() {
558                    WatcherEvent::Changed(path)
559                } else {
560                    WatcherEvent::Deleted(path)
561                };
562                let _ = tx.send(event).await;
563            }
564        }
565
566        // Wait for new events with timeout
567        match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
568            Ok(Some(Ok(event))) => {
569                for path in event.paths {
570                    if should_process_file(&path, &extensions) {
571                        pending.insert(path, Instant::now());
572                    }
573                }
574            }
575            Ok(Some(Err(e))) => {
576                warn!("Watch error: {}", e);
577            }
578            Ok(None) => break, // Channel closed
579            Err(_) => {}       // Timeout, continue
580        }
581    }
582
583    Ok(())
584}
585
586/// Checks if a file should be processed based on extension.
587fn should_process_file(path: &Path, extensions: &[String]) -> bool {
588    path.extension()
589        .and_then(|ext| ext.to_str())
590        .map(|ext| extensions.iter().any(|e| e == ext))
591        .unwrap_or(false)
592}
593
594// ─────────────────────────────────────────────────────────────────────────────
595// Background Indexer
596// ─────────────────────────────────────────────────────────────────────────────
597
598/// Runs the background indexer that processes file changes.
599async fn run_background_indexer(
600    mut rx: mpsc::Receiver<WatcherEvent>,
601    graph: SharedGraph,
602    broadcast_tx: broadcast::Sender<BroadcastMessage>,
603    _root_path: PathBuf,
604) {
605    let mut parser = ArborParser::new().expect("Failed to initialize parser");
606
607    info!("🔧 Background indexer started");
608
609    while let Some(event) = rx.recv().await {
610        let start = Instant::now();
611
612        match event {
613            WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
614                let file_name = path
615                    .file_name()
616                    .and_then(|n| n.to_str())
617                    .unwrap_or("unknown");
618
619                info!("📝 Re-indexing: {}", file_name);
620
621                match parser.parse_file(&path) {
622                    Ok(result) => {
623                        let mut g = graph.write().await;
624
625                        // Remove old nodes from this file
626                        g.remove_file(&result.file_path);
627
628                        // Add new nodes
629                        let mut node_ids = HashMap::new();
630                        for symbol in &result.symbols {
631                            let id = g.add_node(symbol.clone());
632                            node_ids.insert(symbol.id.clone(), id);
633                        }
634
635                        // Add edges for relations
636                        for relation in &result.relations {
637                            if let Some(&from_id) = node_ids.get(&relation.from_id) {
638                                // Try to find the target by name
639                                let targets = g.find_by_name(&relation.to_name);
640                                if let Some(target) = targets.first() {
641                                    if let Some(to_id) = g.get_index(&target.id) {
642                                        let edge_kind = match relation.kind {
643                                            arbor_core::RelationType::Calls => EdgeKind::Calls,
644                                            arbor_core::RelationType::Imports => EdgeKind::Imports,
645                                            arbor_core::RelationType::Extends => EdgeKind::Extends,
646                                            arbor_core::RelationType::Implements => {
647                                                EdgeKind::Implements
648                                            }
649                                        };
650                                        g.add_edge(from_id, to_id, Edge::new(edge_kind));
651                                    }
652                                }
653                            }
654                        }
655
656                        let elapsed = start.elapsed();
657                        info!(
658                            "✅ Indexed {} in {:?} ({} symbols, {} relations)",
659                            file_name,
660                            elapsed,
661                            result.symbols.len(),
662                            result.relations.len()
663                        );
664
665                        // Broadcast update
666                        let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
667                            is_delta: true,
668                            node_count: g.node_count(),
669                            edge_count: g.edge_count(),
670                            file_count: g.stats().files,
671                            changed_files: vec![result.file_path],
672                            timestamp: std::time::SystemTime::now()
673                                .duration_since(std::time::UNIX_EPOCH)
674                                .unwrap()
675                                .as_secs(),
676                            nodes: Some(g.nodes().cloned().collect()),
677                            edges: Some(g.export_edges()),
678                        });
679
680                        let _ = broadcast_tx.send(update);
681                    }
682                    Err(e) => {
683                        warn!("⚠️  Parse error for {}: {}", file_name, e);
684                    }
685                }
686            }
687
688            WatcherEvent::Deleted(path) => {
689                let file_str = path.to_string_lossy().to_string();
690                info!("🗑️  File deleted: {}", path.display());
691
692                let mut g = graph.write().await;
693                g.remove_file(&file_str);
694
695                let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
696                    is_delta: true,
697                    node_count: g.node_count(),
698                    edge_count: g.edge_count(),
699                    file_count: g.stats().files,
700                    changed_files: vec![file_str],
701                    timestamp: std::time::SystemTime::now()
702                        .duration_since(std::time::UNIX_EPOCH)
703                        .unwrap()
704                        .as_secs(),
705                    nodes: Some(g.nodes().cloned().collect()),
706                    edges: Some(g.export_edges()),
707                });
708
709                let _ = broadcast_tx.send(update);
710            }
711        }
712    }
713}
714
715// ─────────────────────────────────────────────────────────────────────────────
716// Tests
717// ─────────────────────────────────────────────────────────────────────────────
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn test_should_process_file() {
725        let extensions = vec!["ts".to_string(), "rs".to_string()];
726
727        assert!(should_process_file(Path::new("foo.ts"), &extensions));
728        assert!(should_process_file(Path::new("bar.rs"), &extensions));
729        assert!(!should_process_file(Path::new("baz.py"), &extensions));
730        assert!(!should_process_file(Path::new("README.md"), &extensions));
731    }
732
733    #[test]
734    fn test_broadcast_message_serialization() {
735        let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
736            is_delta: true,
737            node_count: 42,
738            edge_count: 100,
739            file_count: 5,
740            changed_files: vec!["foo.ts".to_string()],
741            timestamp: 1234567890,
742            nodes: None,
743            edges: None,
744        });
745
746        let json = serde_json::to_string(&msg).unwrap();
747        assert!(json.contains("GraphUpdate"));
748        assert!(json.contains("42"));
749    }
750}