1use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::tungstenite::Message;
22use tracing::{debug, error, info, warn};
23
24#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31 pub addr: SocketAddr,
33 pub watch_path: PathBuf,
35 pub debounce_ms: u64,
37 pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42 fn default() -> Self {
43 Self {
44 addr: "127.0.0.1:8080".parse().unwrap(),
45 watch_path: PathBuf::from("."),
46 debounce_ms: 150,
47 extensions: vec![
48 "ts".into(),
49 "tsx".into(),
50 "js".into(),
51 "jsx".into(),
52 "rs".into(),
53 "py".into(),
54 "go".into(),
55 "java".into(),
56 "c".into(),
57 "h".into(),
58 "cpp".into(),
59 "hpp".into(),
60 "cc".into(),
61 "hh".into(),
62 "cxx".into(),
63 "hxx".into(),
64 "cs".into(),
65 "dart".into(),
66 "kt".into(),
67 "kts".into(),
68 "swift".into(),
69 "rb".into(),
70 "php".into(),
71 "phtml".into(),
72 "sh".into(),
73 "bash".into(),
74 "zsh".into(),
75 ],
76 }
77 }
78}
79
80#[derive(Debug, Clone, serde::Serialize)]
82#[serde(tag = "type", content = "payload")]
83pub enum BroadcastMessage {
84 Hello(HelloPayload),
86 GraphBegin(GraphBeginPayload),
88 NodeBatch(NodeBatchPayload),
90 EdgeBatch(EdgeBatchPayload),
92 GraphEnd,
94 GraphUpdate(GraphUpdatePayload),
96 FocusNode(FocusNodePayload),
98 IndexerStatus(IndexerStatusPayload),
100}
101
102#[derive(Debug, Clone, serde::Serialize)]
103pub struct HelloPayload {
104 pub version: String,
105 pub node_count: usize,
106 pub edge_count: usize,
107}
108
109#[derive(Debug, Clone, serde::Serialize)]
110pub struct GraphBeginPayload {
111 pub total_nodes: usize,
112 pub total_edges: usize,
113}
114
115#[derive(Debug, Clone, serde::Serialize)]
116pub struct NodeBatchPayload {
117 pub nodes: Vec<arbor_core::CodeNode>,
118}
119
120#[derive(Debug, Clone, serde::Serialize)]
121pub struct EdgeBatchPayload {
122 pub edges: Vec<arbor_graph::GraphEdge>,
123}
124
125#[derive(Debug, Clone, serde::Serialize)]
126pub struct GraphUpdatePayload {
127 pub is_delta: bool,
129 pub node_count: usize,
131 pub edge_count: usize,
133 pub file_count: usize,
135 pub changed_files: Vec<String>,
137 pub timestamp: u64,
139 pub nodes: Option<Vec<arbor_core::CodeNode>>,
140 pub edges: Option<Vec<arbor_graph::GraphEdge>>,
141}
142
143#[derive(Debug, Clone, serde::Serialize)]
144pub struct FocusNodePayload {
145 pub node_id: String,
147 pub file: String,
149 pub line: u32,
151}
152
153#[derive(Debug, Clone, serde::Serialize)]
154pub struct IndexerStatusPayload {
155 pub phase: String,
157 pub files_processed: usize,
159 pub files_total: usize,
161 pub current_file: Option<String>,
163}
164
165#[derive(Debug, Clone)]
167#[allow(dead_code)]
168enum WatcherEvent {
169 Changed(PathBuf),
170 Created(PathBuf),
171 Deleted(PathBuf),
172}
173
174pub struct SyncServer {
187 config: SyncServerConfig,
188 graph: SharedGraph,
189 broadcast_tx: broadcast::Sender<BroadcastMessage>,
190}
191
192#[derive(Clone)]
194pub struct SyncServerHandle {
195 broadcast_tx: broadcast::Sender<BroadcastMessage>,
196 graph: SharedGraph,
197}
198
199impl SyncServerHandle {
200 pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
202 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
203 node_id: node_id.to_string(),
204 file: file.to_string(),
205 line,
206 });
207 let _ = self.broadcast_tx.send(msg);
208 }
209
210 pub fn graph(&self) -> SharedGraph {
212 self.graph.clone()
213 }
214}
215
216impl SyncServer {
217 pub fn new(config: SyncServerConfig) -> Self {
219 let (broadcast_tx, _) = broadcast::channel(256);
220
221 Self {
222 config,
223 graph: Arc::new(RwLock::new(ArborGraph::new())),
224 broadcast_tx,
225 }
226 }
227
228 pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
230 let (broadcast_tx, _) = broadcast::channel(256);
231
232 Self {
233 config,
234 graph: Arc::new(RwLock::new(graph)),
235 broadcast_tx,
236 }
237 }
238
239 pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
241 let (broadcast_tx, _) = broadcast::channel(256);
242
243 Self {
244 config,
245 graph,
246 broadcast_tx,
247 }
248 }
249
250 pub fn graph(&self) -> SharedGraph {
252 self.graph.clone()
253 }
254
255 pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
257 self.broadcast_tx.subscribe()
258 }
259
260 pub fn handle(&self) -> SyncServerHandle {
262 SyncServerHandle {
263 broadcast_tx: self.broadcast_tx.clone(),
264 graph: self.graph.clone(),
265 }
266 }
267
268 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
270 info!("╔═══════════════════════════════════════════════════════════╗");
271 info!("║ ARBOR SYNC SERVER - THE PULSE OF CODE ║");
272 info!("╚═══════════════════════════════════════════════════════════╝");
273
274 let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
276
277 let watch_path = self.config.watch_path.clone();
279 let extensions = self.config.extensions.clone();
280 let debounce_ms = self.config.debounce_ms;
281
282 tokio::spawn(async move {
283 if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
284 {
285 error!("File watcher error: {}", e);
286 }
287 });
288
289 let graph = self.graph.clone();
291 let broadcast_tx = self.broadcast_tx.clone();
292 let watch_path = self.config.watch_path.clone();
293
294 tokio::spawn(async move {
295 run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
296 });
297
298 self.run_websocket_server().await
300 }
301
302 async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
304 let listener = TcpListener::bind(&self.config.addr).await?;
305 info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
306 info!("👁️ Watching: {}", self.config.watch_path.display());
307 info!("⏱️ Debounce: {}ms", self.config.debounce_ms);
308
309 loop {
310 match listener.accept().await {
311 Ok((stream, addr)) => {
312 info!("🔌 New connection from {}", addr);
313 let graph = self.graph.clone();
314 let broadcast_rx = self.broadcast_tx.subscribe();
315
316 tokio::spawn(async move {
317 if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
318 warn!("Connection error from {}: {}", addr, e);
319 }
320 });
321 }
322 Err(e) => {
323 error!("Accept error: {}", e);
324 }
325 }
326 }
327 }
328
329 pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
331 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
332 node_id: node_id.to_string(),
333 file: file.to_string(),
334 line,
335 });
336
337 let _ = self.broadcast_tx.send(msg);
338 }
339
340 pub fn update_status(
342 &self,
343 phase: &str,
344 processed: usize,
345 total: usize,
346 current: Option<&str>,
347 ) {
348 let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
349 phase: phase.to_string(),
350 files_processed: processed,
351 files_total: total,
352 current_file: current.map(|s| s.to_string()),
353 });
354
355 let _ = self.broadcast_tx.send(msg);
356 }
357}
358
359async fn handle_client(
365 stream: TcpStream,
366 addr: SocketAddr,
367 graph: SharedGraph,
368 mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
369) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
370 use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
371
372 let config = WebSocketConfig {
373 max_message_size: Some(64 * 1024 * 1024), max_frame_size: Some(64 * 1024 * 1024), accept_unmasked_frames: false,
376 ..Default::default()
377 };
378
379 let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
380 let (mut write, mut read) = ws_stream.split();
381
382 info!("✅ WebSocket handshake complete with {}", addr);
383
384 let (node_count, edge_count, nodes, edges) = {
386 let g = graph.read().await;
387 let mut nodes: Vec<_> = g.nodes().cloned().collect();
388 let edges_raw = g.export_edges();
389 nodes.sort_by(|a, b| a.id.cmp(&b.id));
391 let mut edges = edges_raw;
392 edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
393 (g.node_count(), g.edge_count(), nodes, edges)
394 };
395
396 let hello = BroadcastMessage::Hello(HelloPayload {
397 version: env!("CARGO_PKG_VERSION").to_string(),
398 node_count,
399 edge_count,
400 });
401
402 let json = serde_json::to_string(&hello)?;
403 write.send(Message::Text(json)).await?;
404 info!(
405 "👋 Sent Hello ({} nodes, {} edges) to {}",
406 node_count, edge_count, addr
407 );
408
409 info!("⏳ Waiting for client {} to be ready...", addr);
411 let mut ready = false;
412 while let Some(msg) = read.next().await {
413 match msg {
414 Ok(Message::Text(text)) => {
415 if text.contains("ready_for_graph") {
417 ready = true;
418 info!("✅ Client {} is ready for graph", addr);
419 break;
420 }
421 debug!("Running pre-ready protocol with {}: {}", addr, text);
422 }
423 Ok(Message::Ping(data)) => {
424 write.send(Message::Pong(data)).await?;
425 }
426 Ok(Message::Close(_)) => return Ok(()),
427 Err(e) => return Err(e.into()),
428 _ => {}
429 }
430 }
431
432 if !ready {
433 warn!("Client {} disconnected before sending ready signal", addr);
434 return Ok(());
435 }
436
437 let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
439 total_nodes: node_count,
440 total_edges: edge_count,
441 });
442 write
443 .send(Message::Text(serde_json::to_string(&begin)?))
444 .await?;
445
446 for chunk in nodes.chunks(50) {
448 let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
449 nodes: chunk.to_vec(),
450 });
451 write
452 .send(Message::Text(serde_json::to_string(&batch)?))
453 .await?;
454 }
455 info!("📤 Streamed {} nodes to {}", node_count, addr);
456
457 for chunk in edges.chunks(100) {
459 let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
460 edges: chunk.to_vec(),
461 });
462 write
463 .send(Message::Text(serde_json::to_string(&batch)?))
464 .await?;
465 }
466 info!("📤 Streamed {} edges to {}", edge_count, addr);
467
468 write
470 .send(Message::Text(serde_json::to_string(
471 &BroadcastMessage::GraphEnd,
472 )?))
473 .await?;
474 info!("🏁 Graph stream complete for {}", addr);
475
476 loop {
478 tokio::select! {
479 msg = read.next() => {
481 match msg {
482 Some(Ok(Message::Text(text))) => {
483 debug!("📥 Received from {}: {}", addr, text);
484 }
487 Some(Ok(Message::Ping(data))) => {
488 write.send(Message::Pong(data)).await?;
489 }
490 Some(Ok(Message::Close(_))) => {
491 info!("👋 Client {} disconnected gracefully", addr);
492 break;
493 }
494 Some(Err(e)) => {
495 warn!("⚠️ Error from {}: {}", addr, e);
496 break;
497 }
498 None => break,
499 _ => {}
500 }
501 }
502
503 msg = broadcast_rx.recv() => {
505 match msg {
506 Ok(broadcast) => {
507 let json = serde_json::to_string(&broadcast)?;
508 if write.send(Message::Text(json)).await.is_err() {
509 break;
510 }
511 }
512 Err(broadcast::error::RecvError::Lagged(n)) => {
513 warn!("Client {} lagged by {} messages", addr, n);
514 }
515 Err(broadcast::error::RecvError::Closed) => {
516 break;
517 }
518 }
519 }
520 }
521 }
522
523 info!("🔌 Connection closed: {}", addr);
524 Ok(())
525}
526
527async fn run_file_watcher(
533 watch_path: PathBuf,
534 extensions: Vec<String>,
535 debounce_ms: u64,
536 tx: mpsc::Sender<WatcherEvent>,
537) -> notify::Result<()> {
538 let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
539
540 let mut watcher = RecommendedWatcher::new(
542 move |res| {
543 let _ = notify_tx.blocking_send(res);
544 },
545 Config::default(),
546 )?;
547
548 watcher.watch(&watch_path, RecursiveMode::Recursive)?;
549 info!("👁️ File watcher started for {}", watch_path.display());
550
551 let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
553 let debounce_dur = Duration::from_millis(debounce_ms);
554
555 loop {
556 let now = Instant::now();
558 let mut ready: Vec<PathBuf> = Vec::new();
559
560 for (path, time) in pending.iter() {
561 if now.duration_since(*time) >= debounce_dur {
562 ready.push(path.clone());
563 }
564 }
565
566 for path in ready {
567 pending.remove(&path);
568 if should_process_file(&path, &extensions) {
569 let event = if path.exists() {
570 WatcherEvent::Changed(path)
571 } else {
572 WatcherEvent::Deleted(path)
573 };
574 let _ = tx.send(event).await;
575 }
576 }
577
578 match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
580 Ok(Some(Ok(event))) => {
581 for path in event.paths {
582 if should_process_file(&path, &extensions) {
583 pending.insert(path, Instant::now());
584 }
585 }
586 }
587 Ok(Some(Err(e))) => {
588 warn!("Watch error: {}", e);
589 }
590 Ok(None) => break, Err(_) => {} }
593 }
594
595 Ok(())
596}
597
598fn should_process_file(path: &Path, extensions: &[String]) -> bool {
600 path.extension()
601 .and_then(|ext| ext.to_str())
602 .map(|ext| extensions.iter().any(|e| e == ext))
603 .unwrap_or(false)
604}
605
606async fn run_background_indexer(
612 mut rx: mpsc::Receiver<WatcherEvent>,
613 graph: SharedGraph,
614 broadcast_tx: broadcast::Sender<BroadcastMessage>,
615 _root_path: PathBuf,
616) {
617 let mut parser = ArborParser::new().expect("Failed to initialize parser");
618
619 info!("🔧 Background indexer started");
620
621 while let Some(event) = rx.recv().await {
622 let start = Instant::now();
623
624 match event {
625 WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
626 let file_name = path
627 .file_name()
628 .and_then(|n| n.to_str())
629 .unwrap_or("unknown");
630
631 info!("📝 Re-indexing: {}", file_name);
632
633 match parser.parse_file(&path) {
634 Ok(result) => {
635 let mut g = graph.write().await;
636
637 g.remove_file(&result.file_path);
639
640 let mut node_ids = HashMap::new();
642 for symbol in &result.symbols {
643 let id = g.add_node(symbol.clone());
644 node_ids.insert(symbol.id.clone(), id);
645 }
646
647 for relation in &result.relations {
649 if let Some(&from_id) = node_ids.get(&relation.from_id) {
650 let targets = g.find_by_name(&relation.to_name);
652 if let Some(target) = targets.first() {
653 if let Some(to_id) = g.get_index(&target.id) {
654 let edge_kind = match relation.kind {
655 arbor_core::RelationType::Calls => EdgeKind::Calls,
656 arbor_core::RelationType::Imports => EdgeKind::Imports,
657 arbor_core::RelationType::Extends => EdgeKind::Extends,
658 arbor_core::RelationType::Implements => {
659 EdgeKind::Implements
660 }
661 };
662 g.add_edge(from_id, to_id, Edge::new(edge_kind));
663 }
664 }
665 }
666 }
667
668 let elapsed = start.elapsed();
669 info!(
670 "✅ Indexed {} in {:?} ({} symbols, {} relations)",
671 file_name,
672 elapsed,
673 result.symbols.len(),
674 result.relations.len()
675 );
676
677 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
679 is_delta: true,
680 node_count: g.node_count(),
681 edge_count: g.edge_count(),
682 file_count: g.stats().files,
683 changed_files: vec![result.file_path],
684 timestamp: std::time::SystemTime::now()
685 .duration_since(std::time::UNIX_EPOCH)
686 .unwrap()
687 .as_secs(),
688 nodes: Some(g.nodes().cloned().collect()),
689 edges: Some(g.export_edges()),
690 });
691
692 let _ = broadcast_tx.send(update);
693 }
694 Err(e) => {
695 warn!("⚠️ Parse error for {}: {}", file_name, e);
696 }
697 }
698 }
699
700 WatcherEvent::Deleted(path) => {
701 let file_str = path.to_string_lossy().to_string();
702 info!("🗑️ File deleted: {}", path.display());
703
704 let mut g = graph.write().await;
705 g.remove_file(&file_str);
706
707 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
708 is_delta: true,
709 node_count: g.node_count(),
710 edge_count: g.edge_count(),
711 file_count: g.stats().files,
712 changed_files: vec![file_str],
713 timestamp: std::time::SystemTime::now()
714 .duration_since(std::time::UNIX_EPOCH)
715 .unwrap()
716 .as_secs(),
717 nodes: Some(g.nodes().cloned().collect()),
718 edges: Some(g.export_edges()),
719 });
720
721 let _ = broadcast_tx.send(update);
722 }
723 }
724 }
725}
726
727#[cfg(test)]
732mod tests {
733 use super::*;
734
735 #[test]
736 fn test_should_process_file() {
737 let extensions = vec!["ts".to_string(), "rs".to_string()];
738
739 assert!(should_process_file(Path::new("foo.ts"), &extensions));
740 assert!(should_process_file(Path::new("bar.rs"), &extensions));
741 assert!(!should_process_file(Path::new("baz.py"), &extensions));
742 assert!(!should_process_file(Path::new("README.md"), &extensions));
743 }
744
745 #[test]
746 fn test_broadcast_message_serialization() {
747 let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
748 is_delta: true,
749 node_count: 42,
750 edge_count: 100,
751 file_count: 5,
752 changed_files: vec!["foo.ts".to_string()],
753 timestamp: 1234567890,
754 nodes: None,
755 edges: None,
756 });
757
758 let json = serde_json::to_string(&msg).unwrap();
759 assert!(json.contains("GraphUpdate"));
760 assert!(json.contains("42"));
761 }
762}