1use crate::SharedGraph;
10use arbor_core::ArborParser;
11use arbor_graph::{ArborGraph, Edge, EdgeKind};
12use futures_util::{SinkExt, StreamExt};
13use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::net::{TcpListener, TcpStream};
20use tokio::sync::{broadcast, mpsc, RwLock};
21use tokio_tungstenite::{accept_async, tungstenite::Message};
22use tracing::{debug, error, info, warn};
23
24#[derive(Debug, Clone)]
30pub struct SyncServerConfig {
31 pub addr: SocketAddr,
33 pub watch_path: PathBuf,
35 pub debounce_ms: u64,
37 pub extensions: Vec<String>,
39}
40
41impl Default for SyncServerConfig {
42 fn default() -> Self {
43 Self {
44 addr: "127.0.0.1:8080".parse().unwrap(),
45 watch_path: PathBuf::from("."),
46 debounce_ms: 150,
47 extensions: vec![
48 "ts".into(),
49 "tsx".into(),
50 "js".into(),
51 "jsx".into(),
52 "rs".into(),
53 "py".into(),
54 ],
55 }
56 }
57}
58
59#[derive(Debug, Clone, serde::Serialize)]
61#[serde(tag = "type", content = "payload")]
62pub enum BroadcastMessage {
63 GraphUpdate(GraphUpdatePayload),
65 FocusNode(FocusNodePayload),
67 IndexerStatus(IndexerStatusPayload),
69}
70
71#[derive(Debug, Clone, serde::Serialize)]
72pub struct GraphUpdatePayload {
73 pub is_delta: bool,
75 pub node_count: usize,
77 pub edge_count: usize,
79 pub file_count: usize,
81 pub changed_files: Vec<String>,
83 pub timestamp: u64,
85 pub nodes: Option<Vec<arbor_core::CodeNode>>,
86 pub edges: Option<Vec<arbor_graph::GraphEdge>>,
87}
88
89#[derive(Debug, Clone, serde::Serialize)]
90pub struct FocusNodePayload {
91 pub node_id: String,
93 pub file: String,
95 pub line: u32,
97}
98
99#[derive(Debug, Clone, serde::Serialize)]
100pub struct IndexerStatusPayload {
101 pub phase: String,
103 pub files_processed: usize,
105 pub files_total: usize,
107 pub current_file: Option<String>,
109}
110
111#[derive(Debug, Clone)]
113#[allow(dead_code)]
114enum WatcherEvent {
115 Changed(PathBuf),
116 Created(PathBuf),
117 Deleted(PathBuf),
118}
119
120pub struct SyncServer {
133 config: SyncServerConfig,
134 graph: SharedGraph,
135 broadcast_tx: broadcast::Sender<BroadcastMessage>,
136}
137
138#[derive(Clone)]
140pub struct SyncServerHandle {
141 broadcast_tx: broadcast::Sender<BroadcastMessage>,
142 graph: SharedGraph,
143}
144
145impl SyncServerHandle {
146 pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
148 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
149 node_id: node_id.to_string(),
150 file: file.to_string(),
151 line,
152 });
153 let _ = self.broadcast_tx.send(msg);
154 }
155
156 pub fn graph(&self) -> SharedGraph {
158 self.graph.clone()
159 }
160}
161
162impl SyncServer {
163 pub fn new(config: SyncServerConfig) -> Self {
165 let (broadcast_tx, _) = broadcast::channel(256);
166
167 Self {
168 config,
169 graph: Arc::new(RwLock::new(ArborGraph::new())),
170 broadcast_tx,
171 }
172 }
173
174 pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
176 let (broadcast_tx, _) = broadcast::channel(256);
177
178 Self {
179 config,
180 graph: Arc::new(RwLock::new(graph)),
181 broadcast_tx,
182 }
183 }
184
185 pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
187 let (broadcast_tx, _) = broadcast::channel(256);
188
189 Self {
190 config,
191 graph,
192 broadcast_tx,
193 }
194 }
195
196 pub fn graph(&self) -> SharedGraph {
198 self.graph.clone()
199 }
200
201 pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
203 self.broadcast_tx.subscribe()
204 }
205
206 pub fn handle(&self) -> SyncServerHandle {
208 SyncServerHandle {
209 broadcast_tx: self.broadcast_tx.clone(),
210 graph: self.graph.clone(),
211 }
212 }
213
214 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
216 info!("╔═══════════════════════════════════════════════════════════╗");
217 info!("║ ARBOR SYNC SERVER - THE PULSE OF CODE ║");
218 info!("╚═══════════════════════════════════════════════════════════╝");
219
220 let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
222
223 let watch_path = self.config.watch_path.clone();
225 let extensions = self.config.extensions.clone();
226 let debounce_ms = self.config.debounce_ms;
227
228 tokio::spawn(async move {
229 if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
230 {
231 error!("File watcher error: {}", e);
232 }
233 });
234
235 let graph = self.graph.clone();
237 let broadcast_tx = self.broadcast_tx.clone();
238 let watch_path = self.config.watch_path.clone();
239
240 tokio::spawn(async move {
241 run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
242 });
243
244 self.run_websocket_server().await
246 }
247
248 async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
250 let listener = TcpListener::bind(&self.config.addr).await?;
251 info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
252 info!("👁️ Watching: {}", self.config.watch_path.display());
253 info!("⏱️ Debounce: {}ms", self.config.debounce_ms);
254
255 loop {
256 match listener.accept().await {
257 Ok((stream, addr)) => {
258 info!("🔌 New connection from {}", addr);
259 let graph = self.graph.clone();
260 let broadcast_rx = self.broadcast_tx.subscribe();
261
262 tokio::spawn(async move {
263 if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
264 warn!("Connection error from {}: {}", addr, e);
265 }
266 });
267 }
268 Err(e) => {
269 error!("Accept error: {}", e);
270 }
271 }
272 }
273 }
274
275 pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
277 let msg = BroadcastMessage::FocusNode(FocusNodePayload {
278 node_id: node_id.to_string(),
279 file: file.to_string(),
280 line,
281 });
282
283 let _ = self.broadcast_tx.send(msg);
284 }
285
286 pub fn update_status(
288 &self,
289 phase: &str,
290 processed: usize,
291 total: usize,
292 current: Option<&str>,
293 ) {
294 let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
295 phase: phase.to_string(),
296 files_processed: processed,
297 files_total: total,
298 current_file: current.map(|s| s.to_string()),
299 });
300
301 let _ = self.broadcast_tx.send(msg);
302 }
303}
304
305async fn handle_client(
311 stream: TcpStream,
312 addr: SocketAddr,
313 graph: SharedGraph,
314 mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
315) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
316 let ws_stream = accept_async(stream).await?;
317 let (mut write, mut read) = ws_stream.split();
318
319 info!("✅ WebSocket handshake complete with {}", addr);
320
321 {
323 let g = graph.read().await;
324 let snapshot = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
325 is_delta: false,
326 node_count: g.node_count(),
327 edge_count: g.edge_count(),
328 file_count: g.stats().files,
329 changed_files: vec![],
330 timestamp: std::time::SystemTime::now()
331 .duration_since(std::time::UNIX_EPOCH)
332 .unwrap()
333 .as_secs(),
334 nodes: Some(g.nodes().cloned().collect()),
335 edges: Some(g.export_edges()),
336 });
337
338 let json = serde_json::to_string(&snapshot)?;
339 write.send(Message::Text(json)).await?;
340 debug!("📤 Sent initial snapshot to {}", addr);
341 }
342
343 loop {
345 tokio::select! {
346 msg = read.next() => {
348 match msg {
349 Some(Ok(Message::Text(text))) => {
350 debug!("📥 Received from {}: {}", addr, text);
351 }
354 Some(Ok(Message::Ping(data))) => {
355 write.send(Message::Pong(data)).await?;
356 }
357 Some(Ok(Message::Close(_))) => {
358 info!("👋 Client {} disconnected gracefully", addr);
359 break;
360 }
361 Some(Err(e)) => {
362 warn!("⚠️ Error from {}: {}", addr, e);
363 break;
364 }
365 None => break,
366 _ => {}
367 }
368 }
369
370 msg = broadcast_rx.recv() => {
372 match msg {
373 Ok(broadcast) => {
374 let json = serde_json::to_string(&broadcast)?;
375 if write.send(Message::Text(json)).await.is_err() {
376 break;
377 }
378 }
379 Err(broadcast::error::RecvError::Lagged(n)) => {
380 warn!("Client {} lagged by {} messages", addr, n);
381 }
382 Err(broadcast::error::RecvError::Closed) => {
383 break;
384 }
385 }
386 }
387 }
388 }
389
390 info!("🔌 Connection closed: {}", addr);
391 Ok(())
392}
393
394async fn run_file_watcher(
400 watch_path: PathBuf,
401 extensions: Vec<String>,
402 debounce_ms: u64,
403 tx: mpsc::Sender<WatcherEvent>,
404) -> notify::Result<()> {
405 let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
406
407 let mut watcher = RecommendedWatcher::new(
409 move |res| {
410 let _ = notify_tx.blocking_send(res);
411 },
412 Config::default(),
413 )?;
414
415 watcher.watch(&watch_path, RecursiveMode::Recursive)?;
416 info!("👁️ File watcher started for {}", watch_path.display());
417
418 let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
420 let debounce_dur = Duration::from_millis(debounce_ms);
421
422 loop {
423 let now = Instant::now();
425 let mut ready: Vec<PathBuf> = Vec::new();
426
427 for (path, time) in pending.iter() {
428 if now.duration_since(*time) >= debounce_dur {
429 ready.push(path.clone());
430 }
431 }
432
433 for path in ready {
434 pending.remove(&path);
435 if should_process_file(&path, &extensions) {
436 let event = if path.exists() {
437 WatcherEvent::Changed(path)
438 } else {
439 WatcherEvent::Deleted(path)
440 };
441 let _ = tx.send(event).await;
442 }
443 }
444
445 match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
447 Ok(Some(Ok(event))) => {
448 for path in event.paths {
449 if should_process_file(&path, &extensions) {
450 pending.insert(path, Instant::now());
451 }
452 }
453 }
454 Ok(Some(Err(e))) => {
455 warn!("Watch error: {}", e);
456 }
457 Ok(None) => break, Err(_) => {} }
460 }
461
462 Ok(())
463}
464
465fn should_process_file(path: &Path, extensions: &[String]) -> bool {
467 path.extension()
468 .and_then(|ext| ext.to_str())
469 .map(|ext| extensions.iter().any(|e| e == ext))
470 .unwrap_or(false)
471}
472
473async fn run_background_indexer(
479 mut rx: mpsc::Receiver<WatcherEvent>,
480 graph: SharedGraph,
481 broadcast_tx: broadcast::Sender<BroadcastMessage>,
482 _root_path: PathBuf,
483) {
484 let mut parser = ArborParser::new().expect("Failed to initialize parser");
485
486 info!("🔧 Background indexer started");
487
488 while let Some(event) = rx.recv().await {
489 let start = Instant::now();
490
491 match event {
492 WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
493 let file_name = path
494 .file_name()
495 .and_then(|n| n.to_str())
496 .unwrap_or("unknown");
497
498 info!("📝 Re-indexing: {}", file_name);
499
500 match parser.parse_file(&path) {
501 Ok(result) => {
502 let mut g = graph.write().await;
503
504 g.remove_file(&result.file_path);
506
507 let mut node_ids = HashMap::new();
509 for symbol in &result.symbols {
510 let id = g.add_node(symbol.clone());
511 node_ids.insert(symbol.id.clone(), id);
512 }
513
514 for relation in &result.relations {
516 if let Some(&from_id) = node_ids.get(&relation.from_id) {
517 let targets = g.find_by_name(&relation.to_name);
519 if let Some(target) = targets.first() {
520 if let Some(to_id) = g.get_index(&target.id) {
521 let edge_kind = match relation.kind {
522 arbor_core::RelationType::Calls => EdgeKind::Calls,
523 arbor_core::RelationType::Imports => EdgeKind::Imports,
524 arbor_core::RelationType::Extends => EdgeKind::Extends,
525 arbor_core::RelationType::Implements => {
526 EdgeKind::Implements
527 }
528 };
529 g.add_edge(from_id, to_id, Edge::new(edge_kind));
530 }
531 }
532 }
533 }
534
535 let elapsed = start.elapsed();
536 info!(
537 "✅ Indexed {} in {:?} ({} symbols, {} relations)",
538 file_name,
539 elapsed,
540 result.symbols.len(),
541 result.relations.len()
542 );
543
544 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
546 is_delta: true,
547 node_count: g.node_count(),
548 edge_count: g.edge_count(),
549 file_count: g.stats().files,
550 changed_files: vec![result.file_path],
551 timestamp: std::time::SystemTime::now()
552 .duration_since(std::time::UNIX_EPOCH)
553 .unwrap()
554 .as_secs(),
555 nodes: Some(g.nodes().cloned().collect()),
556 edges: Some(g.export_edges()),
557 });
558
559 let _ = broadcast_tx.send(update);
560 }
561 Err(e) => {
562 warn!("⚠️ Parse error for {}: {}", file_name, e);
563 }
564 }
565 }
566
567 WatcherEvent::Deleted(path) => {
568 let file_str = path.to_string_lossy().to_string();
569 info!("🗑️ File deleted: {}", path.display());
570
571 let mut g = graph.write().await;
572 g.remove_file(&file_str);
573
574 let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
575 is_delta: true,
576 node_count: g.node_count(),
577 edge_count: g.edge_count(),
578 file_count: g.stats().files,
579 changed_files: vec![file_str],
580 timestamp: std::time::SystemTime::now()
581 .duration_since(std::time::UNIX_EPOCH)
582 .unwrap()
583 .as_secs(),
584 nodes: Some(g.nodes().cloned().collect()),
585 edges: Some(g.export_edges()),
586 });
587
588 let _ = broadcast_tx.send(update);
589 }
590 }
591 }
592}
593
594#[cfg(test)]
599mod tests {
600 use super::*;
601
602 #[test]
603 fn test_should_process_file() {
604 let extensions = vec!["ts".to_string(), "rs".to_string()];
605
606 assert!(should_process_file(Path::new("foo.ts"), &extensions));
607 assert!(should_process_file(Path::new("bar.rs"), &extensions));
608 assert!(!should_process_file(Path::new("baz.py"), &extensions));
609 assert!(!should_process_file(Path::new("README.md"), &extensions));
610 }
611
612 #[test]
613 fn test_broadcast_message_serialization() {
614 let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
615 is_delta: true,
616 node_count: 42,
617 edge_count: 100,
618 file_count: 5,
619 changed_files: vec!["foo.ts".to_string()],
620 timestamp: 1234567890,
621 nodes: None,
622 edges: None,
623 });
624
625 let json = serde_json::to_string(&msg).unwrap();
626 assert!(json.contains("GraphUpdate"));
627 assert!(json.contains("42"));
628 }
629}