use crate::SharedGraph;
use arbor_core::ArborParser;
use arbor_graph::{ArborGraph, Edge, EdgeKind};
use futures_util::{SinkExt, StreamExt};
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct SyncServerConfig {
pub addr: SocketAddr,
pub watch_path: PathBuf,
pub debounce_ms: u64,
pub extensions: Vec<String>,
}
impl Default for SyncServerConfig {
fn default() -> Self {
Self {
addr: SocketAddr::from(([127, 0, 0, 1], 8080)),
watch_path: PathBuf::from("."),
debounce_ms: 150,
extensions: vec![
"ts".into(),
"tsx".into(),
"mts".into(),
"cts".into(),
"js".into(),
"jsx".into(),
"mjs".into(),
"cjs".into(),
"rs".into(),
"py".into(),
"pyi".into(),
"go".into(),
"java".into(),
"c".into(),
"h".into(),
"cpp".into(),
"hpp".into(),
"cc".into(),
"hh".into(),
"cxx".into(),
"hxx".into(),
"cs".into(),
"dart".into(),
"kt".into(),
"kts".into(),
"swift".into(),
"rb".into(),
"php".into(),
"phtml".into(),
"sh".into(),
"bash".into(),
"zsh".into(),
],
}
}
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", content = "payload")]
pub enum BroadcastMessage {
Hello(HelloPayload),
GraphBegin(GraphBeginPayload),
NodeBatch(NodeBatchPayload),
EdgeBatch(EdgeBatchPayload),
GraphEnd,
GraphUpdate(GraphUpdatePayload),
FocusNode(FocusNodePayload),
IndexerStatus(IndexerStatusPayload),
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct HelloPayload {
pub version: String,
pub node_count: usize,
pub edge_count: usize,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct GraphBeginPayload {
pub total_nodes: usize,
pub total_edges: usize,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct NodeBatchPayload {
pub nodes: Vec<arbor_core::CodeNode>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct EdgeBatchPayload {
pub edges: Vec<arbor_graph::GraphEdge>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct GraphUpdatePayload {
pub is_delta: bool,
pub node_count: usize,
pub edge_count: usize,
pub file_count: usize,
pub changed_files: Vec<String>,
pub timestamp: u64,
pub nodes: Option<Vec<arbor_core::CodeNode>>,
pub edges: Option<Vec<arbor_graph::GraphEdge>>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct FocusNodePayload {
pub node_id: String,
pub file: String,
pub line: u32,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct IndexerStatusPayload {
pub phase: String,
pub files_processed: usize,
pub files_total: usize,
pub current_file: Option<String>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
enum WatcherEvent {
Changed(PathBuf),
Created(PathBuf),
Deleted(PathBuf),
}
pub struct SyncServer {
config: SyncServerConfig,
graph: SharedGraph,
broadcast_tx: broadcast::Sender<BroadcastMessage>,
}
#[derive(Clone)]
pub struct SyncServerHandle {
broadcast_tx: broadcast::Sender<BroadcastMessage>,
graph: SharedGraph,
}
impl SyncServerHandle {
pub fn spotlight_node(&self, node_id: &str, file: &str, line: u32) {
let msg = BroadcastMessage::FocusNode(FocusNodePayload {
node_id: node_id.to_string(),
file: file.to_string(),
line,
});
let _ = self.broadcast_tx.send(msg);
}
pub fn graph(&self) -> SharedGraph {
self.graph.clone()
}
}
impl SyncServer {
pub fn new(config: SyncServerConfig) -> Self {
let (broadcast_tx, _) = broadcast::channel(256);
Self {
config,
graph: Arc::new(RwLock::new(ArborGraph::new())),
broadcast_tx,
}
}
pub fn with_graph(config: SyncServerConfig, graph: ArborGraph) -> Self {
let (broadcast_tx, _) = broadcast::channel(256);
Self {
config,
graph: Arc::new(RwLock::new(graph)),
broadcast_tx,
}
}
pub fn new_with_shared(config: SyncServerConfig, graph: SharedGraph) -> Self {
let (broadcast_tx, _) = broadcast::channel(256);
Self {
config,
graph,
broadcast_tx,
}
}
pub fn graph(&self) -> SharedGraph {
self.graph.clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<BroadcastMessage> {
self.broadcast_tx.subscribe()
}
pub fn handle(&self) -> SyncServerHandle {
SyncServerHandle {
broadcast_tx: self.broadcast_tx.clone(),
graph: self.graph.clone(),
}
}
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("╔═══════════════════════════════════════════════════════════╗");
info!("║ ARBOR SYNC SERVER - THE PULSE OF CODE ║");
info!("╚═══════════════════════════════════════════════════════════╝");
let (watcher_tx, watcher_rx) = mpsc::channel::<WatcherEvent>(256);
let watch_path = self.config.watch_path.clone();
let extensions = self.config.extensions.clone();
let debounce_ms = self.config.debounce_ms;
tokio::spawn(async move {
if let Err(e) = run_file_watcher(watch_path, extensions, debounce_ms, watcher_tx).await
{
error!("File watcher error: {}", e);
}
});
let graph = self.graph.clone();
let broadcast_tx = self.broadcast_tx.clone();
let watch_path = self.config.watch_path.clone();
tokio::spawn(async move {
run_background_indexer(watcher_rx, graph, broadcast_tx, watch_path).await;
});
self.run_websocket_server().await
}
async fn run_websocket_server(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(&self.config.addr).await?;
info!("🌐 WebSocket server listening on ws://{}", self.config.addr);
info!("👁️ Watching: {}", self.config.watch_path.display());
info!("⏱️ Debounce: {}ms", self.config.debounce_ms);
loop {
match listener.accept().await {
Ok((stream, addr)) => {
info!("🔌 New connection from {}", addr);
let graph = self.graph.clone();
let broadcast_rx = self.broadcast_tx.subscribe();
tokio::spawn(async move {
if let Err(e) = handle_client(stream, addr, graph, broadcast_rx).await {
warn!("Connection error from {}: {}", addr, e);
}
});
}
Err(e) => {
error!("Accept error: {}", e);
}
}
}
}
pub fn focus_node(&self, node_id: &str, file: &str, line: u32) {
let msg = BroadcastMessage::FocusNode(FocusNodePayload {
node_id: node_id.to_string(),
file: file.to_string(),
line,
});
let _ = self.broadcast_tx.send(msg);
}
pub fn update_status(
&self,
phase: &str,
processed: usize,
total: usize,
current: Option<&str>,
) {
let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
phase: phase.to_string(),
files_processed: processed,
files_total: total,
current_file: current.map(|s| s.to_string()),
});
let _ = self.broadcast_tx.send(msg);
}
}
async fn handle_client(
stream: TcpStream,
addr: SocketAddr,
graph: SharedGraph,
mut broadcast_rx: broadcast::Receiver<BroadcastMessage>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
let config = WebSocketConfig {
max_message_size: Some(64 * 1024 * 1024), max_frame_size: Some(64 * 1024 * 1024), accept_unmasked_frames: false,
..Default::default()
};
let ws_stream = tokio_tungstenite::accept_async_with_config(stream, Some(config)).await?;
let (mut write, mut read) = ws_stream.split();
info!("✅ WebSocket handshake complete with {}", addr);
let (node_count, edge_count, nodes, edges) = {
let g = graph.read().await;
let mut nodes: Vec<_> = g.nodes().cloned().collect();
let edges_raw = g.export_edges();
nodes.sort_by(|a, b| a.id.cmp(&b.id));
let mut edges = edges_raw;
edges.sort_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
(g.node_count(), g.edge_count(), nodes, edges)
};
let hello = BroadcastMessage::Hello(HelloPayload {
version: env!("CARGO_PKG_VERSION").to_string(),
node_count,
edge_count,
});
let json = serde_json::to_string(&hello)?;
write.send(Message::Text(json)).await?;
info!(
"👋 Sent Hello ({} nodes, {} edges) to {}",
node_count, edge_count, addr
);
info!("⏳ Waiting for client {} to be ready...", addr);
let mut ready = false;
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
if text.contains("ready_for_graph") {
ready = true;
info!("✅ Client {} is ready for graph", addr);
break;
}
debug!("Running pre-ready protocol with {}: {}", addr, text);
}
Ok(Message::Ping(data)) => {
write.send(Message::Pong(data)).await?;
}
Ok(Message::Close(_)) => return Ok(()),
Err(e) => return Err(e.into()),
_ => {}
}
}
if !ready {
warn!("Client {} disconnected before sending ready signal", addr);
return Ok(());
}
let begin = BroadcastMessage::GraphBegin(GraphBeginPayload {
total_nodes: node_count,
total_edges: edge_count,
});
write
.send(Message::Text(serde_json::to_string(&begin)?))
.await?;
for chunk in nodes.chunks(50) {
let batch = BroadcastMessage::NodeBatch(NodeBatchPayload {
nodes: chunk.to_vec(),
});
write
.send(Message::Text(serde_json::to_string(&batch)?))
.await?;
}
info!("📤 Streamed {} nodes to {}", node_count, addr);
for chunk in edges.chunks(100) {
let batch = BroadcastMessage::EdgeBatch(EdgeBatchPayload {
edges: chunk.to_vec(),
});
write
.send(Message::Text(serde_json::to_string(&batch)?))
.await?;
}
info!("📤 Streamed {} edges to {}", edge_count, addr);
write
.send(Message::Text(serde_json::to_string(
&BroadcastMessage::GraphEnd,
)?))
.await?;
info!("🏁 Graph stream complete for {}", addr);
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
debug!("📥 Received from {}: {}", addr, text);
}
Some(Ok(Message::Ping(data))) => {
write.send(Message::Pong(data)).await?;
}
Some(Ok(Message::Close(_))) => {
info!("👋 Client {} disconnected gracefully", addr);
break;
}
Some(Err(e)) => {
warn!("⚠️ Error from {}: {}", addr, e);
break;
}
None => break,
_ => {}
}
}
msg = broadcast_rx.recv() => {
match msg {
Ok(broadcast) => {
let json = serde_json::to_string(&broadcast)?;
if write.send(Message::Text(json)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("Client {} lagged by {} messages", addr, n);
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
}
}
}
}
info!("🔌 Connection closed: {}", addr);
Ok(())
}
async fn run_file_watcher(
watch_path: PathBuf,
extensions: Vec<String>,
debounce_ms: u64,
tx: mpsc::Sender<WatcherEvent>,
) -> notify::Result<()> {
let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<Event>>(256);
let mut watcher = RecommendedWatcher::new(
move |res| {
let _ = notify_tx.blocking_send(res);
},
Config::default(),
)?;
watcher.watch(&watch_path, RecursiveMode::Recursive)?;
info!("👁️ File watcher started for {}", watch_path.display());
let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
let debounce_dur = Duration::from_millis(debounce_ms);
loop {
let now = Instant::now();
let mut ready: Vec<PathBuf> = Vec::new();
for (path, time) in pending.iter() {
if now.duration_since(*time) >= debounce_dur {
ready.push(path.clone());
}
}
for path in ready {
pending.remove(&path);
if should_process_file(&path, &extensions) {
let event = if path.exists() {
WatcherEvent::Changed(path)
} else {
WatcherEvent::Deleted(path)
};
let _ = tx.send(event).await;
}
}
match tokio::time::timeout(Duration::from_millis(50), notify_rx.recv()).await {
Ok(Some(Ok(event))) => {
for path in event.paths {
if should_process_file(&path, &extensions) {
pending.insert(path, Instant::now());
}
}
}
Ok(Some(Err(e))) => {
warn!("Watch error: {}", e);
}
Ok(None) => break, Err(_) => {} }
}
Ok(())
}
fn should_process_file(path: &Path, extensions: &[String]) -> bool {
path.extension()
.and_then(|ext| ext.to_str())
.map(|ext| extensions.iter().any(|e| e == ext))
.unwrap_or(false)
}
async fn run_background_indexer(
mut rx: mpsc::Receiver<WatcherEvent>,
graph: SharedGraph,
broadcast_tx: broadcast::Sender<BroadcastMessage>,
_root_path: PathBuf,
) {
let mut parser = match ArborParser::new() {
Ok(parser) => Some(parser),
Err(error) => {
warn!(
"Failed to initialize parser for background indexer; will retry lazily per event: {}",
error
);
None
}
};
info!("🔧 Background indexer started");
while let Some(event) = rx.recv().await {
let start = Instant::now();
match event {
WatcherEvent::Changed(path) | WatcherEvent::Created(path) => {
let file_name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
info!("📝 Re-indexing: {}", file_name);
if parser.is_none() {
parser = match ArborParser::new() {
Ok(parser) => Some(parser),
Err(error) => {
warn!(
"Skipping '{}' due to parser init failure: {}",
file_name,
error
);
None
}
};
}
let Some(parser) = parser.as_mut() else {
continue;
};
match parser.parse_file(&path) {
Ok(result) => {
let mut g = graph.write().await;
g.remove_file(&result.file_path);
let mut node_ids = HashMap::new();
for symbol in &result.symbols {
let id = g.add_node(symbol.clone());
node_ids.insert(symbol.id.clone(), id);
}
for relation in &result.relations {
if let Some(&from_id) = node_ids.get(&relation.from_id) {
let targets = g.find_by_name(&relation.to_name);
if let Some(target) = targets.first() {
if let Some(to_id) = g.get_index(&target.id) {
let edge_kind = match relation.kind {
arbor_core::RelationType::Calls => EdgeKind::Calls,
arbor_core::RelationType::Imports => EdgeKind::Imports,
arbor_core::RelationType::Extends => EdgeKind::Extends,
arbor_core::RelationType::Implements => {
EdgeKind::Implements
}
};
g.add_edge(from_id, to_id, Edge::new(edge_kind));
}
}
}
}
let elapsed = start.elapsed();
info!(
"✅ Indexed {} in {:?} ({} symbols, {} relations)",
file_name,
elapsed,
result.symbols.len(),
result.relations.len()
);
let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
is_delta: true,
node_count: g.node_count(),
edge_count: g.edge_count(),
file_count: g.stats().files,
changed_files: vec![result.file_path],
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs()),
nodes: Some(g.nodes().cloned().collect()),
edges: Some(g.export_edges()),
});
let _ = broadcast_tx.send(update);
}
Err(e) => {
warn!("⚠️ Parse error for {}: {}", file_name, e);
}
}
}
WatcherEvent::Deleted(path) => {
let file_str = path.to_string_lossy().to_string();
info!("🗑️ File deleted: {}", path.display());
let mut g = graph.write().await;
g.remove_file(&file_str);
let update = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
is_delta: true,
node_count: g.node_count(),
edge_count: g.edge_count(),
file_count: g.stats().files,
changed_files: vec![file_str],
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs()),
nodes: Some(g.nodes().cloned().collect()),
edges: Some(g.export_edges()),
});
let _ = broadcast_tx.send(update);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_process_file() {
let extensions = vec!["ts".to_string(), "rs".to_string()];
assert!(should_process_file(Path::new("foo.ts"), &extensions));
assert!(should_process_file(Path::new("bar.rs"), &extensions));
assert!(!should_process_file(Path::new("baz.py"), &extensions));
assert!(!should_process_file(Path::new("README.md"), &extensions));
}
#[test]
fn test_broadcast_message_serialization() {
let msg = BroadcastMessage::GraphUpdate(GraphUpdatePayload {
is_delta: true,
node_count: 42,
edge_count: 100,
file_count: 5,
changed_files: vec!["foo.ts".to_string()],
timestamp: 1234567890,
nodes: None,
edges: None,
});
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("GraphUpdate"));
assert!(json.contains("42"));
}
#[test]
fn test_sync_config_default_has_all_extensions() {
let config = SyncServerConfig::default();
let exts = &config.extensions;
let required: std::collections::HashSet<String> =
arbor_core::languages::supported_extensions()
.iter()
.map(|ext| ext.to_string())
.collect();
let actual: std::collections::HashSet<String> = exts.iter().cloned().collect();
for ext in &required {
assert!(
actual.contains(ext),
"SyncServerConfig is missing extension: {}",
ext
);
}
}
#[test]
fn test_focus_node_serialization() {
let msg = BroadcastMessage::FocusNode(FocusNodePayload {
node_id: "abc123".to_string(),
file: "main.rs".to_string(),
line: 42,
});
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("FocusNode"));
assert!(json.contains("abc123"));
assert!(json.contains("main.rs"));
}
#[test]
fn test_indexer_status_serialization() {
let msg = BroadcastMessage::IndexerStatus(IndexerStatusPayload {
phase: "scanning".to_string(),
files_processed: 10,
files_total: 100,
current_file: Some("test.rs".to_string()),
});
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("scanning"));
assert!(json.contains("test.rs"));
}
#[test]
fn test_hello_payload_serialization() {
let msg = BroadcastMessage::Hello(HelloPayload {
version: "2.0.0".to_string(),
node_count: 100,
edge_count: 200,
});
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("2.0.0"));
assert!(json.contains("100"));
}
#[test]
fn test_graph_end_serialization() {
let msg = BroadcastMessage::GraphEnd;
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("GraphEnd"));
}
}