1use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::tungstenite::Message;
22use tracing::{debug, error, info, warn};
23
24#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31 pub addr: SocketAddr,
33 pub watch_path: PathBuf,
35 pub debounce_ms: u64,
37 pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42 fn default() -> Self {
43 Self {
44 addr: "127.0.0.1:8080".parse().unwrap(),
45 watch_path: PathBuf::from("."),
46 debounce_ms: 150,
47 extensions: vec![
48 "ts".into(),
49 "tsx".into(),
50 "js".into(),
51 "jsx".into(),
52 "rs".into(),
53 "py".into(),
54 "kt".into(),
55 "kts".into(),
56 "swift".into(),
57 "rb".into(),
58 "php".into(),
59 "phtml".into(),
60 "sh".into(),
61 "bash".into(),
62 "zsh".into(),
63 ],
64 }
65 }
66}
67
68#[derive(Debug, Clone, serde::Serialize)]
70#[serde(tag = "type", content = "payload")]
71pub enum BroadcastMessage {
72 Hello(HelloPayload),
74 GraphBegin(GraphBeginPayload),
76 NodeBatch(NodeBatchPayload),
78 EdgeBatch(EdgeBatchPayload),
80 GraphEnd,
82 GraphUpdate(GraphUpdatePayload),
84 FocusNode(FocusNodePayload),
86 IndexerStatus(IndexerStatusPayload),
88}
89
90#[derive(Debug, Clone, serde::Serialize)]
91pub struct HelloPayload {
92 pub version: String,
93 pub node_count: usize,
94 pub edge_count: usize,
95}
96
97#[derive(Debug, Clone, serde::Serialize)]
98pub struct GraphBeginPayload {
99 pub total_nodes: usize,
100 pub total_edges: usize,
101}
102
103#[derive(Debug, Clone, serde::Serialize)]
104pub struct NodeBatchPayload {
105 pub nodes: Vec<arbor_core::CodeNode>,
106}
107
108#[derive(Debug, Clone, serde::Serialize)]
109pub struct EdgeBatchPayload {
110 pub edges: Vec<arbor_graph::GraphEdge>,
111}
112
113#[derive(Debug, Clone, serde::Serialize)]
114pub struct GraphUpdatePayload {
115 pub is_delta: bool,
117 pub node_count: usize,
119 pub edge_count: usize,
121 pub file_count: usize,
123 pub changed_files: Vec<String>,
125 pub timestamp: u64,
127 pub nodes: Option<Vec<arbor_core::CodeNode>>,
128 pub edges: Option<Vec<arbor_graph::GraphEdge>>,
129}
130
131#[derive(Debug, Clone, serde::Serialize)]
132pub struct FocusNodePayload {
133 pub node_id: String,
135 pub file: String,
137 pub line: u32,
139}
140
141#[derive(Debug, Clone, serde::Serialize)]
142pub struct IndexerStatusPayload {
143 pub phase: String,
145 pub files_processed: usize,
147 pub files_total: usize,
149 pub current_file: Option<String>,
151}
152
153#[derive(Debug, Clone)]
155#[allow(dead_code)]
156enum WatcherEvent {
157 Changed(PathBuf),
158 Created(PathBuf),
159 Deleted(PathBuf),
160}
161
162pub struct SyncServer {
175 config: SyncServerConfig,
176 graph: SharedGraph,
177 broadcast_tx: broadcast::Sender<BroadcastMessage>,
178}
179
180#[derive(Clone)]
182pub struct SyncServerHandle {
183 broadcast_tx: broadcast::Sender<BroadcastMessage>,
184 graph: SharedGraph,
185}
186
187impl SyncServerHandle {
188 pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
190 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
191 node_id: node_id.to_string(),
192 file: file.to_string(),
193 line,
194 });
195 let _ = self.broadcast_tx.send(msg);
196 }
197
198 pub fn graph(&self) -> SharedGraph {
200 self.graph.clone()
201 }
202}
203
204impl SyncServer {
205 pub fn new(config: SyncServerConfig) -> Self {
207 let (broadcast_tx, _) = broadcast::channel(256);
208
209 Self {
210 config,
211 graph: Arc::new(RwLock::new(ArborGraph::new())),
212 broadcast_tx,
213 }
214 }
215
216 pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
218 let (broadcast_tx, _) = broadcast::channel(256);
219
220 Self {
221 config,
222 graph: Arc::new(RwLock::new(graph)),
223 broadcast_tx,
224 }
225 }
226
227 pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
229 let (broadcast_tx, _) = broadcast::channel(256);
230
231 Self {
232 config,
233 graph,
234 broadcast_tx,
235 }
236 }
237
238 pub fn graph(&self) -> SharedGraph {
240 self.graph.clone()
241 }
242
243 pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
245 self.broadcast_tx.subscribe()
246 }
247
248 pub fn handle(&self) -> SyncServerHandle {
250 SyncServerHandle {
251 broadcast_tx: self.broadcast_tx.clone(),
252 graph: self.graph.clone(),
253 }
254 }
255
256 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
258 info!("╔═══════════════════════════════════════════════════════════╗");
259 info!("║ ARBOR SYNC SERVER - THE PULSE OF CODE ║");
260 info!("╚═══════════════════════════════════════════════════════════╝");
261
262 let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
264
265 let watch_path = self.config.watch_path.clone();
267 let extensions = self.config.extensions.clone();
268 let debounce_ms = self.config.debounce_ms;
269
270 tokio::spawn(async move {
271 if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
272 {
273 error!("File watcher error: {}", e);
274 }
275 });
276
277 let graph = self.graph.clone();
279 let broadcast_tx = self.broadcast_tx.clone();
280 let watch_path = self.config.watch_path.clone();
281
282 tokio::spawn(async move {
283 run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
284 });
285
286 self.run_websocket_server().await
288 }
289
290 async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
292 let listener = TcpListener::bind(&self.config.addr).await?;
293 info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
294 info!("👁️ Watching: {}", self.config.watch_path.display());
295 info!("⏱️ Debounce: {}ms", self.config.debounce_ms);
296
297 loop {
298 match listener.accept().await {
299 Ok((stream, addr)) => {
300 info!("🔌 New connection from {}", addr);
301 let graph = self.graph.clone();
302 let broadcast_rx = self.broadcast_tx.subscribe();
303
304 tokio::spawn(async move {
305 if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
306 warn!("Connection error from {}: {}", addr, e);
307 }
308 });
309 }
310 Err(e) => {
311 error!("Accept error: {}", e);
312 }
313 }
314 }
315 }
316
317 pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
319 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
320 node_id: node_id.to_string(),
321 file: file.to_string(),
322 line,
323 });
324
325 let _ = self.broadcast_tx.send(msg);
326 }
327
328 pub fn update_status(
330 &self,
331 phase: &str,
332 processed: usize,
333 total: usize,
334 current: Option<&str>,
335 ) {
336 let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
337 phase: phase.to_string(),
338 files_processed: processed,
339 files_total: total,
340 current_file: current.map(|s| s.to_string()),
341 });
342
343 let _ = self.broadcast_tx.send(msg);
344 }
345}
346
347async fn handle_client(
353 stream: TcpStream,
354 addr: SocketAddr,
355 graph: SharedGraph,
356 mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
357) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
358 use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
359
360 let config = WebSocketConfig {
361 max_message_size: Some(64 * 1024 * 1024), max_frame_size: Some(64 * 1024 * 1024), accept_unmasked_frames: false,
364 ..Default::default()
365 };
366
367 let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
368 let (mut write, mut read) = ws_stream.split();
369
370 info!("✅ WebSocket handshake complete with {}", addr);
371
372 let (node_count, edge_count, nodes, edges) = {
374 let g = graph.read().await;
375 let mut nodes: Vec<_> = g.nodes().cloned().collect();
376 let edges_raw = g.export_edges();
377 nodes.sort_by(|a, b| a.id.cmp(&b.id));
379 let mut edges = edges_raw;
380 edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
381 (g.node_count(), g.edge_count(), nodes, edges)
382 };
383
384 let hello = BroadcastMessage::Hello(HelloPayload {
385 version: "1.1.1".to_string(),
386 node_count,
387 edge_count,
388 });
389
390 let json = serde_json::to_string(&hello)?;
391 write.send(Message::Text(json)).await?;
392 info!(
393 "👋 Sent Hello ({} nodes, {} edges) to {}",
394 node_count, edge_count, addr
395 );
396
397 info!("⏳ Waiting for client {} to be ready...", addr);
399 let mut ready = false;
400 while let Some(msg) = read.next().await {
401 match msg {
402 Ok(Message::Text(text)) => {
403 if text.contains("ready_for_graph") {
405 ready = true;
406 info!("✅ Client {} is ready for graph", addr);
407 break;
408 }
409 debug!("Running pre-ready protocol with {}: {}", addr, text);
410 }
411 Ok(Message::Ping(data)) => {
412 write.send(Message::Pong(data)).await?;
413 }
414 Ok(Message::Close(_)) => return Ok(()),
415 Err(e) => return Err(e.into()),
416 _ => {}
417 }
418 }
419
420 if !ready {
421 warn!("Client {} disconnected before sending ready signal", addr);
422 return Ok(());
423 }
424
425 let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
427 total_nodes: node_count,
428 total_edges: edge_count,
429 });
430 write
431 .send(Message::Text(serde_json::to_string(&begin)?))
432 .await?;
433
434 for chunk in nodes.chunks(50) {
436 let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
437 nodes: chunk.to_vec(),
438 });
439 write
440 .send(Message::Text(serde_json::to_string(&batch)?))
441 .await?;
442 }
443 info!("📤 Streamed {} nodes to {}", node_count, addr);
444
445 for chunk in edges.chunks(100) {
447 let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
448 edges: chunk.to_vec(),
449 });
450 write
451 .send(Message::Text(serde_json::to_string(&batch)?))
452 .await?;
453 }
454 info!("📤 Streamed {} edges to {}", edge_count, addr);
455
456 write
458 .send(Message::Text(serde_json::to_string(
459 &BroadcastMessage::GraphEnd,
460 )?))
461 .await?;
462 info!("🏁 Graph stream complete for {}", addr);
463
464 loop {
466 tokio::select! {
467 msg = read.next() => {
469 match msg {
470 Some(Ok(Message::Text(text))) => {
471 debug!("📥 Received from {}: {}", addr, text);
472 }
475 Some(Ok(Message::Ping(data))) => {
476 write.send(Message::Pong(data)).await?;
477 }
478 Some(Ok(Message::Close(_))) => {
479 info!("👋 Client {} disconnected gracefully", addr);
480 break;
481 }
482 Some(Err(e)) => {
483 warn!("⚠️ Error from {}: {}", addr, e);
484 break;
485 }
486 None => break,
487 _ => {}
488 }
489 }
490
491 msg = broadcast_rx.recv() => {
493 match msg {
494 Ok(broadcast) => {
495 let json = serde_json::to_string(&broadcast)?;
496 if write.send(Message::Text(json)).await.is_err() {
497 break;
498 }
499 }
500 Err(broadcast::error::RecvError::Lagged(n)) => {
501 warn!("Client {} lagged by {} messages", addr, n);
502 }
503 Err(broadcast::error::RecvError::Closed) => {
504 break;
505 }
506 }
507 }
508 }
509 }
510
511 info!("🔌 Connection closed: {}", addr);
512 Ok(())
513}
514
515async fn run_file_watcher(
521 watch_path: PathBuf,
522 extensions: Vec<String>,
523 debounce_ms: u64,
524 tx: mpsc::Sender<WatcherEvent>,
525) -> notify::Result<()> {
526 let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
527
528 let mut watcher = RecommendedWatcher::new(
530 move |res| {
531 let _ = notify_tx.blocking_send(res);
532 },
533 Config::default(),
534 )?;
535
536 watcher.watch(&watch_path, RecursiveMode::Recursive)?;
537 info!("👁️ File watcher started for {}", watch_path.display());
538
539 let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
541 let debounce_dur = Duration::from_millis(debounce_ms);
542
543 loop {
544 let now = Instant::now();
546 let mut ready: Vec<PathBuf> = Vec::new();
547
548 for (path, time) in pending.iter() {
549 if now.duration_since(*time) >= debounce_dur {
550 ready.push(path.clone());
551 }
552 }
553
554 for path in ready {
555 pending.remove(&path);
556 if should_process_file(&path, &extensions) {
557 let event = if path.exists() {
558 WatcherEvent::Changed(path)
559 } else {
560 WatcherEvent::Deleted(path)
561 };
562 let _ = tx.send(event).await;
563 }
564 }
565
566 match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
568 Ok(Some(Ok(event))) => {
569 for path in event.paths {
570 if should_process_file(&path, &extensions) {
571 pending.insert(path, Instant::now());
572 }
573 }
574 }
575 Ok(Some(Err(e))) => {
576 warn!("Watch error: {}", e);
577 }
578 Ok(None) => break, Err(_) => {} }
581 }
582
583 Ok(())
584}
585
586fn should_process_file(path: &Path, extensions: &[String]) -> bool {
588 path.extension()
589 .and_then(|ext| ext.to_str())
590 .map(|ext| extensions.iter().any(|e| e == ext))
591 .unwrap_or(false)
592}
593
594async fn run_background_indexer(
600 mut rx: mpsc::Receiver<WatcherEvent>,
601 graph: SharedGraph,
602 broadcast_tx: broadcast::Sender<BroadcastMessage>,
603 _root_path: PathBuf,
604) {
605 let mut parser = ArborParser::new().expect("Failed to initialize parser");
606
607 info!("🔧 Background indexer started");
608
609 while let Some(event) = rx.recv().await {
610 let start = Instant::now();
611
612 match event {
613 WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
614 let file_name = path
615 .file_name()
616 .and_then(|n| n.to_str())
617 .unwrap_or("unknown");
618
619 info!("📝 Re-indexing: {}", file_name);
620
621 match parser.parse_file(&path) {
622 Ok(result) => {
623 let mut g = graph.write().await;
624
625 g.remove_file(&result.file_path);
627
628 let mut node_ids = HashMap::new();
630 for symbol in &result.symbols {
631 let id = g.add_node(symbol.clone());
632 node_ids.insert(symbol.id.clone(), id);
633 }
634
635 for relation in &result.relations {
637 if let Some(&from_id) = node_ids.get(&relation.from_id) {
638 let targets = g.find_by_name(&relation.to_name);
640 if let Some(target) = targets.first() {
641 if let Some(to_id) = g.get_index(&target.id) {
642 let edge_kind = match relation.kind {
643 arbor_core::RelationType::Calls => EdgeKind::Calls,
644 arbor_core::RelationType::Imports => EdgeKind::Imports,
645 arbor_core::RelationType::Extends => EdgeKind::Extends,
646 arbor_core::RelationType::Implements => {
647 EdgeKind::Implements
648 }
649 };
650 g.add_edge(from_id, to_id, Edge::new(edge_kind));
651 }
652 }
653 }
654 }
655
656 let elapsed = start.elapsed();
657 info!(
658 "✅ Indexed {} in {:?} ({} symbols, {} relations)",
659 file_name,
660 elapsed,
661 result.symbols.len(),
662 result.relations.len()
663 );
664
665 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
667 is_delta: true,
668 node_count: g.node_count(),
669 edge_count: g.edge_count(),
670 file_count: g.stats().files,
671 changed_files: vec![result.file_path],
672 timestamp: std::time::SystemTime::now()
673 .duration_since(std::time::UNIX_EPOCH)
674 .unwrap()
675 .as_secs(),
676 nodes: Some(g.nodes().cloned().collect()),
677 edges: Some(g.export_edges()),
678 });
679
680 let _ = broadcast_tx.send(update);
681 }
682 Err(e) => {
683 warn!("⚠️ Parse error for {}: {}", file_name, e);
684 }
685 }
686 }
687
688 WatcherEvent::Deleted(path) => {
689 let file_str = path.to_string_lossy().to_string();
690 info!("🗑️ File deleted: {}", path.display());
691
692 let mut g = graph.write().await;
693 g.remove_file(&file_str);
694
695 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
696 is_delta: true,
697 node_count: g.node_count(),
698 edge_count: g.edge_count(),
699 file_count: g.stats().files,
700 changed_files: vec![file_str],
701 timestamp: std::time::SystemTime::now()
702 .duration_since(std::time::UNIX_EPOCH)
703 .unwrap()
704 .as_secs(),
705 nodes: Some(g.nodes().cloned().collect()),
706 edges: Some(g.export_edges()),
707 });
708
709 let _ = broadcast_tx.send(update);
710 }
711 }
712 }
713}
714
715#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_should_process_file() {
725 let extensions = vec!["ts".to_string(), "rs".to_string()];
726
727 assert!(should_process_file(Path::new("foo.ts"), &extensions));
728 assert!(should_process_file(Path::new("bar.rs"), &extensions));
729 assert!(!should_process_file(Path::new("baz.py"), &extensions));
730 assert!(!should_process_file(Path::new("README.md"), &extensions));
731 }
732
733 #[test]
734 fn test_broadcast_message_serialization() {
735 let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
736 is_delta: true,
737 node_count: 42,
738 edge_count: 100,
739 file_count: 5,
740 changed_files: vec!["foo.ts".to_string()],
741 timestamp: 1234567890,
742 nodes: None,
743 edges: None,
744 });
745
746 let json = serde_json::to_string(&msg).unwrap();
747 assert!(json.contains("GraphUpdate"));
748 assert!(json.contains("42"));
749 }
750}