use crate::commands::scan::{self, Diag};
use crate::database::index::{Indexer, ScanRecord};
use crate::server::app::ServerEvent;
use crate::server::progress::{ScanMetrics, ScanProgress, TimingBreakdown};
use crate::server::scan_log::ScanLogCollector;
use crate::utils::config::Config;
use crate::utils::project::get_project_info;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::sync::broadcast;
use uuid::Uuid;
fn build_scan_pool(stack_size: usize) -> rayon::ThreadPool {
let total = num_cpus::get();
let scan_threads = total.saturating_sub(2).max(1);
rayon::ThreadPoolBuilder::new()
.num_threads(scan_threads)
.stack_size(stack_size)
.thread_name(|i| format!("nyx-scan-{i}"))
.build()
.expect("failed to build scan thread pool")
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "lowercase")]
pub enum JobStatus {
Queued,
Running,
Completed,
Failed,
}
#[derive(Debug, Clone)]
pub struct ScanJob {
pub id: String,
pub status: JobStatus,
pub scan_root: PathBuf,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
pub duration_secs: Option<f64>,
pub findings: Option<Arc<Vec<Diag>>>,
pub error: Option<String>,
pub progress: Option<Arc<ScanProgress>>,
pub metrics: Option<Arc<ScanMetrics>>,
pub log_collector: Option<Arc<ScanLogCollector>>,
pub engine_version: Option<String>,
pub languages: Option<Vec<String>>,
pub files_scanned: Option<u64>,
pub timing: Option<TimingBreakdown>,
}
pub struct JobManager {
jobs: Mutex<HashMap<String, ScanJob>>,
job_order: Mutex<Vec<String>>,
active_job_id: Mutex<Option<String>>,
max_jobs: usize,
scan_pool: rayon::ThreadPool,
}
impl std::fmt::Debug for JobManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobManager")
.field("max_jobs", &self.max_jobs)
.finish()
}
}
impl JobManager {
pub fn new(max_jobs: usize, rayon_stack_size: usize) -> Self {
Self {
jobs: Mutex::new(HashMap::new()),
job_order: Mutex::new(Vec::new()),
active_job_id: Mutex::new(None),
max_jobs,
scan_pool: build_scan_pool(rayon_stack_size),
}
}
pub fn start_scan(
self: &Arc<Self>,
scan_root: PathBuf,
mut config: Config,
event_tx: broadcast::Sender<ServerEvent>,
db_pool: Option<Arc<Pool<SqliteConnectionManager>>>,
database_dir: PathBuf,
) -> Result<String, &'static str> {
let mut active = self.active_job_id.lock().unwrap();
if active.is_some() {
return Err("A scan is already running");
}
let job_id = Uuid::new_v4().to_string();
let progress = Arc::new(ScanProgress::new());
let metrics = Arc::new(ScanMetrics::new());
let log_collector = Arc::new(ScanLogCollector::default());
let engine_version = env!("CARGO_PKG_VERSION").to_string();
let job = ScanJob {
id: job_id.clone(),
status: JobStatus::Running,
scan_root: scan_root.clone(),
started_at: Some(chrono::Utc::now()),
finished_at: None,
duration_secs: None,
findings: None,
error: None,
progress: Some(Arc::clone(&progress)),
metrics: Some(Arc::clone(&metrics)),
log_collector: Some(Arc::clone(&log_collector)),
engine_version: Some(engine_version.clone()),
languages: None,
files_scanned: None,
timing: None,
};
{
let mut jobs = self.jobs.lock().unwrap();
let mut order = self.job_order.lock().unwrap();
while order.len() >= self.max_jobs {
if let Some(oldest_id) = order.first().cloned() {
if Some(&oldest_id) == active.as_ref() {
break;
}
jobs.remove(&oldest_id);
order.remove(0);
}
}
jobs.insert(job_id.clone(), job);
order.push(job_id.clone());
}
*active = Some(job_id.clone());
if config.framework_ctx.is_none() {
config.framework_ctx = Some(crate::utils::detect_frameworks(&scan_root));
}
let _ = event_tx.send(ServerEvent::ScanStarted {
job_id: job_id.clone(),
});
if let Some(ref pool) = db_pool
&& let Ok(idx) = Indexer::from_pool("_scans", pool)
{
let _ = idx.insert_scan(&ScanRecord {
id: job_id.clone(),
status: "running".to_string(),
scan_root: scan_root.display().to_string(),
started_at: Some(chrono::Utc::now().to_rfc3339()),
finished_at: None,
duration_secs: None,
engine_version: Some(engine_version.clone()),
languages: None,
files_scanned: None,
files_skipped: None,
finding_count: None,
findings_json: None,
timing_json: None,
error: None,
});
}
let progress_for_sse = Arc::clone(&progress);
let event_tx_sse = event_tx.clone();
let jid_sse = job_id.clone();
std::thread::spawn(move || {
loop {
std::thread::sleep(std::time::Duration::from_millis(500));
let snap = progress_for_sse.snapshot();
let is_complete = snap.stage == "complete";
let _ = event_tx_sse.send(ServerEvent::ScanProgress {
job_id: jid_sse.clone(),
stage: snap.stage,
files_discovered: snap.files_discovered,
files_parsed: snap.files_parsed,
files_analyzed: snap.files_analyzed,
files_skipped: snap.files_skipped,
batches_total: snap.batches_total,
batches_completed: snap.batches_completed,
current_file: snap.current_file,
elapsed_ms: snap.elapsed_ms,
timing: snap.timing,
});
if is_complete {
break;
}
}
});
let manager = Arc::clone(self);
let jid = job_id.clone();
std::thread::spawn(move || {
crate::utils::analysis_options::reinstall(config.analysis.engine);
let start = Instant::now();
log_collector.info("Indexed scan started (rebuild enabled)", None);
let result = manager
.scan_pool
.install(|| -> crate::errors::NyxResult<Vec<Diag>> {
let (project_name, db_path) = get_project_info(&scan_root, &database_dir)?;
crate::commands::index::build_index_with_observer(
&project_name,
&scan_root,
&db_path,
&config,
false,
Some(&progress),
Some(&metrics),
Some(&log_collector),
)?;
let pool = Indexer::init(&db_path)?;
scan::scan_with_index_parallel_observer(
&project_name,
pool,
&config,
false,
&scan_root,
Some(&progress),
Some(&metrics),
Some(&log_collector),
None,
)
});
let elapsed = start.elapsed().as_secs_f64();
let progress_snap = progress.snapshot();
let metrics_snap = metrics.snapshot();
let logs = log_collector.drain();
let languages: Vec<String> = progress_snap.languages.keys().cloned().collect();
let files_scanned = progress_snap.files_discovered;
let files_skipped = progress_snap.files_skipped;
let timing = progress_snap.timing.clone();
let finished_at = chrono::Utc::now();
let (status, diags, error_str) = match result {
Ok(diags) => {
log_collector.info(format!("Scan completed: {} findings", diags.len()), None);
(JobStatus::Completed, Some(Arc::new(diags)), None)
}
Err(e) => {
let err_str = e.to_string();
log_collector.error(&err_str, None, None);
(JobStatus::Failed, None, Some(err_str))
}
};
let finding_count = diags.as_ref().map(|d| d.len());
let findings_json = diags
.as_ref()
.and_then(|f| serde_json::to_string(f.as_slice()).ok());
let timing_json = serde_json::to_string(&timing).ok();
let langs_json = serde_json::to_string(&languages).ok();
{
let mut jobs = manager.jobs.lock().unwrap();
if let Some(job) = jobs.get_mut(&jid) {
job.finished_at = Some(finished_at);
job.duration_secs = Some(elapsed);
job.languages = Some(languages.clone());
job.files_scanned = Some(files_scanned);
job.timing = Some(timing.clone());
job.status = status.clone();
job.findings = diags;
job.error = error_str.clone();
}
}
{
let mut active = manager.active_job_id.lock().unwrap();
if active.as_deref() == Some(&jid) {
*active = None;
}
}
match status {
JobStatus::Completed => {
let _ = event_tx.send(ServerEvent::ScanCompleted {
job_id: jid.clone(),
});
}
JobStatus::Failed => {
let _ = event_tx.send(ServerEvent::ScanFailed {
job_id: jid.clone(),
error: error_str.clone().unwrap_or_default(),
});
}
_ => {}
}
if let Some(ref pool) = db_pool
&& let Ok(idx) = Indexer::from_pool("_scans", pool)
{
let finished_str = finished_at.to_rfc3339();
let _ = idx.update_scan(
&jid,
if finding_count.is_some() {
"completed"
} else {
"failed"
},
Some(&finished_str),
Some(elapsed),
finding_count.map(|c| c as i64),
findings_json.as_deref(),
timing_json.as_deref(),
error_str.as_deref(),
Some(files_scanned as i64),
Some(files_skipped as i64),
langs_json.as_deref(),
);
let _ = idx.insert_scan_metrics(&jid, &metrics_snap);
let final_logs = log_collector.drain();
let all_logs: Vec<_> = logs.into_iter().chain(final_logs).collect();
if !all_logs.is_empty() {
let _ = idx.insert_scan_logs(&jid, &all_logs);
}
}
});
Ok(job_id)
}
pub fn get_job(&self, id: &str) -> Option<ScanJob> {
self.jobs.lock().unwrap().get(id).cloned()
}
pub fn list_jobs(&self) -> Vec<ScanJob> {
let jobs = self.jobs.lock().unwrap();
let order = self.job_order.lock().unwrap();
order
.iter()
.rev()
.filter_map(|id| jobs.get(id).cloned())
.collect()
}
pub fn active_job(&self) -> Option<ScanJob> {
let active = self.active_job_id.lock().unwrap();
active
.as_ref()
.and_then(|id| self.jobs.lock().unwrap().get(id).cloned())
}
pub fn get_latest_completed(&self) -> Option<ScanJob> {
let jobs = self.jobs.lock().unwrap();
let order = self.job_order.lock().unwrap();
order
.iter()
.rev()
.filter_map(|id| jobs.get(id))
.find(|j| j.status == JobStatus::Completed)
.cloned()
}
pub fn remove_job(&self, id: &str) -> Result<(), &'static str> {
let active = self.active_job_id.lock().unwrap();
if active.as_deref() == Some(id) {
return Err("Cannot delete a running scan");
}
drop(active);
let mut jobs = self.jobs.lock().unwrap();
if jobs.remove(id).is_none() {
return Err("Scan not found");
}
let mut order = self.job_order.lock().unwrap();
order.retain(|x| x != id);
Ok(())
}
pub fn latest_findings(&self) -> Vec<Diag> {
self.get_latest_completed()
.and_then(|j| j.findings)
.map(|arc| arc.as_ref().clone())
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
fn test_config() -> Config {
Config::default()
}
fn wait_for_job(manager: &Arc<JobManager>, job_id: &str) -> ScanJob {
for _ in 0..200 {
std::thread::sleep(std::time::Duration::from_millis(50));
if let Some(job) = manager.get_job(job_id)
&& job.status != JobStatus::Running
{
return job;
}
}
panic!("job {job_id} did not finish in time");
}
fn wait_for_scan_metrics(
idx: &Indexer,
job_id: &str,
) -> crate::server::progress::ScanMetricsSnapshot {
for _ in 0..100 {
if let Some(metrics) = idx.get_scan_metrics(job_id).unwrap() {
return metrics;
}
std::thread::sleep(std::time::Duration::from_millis(25));
}
panic!("scan metrics for {job_id} were not persisted in time");
}
fn wait_for_scan_record(idx: &Indexer, job_id: &str) -> ScanRecord {
for _ in 0..100 {
if let Some(record) = idx.get_scan(job_id).unwrap() {
return record;
}
std::thread::sleep(std::time::Duration::from_millis(25));
}
panic!("scan record for {job_id} was not persisted in time");
}
#[test]
fn single_scan_policy() {
let manager = Arc::new(JobManager::new(10, 8 * 1024 * 1024));
let (tx, _rx) = broadcast::channel(16);
let dir = tempfile::tempdir().unwrap();
let id = manager
.start_scan(
dir.path().to_path_buf(),
test_config(),
tx.clone(),
None,
dir.path().to_path_buf(),
)
.unwrap();
assert!(!id.is_empty());
let result = manager.start_scan(
dir.path().to_path_buf(),
test_config(),
tx,
None,
dir.path().to_path_buf(),
);
assert!(result.is_err());
}
#[test]
fn bounded_history() {
let manager = Arc::new(JobManager::new(2, 8 * 1024 * 1024));
let (tx, _rx) = broadcast::channel(16);
let dir = tempfile::tempdir().unwrap();
let id1 = manager
.start_scan(
dir.path().to_path_buf(),
test_config(),
tx.clone(),
None,
dir.path().to_path_buf(),
)
.unwrap();
for _ in 0..100 {
std::thread::sleep(std::time::Duration::from_millis(50));
if let Some(j) = manager.get_job(&id1)
&& j.status != JobStatus::Running
{
break;
}
}
let id2 = manager
.start_scan(
dir.path().to_path_buf(),
test_config(),
tx.clone(),
None,
dir.path().to_path_buf(),
)
.unwrap();
for _ in 0..100 {
std::thread::sleep(std::time::Duration::from_millis(50));
if let Some(j) = manager.get_job(&id2)
&& j.status != JobStatus::Running
{
break;
}
}
let _id3 = manager
.start_scan(
dir.path().to_path_buf(),
test_config(),
tx,
None,
dir.path().to_path_buf(),
)
.unwrap();
for _ in 0..100 {
std::thread::sleep(std::time::Duration::from_millis(50));
if manager.active_job().is_none() {
break;
}
}
assert!(manager.get_job(&id1).is_none());
}
#[test]
fn start_scan_uses_indexed_rebuild_and_persists_scan_artifacts() {
let manager = Arc::new(JobManager::new(4, 8 * 1024 * 1024));
let (tx, _rx) = broadcast::channel(16);
let dir = tempfile::tempdir().unwrap();
let project_dir = dir.path().join("proj");
fs::create_dir(&project_dir).unwrap();
fs::write(
project_dir.join("app.js"),
r#"function cleanHtml(input) {
return DOMPurify.sanitize(input);
}
function handleRequest(req, res) {
const safe = cleanHtml(req.query.name);
res.send(safe);
}
handleRequest({ query: { name: '<b>x</b>' } }, { send() {} });
"#,
)
.unwrap();
let (_, db_path) =
crate::utils::project::get_project_info(&project_dir, dir.path()).unwrap();
let pool = Indexer::init(&db_path).unwrap();
let id = manager
.start_scan(
project_dir.clone(),
test_config(),
tx,
Some(Arc::clone(&pool)),
dir.path().to_path_buf(),
)
.unwrap();
let job = wait_for_job(&manager, &id);
assert_eq!(job.status, JobStatus::Completed);
let idx = Indexer::from_pool("proj", &pool).unwrap();
assert!(
!idx.load_all_summaries().unwrap().is_empty(),
"server scan should persist coarse summaries"
);
assert!(
!idx.load_all_ssa_summaries().unwrap().is_empty(),
"server scan should persist SSA summaries"
);
let scans_idx = Indexer::from_pool("_scans", &pool).unwrap();
let metrics = wait_for_scan_metrics(&scans_idx, &id);
assert!(
metrics.summaries_reused >= 1,
"rebuild-index server scan should reuse persisted summaries in indexed pass 1"
);
let mut logs = Vec::new();
for _ in 0..100 {
logs = scans_idx.get_scan_logs(&id, None).unwrap();
if !logs.is_empty() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(25));
}
assert!(
logs.iter()
.any(|entry| entry.message.contains("Indexed scan started")),
"server scan should persist indexed-path logs"
);
let record = wait_for_scan_record(&scans_idx, &id);
assert_eq!(record.files_scanned, Some(1));
assert!(
record.files_skipped.unwrap_or_default() >= 1,
"scan record should capture indexed summary reuse"
);
}
}