Skip to main content

nyx_agent_api/
state.rs

1use std::collections::{HashMap, VecDeque};
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::sync::{
6    atomic::{AtomicU64, Ordering},
7    Arc,
8};
9
10use axum::http::StatusCode;
11use axum::response::{IntoResponse, Response};
12use axum::Json;
13use serde_json::json;
14use thiserror::Error;
15use tokio::sync::{Mutex, RwLock};
16
17use nyx_agent_core::store::StoreError;
18use nyx_agent_core::{Config, SecretStore, Store};
19use nyx_agent_types::event::{AgentEvent, AiEvent, EventSink, RunEvent, SandboxEvent};
20use nyx_agent_types::product::{
21    ProjectLaunchProfile, ProjectLaunchProfileInput, ProjectSetupError, ProjectSetupJobEvent,
22    ProjectSetupJobRecord, ProjectSetupJobStatus, ProjectSetupPhase, ProjectSetupResponse,
23    ProjectSetupVerificationStatus, SeedSetupPlan, VerifiedVulnerabilityRecord,
24};
25use nyx_agent_types::project::{
26    AuthSetupError, AuthSetupJobEvent, AuthSetupJobRecord, AuthSetupJobStatus, AuthSetupPhase,
27    AuthSetupResponse, AuthSetupVerification, ProjectAuthOwnedObject, ProjectAuthProfile,
28};
29
30/// Future returned by [`ScanTrigger::trigger`]. Boxed so the trait can be
31/// object-safe.
32pub type ScanFuture<'a> =
33    Pin<Box<dyn Future<Output = Result<String, ScanTriggerError>> + Send + 'a>>;
34
35/// What surface kicked off a scan. The daemon stamps this onto the
36/// `runs.triggered_by` column via [`ScanTriggerSource::as_run_record_string`]
37/// so subsequent reads (`GET /api/v1/runs/:id`, `nyx-agent report`) can
38/// attribute the run to the right source. Previously every API-driven
39/// run was stamped `"UI"` regardless of whether the scheduler, the
40/// webhook handler, or the SPA fired it.
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum ScanTriggerSource {
43    /// SPA "Scan now" button or anything else routed through
44    /// `POST /api/v1/projects/:id/scan`.
45    Manual,
46    /// `[[schedule]]` cron entry; `label` is the entry's operator-facing
47    /// name so the persisted row records which schedule fired.
48    Scheduler { label: String },
49    /// `POST /webhook/git` delivery (verified HMAC).
50    Webhook,
51}
52
53impl ScanTriggerSource {
54    /// Encode for the `runs.triggered_by` TEXT column. The scheduler
55    /// variant carries a label so the row records which `[[schedule]]`
56    /// entry fired; the prefix matches the historical
57    /// `TriggeredBy::Cron` discriminator in `nyx-agent-core` so existing
58    /// readers that match on the prefix keep working.
59    pub fn as_run_record_string(&self) -> String {
60        match self {
61            ScanTriggerSource::Manual => "UI".to_string(),
62            ScanTriggerSource::Scheduler { label } => format!("Cron:{label}"),
63            ScanTriggerSource::Webhook => "Webhook".to_string(),
64        }
65    }
66}
67
68/// Per-run safety overrides requested by an interactive UI flow. These
69/// do not persist to `nyx-agent.toml`; scheduled/webhook scans keep the
70/// daemon defaults.
71#[derive(Debug, Clone, Default, PartialEq, Eq)]
72pub struct ScanRunOverrides {
73    pub exploit_mode_enabled: bool,
74    pub allow_state_changing_live_probes: bool,
75    pub exploit_dry_run: Option<bool>,
76    pub browser_checks_enabled: Option<bool>,
77    pub business_logic_templates_enabled: Option<bool>,
78    pub research_mode_enabled: Option<bool>,
79    pub unsafe_attack_agent_enabled: Option<bool>,
80    pub business_logic_template_ids: Option<Vec<String>>,
81}
82
83/// Plug that lets the API hand off a manual scan request to the daemon
84/// that owns the run dispatcher. The daemon wires the production impl;
85/// tests substitute a stub.
86pub trait ScanTrigger: Send + Sync + 'static {
87    /// Kick off a scan. Returns the freshly minted run id.
88    ///
89    /// - `source` records the surface that requested the scan; the
90    ///   daemon stamps it onto `runs.triggered_by`.
91    /// - `project_id`, when set, restricts the run to repos belonging
92    ///   to that project; `repo` further narrows to a single repo.
93    ///   Passing both unset scans every enabled repo.
94    fn trigger<'a>(
95        &'a self,
96        source: ScanTriggerSource,
97        project_id: Option<String>,
98        repo: Option<String>,
99        run_overrides: Option<ScanRunOverrides>,
100    ) -> ScanFuture<'a>;
101}
102
103pub type AuthSetupAgentFuture<'a> =
104    Pin<Box<dyn Future<Output = Result<AuthSetupAgentOutput, AuthSetupAgentError>> + Send + 'a>>;
105
106#[derive(Debug, Clone)]
107pub struct AuthSetupAgentRequest {
108    pub project_id: String,
109    pub project_name: String,
110    pub target_base_url: Option<String>,
111    pub workspace_roots: Vec<PathBuf>,
112    pub requested_roles: Vec<String>,
113    pub seeded_objects: Vec<ProjectAuthOwnedObject>,
114    pub existing_profiles: Vec<ProjectAuthProfile>,
115    pub static_login_paths: Vec<String>,
116    pub static_object_routes: Vec<String>,
117    pub files_inspected: usize,
118}
119
120#[derive(Debug, Clone)]
121pub struct AuthSetupAgentOutput {
122    pub profiles: Vec<ProjectAuthProfile>,
123    pub roles: Vec<String>,
124    pub login_paths: Vec<String>,
125    pub object_routes: Vec<String>,
126    pub files_inspected: usize,
127    pub verification: AuthSetupVerification,
128    pub message: String,
129}
130
131#[derive(Debug, Error)]
132pub enum AuthSetupAgentError {
133    #[error("auth setup agent unavailable: {0}")]
134    Unavailable(String),
135    #[error("auth setup agent failed: {0}")]
136    Failed(String),
137}
138
139pub trait AuthSetupAgent: Send + Sync + 'static {
140    fn explore<'a>(&'a self, req: AuthSetupAgentRequest) -> AuthSetupAgentFuture<'a>;
141}
142
143pub type ProjectSetupAgentFuture<'a> = Pin<
144    Box<dyn Future<Output = Result<ProjectSetupAgentOutput, ProjectSetupAgentError>> + Send + 'a>,
145>;
146
147#[derive(Debug, Clone)]
148pub struct ProjectSetupAgentRequest {
149    pub project_id: String,
150    pub project_name: String,
151    pub target_base_url: Option<String>,
152    pub workspace_roots: Vec<PathBuf>,
153    pub existing_launch_profile: Option<ProjectLaunchProfile>,
154}
155
156#[derive(Debug, Clone)]
157pub struct ProjectSetupAgentOutput {
158    pub profile: ProjectLaunchProfileInput,
159    pub summary: String,
160    pub checks: Vec<String>,
161    pub warnings: Vec<String>,
162    pub verification_status: ProjectSetupVerificationStatus,
163    pub message: String,
164}
165
166#[derive(Debug, Error)]
167pub enum ProjectSetupAgentError {
168    #[error("project setup agent unavailable: {0}")]
169    Unavailable(String),
170    #[error("project setup agent failed: {0}")]
171    Failed(String),
172}
173
174pub trait ProjectSetupAgent: Send + Sync + 'static {
175    fn explore<'a>(&'a self, req: ProjectSetupAgentRequest) -> ProjectSetupAgentFuture<'a>;
176}
177
178pub type SeedSetupAgentFuture<'a> =
179    Pin<Box<dyn Future<Output = Result<SeedSetupAgentOutput, SeedSetupAgentError>> + Send + 'a>>;
180
181#[derive(Debug, Clone)]
182pub struct SeedSetupAgentRequest {
183    pub project_id: String,
184    pub project_name: String,
185    pub target_base_url: Option<String>,
186    pub workspace_roots: Vec<PathBuf>,
187    pub launch_profile: Option<ProjectLaunchProfile>,
188}
189
190#[derive(Debug, Clone)]
191pub struct SeedSetupAgentOutput {
192    pub plan: SeedSetupPlan,
193    pub message: String,
194}
195
196#[derive(Debug, Error)]
197pub enum SeedSetupAgentError {
198    #[error("seed setup agent unavailable: {0}")]
199    Unavailable(String),
200    #[error("seed setup agent failed: {0}")]
201    Failed(String),
202}
203
204pub trait SeedSetupAgent: Send + Sync + 'static {
205    fn explore<'a>(&'a self, req: SeedSetupAgentRequest) -> SeedSetupAgentFuture<'a>;
206}
207
208pub type RemediationAgentFuture<'a> = Pin<
209    Box<dyn Future<Output = Result<RemediationAgentOutput, RemediationAgentError>> + Send + 'a>,
210>;
211
212#[derive(Debug, Clone)]
213pub struct RemediationAgentRequest {
214    pub vulnerability: VerifiedVulnerabilityRecord,
215    pub workspace_roots: Vec<PathBuf>,
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
219pub struct RemediationChangedFile {
220    pub repo: String,
221    pub path: String,
222    pub status: String,
223    pub additions: Option<i64>,
224    pub deletions: Option<i64>,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
228pub struct RemediationAgentOutput {
229    pub changed_files: Vec<RemediationChangedFile>,
230    pub summary: String,
231    pub final_message: String,
232}
233
234#[derive(Debug, Error)]
235pub enum RemediationAgentError {
236    #[error("remediation agent unavailable: {0}")]
237    Unavailable(String),
238    #[error("remediation agent failed: {0}")]
239    Failed(String),
240}
241
242pub trait RemediationAgent: Send + Sync + 'static {
243    fn fix<'a>(&'a self, req: RemediationAgentRequest) -> RemediationAgentFuture<'a>;
244}
245
246#[derive(Debug, Default)]
247pub struct AuthSetupJobStore {
248    seq: AtomicU64,
249    jobs: Mutex<HashMap<String, AuthSetupJobRecord>>,
250}
251
252#[derive(Debug, Default)]
253pub struct ProjectSetupJobStore {
254    seq: AtomicU64,
255    jobs: Mutex<HashMap<String, ProjectSetupJobRecord>>,
256}
257
258#[derive(Debug, Clone, serde::Serialize)]
259pub struct RemediationJobEvent {
260    pub at: i64,
261    pub phase: String,
262    pub message: String,
263}
264
265#[derive(Debug, Clone, serde::Serialize)]
266pub struct RemediationJobError {
267    pub title: String,
268    pub detail: String,
269}
270
271#[derive(Debug, Clone, serde::Serialize)]
272pub struct RemediationJobRecord {
273    pub id: String,
274    pub vulnerability_id: String,
275    pub project_id: String,
276    pub status: String,
277    pub phase: String,
278    pub message: String,
279    pub started_at: i64,
280    pub finished_at: Option<i64>,
281    pub events: Vec<RemediationJobEvent>,
282    pub result: Option<RemediationAgentOutput>,
283    pub error: Option<RemediationJobError>,
284}
285
286#[derive(Debug, Default)]
287pub struct RemediationJobStore {
288    seq: AtomicU64,
289    jobs: Mutex<HashMap<String, RemediationJobRecord>>,
290}
291
292impl RemediationJobStore {
293    pub fn new() -> Self {
294        Self::default()
295    }
296
297    pub async fn create(
298        &self,
299        vulnerability_id: &str,
300        project_id: &str,
301        now: i64,
302    ) -> RemediationJobRecord {
303        let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
304        let id = format!("fix-{now}-{n}");
305        let event = RemediationJobEvent {
306            at: now,
307            phase: "queued".to_string(),
308            message: "Fix agent queued.".to_string(),
309        };
310        let record = RemediationJobRecord {
311            id: id.clone(),
312            vulnerability_id: vulnerability_id.to_string(),
313            project_id: project_id.to_string(),
314            status: "queued".to_string(),
315            phase: "queued".to_string(),
316            message: event.message.clone(),
317            started_at: now,
318            finished_at: None,
319            events: vec![event],
320            result: None,
321            error: None,
322        };
323        self.jobs.lock().await.insert(id, record.clone());
324        record
325    }
326
327    pub async fn get(&self, id: &str) -> Option<RemediationJobRecord> {
328        self.jobs.lock().await.get(id).cloned()
329    }
330
331    pub async fn push_phase(&self, id: &str, phase: &str, message: impl Into<String>) {
332        let now = nyx_agent_core::now_epoch_ms();
333        let message = message.into();
334        let mut jobs = self.jobs.lock().await;
335        let Some(job) = jobs.get_mut(id) else {
336            return;
337        };
338        job.status = "running".to_string();
339        job.phase = phase.to_string();
340        job.message = message.clone();
341        job.events.push(RemediationJobEvent { at: now, phase: phase.to_string(), message });
342    }
343
344    pub async fn complete(&self, id: &str, result: RemediationAgentOutput) {
345        let now = nyx_agent_core::now_epoch_ms();
346        let mut jobs = self.jobs.lock().await;
347        let Some(job) = jobs.get_mut(id) else {
348            return;
349        };
350        job.status = "succeeded".to_string();
351        job.phase = "complete".to_string();
352        job.message = if result.changed_files.is_empty() {
353            "Fix agent completed without leaving file changes.".to_string()
354        } else {
355            format!("Fix agent updated {} file(s).", result.changed_files.len())
356        };
357        job.finished_at = Some(now);
358        job.result = Some(result);
359        job.error = None;
360        job.events.push(RemediationJobEvent {
361            at: now,
362            phase: "complete".to_string(),
363            message: job.message.clone(),
364        });
365    }
366
367    pub async fn fail(&self, id: &str, error: RemediationJobError) {
368        let now = nyx_agent_core::now_epoch_ms();
369        let mut jobs = self.jobs.lock().await;
370        let Some(job) = jobs.get_mut(id) else {
371            return;
372        };
373        job.status = "failed".to_string();
374        job.phase = "failed".to_string();
375        job.message = error.title.clone();
376        job.finished_at = Some(now);
377        job.result = None;
378        job.error = Some(error.clone());
379        job.events.push(RemediationJobEvent {
380            at: now,
381            phase: "failed".to_string(),
382            message: error.detail,
383        });
384    }
385}
386
387impl ProjectSetupJobStore {
388    pub fn new() -> Self {
389        Self::default()
390    }
391
392    pub async fn create(&self, project_id: &str, now: i64) -> ProjectSetupJobRecord {
393        let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
394        let id = format!("projectsetup-{now}-{n}");
395        let event = ProjectSetupJobEvent {
396            at: now,
397            phase: ProjectSetupPhase::Queued,
398            message: "Project setup queued.".to_string(),
399        };
400        let record = ProjectSetupJobRecord {
401            id: id.clone(),
402            project_id: project_id.to_string(),
403            status: ProjectSetupJobStatus::Queued,
404            phase: ProjectSetupPhase::Queued,
405            message: event.message.clone(),
406            started_at: now,
407            finished_at: None,
408            events: vec![event],
409            result: None,
410            error: None,
411        };
412        self.jobs.lock().await.insert(id, record.clone());
413        record
414    }
415
416    pub async fn get(&self, id: &str) -> Option<ProjectSetupJobRecord> {
417        self.jobs.lock().await.get(id).cloned()
418    }
419
420    pub async fn list_by_project(&self, project_id: &str) -> Vec<ProjectSetupJobRecord> {
421        let mut jobs = self
422            .jobs
423            .lock()
424            .await
425            .values()
426            .filter(|job| job.project_id == project_id)
427            .cloned()
428            .collect::<Vec<_>>();
429        jobs.sort_by(|a, b| b.started_at.cmp(&a.started_at).then_with(|| b.id.cmp(&a.id)));
430        jobs
431    }
432
433    pub async fn push_phase(&self, id: &str, phase: ProjectSetupPhase, message: impl Into<String>) {
434        let now = nyx_agent_core::now_epoch_ms();
435        let message = message.into();
436        let mut jobs = self.jobs.lock().await;
437        let Some(job) = jobs.get_mut(id) else {
438            return;
439        };
440        job.status = ProjectSetupJobStatus::Running;
441        job.phase = phase;
442        job.message = message.clone();
443        job.events.push(ProjectSetupJobEvent { at: now, phase, message });
444    }
445
446    pub async fn complete(&self, id: &str, result: ProjectSetupResponse) {
447        let now = nyx_agent_core::now_epoch_ms();
448        let mut jobs = self.jobs.lock().await;
449        let Some(job) = jobs.get_mut(id) else {
450            return;
451        };
452        job.status = ProjectSetupJobStatus::Succeeded;
453        job.phase = ProjectSetupPhase::Complete;
454        job.message = result.message.clone();
455        job.finished_at = Some(now);
456        job.result = Some(result);
457        job.error = None;
458        job.events.push(ProjectSetupJobEvent {
459            at: now,
460            phase: ProjectSetupPhase::Complete,
461            message: job.message.clone(),
462        });
463    }
464
465    pub async fn fail(&self, id: &str, error: ProjectSetupError) {
466        let now = nyx_agent_core::now_epoch_ms();
467        let mut jobs = self.jobs.lock().await;
468        let Some(job) = jobs.get_mut(id) else {
469            return;
470        };
471        job.status = ProjectSetupJobStatus::Failed;
472        job.phase = ProjectSetupPhase::Failed;
473        job.message = error.title.clone();
474        job.finished_at = Some(now);
475        job.result = None;
476        job.error = Some(error.clone());
477        job.events.push(ProjectSetupJobEvent {
478            at: now,
479            phase: ProjectSetupPhase::Failed,
480            message: error.detail,
481        });
482    }
483}
484
485impl AuthSetupJobStore {
486    pub fn new() -> Self {
487        Self::default()
488    }
489
490    pub async fn create(&self, project_id: &str, now: i64) -> AuthSetupJobRecord {
491        let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
492        let id = format!("authsetup-{now}-{n}");
493        let event = AuthSetupJobEvent {
494            at: now,
495            phase: AuthSetupPhase::Queued,
496            message: "Auth setup queued.".to_string(),
497        };
498        let record = AuthSetupJobRecord {
499            id: id.clone(),
500            project_id: project_id.to_string(),
501            status: AuthSetupJobStatus::Queued,
502            phase: AuthSetupPhase::Queued,
503            message: event.message.clone(),
504            started_at: now,
505            finished_at: None,
506            events: vec![event],
507            result: None,
508            error: None,
509        };
510        self.jobs.lock().await.insert(id, record.clone());
511        record
512    }
513
514    pub async fn get(&self, id: &str) -> Option<AuthSetupJobRecord> {
515        self.jobs.lock().await.get(id).cloned()
516    }
517
518    pub async fn push_phase(&self, id: &str, phase: AuthSetupPhase, message: impl Into<String>) {
519        let now = nyx_agent_core::now_epoch_ms();
520        let message = message.into();
521        let mut jobs = self.jobs.lock().await;
522        let Some(job) = jobs.get_mut(id) else {
523            return;
524        };
525        job.status = AuthSetupJobStatus::Running;
526        job.phase = phase;
527        job.message = message.clone();
528        job.events.push(AuthSetupJobEvent { at: now, phase, message });
529    }
530
531    pub async fn complete(&self, id: &str, result: AuthSetupResponse) {
532        let now = nyx_agent_core::now_epoch_ms();
533        let mut jobs = self.jobs.lock().await;
534        let Some(job) = jobs.get_mut(id) else {
535            return;
536        };
537        job.status = AuthSetupJobStatus::Succeeded;
538        job.phase = AuthSetupPhase::Complete;
539        job.message = result.message.clone();
540        job.finished_at = Some(now);
541        job.result = Some(result);
542        job.error = None;
543        job.events.push(AuthSetupJobEvent {
544            at: now,
545            phase: AuthSetupPhase::Complete,
546            message: job.message.clone(),
547        });
548    }
549
550    pub async fn fail(&self, id: &str, error: AuthSetupError) {
551        let now = nyx_agent_core::now_epoch_ms();
552        let mut jobs = self.jobs.lock().await;
553        let Some(job) = jobs.get_mut(id) else {
554            return;
555        };
556        job.status = AuthSetupJobStatus::Failed;
557        job.phase = AuthSetupPhase::Failed;
558        job.message = error.title.clone();
559        job.finished_at = Some(now);
560        job.result = None;
561        job.error = Some(error.clone());
562        job.events.push(AuthSetupJobEvent {
563            at: now,
564            phase: AuthSetupPhase::Failed,
565            message: error.detail,
566        });
567    }
568}
569
570#[derive(Debug, Error)]
571pub enum ScanTriggerError {
572    #[error("scan request was rejected: {0}")]
573    Rejected(String),
574    #[error("daemon is shutting down")]
575    Closed,
576    /// The scan request queue is full. The API maps this to HTTP 429
577    /// so external schedulers, webhooks, and CI loops back off instead
578    /// of stalling on `send().await`.
579    #[error("scan request queue is full: {0}")]
580    Backpressure(String),
581    #[error("internal error: {0}")]
582    Internal(String),
583}
584
585/// First-launch wizard context. Lets the API write `nyx-agent.toml`
586/// on behalf of the operator, see whether setup is complete, and
587/// stash API keys in the OS keychain.
588#[derive(Clone)]
589pub struct SetupContext {
590    pub config_path: PathBuf,
591    pub secrets: SecretStore,
592    /// Current in-memory config. Wrapped in an `RwLock` so the
593    /// `/setup` handler can hand a freshly-written config back to the
594    /// rest of the API without restarting the daemon.
595    pub config: Arc<RwLock<Config>>,
596    /// `true` once `nyx-agent.toml` is materialised on disk. Read by
597    /// `GET /api/v1/setup/status` and by the auth middleware to know
598    /// whether to exempt `/setup` endpoints.
599    pub completed: Arc<std::sync::atomic::AtomicBool>,
600}
601
602impl SetupContext {
603    pub fn new(
604        config_path: PathBuf,
605        config: Config,
606        completed: bool,
607        secrets: SecretStore,
608    ) -> Self {
609        Self {
610            config_path,
611            secrets,
612            config: Arc::new(RwLock::new(config)),
613            completed: Arc::new(std::sync::atomic::AtomicBool::new(completed)),
614        }
615    }
616
617    pub fn is_complete(&self) -> bool {
618        self.completed.load(std::sync::atomic::Ordering::Acquire)
619    }
620
621    pub fn mark_complete(&self) {
622        self.completed.store(true, std::sync::atomic::Ordering::Release);
623    }
624}
625
626/// Bounded per-run event replay buffer. Closes a broadcast race: a
627/// client that calls `POST /api/v1/scan` and *then* opens the
628/// WebSocket would miss
629/// `RunStarted` (and possibly the first few `RepoStarted`/`RepoFailed`)
630/// frames because tokio's `broadcast::Sender` does not replay history.
631/// `events_ws` reads back the snapshot here before joining the live
632/// stream so the LiveScanView always sees the run's lifecycle from the
633/// start.
634///
635/// Events that lack a `run_id` (e.g. plain heartbeats) are not buffered
636/// because there is nothing for a subscriber to scope to.
637///
638/// Eviction is least-recently-touched: the side `order` deque tracks
639/// run ids with the most recently pushed-into run at the back. When a
640/// new run needs admission past `max_runs`, the front (oldest activity)
641/// is evicted.
642#[derive(Debug)]
643pub struct EventReplay {
644    inner: Mutex<ReplayInner>,
645    /// Hard cap on events stored per run. The LiveScanView acceptance
646    /// set is small (one RunStarted + N RepoStarted/RepoFinished
647    /// pairs + RunFinished). 128 frames covers ~60 repos before the
648    /// head is dropped, which is more than the static-pass budget.
649    pub max_per_run: usize,
650    /// Cap on tracked runs. Past this we evict the least-recently-
651    /// touched tracked run. 16 covers the realistic concurrent-
652    /// LiveScanView count.
653    pub max_runs: usize,
654}
655
656#[derive(Debug, Default)]
657struct ReplayInner {
658    by_run: HashMap<String, VecDeque<AgentEvent>>,
659    /// Insertion / touch order. Front is least-recently-pushed,
660    /// back is most-recently-pushed.
661    order: VecDeque<String>,
662}
663
664impl Default for EventReplay {
665    fn default() -> Self {
666        Self::new()
667    }
668}
669
670impl EventReplay {
671    pub fn new() -> Self {
672        Self { inner: Mutex::new(ReplayInner::default()), max_per_run: 128, max_runs: 16 }
673    }
674
675    /// Append an event to the per-run buffer. No-op for events that do
676    /// not carry a `run_id`.
677    pub async fn push(&self, event: &AgentEvent) {
678        let Some(run_id) = run_id_for_event(event) else { return };
679        let mut g = self.inner.lock().await;
680
681        // Touch LRU position: if the run is already tracked, lift it
682        // out of `order` so we can re-append at the back. If the run is
683        // new and we are at capacity, evict the front (oldest).
684        if let Some(pos) = g.order.iter().position(|r| r == run_id) {
685            g.order.remove(pos);
686        } else if g.by_run.len() >= self.max_runs {
687            if let Some(victim) = g.order.pop_front() {
688                g.by_run.remove(&victim);
689            }
690        }
691        g.order.push_back(run_id.to_string());
692
693        let buf = g.by_run.entry(run_id.to_string()).or_default();
694        if buf.len() == self.max_per_run {
695            buf.pop_front();
696        }
697        buf.push_back(event.clone());
698    }
699
700    /// Snapshot every buffered event for `run_id`. Cheap clone.
701    pub async fn snapshot(&self, run_id: &str) -> Vec<AgentEvent> {
702        let g = self.inner.lock().await;
703        g.by_run.get(run_id).map(|q| q.iter().cloned().collect()).unwrap_or_default()
704    }
705
706    /// Number of currently tracked runs. Used in tests; cheap.
707    pub async fn tracked_runs(&self) -> usize {
708        self.inner.lock().await.by_run.len()
709    }
710}
711
712fn run_id_for_event(ev: &AgentEvent) -> Option<&str> {
713    match ev {
714        AgentEvent::Run { data } => match data {
715            RunEvent::Heartbeat { .. } => None,
716            RunEvent::RunStarted { run_id, .. }
717            | RunEvent::ProjectStarted { run_id, .. }
718            | RunEvent::PhaseStarted { run_id, .. }
719            | RunEvent::PhaseFinished { run_id, .. }
720            | RunEvent::EnvironmentStatus { run_id, .. }
721            | RunEvent::AuthSessionStatus { run_id, .. }
722            | RunEvent::LiveVerificationCapabilities { run_id, .. }
723            | RunEvent::RepoStarted { run_id, .. }
724            | RunEvent::RepoStaticDone { run_id, .. }
725            | RunEvent::RepoDynamicDone { run_id, .. }
726            | RunEvent::RepoFailed { run_id, .. }
727            | RunEvent::RepoIngestFailed { run_id, .. }
728            | RunEvent::RepoFinished { run_id, .. }
729            | RunEvent::ProjectFinished { run_id, .. }
730            | RunEvent::RunFinished { run_id, .. } => Some(run_id.as_str()),
731        },
732        AgentEvent::Ai { data: AiEvent::BudgetTick { run_id, .. } } => Some(run_id.as_str()),
733        AgentEvent::Sandbox { data } => match data {
734            SandboxEvent::VerifierStarted { run_id, .. }
735            | SandboxEvent::VerifierFinished { run_id, .. } => Some(run_id.as_str()),
736        },
737        _ => None,
738    }
739}
740
741/// Bearer-token guard used by the API auth middleware. `None` skips
742/// the check entirely (e.g. when the daemon was launched with
743/// `--headless`).
744#[derive(Clone, Default)]
745pub struct AuthConfig {
746    pub token: Option<String>,
747}
748
749impl AuthConfig {
750    pub fn new(token: Option<String>) -> Self {
751        Self { token }
752    }
753
754    pub fn is_enforced(&self) -> bool {
755        self.token.is_some()
756    }
757}
758
759/// Shared state injected into every Axum handler. Cloned per request;
760/// the underlying [`Store`] and broadcast sender are already cheap to
761/// clone because they wrap `Arc`s internally.
762#[derive(Clone)]
763pub struct ServerState {
764    pub store: Store,
765    pub events: EventSink,
766    pub scan: Arc<dyn ScanTrigger>,
767    pub setup: SetupContext,
768    pub auth: AuthConfig,
769    pub auth_setup_agent: Option<Arc<dyn AuthSetupAgent>>,
770    pub auth_setup_jobs: Arc<AuthSetupJobStore>,
771    pub project_setup_agent: Option<Arc<dyn ProjectSetupAgent>>,
772    pub project_setup_jobs: Arc<ProjectSetupJobStore>,
773    pub seed_setup_agent: Option<Arc<dyn SeedSetupAgent>>,
774    pub remediation_agent: Option<Arc<dyn RemediationAgent>>,
775    pub remediation_jobs: Arc<RemediationJobStore>,
776    /// Per-run event replay buffer. Populated by a tap task the daemon
777    /// runs alongside the broadcast channel and read by `events_ws` on
778    /// upgrade so newly-attached LiveScanView clients catch the
779    /// run's lifecycle from the start.
780    pub replay: Arc<EventReplay>,
781    /// Path that holds per-repo workspace dirs (the moral equivalent of
782    /// `<state>/repos`). The repo-delete handler removes the per-repo
783    /// subdir under this path so a re-add starts from a clean slate.
784    /// `None` in tests that do not exercise workspace cleanup.
785    pub state_repos_dir: Option<PathBuf>,
786    /// Per-finding repro bundle output directory (`<state>/bundles`).
787    /// The bundle handler writes one tarball per finding here and
788    /// stamps a `repro_bundles` row pointing at the resulting path.
789    /// `None` in tests that do not exercise bundle creation.
790    pub state_bundles_dir: Option<PathBuf>,
791    /// Per-run live-stream event logs (`<state>/logs/runs/*.events.jsonl`).
792    /// The daemon's event-log tap writes these; the API serves them as
793    /// authenticated post-run artifacts.
794    pub state_logs_dir: Option<PathBuf>,
795    /// `POST /webhook/git` config. `None` disables the route (the
796    /// daemon hands a populated struct only when the operator has
797    /// configured `triggers.webhook_secret_ref`).
798    pub webhook: Option<Arc<crate::webhook::WebhookConfig>>,
799}
800
801impl ServerState {
802    pub fn new(
803        store: Store,
804        events: EventSink,
805        scan: Arc<dyn ScanTrigger>,
806        setup: SetupContext,
807        auth: AuthConfig,
808    ) -> Self {
809        Self {
810            store,
811            events,
812            scan,
813            setup,
814            auth,
815            auth_setup_agent: None,
816            auth_setup_jobs: Arc::new(AuthSetupJobStore::new()),
817            project_setup_agent: None,
818            project_setup_jobs: Arc::new(ProjectSetupJobStore::new()),
819            seed_setup_agent: None,
820            remediation_agent: None,
821            remediation_jobs: Arc::new(RemediationJobStore::new()),
822            replay: Arc::new(EventReplay::new()),
823            state_repos_dir: None,
824            state_bundles_dir: None,
825            state_logs_dir: None,
826            webhook: None,
827        }
828    }
829
830    /// Attach the on-disk repo workspace root so the delete handler can
831    /// remove `<state_repos_dir>/<name>/` when a repo is removed.
832    pub fn with_state_repos_dir(mut self, dir: PathBuf) -> Self {
833        self.state_repos_dir = Some(dir);
834        self
835    }
836
837    pub fn with_auth_setup_agent(mut self, agent: Arc<dyn AuthSetupAgent>) -> Self {
838        self.auth_setup_agent = Some(agent);
839        self
840    }
841
842    pub fn with_project_setup_agent(mut self, agent: Arc<dyn ProjectSetupAgent>) -> Self {
843        self.project_setup_agent = Some(agent);
844        self
845    }
846
847    pub fn with_seed_setup_agent(mut self, agent: Arc<dyn SeedSetupAgent>) -> Self {
848        self.seed_setup_agent = Some(agent);
849        self
850    }
851
852    pub fn with_remediation_agent(mut self, agent: Arc<dyn RemediationAgent>) -> Self {
853        self.remediation_agent = Some(agent);
854        self
855    }
856
857    /// Attach the on-disk repro bundle output root so the bundle
858    /// handler can write `<state_bundles_dir>/<finding-id>.tar`.
859    pub fn with_state_bundles_dir(mut self, dir: PathBuf) -> Self {
860        self.state_bundles_dir = Some(dir);
861        self
862    }
863
864    /// Attach the logs root so the run event-log handler can serve
865    /// `<state_logs_dir>/runs/<run>.events.jsonl`.
866    pub fn with_state_logs_dir(mut self, dir: PathBuf) -> Self {
867        self.state_logs_dir = Some(dir);
868        self
869    }
870
871    /// Enable `POST /webhook/git`. The handler returns the standard
872    /// error envelope (HTTP 500) when this is not called.
873    pub fn with_webhook(mut self, cfg: crate::webhook::WebhookConfig) -> Self {
874        self.webhook = Some(Arc::new(cfg));
875        self
876    }
877}
878
879/// Uniform error envelope. Every handler returns
880/// `Result<T, ApiError>` so HTTP status codes and JSON bodies stay
881/// consistent across endpoints.
882#[derive(Debug, Error)]
883pub enum ApiError {
884    #[error("not found: {0}")]
885    NotFound(String),
886    #[error("bad request: {0}")]
887    BadRequest(String),
888    #[error("unauthorized")]
889    Unauthorized,
890    #[error("payload too large: {0}")]
891    PayloadTooLarge(String),
892    /// Refused at the rate-limit or concurrency gate (e.g. the
893    /// per-IP webhook token bucket or the webhook concurrency
894    /// semaphore). HTTP 429 so the upstream backs off instead of
895    /// retrying at full rate.
896    #[error("too many requests: {0}")]
897    TooManyRequests(String),
898    #[error("store error: {0}")]
899    Store(#[from] StoreError),
900    #[error("scan trigger failed: {0}")]
901    Scan(#[from] ScanTriggerError),
902    #[error("internal: {0}")]
903    Internal(String),
904}
905
906impl IntoResponse for ApiError {
907    fn into_response(self) -> Response {
908        let (status, code) = match &self {
909            ApiError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
910            ApiError::BadRequest(_) => (StatusCode::BAD_REQUEST, "bad_request"),
911            ApiError::Unauthorized => (StatusCode::UNAUTHORIZED, "unauthorized"),
912            ApiError::PayloadTooLarge(_) => (StatusCode::PAYLOAD_TOO_LARGE, "payload_too_large"),
913            ApiError::TooManyRequests(_) => (StatusCode::TOO_MANY_REQUESTS, "too_many_requests"),
914            ApiError::Store(_) => (StatusCode::INTERNAL_SERVER_ERROR, "store_error"),
915            ApiError::Scan(ScanTriggerError::Rejected(_)) => {
916                (StatusCode::BAD_REQUEST, "scan_rejected")
917            }
918            ApiError::Scan(ScanTriggerError::Closed) => {
919                (StatusCode::SERVICE_UNAVAILABLE, "shutting_down")
920            }
921            ApiError::Scan(ScanTriggerError::Backpressure(_)) => {
922                (StatusCode::TOO_MANY_REQUESTS, "scan_backpressure")
923            }
924            ApiError::Scan(ScanTriggerError::Internal(_)) => {
925                (StatusCode::INTERNAL_SERVER_ERROR, "scan_internal")
926            }
927            ApiError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "internal"),
928        };
929        let body = Json(json!({ "error": { "code": code, "message": self.to_string() } }));
930        (status, body).into_response()
931    }
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937
938    #[test]
939    fn scan_trigger_source_manual_stamps_ui() {
940        assert_eq!(ScanTriggerSource::Manual.as_run_record_string(), "UI");
941    }
942
943    #[test]
944    fn scan_trigger_source_webhook_stamps_webhook() {
945        assert_eq!(ScanTriggerSource::Webhook.as_run_record_string(), "Webhook");
946    }
947
948    #[test]
949    fn scan_trigger_source_scheduler_stamps_label_under_cron_prefix() {
950        let s = ScanTriggerSource::Scheduler { label: "weekly".to_string() };
951        assert_eq!(s.as_run_record_string(), "Cron:weekly");
952    }
953
954    fn run_started(run_id: &str) -> AgentEvent {
955        AgentEvent::Run {
956            data: RunEvent::RunStarted {
957                run_id: run_id.to_string(),
958                project_id: "test-project".to_string(),
959                repos: vec!["alpha".to_string()],
960                started_at_ms: 0,
961            },
962        }
963    }
964
965    fn repo_started(run_id: &str, repo: &str) -> AgentEvent {
966        AgentEvent::Run {
967            data: RunEvent::RepoStarted {
968                run_id: run_id.to_string(),
969                project_id: "test-project".to_string(),
970                repo: repo.to_string(),
971                started_at_ms: 0,
972            },
973        }
974    }
975
976    fn heartbeat() -> AgentEvent {
977        AgentEvent::Run { data: RunEvent::Heartbeat { ts: 0 } }
978    }
979
980    #[tokio::test]
981    async fn heartbeat_is_not_buffered() {
982        let replay = EventReplay::new();
983        replay.push(&heartbeat()).await;
984        assert_eq!(replay.tracked_runs().await, 0);
985        assert!(replay.snapshot("anything").await.is_empty());
986    }
987
988    #[tokio::test]
989    async fn snapshot_returns_events_in_push_order() {
990        let replay = EventReplay::new();
991        replay.push(&run_started("r1")).await;
992        replay.push(&repo_started("r1", "alpha")).await;
993        let frames = replay.snapshot("r1").await;
994        assert_eq!(frames.len(), 2);
995        assert!(matches!(frames[0], AgentEvent::Run { data: RunEvent::RunStarted { .. } }));
996        assert!(matches!(frames[1], AgentEvent::Run { data: RunEvent::RepoStarted { .. } }));
997    }
998
999    #[tokio::test]
1000    async fn max_per_run_drops_oldest_frame() {
1001        let mut replay = EventReplay::new();
1002        replay.max_per_run = 2;
1003        replay.push(&run_started("r1")).await;
1004        replay.push(&repo_started("r1", "alpha")).await;
1005        replay.push(&repo_started("r1", "beta")).await;
1006        let frames = replay.snapshot("r1").await;
1007        assert_eq!(frames.len(), 2);
1008        // Oldest frame (RunStarted) is dropped; surviving frames are
1009        // the two RepoStarted entries in arrival order.
1010        let repos: Vec<String> = frames
1011            .iter()
1012            .filter_map(|ev| match ev {
1013                AgentEvent::Run { data: RunEvent::RepoStarted { repo, .. } } => Some(repo.clone()),
1014                _ => None,
1015            })
1016            .collect();
1017        assert_eq!(repos, vec!["alpha".to_string(), "beta".to_string()]);
1018    }
1019
1020    #[tokio::test]
1021    async fn max_runs_evicts_least_recently_touched_run() {
1022        let mut replay = EventReplay::new();
1023        replay.max_runs = 2;
1024        replay.push(&run_started("a")).await;
1025        replay.push(&run_started("b")).await;
1026        // Touch `a` to make it most-recent; `b` is now LRU.
1027        replay.push(&repo_started("a", "alpha")).await;
1028        // Admitting `c` should evict `b`, not `a`.
1029        replay.push(&run_started("c")).await;
1030
1031        assert_eq!(replay.tracked_runs().await, 2);
1032        assert!(!replay.snapshot("a").await.is_empty(), "`a` was touched, must survive");
1033        assert!(replay.snapshot("b").await.is_empty(), "`b` was LRU, must be evicted");
1034        assert!(!replay.snapshot("c").await.is_empty(), "`c` is newest");
1035    }
1036
1037    #[tokio::test]
1038    async fn unknown_run_id_yields_empty_snapshot() {
1039        let replay = EventReplay::new();
1040        replay.push(&run_started("real")).await;
1041        assert!(replay.snapshot("ghost").await.is_empty());
1042    }
1043}