1use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{compute_centrality, ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::tungstenite::Message;
22use tracing::{debug, error, info, warn};
23
24#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31 pub addr: SocketAddr,
33 pub watch_path: PathBuf,
35 pub debounce_ms: u64,
37 pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42 fn default() -> Self {
43 Self {
44 addr: SocketAddr::from(([127, 0, 0, 1], 8080)),
45 watch_path: PathBuf::from("."),
46 debounce_ms: 150,
47 extensions: vec![
48 "ts".into(),
49 "tsx".into(),
50 "mts".into(),
51 "cts".into(),
52 "js".into(),
53 "jsx".into(),
54 "mjs".into(),
55 "cjs".into(),
56 "rs".into(),
57 "py".into(),
58 "pyi".into(),
59 "go".into(),
60 "java".into(),
61 "c".into(),
62 "h".into(),
63 "cpp".into(),
64 "hpp".into(),
65 "cc".into(),
66 "hh".into(),
67 "cxx".into(),
68 "hxx".into(),
69 "cs".into(),
70 "dart".into(),
71 "kt".into(),
72 "kts".into(),
73 "swift".into(),
74 "rb".into(),
75 "php".into(),
76 "phtml".into(),
77 "sh".into(),
78 "bash".into(),
79 "zsh".into(),
80 ],
81 }
82 }
83}
84
85#[derive(Debug, Clone, serde::Serialize)]
87#[serde(tag = "type", content = "payload")]
88pub enum BroadcastMessage {
89 Hello(HelloPayload),
91 GraphBegin(GraphBeginPayload),
93 NodeBatch(NodeBatchPayload),
95 EdgeBatch(EdgeBatchPayload),
97 GraphEnd,
99 GraphUpdate(GraphUpdatePayload),
101 FocusNode(FocusNodePayload),
103 IndexerStatus(IndexerStatusPayload),
105}
106
107#[derive(Debug, Clone, serde::Serialize)]
108pub struct HelloPayload {
109 pub version: String,
110 pub node_count: usize,
111 pub edge_count: usize,
112}
113
114#[derive(Debug, Clone, serde::Serialize)]
115pub struct GraphBeginPayload {
116 pub total_nodes: usize,
117 pub total_edges: usize,
118}
119
120#[derive(Debug, Clone, serde::Serialize)]
121pub struct NodeBatchPayload {
122 pub nodes: Vec<arbor_core::CodeNode>,
123}
124
125#[derive(Debug, Clone, serde::Serialize)]
126pub struct EdgeBatchPayload {
127 pub edges: Vec<arbor_graph::GraphEdge>,
128}
129
130#[derive(Debug, Clone, serde::Serialize)]
131pub struct GraphUpdatePayload {
132 pub is_delta: bool,
134 pub node_count: usize,
136 pub edge_count: usize,
138 pub file_count: usize,
140 pub changed_files: Vec<String>,
142 pub timestamp: u64,
144 pub nodes: Option<Vec<arbor_core::CodeNode>>,
145 pub edges: Option<Vec<arbor_graph::GraphEdge>>,
146}
147
148#[derive(Debug, Clone, serde::Serialize)]
149pub struct FocusNodePayload {
150 pub node_id: String,
152 pub file: String,
154 pub line: u32,
156}
157
158#[derive(Debug, Clone, serde::Serialize)]
159pub struct IndexerStatusPayload {
160 pub phase: String,
162 pub files_processed: usize,
164 pub files_total: usize,
166 pub current_file: Option<String>,
168}
169
170#[derive(Debug, Clone)]
172#[allow(dead_code)]
173enum WatcherEvent {
174 Changed(PathBuf),
175 Created(PathBuf),
176 Deleted(PathBuf),
177}
178
179pub struct SyncServer {
192 config: SyncServerConfig,
193 graph: SharedGraph,
194 broadcast_tx: broadcast::Sender<BroadcastMessage>,
195}
196
197#[derive(Clone)]
199pub struct SyncServerHandle {
200 broadcast_tx: broadcast::Sender<BroadcastMessage>,
201 graph: SharedGraph,
202}
203
204impl SyncServerHandle {
205 pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
207 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
208 node_id: node_id.to_string(),
209 file: file.to_string(),
210 line,
211 });
212 let _ = self.broadcast_tx.send(msg);
213 }
214
215 pub fn graph(&self) -> SharedGraph {
217 self.graph.clone()
218 }
219}
220
221impl SyncServer {
222 pub fn new(config: SyncServerConfig) -> Self {
224 let (broadcast_tx, _) = broadcast::channel(256);
225
226 Self {
227 config,
228 graph: Arc::new(RwLock::new(ArborGraph::new())),
229 broadcast_tx,
230 }
231 }
232
233 pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
235 let (broadcast_tx, _) = broadcast::channel(256);
236
237 Self {
238 config,
239 graph: Arc::new(RwLock::new(graph)),
240 broadcast_tx,
241 }
242 }
243
244 pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
246 let (broadcast_tx, _) = broadcast::channel(256);
247
248 Self {
249 config,
250 graph,
251 broadcast_tx,
252 }
253 }
254
255 pub fn graph(&self) -> SharedGraph {
257 self.graph.clone()
258 }
259
260 pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
262 self.broadcast_tx.subscribe()
263 }
264
265 pub fn handle(&self) -> SyncServerHandle {
267 SyncServerHandle {
268 broadcast_tx: self.broadcast_tx.clone(),
269 graph: self.graph.clone(),
270 }
271 }
272
273 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
275 info!("╔═══════════════════════════════════════════════════════════╗");
276 info!("║ ARBOR SYNC SERVER - THE PULSE OF CODE ║");
277 info!("╚═══════════════════════════════════════════════════════════╝");
278
279 let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
281
282 let watch_path = self.config.watch_path.clone();
284 let extensions = self.config.extensions.clone();
285 let debounce_ms = self.config.debounce_ms;
286
287 tokio::spawn(async move {
288 if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
289 {
290 error!("File watcher error: {}", e);
291 }
292 });
293
294 let graph = self.graph.clone();
296 let broadcast_tx = self.broadcast_tx.clone();
297 let watch_path = self.config.watch_path.clone();
298
299 tokio::spawn(async move {
300 run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
301 });
302
303 self.run_websocket_server().await
305 }
306
307 async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
309 let listener = TcpListener::bind(&self.config.addr).await?;
310 info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
311 info!("👁️ Watching: {}", self.config.watch_path.display());
312 info!("⏱️ Debounce: {}ms", self.config.debounce_ms);
313
314 loop {
315 match listener.accept().await {
316 Ok((stream, addr)) => {
317 info!("🔌 New connection from {}", addr);
318 let graph = self.graph.clone();
319 let broadcast_rx = self.broadcast_tx.subscribe();
320
321 tokio::spawn(async move {
322 if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
323 warn!("Connection error from {}: {}", addr, e);
324 }
325 });
326 }
327 Err(e) => {
328 error!("Accept error: {}", e);
329 }
330 }
331 }
332 }
333
334 pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
336 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
337 node_id: node_id.to_string(),
338 file: file.to_string(),
339 line,
340 });
341
342 let _ = self.broadcast_tx.send(msg);
343 }
344
345 pub fn update_status(
347 &self,
348 phase: &str,
349 processed: usize,
350 total: usize,
351 current: Option<&str>,
352 ) {
353 let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
354 phase: phase.to_string(),
355 files_processed: processed,
356 files_total: total,
357 current_file: current.map(|s| s.to_string()),
358 });
359
360 let _ = self.broadcast_tx.send(msg);
361 }
362}
363
364async fn handle_client(
370 stream: TcpStream,
371 addr: SocketAddr,
372 graph: SharedGraph,
373 mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
374) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
375 use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
376
377 let config = WebSocketConfig {
378 max_message_size: Some(64 * 1024 * 1024), max_frame_size: Some(64 * 1024 * 1024), accept_unmasked_frames: false,
381 ..Default::default()
382 };
383
384 let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
385 let (mut write, mut read) = ws_stream.split();
386
387 info!("✅ WebSocket handshake complete with {}", addr);
388
389 let (node_count, edge_count, nodes, edges) = {
391 let g = graph.read().await;
392 let mut nodes: Vec<_> = g.nodes().cloned().collect();
393 let edges_raw = g.export_edges();
394 nodes.sort_by(|a, b| a.id.cmp(&b.id));
396 let mut edges = edges_raw;
397 edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
398 (g.node_count(), g.edge_count(), nodes, edges)
399 };
400
401 let hello = BroadcastMessage::Hello(HelloPayload {
402 version: env!("CARGO_PKG_VERSION").to_string(),
403 node_count,
404 edge_count,
405 });
406
407 let json = serde_json::to_string(&hello)?;
408 write.send(Message::Text(json)).await?;
409 info!(
410 "👋 Sent Hello ({} nodes, {} edges) to {}",
411 node_count, edge_count, addr
412 );
413
414 info!("⏳ Waiting for client {} to be ready...", addr);
416 let mut ready = false;
417 while let Some(msg) = read.next().await {
418 match msg {
419 Ok(Message::Text(text)) => {
420 if text.contains("ready_for_graph") {
422 ready = true;
423 info!("✅ Client {} is ready for graph", addr);
424 break;
425 }
426 debug!("Running pre-ready protocol with {}: {}", addr, text);
427 }
428 Ok(Message::Ping(data)) => {
429 write.send(Message::Pong(data)).await?;
430 }
431 Ok(Message::Close(_)) => return Ok(()),
432 Err(e) => return Err(e.into()),
433 _ => {}
434 }
435 }
436
437 if !ready {
438 warn!("Client {} disconnected before sending ready signal", addr);
439 return Ok(());
440 }
441
442 let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
444 total_nodes: node_count,
445 total_edges: edge_count,
446 });
447 write
448 .send(Message::Text(serde_json::to_string(&begin)?))
449 .await?;
450
451 for chunk in nodes.chunks(50) {
453 let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
454 nodes: chunk.to_vec(),
455 });
456 write
457 .send(Message::Text(serde_json::to_string(&batch)?))
458 .await?;
459 }
460 info!("📤 Streamed {} nodes to {}", node_count, addr);
461
462 for chunk in edges.chunks(100) {
464 let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
465 edges: chunk.to_vec(),
466 });
467 write
468 .send(Message::Text(serde_json::to_string(&batch)?))
469 .await?;
470 }
471 info!("📤 Streamed {} edges to {}", edge_count, addr);
472
473 write
475 .send(Message::Text(serde_json::to_string(
476 &BroadcastMessage::GraphEnd,
477 )?))
478 .await?;
479 info!("🏁 Graph stream complete for {}", addr);
480
481 loop {
483 tokio::select! {
484 msg = read.next() => {
486 match msg {
487 Some(Ok(Message::Text(text))) => {
488 debug!("📥 Received from {}: {}", addr, text);
489 }
492 Some(Ok(Message::Ping(data))) => {
493 write.send(Message::Pong(data)).await?;
494 }
495 Some(Ok(Message::Close(_))) => {
496 info!("👋 Client {} disconnected gracefully", addr);
497 break;
498 }
499 Some(Err(e)) => {
500 warn!("⚠️ Error from {}: {}", addr, e);
501 break;
502 }
503 None => break,
504 _ => {}
505 }
506 }
507
508 msg = broadcast_rx.recv() => {
510 match msg {
511 Ok(broadcast) => {
512 let json = serde_json::to_string(&broadcast)?;
513 if write.send(Message::Text(json)).await.is_err() {
514 break;
515 }
516 }
517 Err(broadcast::error::RecvError::Lagged(n)) => {
518 warn!("Client {} lagged by {} messages", addr, n);
519 }
520 Err(broadcast::error::RecvError::Closed) => {
521 break;
522 }
523 }
524 }
525 }
526 }
527
528 info!("🔌 Connection closed: {}", addr);
529 Ok(())
530}
531
532async fn run_file_watcher(
538 watch_path: PathBuf,
539 extensions: Vec<String>,
540 debounce_ms: u64,
541 tx: mpsc::Sender<WatcherEvent>,
542) -> notify::Result<()> {
543 let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
544
545 let mut watcher = RecommendedWatcher::new(
547 move |res| {
548 let _ = notify_tx.blocking_send(res);
549 },
550 Config::default(),
551 )?;
552
553 watcher.watch(&watch_path, RecursiveMode::Recursive)?;
554 info!("👁️ File watcher started for {}", watch_path.display());
555
556 let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
558 let debounce_dur = Duration::from_millis(debounce_ms);
559
560 loop {
561 let now = Instant::now();
563 let mut ready: Vec<PathBuf> = Vec::new();
564
565 for (path, time) in pending.iter() {
566 if now.duration_since(*time) >= debounce_dur {
567 ready.push(path.clone());
568 }
569 }
570
571 for path in ready {
572 pending.remove(&path);
573 if should_process_file(&path, &extensions) {
574 let event = if path.exists() {
575 WatcherEvent::Changed(path)
576 } else {
577 WatcherEvent::Deleted(path)
578 };
579 let _ = tx.send(event).await;
580 }
581 }
582
583 match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
585 Ok(Some(Ok(event))) => {
586 for path in event.paths {
587 if should_process_file(&path, &extensions) {
588 pending.insert(path, Instant::now());
589 }
590 }
591 }
592 Ok(Some(Err(e))) => {
593 warn!("Watch error: {}", e);
594 }
595 Ok(None) => break, Err(_) => {} }
598 }
599
600 Ok(())
601}
602
603fn should_process_file(path: &Path, extensions: &[String]) -> bool {
605 path.extension()
606 .and_then(|ext| ext.to_str())
607 .map(|ext| extensions.iter().any(|e| e == ext))
608 .unwrap_or(false)
609}
610
611async fn run_background_indexer(
617 mut rx: mpsc::Receiver<WatcherEvent>,
618 graph: SharedGraph,
619 broadcast_tx: broadcast::Sender<BroadcastMessage>,
620 _root_path: PathBuf,
621) {
622 let mut parser = match ArborParser::new() {
623 Ok(parser) => Some(parser),
624 Err(error) => {
625 warn!(
626 "Failed to initialize parser for background indexer; will retry lazily per event: {}",
627 error
628 );
629 None
630 }
631 };
632
633 info!("🔧 Background indexer started");
634
635 while let Some(event) = rx.recv().await {
636 let start = Instant::now();
637
638 match event {
639 WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
640 let file_name = path
641 .file_name()
642 .and_then(|n| n.to_str())
643 .unwrap_or("unknown");
644
645 info!("📝 Re-indexing: {}", file_name);
646
647 if parser.is_none() {
648 parser = match ArborParser::new() {
649 Ok(parser) => Some(parser),
650 Err(error) => {
651 warn!(
652 "Skipping '{}' due to parser init failure: {}",
653 file_name, error
654 );
655 None
656 }
657 };
658 }
659
660 let Some(parser) = parser.as_mut() else {
661 continue;
662 };
663
664 match parser.parse_file(&path) {
665 Ok(result) => {
666 let mut g = graph.write().await;
667
668 g.remove_file(&result.file_path);
670
671 let mut node_ids = HashMap::new();
673 for symbol in &result.symbols {
674 let id = g.add_node(symbol.clone());
675 node_ids.insert(symbol.id.clone(), id);
676 }
677
678 for relation in &result.relations {
680 if let Some(&from_id) = node_ids.get(&relation.from_id) {
681 let targets = g.find_by_name(&relation.to_name);
683 if let Some(target) = targets.first() {
684 if let Some(to_id) = g.get_index(&target.id) {
685 let edge_kind = match relation.kind {
686 arbor_core::RelationType::Calls => EdgeKind::Calls,
687 arbor_core::RelationType::Imports => EdgeKind::Imports,
688 arbor_core::RelationType::Extends => EdgeKind::Extends,
689 arbor_core::RelationType::Implements => {
690 EdgeKind::Implements
691 }
692 };
693 g.add_edge(from_id, to_id, Edge::new(edge_kind));
694 }
695 }
696 }
697 }
698
699 let scores = compute_centrality(&g, 20, 0.85);
702 g.set_centrality(scores.into_map());
703
704 let elapsed = start.elapsed();
705 info!(
706 "✅ Indexed {} in {:?} ({} symbols, {} relations)",
707 file_name,
708 elapsed,
709 result.symbols.len(),
710 result.relations.len()
711 );
712
713 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
715 is_delta: true,
716 node_count: g.node_count(),
717 edge_count: g.edge_count(),
718 file_count: g.stats().files,
719 changed_files: vec![result.file_path],
720 timestamp: std::time::SystemTime::now()
721 .duration_since(std::time::UNIX_EPOCH)
722 .map_or(0, |d| d.as_secs()),
723 nodes: Some(g.nodes().cloned().collect()),
724 edges: Some(g.export_edges()),
725 });
726
727 let _ = broadcast_tx.send(update);
728 }
729 Err(e) => {
730 warn!("⚠️ Parse error for {}: {}", file_name, e);
731 }
732 }
733 }
734
735 WatcherEvent::Deleted(path) => {
736 let file_str = path.to_string_lossy().to_string();
737 info!("🗑️ File deleted: {}", path.display());
738
739 let mut g = graph.write().await;
740 g.remove_file(&file_str);
741
742 let scores = compute_centrality(&g, 20, 0.85);
744 g.set_centrality(scores.into_map());
745
746 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
747 is_delta: true,
748 node_count: g.node_count(),
749 edge_count: g.edge_count(),
750 file_count: g.stats().files,
751 changed_files: vec![file_str],
752 timestamp: std::time::SystemTime::now()
753 .duration_since(std::time::UNIX_EPOCH)
754 .map_or(0, |d| d.as_secs()),
755 nodes: Some(g.nodes().cloned().collect()),
756 edges: Some(g.export_edges()),
757 });
758
759 let _ = broadcast_tx.send(update);
760 }
761 }
762 }
763}
764
765#[cfg(test)]
770mod tests {
771 use super::*;
772
773 #[test]
774 fn test_should_process_file() {
775 let extensions = vec!["ts".to_string(), "rs".to_string()];
776
777 assert!(should_process_file(Path::new("foo.ts"), &extensions));
778 assert!(should_process_file(Path::new("bar.rs"), &extensions));
779 assert!(!should_process_file(Path::new("baz.py"), &extensions));
780 assert!(!should_process_file(Path::new("README.md"), &extensions));
781 }
782
783 #[test]
784 fn test_broadcast_message_serialization() {
785 let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
786 is_delta: true,
787 node_count: 42,
788 edge_count: 100,
789 file_count: 5,
790 changed_files: vec!["foo.ts".to_string()],
791 timestamp: 1234567890,
792 nodes: None,
793 edges: None,
794 });
795
796 let json = serde_json::to_string(&msg).unwrap();
797 assert!(json.contains("GraphUpdate"));
798 assert!(json.contains("42"));
799 }
800
801 #[test]
802 fn test_sync_config_default_has_all_extensions() {
803 let config = SyncServerConfig::default();
804 let exts = &config.extensions;
805
806 let required: std::collections::HashSet<String> =
808 arbor_core::languages::supported_extensions()
809 .iter()
810 .map(|ext| ext.to_string())
811 .collect();
812
813 let actual: std::collections::HashSet<String> = exts.iter().cloned().collect();
814
815 for ext in &required {
816 assert!(
817 actual.contains(ext),
818 "SyncServerConfig is missing extension: {}",
819 ext
820 );
821 }
822 }
823
824 #[test]
825 fn test_focus_node_serialization() {
826 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
827 node_id: "abc123".to_string(),
828 file: "main.rs".to_string(),
829 line: 42,
830 });
831
832 let json = serde_json::to_string(&msg).unwrap();
833 assert!(json.contains("FocusNode"));
834 assert!(json.contains("abc123"));
835 assert!(json.contains("main.rs"));
836 }
837
838 #[test]
839 fn test_indexer_status_serialization() {
840 let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
841 phase: "scanning".to_string(),
842 files_processed: 10,
843 files_total: 100,
844 current_file: Some("test.rs".to_string()),
845 });
846
847 let json = serde_json::to_string(&msg).unwrap();
848 assert!(json.contains("scanning"));
849 assert!(json.contains("test.rs"));
850 }
851
852 #[test]
853 fn test_hello_payload_serialization() {
854 let msg = BroadcastMessage::Hello(HelloPayload {
855 version: "2.0.0".to_string(),
856 node_count: 100,
857 edge_count: 200,
858 });
859
860 let json = serde_json::to_string(&msg).unwrap();
861 assert!(json.contains("2.0.0"));
862 assert!(json.contains("100"));
863 }
864
865 #[test]
866 fn test_graph_end_serialization() {
867 let msg = BroadcastMessage::GraphEnd;
868 let json = serde_json::to_string(&msg).unwrap();
869 assert!(json.contains("GraphEnd"));
870 }
871}