pub mod async_io;
const DEFAULT_L3_CACHE_SIZE: usize = 16 * 1024 * 1024; const TARGET_CACHE_USAGE: f64 = 0.50; const AVG_SOURCE_FILE_SIZE: usize = 50 * 1024;
#[derive(Debug)]
pub struct FileBatchResult {
pub symbols: usize,
pub references: usize,
pub calls: usize,
pub bytes_processed: usize,
}
pub fn compute_l3_cache_batches(
paths_with_sizes: &[(PathBuf, usize)],
target_cache_bytes: usize,
) -> Vec<Vec<(PathBuf, usize)>> {
if paths_with_sizes.is_empty() {
return Vec::new();
}
let mut batches: Vec<Vec<(PathBuf, usize)>> = Vec::new();
let mut current_batch: Vec<(PathBuf, usize)> = Vec::new();
let mut current_batch_size: usize = 0;
for (path, size) in paths_with_sizes {
if current_batch.is_empty() || current_batch_size + size <= target_cache_bytes {
current_batch.push((path.clone(), *size));
current_batch_size += size;
} else {
batches.push(std::mem::take(&mut current_batch));
current_batch.push((path.clone(), *size));
current_batch_size = *size;
}
}
if !current_batch.is_empty() {
batches.push(current_batch);
}
batches
}
pub fn read_batch_sources(paths: &[PathBuf]) -> Vec<(PathBuf, Vec<u8>, usize)> {
paths
.iter()
.filter_map(|path| {
std::fs::read(path).ok().map(|source| {
let len = source.len();
(path.clone(), source, len)
})
})
.collect()
}
#[cfg(feature = "debug-prints")]
macro_rules! debug_print {
($($arg:tt)*) => {
{ eprintln!($($arg)*); }
};
}
#[cfg(not(feature = "debug-prints"))]
#[allow(unused_macros)]
macro_rules! debug_print {
($($arg:tt)*) => {
{
()
}
};
}
use anyhow::{Context, Result};
use indicatif::{ProgressBar, ProgressStyle};
use std::collections::BTreeSet;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use crate::{CodeGraph, FileEvent, FileSystemWatcher, WatcherConfig};
fn reconcile_deleted_files(graph: &mut CodeGraph, root_path: &std::path::Path) -> Result<()> {
let file_nodes = graph.all_file_nodes()?;
for (path, _file_node) in file_nodes {
let file_path = std::path::Path::new(&path);
if file_path.starts_with(root_path) && !file_path.exists() {
let _ = graph.delete_file(&path);
}
}
Ok(())
}
fn handle_event(graph: &mut CodeGraph, event: FileEvent) -> Result<()> {
let path_key = crate::validation::normalize_path(&event.path)
.unwrap_or_else(|_| event.path.to_string_lossy().to_string());
let _outcome = graph.reconcile_file_path(&event.path, &path_key)?;
Ok(())
}
pub fn run_indexer(root_path: PathBuf, db_path: PathBuf) -> Result<()> {
run_indexer_n(root_path, db_path, usize::MAX)?;
Ok(())
}
pub fn run_indexer_n(root_path: PathBuf, db_path: PathBuf, max_events: usize) -> Result<usize> {
let shutdown = Arc::new(AtomicBool::new(false));
let watcher = FileSystemWatcher::new(
root_path.clone(),
WatcherConfig::default(),
shutdown.clone(),
)?;
let mut graph = CodeGraph::open(&db_path)?;
reconcile_deleted_files(&mut graph, &root_path)?;
let mut processed = 0;
let mut idle_for = std::time::Duration::from_secs(0);
let idle_step = std::time::Duration::from_millis(10);
let idle_timeout_ms = std::env::var("MAGELLAN_WATCH_TIMEOUT_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(5000);
let idle_timeout = std::time::Duration::from_millis(idle_timeout_ms);
while processed < max_events {
match watcher.try_recv_event() {
Ok(Some(event)) => {
handle_event(&mut graph, event)?;
processed += 1;
idle_for = std::time::Duration::from_secs(0);
continue;
}
Ok(None) => {}
Err(e) => {
return Err(e.context("watcher mutex poisoned during event recv"));
}
}
if idle_for >= idle_timeout {
break;
}
std::thread::sleep(idle_step);
idle_for += idle_step;
}
shutdown.store(true, Ordering::SeqCst);
Ok(processed)
}
#[derive(Debug, Clone)]
pub struct WatchPipelineConfig {
pub root_path: PathBuf,
pub db_path: PathBuf,
pub watcher_config: WatcherConfig,
pub scan_initial: bool,
}
impl WatchPipelineConfig {
pub fn new(
root_path: PathBuf,
db_path: PathBuf,
watcher_config: WatcherConfig,
scan_initial: bool,
) -> Self {
Self {
root_path,
db_path,
watcher_config,
scan_initial,
}
}
}
#[derive(Clone)]
struct PipelineSharedState {
dirty_paths: Arc<std::sync::Mutex<BTreeSet<PathBuf>>>,
wakeup_tx: std::sync::mpsc::SyncSender<()>,
}
impl PipelineSharedState {
fn new() -> (Self, std::sync::mpsc::Receiver<()>) {
let (wakeup_tx, wakeup_rx) = std::sync::mpsc::sync_channel(1);
(
Self {
dirty_paths: Arc::new(std::sync::Mutex::new(BTreeSet::new())),
wakeup_tx,
},
wakeup_rx,
)
}
fn insert_dirty_paths(&self, paths: &[PathBuf]) -> Result<()> {
let mut dirty_paths = self
.dirty_paths
.lock()
.map_err(|e| anyhow::anyhow!("dirty_paths mutex poisoned: {}", e))?;
for path in paths {
dirty_paths.insert(path.clone());
}
let _ = self.wakeup_tx.try_send(());
Ok(())
}
fn drain_dirty_paths(&self) -> Result<Vec<PathBuf>> {
let mut paths = self
.dirty_paths
.lock()
.map_err(|e| anyhow::anyhow!("dirty_paths mutex poisoned: {}", e))?;
let snapshot: Vec<PathBuf> = paths.iter().cloned().collect();
paths.clear();
Ok(snapshot)
}
}
pub fn run_watch_pipeline(config: WatchPipelineConfig, shutdown: Arc<AtomicBool>) -> Result<usize> {
let mut graph = CodeGraph::open(&config.db_path)?;
let (shared_state, wakeup_rx) = PipelineSharedState::new();
let main_state = shared_state.clone();
let watcher_thread = {
let root_path = config.root_path.clone();
let watcher_config = config.watcher_config.clone();
let shared_state = Arc::new(shared_state);
let shutdown_watch = shutdown.clone();
thread::spawn(move || {
let result = watcher_loop(root_path, watcher_config, shared_state, shutdown_watch);
crate::ingest::pool::cleanup_parsers();
if let Err(e) = result {
eprintln!("Watcher thread error: {:?}", e);
}
})
};
if config.scan_initial {
use indicatif::HumanCount;
graph.scan_directory(
&config.root_path,
Some(&|current, total, file_path| {
static PB: std::sync::OnceLock<ProgressBar> = std::sync::OnceLock::new();
let pb = PB.get_or_init(|| {
let pb = ProgressBar::new(total as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) ETA: {eta}\n{msg}")
.unwrap()
.progress_chars("=>-"),
);
pb
});
pb.set_position(current as u64);
pb.set_message(format!("Scanning: {}", file_path));
if current >= total {
pb.finish_with_message(format!("Scanned {} files", HumanCount(total as u64)));
}
}),
)?;
}
let mut total_processed = 0;
let paths_during_scan = main_state.drain_dirty_paths()?;
if !paths_during_scan.is_empty() {
println!(
"Flushing {} buffered path(s) from scan...",
paths_during_scan.len()
);
total_processed += process_dirty_paths(&mut graph, &paths_during_scan)?;
if let Err(e) = graph.checkpoint_wal() {
eprintln!("Warning: WAL checkpoint failed after scan flush: {}", e);
}
}
println!("Magellan watching: {}", config.root_path.display());
println!("Database: {}", config.db_path.display());
while !shutdown.load(Ordering::SeqCst) {
match wakeup_rx.recv_timeout(Duration::from_millis(100)) {
Ok(()) => {
let dirty_paths = main_state.drain_dirty_paths()?;
if !dirty_paths.is_empty() {
total_processed += process_dirty_paths(&mut graph, &dirty_paths)?;
if let Err(e) = graph.checkpoint_wal() {
eprintln!("Warning: WAL checkpoint failed after watch batch: {}", e);
}
}
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
continue;
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
let timeout = Duration::from_secs(25);
let start = Instant::now();
let mut finished = false;
while !watcher_thread.is_finished() {
if start.elapsed() >= timeout {
eprintln!(
"Warning: Watcher thread did not finish within {:?}, forcing shutdown",
timeout
);
eprintln!(
"Note: Data may not be flushed. Use Ctrl+C (not timeout) for clean shutdown."
);
break;
}
thread::sleep(Duration::from_millis(100));
}
if watcher_thread.is_finished() {
finished = true;
}
if finished {
match watcher_thread.join() {
Ok(_) => {
}
Err(panic_payload) => {
if let Some(msg) = panic_payload.downcast_ref::<&str>() {
eprintln!("Watcher thread panicked: {}", msg);
} else if let Some(msg) = panic_payload.downcast_ref::<String>() {
eprintln!("Watcher thread panicked: {}", msg);
} else {
eprintln!("Watcher thread panicked with unknown payload");
}
}
}
}
crate::ingest::pool::cleanup_parsers();
Ok(total_processed)
}
fn watcher_loop(
root_path: PathBuf,
config: WatcherConfig,
shared_state: Arc<PipelineSharedState>,
shutdown: Arc<AtomicBool>,
) -> Result<()> {
let watcher = FileSystemWatcher::new(root_path, config, shutdown.clone())?;
while !shutdown.load(Ordering::SeqCst) {
match watcher.recv_batch_timeout(Duration::from_millis(100)) {
Ok(Some(batch)) => {
shared_state.insert_dirty_paths(&batch.paths)?;
}
Ok(None) => {
break;
}
Err(_) => {
continue;
}
}
}
Ok(())
}
fn process_dirty_paths(graph: &mut CodeGraph, dirty_paths: &[PathBuf]) -> Result<usize> {
process_dirty_paths_batched(graph, dirty_paths)
}
fn process_dirty_paths_batched(graph: &mut CodeGraph, dirty_paths: &[PathBuf]) -> Result<usize> {
if dirty_paths.is_empty() {
return Ok(0);
}
let batch_start = Instant::now();
let _total_files = dirty_paths.len();
let size_start = Instant::now();
let paths_with_sizes: Vec<(PathBuf, usize)> = dirty_paths
.iter()
.filter_map(|path| {
std::fs::metadata(path)
.ok()
.map(|meta| (path.clone(), meta.len() as usize))
})
.collect();
let size_time = size_start.elapsed();
let target_cache_bytes = (DEFAULT_L3_CACHE_SIZE as f64 * TARGET_CACHE_USAGE) as usize;
let batch_start_compute = Instant::now();
let batches = compute_l3_cache_batches(&paths_with_sizes, target_cache_bytes);
let batch_count = batches.len();
let batch_compute_time = batch_start_compute.elapsed();
let mut total_processed = 0;
let mut total_read_time = std::time::Duration::ZERO;
let mut total_reconcile_time = std::time::Duration::ZERO;
for batch in batches {
let batch_paths: Vec<PathBuf> = batch.iter().map(|(p, _)| p.clone()).collect();
let read_start = Instant::now();
let sources = read_batch_sources(&batch_paths);
total_read_time += read_start.elapsed();
let source_map: std::collections::HashMap<PathBuf, Vec<u8>> = sources
.into_iter()
.map(|(path, source, _)| (path, source))
.collect();
for path in &batch_paths {
let path_key = crate::validation::normalize_path(path)
.unwrap_or_else(|_| path.to_string_lossy().to_string());
let reconcile_start = Instant::now();
let outcome = if let Some(source) = source_map.get(path) {
graph.reconcile_file_path_with_source(path, &path_key, source)
} else {
graph.reconcile_file_path(path, &path_key)
};
match outcome {
Ok(outcome) => {
total_reconcile_time += reconcile_start.elapsed();
let path_str = path.to_string_lossy();
let was_modified = match outcome {
crate::ReconcileOutcome::Deleted => {
println!("DELETE {}", path_str);
true
}
crate::ReconcileOutcome::Unchanged => {
false
}
crate::ReconcileOutcome::Reindexed {
symbols,
references,
calls,
} => {
println!(
"MODIFY {} symbols={} refs={} calls={}",
path_str, symbols, references, calls
);
true
}
};
if was_modified {
total_processed += 1;
}
}
Err(e) => {
total_reconcile_time += reconcile_start.elapsed();
let path_str = path.to_string_lossy();
println!("ERROR {} {}", path_str, e);
}
}
}
}
for path in dirty_paths {
if !path.exists() {
let path_key = crate::validation::normalize_path(path)
.unwrap_or_else(|_| path.to_string_lossy().to_string());
let _ = graph.delete_file_facts(&path_key);
println!("DELETE {}", path.to_string_lossy());
total_processed += 1;
}
}
let elapsed = batch_start.elapsed();
if total_processed > 0 {
eprintln!(
"L3 Batch: {} files processed, {} batches, {}ms total (size:{}ms batch:{}ms read:{}ms reconcile:{}ms)",
total_processed,
batch_count,
elapsed.as_millis(),
size_time.as_millis(),
batch_compute_time.as_millis(),
total_read_time.as_millis(),
total_reconcile_time.as_millis()
);
if let Err(e) = crate::graph::CodeGraph::rebuild_fts5_index(graph.db_path()) {
eprintln!("Warning: FTS5 rebuild failed: {}", e);
}
}
Ok(total_processed)
}
#[cfg(feature = "geometric-backend")]
pub fn run_watch_pipeline_geometric(
config: crate::WatchPipelineConfig,
shutdown: Arc<AtomicBool>,
) -> Result<usize> {
use crate::graph::geo_index;
use crate::graph::geometric_backend::GeometricBackend;
use indicatif::{HumanCount, ProgressBar, ProgressStyle};
let mut backend = if config.db_path.exists() {
GeometricBackend::open(&config.db_path)?
} else {
GeometricBackend::create(&config.db_path)?
};
let (shared_state, wakeup_rx) = PipelineSharedState::new();
let main_state = shared_state.clone();
let _watcher_thread = {
let root_path = config.root_path.clone();
let watcher_config = config.watcher_config.clone();
let shared_state = Arc::new(shared_state);
let shutdown_watch = shutdown.clone();
thread::spawn(move || {
let _ = watcher_loop(root_path, watcher_config, shared_state, shutdown_watch);
crate::ingest::pool::cleanup_parsers();
})
};
let mut total_processed = 0;
if config.scan_initial {
println!("Starting initial scan of: {}", config.root_path.display());
use indicatif::HumanCount;
match geo_index::scan_directory_with_progress(
&mut backend,
&config.root_path,
Some(&|current, total| {
static PB: std::sync::OnceLock<ProgressBar> = std::sync::OnceLock::new();
let pb = PB.get_or_init(|| {
let pb = ProgressBar::new(total as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) ETA: {eta}\n{msg}")
.unwrap()
.progress_chars("=>-"),
);
pb
});
pb.set_position(current as u64);
pb.set_message(format!(
"Indexing: {}/{} files",
HumanCount(current as u64),
HumanCount(total as u64)
));
if current >= total {
pb.finish_with_message(format!("Scanned {} files", HumanCount(total as u64)));
}
}),
geo_index::IndexingMode::CfgFirst,
) {
Ok(count) => {
println!("Initial scan complete: indexed {} files", count);
total_processed += count;
debug_print!("[WATCH_DEBUG] Saving data after initial scan...");
match backend.save_to_disk() {
Ok(_) => debug_print!("[WATCH_DEBUG] Data saved successfully"),
Err(_e) => debug_print!("[WATCH_DEBUG] ERROR saving data: {}", _e),
}
debug_print!("[WATCH_DEBUG] Save operation completed.");
}
Err(e) => {
eprintln!("Initial scan error: {}", e);
}
}
}
println!(
"Magellan watching (Geometric): {}",
config.root_path.display()
);
println!("Database: {}", config.db_path.display());
let mut save_counter = 0;
while !shutdown.load(Ordering::SeqCst) {
match wakeup_rx.recv_timeout(Duration::from_millis(100)) {
Ok(()) => {
let dirty_paths = main_state.drain_dirty_paths()?;
if !dirty_paths.is_empty() {
let batch_count = process_dirty_paths_geometric(&backend, &dirty_paths)?;
total_processed += batch_count;
save_counter += batch_count;
if save_counter >= 100 {
backend.save_to_disk()?;
save_counter = 0;
}
}
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
continue;
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
println!("Flushing data to disk...");
backend.save_to_disk()?;
println!("Data flushed.");
Ok(total_processed)
}
#[cfg(feature = "geometric-backend")]
fn process_dirty_paths_geometric(
backend: &crate::graph::geometric_backend::GeometricBackend,
dirty_paths: &[PathBuf],
) -> Result<usize> {
use crate::graph::geometric_backend::extract_symbols_cfg_and_calls_from_file;
use crate::ingest::detect_language;
use std::fs;
let mut count = 0;
for path in dirty_paths {
if !path.exists() {
continue;
}
let language = match detect_language(path) {
Some(l) => l,
None => continue,
};
let content = match fs::read_to_string(path) {
Ok(c) => c,
Err(_) => continue,
};
match extract_symbols_cfg_and_calls_from_file(path, &content, language) {
Ok((symbols, cfg_blocks, cfg_edges, _call_edges)) => {
let sym_count: usize = symbols.len();
let symbol_ids = backend.insert_symbols(symbols)?;
let cfg_blocks_nonempty: bool = !cfg_blocks.is_empty();
if cfg_blocks_nonempty {
let mut block_id_map: std::collections::HashMap<usize, u64> =
std::collections::HashMap::new();
for (idx, mut block) in cfg_blocks.into_iter().enumerate() {
let local_sym_idx = block.function_id as usize;
if local_sym_idx < symbol_ids.len() {
block.function_id = symbol_ids[local_sym_idx] as i64;
}
let logical_id = block.id;
block_id_map.insert(idx, logical_id);
let _ = backend.insert_cfg_block(block);
}
for edge in cfg_edges {
let edge_type_str = match edge.edge_type {
1 => "branch_true",
2 => "branch_false",
_ => "normal",
};
let _ = backend.insert_edge(edge.src_id, edge.dst_id, edge_type_str);
}
}
println!("MODIFY {} symbols={}", path.display(), sym_count);
count += 1;
}
Err(e) => println!("ERROR {} {}", path.display(), e),
}
}
Ok(count)
}
#[cfg(not(feature = "geometric-backend"))]
pub fn run_watch_pipeline_geometric(
_config: crate::WatchPipelineConfig,
_shutdown: Arc<AtomicBool>,
) -> Result<usize> {
Err(anyhow::anyhow!("Geometric backend not enabled"))
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_compute_l3_cache_batches_empty() {
let paths: Vec<(PathBuf, usize)> = Vec::new();
let batches = compute_l3_cache_batches(&paths, 1024);
assert!(batches.is_empty());
}
#[test]
fn test_compute_l3_cache_batches_single_file() {
let paths = vec![(PathBuf::from("test.rs"), 100)];
let batches = compute_l3_cache_batches(&paths, 1024);
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].len(), 1);
assert_eq!(batches[0][0].0, PathBuf::from("test.rs"));
assert_eq!(batches[0][0].1, 100);
}
#[test]
fn test_compute_l3_cache_batches_fits_in_one_batch() {
let paths = vec![
(PathBuf::from("a.rs"), 100),
(PathBuf::from("b.rs"), 200),
(PathBuf::from("c.rs"), 300),
];
let batches = compute_l3_cache_batches(&paths, 1000);
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].len(), 3);
}
#[test]
fn test_compute_l3_cache_batches_splits_on_limit() {
let paths = vec![
(PathBuf::from("a.rs"), 500),
(PathBuf::from("b.rs"), 600), (PathBuf::from("c.rs"), 300),
];
let batches = compute_l3_cache_batches(&paths, 1000);
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].len(), 1); assert_eq!(batches[1].len(), 2); }
#[test]
fn test_compute_l3_cache_batches_large_file() {
let paths = vec![
(PathBuf::from("small.rs"), 100),
(PathBuf::from("huge.rs"), 2000), (PathBuf::from("tiny.rs"), 50), ];
let batches = compute_l3_cache_batches(&paths, 1000);
assert_eq!(batches.len(), 3);
assert_eq!(batches[0].len(), 1); assert_eq!(batches[1].len(), 1); assert_eq!(batches[2].len(), 1); }
#[test]
fn test_read_batch_sources_missing_files() {
let paths = vec![PathBuf::from("/nonexistent/path/file.rs")];
let sources = read_batch_sources(&paths);
assert!(sources.is_empty());
}
#[test]
fn test_read_batch_sources_existing_files() {
use std::fs;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let file1 = temp_dir.path().join("test1.rs");
let file2 = temp_dir.path().join("test2.rs");
fs::write(&file1, "fn main() {}").unwrap();
fs::write(&file2, "fn other() {}").unwrap();
let paths = vec![file1.clone(), file2.clone()];
let sources = read_batch_sources(&paths);
assert_eq!(sources.len(), 2);
let found: std::collections::HashSet<_> =
sources.iter().map(|(p, _, _)| p.clone()).collect();
assert!(found.contains(&file1));
assert!(found.contains(&file2));
}
#[test]
fn test_l3_cache_size_constants() {
assert_eq!(DEFAULT_L3_CACHE_SIZE, 16 * 1024 * 1024); assert!((TARGET_CACHE_USAGE - 0.50).abs() < 0.001); assert_eq!(AVG_SOURCE_FILE_SIZE, 50 * 1024); }
#[test]
fn test_reconcile_with_source_uses_provided_bytes() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let mut graph = crate::CodeGraph::open(&db_path).unwrap();
let source = b"fn main() { let x = 1; }";
let path = dir.path().join("test.rs");
std::fs::write(&path, source).unwrap();
let outcome = graph
.reconcile_file_path_with_source(&path, "test.rs", source)
.unwrap();
match outcome {
crate::ReconcileOutcome::Reindexed { symbols, .. } => {
assert!(symbols > 0, "Should index symbols from provided source");
}
other => panic!("Expected Reindexed, got {:?}", other),
}
}
}