1use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::{accept_async, tungstenite::Message};
22use tracing::{debug, error, info, warn};
23
24#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31 pub addr: SocketAddr,
33 pub watch_path: PathBuf,
35 pub debounce_ms: u64,
37 pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42 fn default() -> Self {
43 Self {
44 addr: "127.0.0.1:8080".parse().unwrap(),
45 watch_path: PathBuf::from("."),
46 debounce_ms: 150,
47 extensions: vec![
48 "ts".into(),
49 "tsx".into(),
50 "js".into(),
51 "jsx".into(),
52 "rs".into(),
53 "py".into(),
54 ],
55 }
56 }
57}
58
59#[derive(Debug, Clone, serde::Serialize)]
61#[serde(tag = "type", content = "payload")]
62pub enum BroadcastMessage {
63 Hello(HelloPayload),
65 GraphBegin(GraphBeginPayload),
67 NodeBatch(NodeBatchPayload),
69 EdgeBatch(EdgeBatchPayload),
71 GraphEnd,
73 GraphUpdate(GraphUpdatePayload),
75 FocusNode(FocusNodePayload),
77 IndexerStatus(IndexerStatusPayload),
79}
80
81#[derive(Debug, Clone, serde::Serialize)]
82pub struct HelloPayload {
83 pub version: String,
84 pub node_count: usize,
85 pub edge_count: usize,
86}
87
88#[derive(Debug, Clone, serde::Serialize)]
89pub struct GraphBeginPayload {
90 pub total_nodes: usize,
91 pub total_edges: usize,
92}
93
94#[derive(Debug, Clone, serde::Serialize)]
95pub struct NodeBatchPayload {
96 pub nodes: Vec<arbor_core::CodeNode>,
97}
98
99#[derive(Debug, Clone, serde::Serialize)]
100pub struct EdgeBatchPayload {
101 pub edges: Vec<arbor_graph::GraphEdge>,
102}
103
104#[derive(Debug, Clone, serde::Serialize)]
105pub struct GraphUpdatePayload {
106 pub is_delta: bool,
108 pub node_count: usize,
110 pub edge_count: usize,
112 pub file_count: usize,
114 pub changed_files: Vec<String>,
116 pub timestamp: u64,
118 pub nodes: Option<Vec<arbor_core::CodeNode>>,
119 pub edges: Option<Vec<arbor_graph::GraphEdge>>,
120}
121
122#[derive(Debug, Clone, serde::Serialize)]
123pub struct FocusNodePayload {
124 pub node_id: String,
126 pub file: String,
128 pub line: u32,
130}
131
132#[derive(Debug, Clone, serde::Serialize)]
133pub struct IndexerStatusPayload {
134 pub phase: String,
136 pub files_processed: usize,
138 pub files_total: usize,
140 pub current_file: Option<String>,
142}
143
144#[derive(Debug, Clone)]
146#[allow(dead_code)]
147enum WatcherEvent {
148 Changed(PathBuf),
149 Created(PathBuf),
150 Deleted(PathBuf),
151}
152
153pub struct SyncServer {
166 config: SyncServerConfig,
167 graph: SharedGraph,
168 broadcast_tx: broadcast::Sender<BroadcastMessage>,
169}
170
171#[derive(Clone)]
173pub struct SyncServerHandle {
174 broadcast_tx: broadcast::Sender<BroadcastMessage>,
175 graph: SharedGraph,
176}
177
178impl SyncServerHandle {
179 pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
181 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
182 node_id: node_id.to_string(),
183 file: file.to_string(),
184 line,
185 });
186 let _ = self.broadcast_tx.send(msg);
187 }
188
189 pub fn graph(&self) -> SharedGraph {
191 self.graph.clone()
192 }
193}
194
195impl SyncServer {
196 pub fn new(config: SyncServerConfig) -> Self {
198 let (broadcast_tx, _) = broadcast::channel(256);
199
200 Self {
201 config,
202 graph: Arc::new(RwLock::new(ArborGraph::new())),
203 broadcast_tx,
204 }
205 }
206
207 pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
209 let (broadcast_tx, _) = broadcast::channel(256);
210
211 Self {
212 config,
213 graph: Arc::new(RwLock::new(graph)),
214 broadcast_tx,
215 }
216 }
217
218 pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
220 let (broadcast_tx, _) = broadcast::channel(256);
221
222 Self {
223 config,
224 graph,
225 broadcast_tx,
226 }
227 }
228
229 pub fn graph(&self) -> SharedGraph {
231 self.graph.clone()
232 }
233
234 pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
236 self.broadcast_tx.subscribe()
237 }
238
239 pub fn handle(&self) -> SyncServerHandle {
241 SyncServerHandle {
242 broadcast_tx: self.broadcast_tx.clone(),
243 graph: self.graph.clone(),
244 }
245 }
246
247 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
249 info!("╔═══════════════════════════════════════════════════════════╗");
250 info!("║ ARBOR SYNC SERVER - THE PULSE OF CODE ║");
251 info!("╚═══════════════════════════════════════════════════════════╝");
252
253 let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
255
256 let watch_path = self.config.watch_path.clone();
258 let extensions = self.config.extensions.clone();
259 let debounce_ms = self.config.debounce_ms;
260
261 tokio::spawn(async move {
262 if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
263 {
264 error!("File watcher error: {}", e);
265 }
266 });
267
268 let graph = self.graph.clone();
270 let broadcast_tx = self.broadcast_tx.clone();
271 let watch_path = self.config.watch_path.clone();
272
273 tokio::spawn(async move {
274 run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
275 });
276
277 self.run_websocket_server().await
279 }
280
281 async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
283 let listener = TcpListener::bind(&self.config.addr).await?;
284 info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
285 info!("👁️ Watching: {}", self.config.watch_path.display());
286 info!("⏱️ Debounce: {}ms", self.config.debounce_ms);
287
288 loop {
289 match listener.accept().await {
290 Ok((stream, addr)) => {
291 info!("🔌 New connection from {}", addr);
292 let graph = self.graph.clone();
293 let broadcast_rx = self.broadcast_tx.subscribe();
294
295 tokio::spawn(async move {
296 if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
297 warn!("Connection error from {}: {}", addr, e);
298 }
299 });
300 }
301 Err(e) => {
302 error!("Accept error: {}", e);
303 }
304 }
305 }
306 }
307
308 pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
310 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
311 node_id: node_id.to_string(),
312 file: file.to_string(),
313 line,
314 });
315
316 let _ = self.broadcast_tx.send(msg);
317 }
318
319 pub fn update_status(
321 &self,
322 phase: &str,
323 processed: usize,
324 total: usize,
325 current: Option<&str>,
326 ) {
327 let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
328 phase: phase.to_string(),
329 files_processed: processed,
330 files_total: total,
331 current_file: current.map(|s| s.to_string()),
332 });
333
334 let _ = self.broadcast_tx.send(msg);
335 }
336}
337
338async fn handle_client(
344 stream: TcpStream,
345 addr: SocketAddr,
346 graph: SharedGraph,
347 mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
348) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
349 use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
350
351 let config = WebSocketConfig {
352 max_send_queue: None,
353 max_message_size: Some(64 * 1024 * 1024), max_frame_size: Some(64 * 1024 * 1024), accept_unmasked_frames: false,
356 ..Default::default()
357 };
358
359 let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
360 let (mut write, mut read) = ws_stream.split();
361
362 info!("✅ WebSocket handshake complete with {}", addr);
363
364 let (node_count, edge_count, nodes, edges) = {
366 let g = graph.read().await;
367 let mut nodes: Vec<_> = g.nodes().cloned().collect();
368 let edges_raw = g.export_edges();
369 nodes.sort_by(|a, b| a.id.cmp(&b.id));
371 let mut edges = edges_raw;
372 edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
373 (g.node_count(), g.edge_count(), nodes, edges)
374 };
375
376 let hello = BroadcastMessage::Hello(HelloPayload {
377 version: "1.1.1".to_string(),
378 node_count,
379 edge_count,
380 });
381
382 let json = serde_json::to_string(&hello)?;
383 write.send(Message::Text(json)).await?;
384 info!(
385 "👋 Sent Hello ({} nodes, {} edges) to {}",
386 node_count, edge_count, addr
387 );
388
389 info!("⏳ Waiting for client {} to be ready...", addr);
391 let mut ready = false;
392 while let Some(msg) = read.next().await {
393 match msg {
394 Ok(Message::Text(text)) => {
395 if text.contains("ready_for_graph") {
397 ready = true;
398 info!("✅ Client {} is ready for graph", addr);
399 break;
400 }
401 debug!("Running pre-ready protocol with {}: {}", addr, text);
402 }
403 Ok(Message::Ping(data)) => {
404 write.send(Message::Pong(data)).await?;
405 }
406 Ok(Message::Close(_)) => return Ok(()),
407 Err(e) => return Err(e.into()),
408 _ => {}
409 }
410 }
411
412 if !ready {
413 warn!("Client {} disconnected before sending ready signal", addr);
414 return Ok(());
415 }
416
417 let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
419 total_nodes: node_count,
420 total_edges: edge_count,
421 });
422 write
423 .send(Message::Text(serde_json::to_string(&begin)?))
424 .await?;
425
426 for chunk in nodes.chunks(50) {
428 let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
429 nodes: chunk.to_vec(),
430 });
431 write
432 .send(Message::Text(serde_json::to_string(&batch)?))
433 .await?;
434 }
435 info!("📤 Streamed {} nodes to {}", node_count, addr);
436
437 for chunk in edges.chunks(100) {
439 let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
440 edges: chunk.to_vec(),
441 });
442 write
443 .send(Message::Text(serde_json::to_string(&batch)?))
444 .await?;
445 }
446 info!("📤 Streamed {} edges to {}", edge_count, addr);
447
448 write
450 .send(Message::Text(serde_json::to_string(
451 &BroadcastMessage::GraphEnd,
452 )?))
453 .await?;
454 info!("🏁 Graph stream complete for {}", addr);
455
456 loop {
458 tokio::select! {
459 msg = read.next() => {
461 match msg {
462 Some(Ok(Message::Text(text))) => {
463 debug!("📥 Received from {}: {}", addr, text);
464 }
467 Some(Ok(Message::Ping(data))) => {
468 write.send(Message::Pong(data)).await?;
469 }
470 Some(Ok(Message::Close(_))) => {
471 info!("👋 Client {} disconnected gracefully", addr);
472 break;
473 }
474 Some(Err(e)) => {
475 warn!("⚠️ Error from {}: {}", addr, e);
476 break;
477 }
478 None => break,
479 _ => {}
480 }
481 }
482
483 msg = broadcast_rx.recv() => {
485 match msg {
486 Ok(broadcast) => {
487 let json = serde_json::to_string(&broadcast)?;
488 if write.send(Message::Text(json)).await.is_err() {
489 break;
490 }
491 }
492 Err(broadcast::error::RecvError::Lagged(n)) => {
493 warn!("Client {} lagged by {} messages", addr, n);
494 }
495 Err(broadcast::error::RecvError::Closed) => {
496 break;
497 }
498 }
499 }
500 }
501 }
502
503 info!("🔌 Connection closed: {}", addr);
504 Ok(())
505}
506
507async fn run_file_watcher(
513 watch_path: PathBuf,
514 extensions: Vec<String>,
515 debounce_ms: u64,
516 tx: mpsc::Sender<WatcherEvent>,
517) -> notify::Result<()> {
518 let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
519
520 let mut watcher = RecommendedWatcher::new(
522 move |res| {
523 let _ = notify_tx.blocking_send(res);
524 },
525 Config::default(),
526 )?;
527
528 watcher.watch(&watch_path, RecursiveMode::Recursive)?;
529 info!("👁️ File watcher started for {}", watch_path.display());
530
531 let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
533 let debounce_dur = Duration::from_millis(debounce_ms);
534
535 loop {
536 let now = Instant::now();
538 let mut ready: Vec<PathBuf> = Vec::new();
539
540 for (path, time) in pending.iter() {
541 if now.duration_since(*time) >= debounce_dur {
542 ready.push(path.clone());
543 }
544 }
545
546 for path in ready {
547 pending.remove(&path);
548 if should_process_file(&path, &extensions) {
549 let event = if path.exists() {
550 WatcherEvent::Changed(path)
551 } else {
552 WatcherEvent::Deleted(path)
553 };
554 let _ = tx.send(event).await;
555 }
556 }
557
558 match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
560 Ok(Some(Ok(event))) => {
561 for path in event.paths {
562 if should_process_file(&path, &extensions) {
563 pending.insert(path, Instant::now());
564 }
565 }
566 }
567 Ok(Some(Err(e))) => {
568 warn!("Watch error: {}", e);
569 }
570 Ok(None) => break, Err(_) => {} }
573 }
574
575 Ok(())
576}
577
578fn should_process_file(path: &Path, extensions: &[String]) -> bool {
580 path.extension()
581 .and_then(|ext| ext.to_str())
582 .map(|ext| extensions.iter().any(|e| e == ext))
583 .unwrap_or(false)
584}
585
586async fn run_background_indexer(
592 mut rx: mpsc::Receiver<WatcherEvent>,
593 graph: SharedGraph,
594 broadcast_tx: broadcast::Sender<BroadcastMessage>,
595 _root_path: PathBuf,
596) {
597 let mut parser = ArborParser::new().expect("Failed to initialize parser");
598
599 info!("🔧 Background indexer started");
600
601 while let Some(event) = rx.recv().await {
602 let start = Instant::now();
603
604 match event {
605 WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
606 let file_name = path
607 .file_name()
608 .and_then(|n| n.to_str())
609 .unwrap_or("unknown");
610
611 info!("📝 Re-indexing: {}", file_name);
612
613 match parser.parse_file(&path) {
614 Ok(result) => {
615 let mut g = graph.write().await;
616
617 g.remove_file(&result.file_path);
619
620 let mut node_ids = HashMap::new();
622 for symbol in &result.symbols {
623 let id = g.add_node(symbol.clone());
624 node_ids.insert(symbol.id.clone(), id);
625 }
626
627 for relation in &result.relations {
629 if let Some(&from_id) = node_ids.get(&relation.from_id) {
630 let targets = g.find_by_name(&relation.to_name);
632 if let Some(target) = targets.first() {
633 if let Some(to_id) = g.get_index(&target.id) {
634 let edge_kind = match relation.kind {
635 arbor_core::RelationType::Calls => EdgeKind::Calls,
636 arbor_core::RelationType::Imports => EdgeKind::Imports,
637 arbor_core::RelationType::Extends => EdgeKind::Extends,
638 arbor_core::RelationType::Implements => {
639 EdgeKind::Implements
640 }
641 };
642 g.add_edge(from_id, to_id, Edge::new(edge_kind));
643 }
644 }
645 }
646 }
647
648 let elapsed = start.elapsed();
649 info!(
650 "✅ Indexed {} in {:?} ({} symbols, {} relations)",
651 file_name,
652 elapsed,
653 result.symbols.len(),
654 result.relations.len()
655 );
656
657 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
659 is_delta: true,
660 node_count: g.node_count(),
661 edge_count: g.edge_count(),
662 file_count: g.stats().files,
663 changed_files: vec![result.file_path],
664 timestamp: std::time::SystemTime::now()
665 .duration_since(std::time::UNIX_EPOCH)
666 .unwrap()
667 .as_secs(),
668 nodes: Some(g.nodes().cloned().collect()),
669 edges: Some(g.export_edges()),
670 });
671
672 let _ = broadcast_tx.send(update);
673 }
674 Err(e) => {
675 warn!("⚠️ Parse error for {}: {}", file_name, e);
676 }
677 }
678 }
679
680 WatcherEvent::Deleted(path) => {
681 let file_str = path.to_string_lossy().to_string();
682 info!("🗑️ File deleted: {}", path.display());
683
684 let mut g = graph.write().await;
685 g.remove_file(&file_str);
686
687 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
688 is_delta: true,
689 node_count: g.node_count(),
690 edge_count: g.edge_count(),
691 file_count: g.stats().files,
692 changed_files: vec![file_str],
693 timestamp: std::time::SystemTime::now()
694 .duration_since(std::time::UNIX_EPOCH)
695 .unwrap()
696 .as_secs(),
697 nodes: Some(g.nodes().cloned().collect()),
698 edges: Some(g.export_edges()),
699 });
700
701 let _ = broadcast_tx.send(update);
702 }
703 }
704 }
705}
706
707#[cfg(test)]
712mod tests {
713 use super::*;
714
715 #[test]
716 fn test_should_process_file() {
717 let extensions = vec!["ts".to_string(), "rs".to_string()];
718
719 assert!(should_process_file(Path::new("foo.ts"), &extensions));
720 assert!(should_process_file(Path::new("bar.rs"), &extensions));
721 assert!(!should_process_file(Path::new("baz.py"), &extensions));
722 assert!(!should_process_file(Path::new("README.md"), &extensions));
723 }
724
725 #[test]
726 fn test_broadcast_message_serialization() {
727 let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
728 is_delta: true,
729 node_count: 42,
730 edge_count: 100,
731 file_count: 5,
732 changed_files: vec!["foo.ts".to_string()],
733 timestamp: 1234567890,
734 nodes: None,
735 edges: None,
736 });
737
738 let json = serde_json::to_string(&msg).unwrap();
739 assert!(json.contains("GraphUpdate"));
740 assert!(json.contains("42"));
741 }
742}