use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use serde_json::json;
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};
use nyx_agent_core::store::StoreError;
use nyx_agent_core::{Config, SecretStore, Store};
use nyx_agent_types::event::{AgentEvent, AiEvent, EventSink, RunEvent, SandboxEvent};
use nyx_agent_types::product::{
ProjectLaunchProfile, ProjectLaunchProfileInput, ProjectSetupError, ProjectSetupJobEvent,
ProjectSetupJobRecord, ProjectSetupJobStatus, ProjectSetupPhase, ProjectSetupResponse,
ProjectSetupVerificationStatus, SeedSetupPlan, VerifiedVulnerabilityRecord,
};
use nyx_agent_types::project::{
AuthSetupError, AuthSetupJobEvent, AuthSetupJobRecord, AuthSetupJobStatus, AuthSetupPhase,
AuthSetupResponse, AuthSetupVerification, ProjectAuthOwnedObject, ProjectAuthProfile,
};
pub type ScanFuture<'a> =
Pin<Box<dyn Future<Output = Result<String, ScanTriggerError>> + Send + 'a>>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ScanTriggerSource {
Manual,
Scheduler { label: String },
Webhook,
}
impl ScanTriggerSource {
pub fn as_run_record_string(&self) -> String {
match self {
ScanTriggerSource::Manual => "UI".to_string(),
ScanTriggerSource::Scheduler { label } => format!("Cron:{label}"),
ScanTriggerSource::Webhook => "Webhook".to_string(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ScanRunOverrides {
pub exploit_mode_enabled: bool,
pub allow_state_changing_live_probes: bool,
pub exploit_dry_run: Option<bool>,
pub browser_checks_enabled: Option<bool>,
pub business_logic_templates_enabled: Option<bool>,
pub research_mode_enabled: Option<bool>,
pub unsafe_attack_agent_enabled: Option<bool>,
pub business_logic_template_ids: Option<Vec<String>>,
}
pub trait ScanTrigger: Send + Sync + 'static {
fn trigger<'a>(
&'a self,
source: ScanTriggerSource,
project_id: Option<String>,
repo: Option<String>,
run_overrides: Option<ScanRunOverrides>,
) -> ScanFuture<'a>;
}
pub type AuthSetupAgentFuture<'a> =
Pin<Box<dyn Future<Output = Result<AuthSetupAgentOutput, AuthSetupAgentError>> + Send + 'a>>;
#[derive(Debug, Clone)]
pub struct AuthSetupAgentRequest {
pub project_id: String,
pub project_name: String,
pub target_base_url: Option<String>,
pub workspace_roots: Vec<PathBuf>,
pub requested_roles: Vec<String>,
pub seeded_objects: Vec<ProjectAuthOwnedObject>,
pub existing_profiles: Vec<ProjectAuthProfile>,
pub static_login_paths: Vec<String>,
pub static_object_routes: Vec<String>,
pub files_inspected: usize,
}
#[derive(Debug, Clone)]
pub struct AuthSetupAgentOutput {
pub profiles: Vec<ProjectAuthProfile>,
pub roles: Vec<String>,
pub login_paths: Vec<String>,
pub object_routes: Vec<String>,
pub files_inspected: usize,
pub verification: AuthSetupVerification,
pub message: String,
}
#[derive(Debug, Error)]
pub enum AuthSetupAgentError {
#[error("auth setup agent unavailable: {0}")]
Unavailable(String),
#[error("auth setup agent failed: {0}")]
Failed(String),
}
pub trait AuthSetupAgent: Send + Sync + 'static {
fn explore<'a>(&'a self, req: AuthSetupAgentRequest) -> AuthSetupAgentFuture<'a>;
}
pub type ProjectSetupAgentFuture<'a> = Pin<
Box<dyn Future<Output = Result<ProjectSetupAgentOutput, ProjectSetupAgentError>> + Send + 'a>,
>;
#[derive(Debug, Clone)]
pub struct ProjectSetupAgentRequest {
pub project_id: String,
pub project_name: String,
pub target_base_url: Option<String>,
pub workspace_roots: Vec<PathBuf>,
pub existing_launch_profile: Option<ProjectLaunchProfile>,
}
#[derive(Debug, Clone)]
pub struct ProjectSetupAgentOutput {
pub profile: ProjectLaunchProfileInput,
pub summary: String,
pub checks: Vec<String>,
pub warnings: Vec<String>,
pub verification_status: ProjectSetupVerificationStatus,
pub message: String,
}
#[derive(Debug, Error)]
pub enum ProjectSetupAgentError {
#[error("project setup agent unavailable: {0}")]
Unavailable(String),
#[error("project setup agent failed: {0}")]
Failed(String),
}
pub trait ProjectSetupAgent: Send + Sync + 'static {
fn explore<'a>(&'a self, req: ProjectSetupAgentRequest) -> ProjectSetupAgentFuture<'a>;
}
pub type SeedSetupAgentFuture<'a> =
Pin<Box<dyn Future<Output = Result<SeedSetupAgentOutput, SeedSetupAgentError>> + Send + 'a>>;
#[derive(Debug, Clone)]
pub struct SeedSetupAgentRequest {
pub project_id: String,
pub project_name: String,
pub target_base_url: Option<String>,
pub workspace_roots: Vec<PathBuf>,
pub launch_profile: Option<ProjectLaunchProfile>,
}
#[derive(Debug, Clone)]
pub struct SeedSetupAgentOutput {
pub plan: SeedSetupPlan,
pub message: String,
}
#[derive(Debug, Error)]
pub enum SeedSetupAgentError {
#[error("seed setup agent unavailable: {0}")]
Unavailable(String),
#[error("seed setup agent failed: {0}")]
Failed(String),
}
pub trait SeedSetupAgent: Send + Sync + 'static {
fn explore<'a>(&'a self, req: SeedSetupAgentRequest) -> SeedSetupAgentFuture<'a>;
}
pub type RemediationAgentFuture<'a> = Pin<
Box<dyn Future<Output = Result<RemediationAgentOutput, RemediationAgentError>> + Send + 'a>,
>;
#[derive(Debug, Clone)]
pub struct RemediationAgentRequest {
pub vulnerability: VerifiedVulnerabilityRecord,
pub workspace_roots: Vec<PathBuf>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
pub struct RemediationChangedFile {
pub repo: String,
pub path: String,
pub status: String,
pub additions: Option<i64>,
pub deletions: Option<i64>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
pub struct RemediationAgentOutput {
pub changed_files: Vec<RemediationChangedFile>,
pub summary: String,
pub final_message: String,
}
#[derive(Debug, Error)]
pub enum RemediationAgentError {
#[error("remediation agent unavailable: {0}")]
Unavailable(String),
#[error("remediation agent failed: {0}")]
Failed(String),
}
pub trait RemediationAgent: Send + Sync + 'static {
fn fix<'a>(&'a self, req: RemediationAgentRequest) -> RemediationAgentFuture<'a>;
}
#[derive(Debug, Default)]
pub struct AuthSetupJobStore {
seq: AtomicU64,
jobs: Mutex<HashMap<String, AuthSetupJobRecord>>,
}
#[derive(Debug, Default)]
pub struct ProjectSetupJobStore {
seq: AtomicU64,
jobs: Mutex<HashMap<String, ProjectSetupJobRecord>>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct RemediationJobEvent {
pub at: i64,
pub phase: String,
pub message: String,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct RemediationJobError {
pub title: String,
pub detail: String,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct RemediationJobRecord {
pub id: String,
pub vulnerability_id: String,
pub project_id: String,
pub status: String,
pub phase: String,
pub message: String,
pub started_at: i64,
pub finished_at: Option<i64>,
pub events: Vec<RemediationJobEvent>,
pub result: Option<RemediationAgentOutput>,
pub error: Option<RemediationJobError>,
}
#[derive(Debug, Default)]
pub struct RemediationJobStore {
seq: AtomicU64,
jobs: Mutex<HashMap<String, RemediationJobRecord>>,
}
impl RemediationJobStore {
pub fn new() -> Self {
Self::default()
}
pub async fn create(
&self,
vulnerability_id: &str,
project_id: &str,
now: i64,
) -> RemediationJobRecord {
let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
let id = format!("fix-{now}-{n}");
let event = RemediationJobEvent {
at: now,
phase: "queued".to_string(),
message: "Fix agent queued.".to_string(),
};
let record = RemediationJobRecord {
id: id.clone(),
vulnerability_id: vulnerability_id.to_string(),
project_id: project_id.to_string(),
status: "queued".to_string(),
phase: "queued".to_string(),
message: event.message.clone(),
started_at: now,
finished_at: None,
events: vec![event],
result: None,
error: None,
};
self.jobs.lock().await.insert(id, record.clone());
record
}
pub async fn get(&self, id: &str) -> Option<RemediationJobRecord> {
self.jobs.lock().await.get(id).cloned()
}
pub async fn push_phase(&self, id: &str, phase: &str, message: impl Into<String>) {
let now = nyx_agent_core::now_epoch_ms();
let message = message.into();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = "running".to_string();
job.phase = phase.to_string();
job.message = message.clone();
job.events.push(RemediationJobEvent { at: now, phase: phase.to_string(), message });
}
pub async fn complete(&self, id: &str, result: RemediationAgentOutput) {
let now = nyx_agent_core::now_epoch_ms();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = "succeeded".to_string();
job.phase = "complete".to_string();
job.message = if result.changed_files.is_empty() {
"Fix agent completed without leaving file changes.".to_string()
} else {
format!("Fix agent updated {} file(s).", result.changed_files.len())
};
job.finished_at = Some(now);
job.result = Some(result);
job.error = None;
job.events.push(RemediationJobEvent {
at: now,
phase: "complete".to_string(),
message: job.message.clone(),
});
}
pub async fn fail(&self, id: &str, error: RemediationJobError) {
let now = nyx_agent_core::now_epoch_ms();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = "failed".to_string();
job.phase = "failed".to_string();
job.message = error.title.clone();
job.finished_at = Some(now);
job.result = None;
job.error = Some(error.clone());
job.events.push(RemediationJobEvent {
at: now,
phase: "failed".to_string(),
message: error.detail,
});
}
}
impl ProjectSetupJobStore {
pub fn new() -> Self {
Self::default()
}
pub async fn create(&self, project_id: &str, now: i64) -> ProjectSetupJobRecord {
let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
let id = format!("projectsetup-{now}-{n}");
let event = ProjectSetupJobEvent {
at: now,
phase: ProjectSetupPhase::Queued,
message: "Project setup queued.".to_string(),
};
let record = ProjectSetupJobRecord {
id: id.clone(),
project_id: project_id.to_string(),
status: ProjectSetupJobStatus::Queued,
phase: ProjectSetupPhase::Queued,
message: event.message.clone(),
started_at: now,
finished_at: None,
events: vec![event],
result: None,
error: None,
};
self.jobs.lock().await.insert(id, record.clone());
record
}
pub async fn get(&self, id: &str) -> Option<ProjectSetupJobRecord> {
self.jobs.lock().await.get(id).cloned()
}
pub async fn list_by_project(&self, project_id: &str) -> Vec<ProjectSetupJobRecord> {
let mut jobs = self
.jobs
.lock()
.await
.values()
.filter(|job| job.project_id == project_id)
.cloned()
.collect::<Vec<_>>();
jobs.sort_by(|a, b| b.started_at.cmp(&a.started_at).then_with(|| b.id.cmp(&a.id)));
jobs
}
pub async fn push_phase(&self, id: &str, phase: ProjectSetupPhase, message: impl Into<String>) {
let now = nyx_agent_core::now_epoch_ms();
let message = message.into();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = ProjectSetupJobStatus::Running;
job.phase = phase;
job.message = message.clone();
job.events.push(ProjectSetupJobEvent { at: now, phase, message });
}
pub async fn complete(&self, id: &str, result: ProjectSetupResponse) {
let now = nyx_agent_core::now_epoch_ms();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = ProjectSetupJobStatus::Succeeded;
job.phase = ProjectSetupPhase::Complete;
job.message = result.message.clone();
job.finished_at = Some(now);
job.result = Some(result);
job.error = None;
job.events.push(ProjectSetupJobEvent {
at: now,
phase: ProjectSetupPhase::Complete,
message: job.message.clone(),
});
}
pub async fn fail(&self, id: &str, error: ProjectSetupError) {
let now = nyx_agent_core::now_epoch_ms();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = ProjectSetupJobStatus::Failed;
job.phase = ProjectSetupPhase::Failed;
job.message = error.title.clone();
job.finished_at = Some(now);
job.result = None;
job.error = Some(error.clone());
job.events.push(ProjectSetupJobEvent {
at: now,
phase: ProjectSetupPhase::Failed,
message: error.detail,
});
}
}
impl AuthSetupJobStore {
pub fn new() -> Self {
Self::default()
}
pub async fn create(&self, project_id: &str, now: i64) -> AuthSetupJobRecord {
let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
let id = format!("authsetup-{now}-{n}");
let event = AuthSetupJobEvent {
at: now,
phase: AuthSetupPhase::Queued,
message: "Auth setup queued.".to_string(),
};
let record = AuthSetupJobRecord {
id: id.clone(),
project_id: project_id.to_string(),
status: AuthSetupJobStatus::Queued,
phase: AuthSetupPhase::Queued,
message: event.message.clone(),
started_at: now,
finished_at: None,
events: vec![event],
result: None,
error: None,
};
self.jobs.lock().await.insert(id, record.clone());
record
}
pub async fn get(&self, id: &str) -> Option<AuthSetupJobRecord> {
self.jobs.lock().await.get(id).cloned()
}
pub async fn push_phase(&self, id: &str, phase: AuthSetupPhase, message: impl Into<String>) {
let now = nyx_agent_core::now_epoch_ms();
let message = message.into();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = AuthSetupJobStatus::Running;
job.phase = phase;
job.message = message.clone();
job.events.push(AuthSetupJobEvent { at: now, phase, message });
}
pub async fn complete(&self, id: &str, result: AuthSetupResponse) {
let now = nyx_agent_core::now_epoch_ms();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = AuthSetupJobStatus::Succeeded;
job.phase = AuthSetupPhase::Complete;
job.message = result.message.clone();
job.finished_at = Some(now);
job.result = Some(result);
job.error = None;
job.events.push(AuthSetupJobEvent {
at: now,
phase: AuthSetupPhase::Complete,
message: job.message.clone(),
});
}
pub async fn fail(&self, id: &str, error: AuthSetupError) {
let now = nyx_agent_core::now_epoch_ms();
let mut jobs = self.jobs.lock().await;
let Some(job) = jobs.get_mut(id) else {
return;
};
job.status = AuthSetupJobStatus::Failed;
job.phase = AuthSetupPhase::Failed;
job.message = error.title.clone();
job.finished_at = Some(now);
job.result = None;
job.error = Some(error.clone());
job.events.push(AuthSetupJobEvent {
at: now,
phase: AuthSetupPhase::Failed,
message: error.detail,
});
}
}
#[derive(Debug, Error)]
pub enum ScanTriggerError {
#[error("scan request was rejected: {0}")]
Rejected(String),
#[error("daemon is shutting down")]
Closed,
#[error("scan request queue is full: {0}")]
Backpressure(String),
#[error("internal error: {0}")]
Internal(String),
}
#[derive(Clone)]
pub struct SetupContext {
pub config_path: PathBuf,
pub secrets: SecretStore,
pub config: Arc<RwLock<Config>>,
pub completed: Arc<std::sync::atomic::AtomicBool>,
}
impl SetupContext {
pub fn new(
config_path: PathBuf,
config: Config,
completed: bool,
secrets: SecretStore,
) -> Self {
Self {
config_path,
secrets,
config: Arc::new(RwLock::new(config)),
completed: Arc::new(std::sync::atomic::AtomicBool::new(completed)),
}
}
pub fn is_complete(&self) -> bool {
self.completed.load(std::sync::atomic::Ordering::Acquire)
}
pub fn mark_complete(&self) {
self.completed.store(true, std::sync::atomic::Ordering::Release);
}
}
#[derive(Debug)]
pub struct EventReplay {
inner: Mutex<ReplayInner>,
pub max_per_run: usize,
pub max_runs: usize,
}
#[derive(Debug, Default)]
struct ReplayInner {
by_run: HashMap<String, VecDeque<AgentEvent>>,
order: VecDeque<String>,
}
impl Default for EventReplay {
fn default() -> Self {
Self::new()
}
}
impl EventReplay {
pub fn new() -> Self {
Self { inner: Mutex::new(ReplayInner::default()), max_per_run: 128, max_runs: 16 }
}
pub async fn push(&self, event: &AgentEvent) {
let Some(run_id) = run_id_for_event(event) else { return };
let mut g = self.inner.lock().await;
if let Some(pos) = g.order.iter().position(|r| r == run_id) {
g.order.remove(pos);
} else if g.by_run.len() >= self.max_runs {
if let Some(victim) = g.order.pop_front() {
g.by_run.remove(&victim);
}
}
g.order.push_back(run_id.to_string());
let buf = g.by_run.entry(run_id.to_string()).or_default();
if buf.len() == self.max_per_run {
buf.pop_front();
}
buf.push_back(event.clone());
}
pub async fn snapshot(&self, run_id: &str) -> Vec<AgentEvent> {
let g = self.inner.lock().await;
g.by_run.get(run_id).map(|q| q.iter().cloned().collect()).unwrap_or_default()
}
pub async fn tracked_runs(&self) -> usize {
self.inner.lock().await.by_run.len()
}
}
fn run_id_for_event(ev: &AgentEvent) -> Option<&str> {
match ev {
AgentEvent::Run { data } => match data {
RunEvent::Heartbeat { .. } => None,
RunEvent::RunStarted { run_id, .. }
| RunEvent::ProjectStarted { run_id, .. }
| RunEvent::PhaseStarted { run_id, .. }
| RunEvent::PhaseFinished { run_id, .. }
| RunEvent::EnvironmentStatus { run_id, .. }
| RunEvent::AuthSessionStatus { run_id, .. }
| RunEvent::LiveVerificationCapabilities { run_id, .. }
| RunEvent::RepoStarted { run_id, .. }
| RunEvent::RepoStaticDone { run_id, .. }
| RunEvent::RepoDynamicDone { run_id, .. }
| RunEvent::RepoFailed { run_id, .. }
| RunEvent::RepoIngestFailed { run_id, .. }
| RunEvent::RepoFinished { run_id, .. }
| RunEvent::ProjectFinished { run_id, .. }
| RunEvent::RunFinished { run_id, .. } => Some(run_id.as_str()),
},
AgentEvent::Ai { data: AiEvent::BudgetTick { run_id, .. } } => Some(run_id.as_str()),
AgentEvent::Sandbox { data } => match data {
SandboxEvent::VerifierStarted { run_id, .. }
| SandboxEvent::VerifierFinished { run_id, .. } => Some(run_id.as_str()),
},
_ => None,
}
}
#[derive(Clone, Default)]
pub struct AuthConfig {
pub token: Option<String>,
}
impl AuthConfig {
pub fn new(token: Option<String>) -> Self {
Self { token }
}
pub fn is_enforced(&self) -> bool {
self.token.is_some()
}
}
#[derive(Clone)]
pub struct ServerState {
pub store: Store,
pub events: EventSink,
pub scan: Arc<dyn ScanTrigger>,
pub setup: SetupContext,
pub auth: AuthConfig,
pub auth_setup_agent: Option<Arc<dyn AuthSetupAgent>>,
pub auth_setup_jobs: Arc<AuthSetupJobStore>,
pub project_setup_agent: Option<Arc<dyn ProjectSetupAgent>>,
pub project_setup_jobs: Arc<ProjectSetupJobStore>,
pub seed_setup_agent: Option<Arc<dyn SeedSetupAgent>>,
pub remediation_agent: Option<Arc<dyn RemediationAgent>>,
pub remediation_jobs: Arc<RemediationJobStore>,
pub replay: Arc<EventReplay>,
pub state_repos_dir: Option<PathBuf>,
pub state_bundles_dir: Option<PathBuf>,
pub state_logs_dir: Option<PathBuf>,
pub webhook: Option<Arc<crate::webhook::WebhookConfig>>,
}
impl ServerState {
pub fn new(
store: Store,
events: EventSink,
scan: Arc<dyn ScanTrigger>,
setup: SetupContext,
auth: AuthConfig,
) -> Self {
Self {
store,
events,
scan,
setup,
auth,
auth_setup_agent: None,
auth_setup_jobs: Arc::new(AuthSetupJobStore::new()),
project_setup_agent: None,
project_setup_jobs: Arc::new(ProjectSetupJobStore::new()),
seed_setup_agent: None,
remediation_agent: None,
remediation_jobs: Arc::new(RemediationJobStore::new()),
replay: Arc::new(EventReplay::new()),
state_repos_dir: None,
state_bundles_dir: None,
state_logs_dir: None,
webhook: None,
}
}
pub fn with_state_repos_dir(mut self, dir: PathBuf) -> Self {
self.state_repos_dir = Some(dir);
self
}
pub fn with_auth_setup_agent(mut self, agent: Arc<dyn AuthSetupAgent>) -> Self {
self.auth_setup_agent = Some(agent);
self
}
pub fn with_project_setup_agent(mut self, agent: Arc<dyn ProjectSetupAgent>) -> Self {
self.project_setup_agent = Some(agent);
self
}
pub fn with_seed_setup_agent(mut self, agent: Arc<dyn SeedSetupAgent>) -> Self {
self.seed_setup_agent = Some(agent);
self
}
pub fn with_remediation_agent(mut self, agent: Arc<dyn RemediationAgent>) -> Self {
self.remediation_agent = Some(agent);
self
}
pub fn with_state_bundles_dir(mut self, dir: PathBuf) -> Self {
self.state_bundles_dir = Some(dir);
self
}
pub fn with_state_logs_dir(mut self, dir: PathBuf) -> Self {
self.state_logs_dir = Some(dir);
self
}
pub fn with_webhook(mut self, cfg: crate::webhook::WebhookConfig) -> Self {
self.webhook = Some(Arc::new(cfg));
self
}
}
#[derive(Debug, Error)]
pub enum ApiError {
#[error("not found: {0}")]
NotFound(String),
#[error("bad request: {0}")]
BadRequest(String),
#[error("unauthorized")]
Unauthorized,
#[error("payload too large: {0}")]
PayloadTooLarge(String),
#[error("too many requests: {0}")]
TooManyRequests(String),
#[error("store error: {0}")]
Store(#[from] StoreError),
#[error("scan trigger failed: {0}")]
Scan(#[from] ScanTriggerError),
#[error("internal: {0}")]
Internal(String),
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let (status, code) = match &self {
ApiError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
ApiError::BadRequest(_) => (StatusCode::BAD_REQUEST, "bad_request"),
ApiError::Unauthorized => (StatusCode::UNAUTHORIZED, "unauthorized"),
ApiError::PayloadTooLarge(_) => (StatusCode::PAYLOAD_TOO_LARGE, "payload_too_large"),
ApiError::TooManyRequests(_) => (StatusCode::TOO_MANY_REQUESTS, "too_many_requests"),
ApiError::Store(_) => (StatusCode::INTERNAL_SERVER_ERROR, "store_error"),
ApiError::Scan(ScanTriggerError::Rejected(_)) => {
(StatusCode::BAD_REQUEST, "scan_rejected")
}
ApiError::Scan(ScanTriggerError::Closed) => {
(StatusCode::SERVICE_UNAVAILABLE, "shutting_down")
}
ApiError::Scan(ScanTriggerError::Backpressure(_)) => {
(StatusCode::TOO_MANY_REQUESTS, "scan_backpressure")
}
ApiError::Scan(ScanTriggerError::Internal(_)) => {
(StatusCode::INTERNAL_SERVER_ERROR, "scan_internal")
}
ApiError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "internal"),
};
let body = Json(json!({ "error": { "code": code, "message": self.to_string() } }));
(status, body).into_response()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn scan_trigger_source_manual_stamps_ui() {
assert_eq!(ScanTriggerSource::Manual.as_run_record_string(), "UI");
}
#[test]
fn scan_trigger_source_webhook_stamps_webhook() {
assert_eq!(ScanTriggerSource::Webhook.as_run_record_string(), "Webhook");
}
#[test]
fn scan_trigger_source_scheduler_stamps_label_under_cron_prefix() {
let s = ScanTriggerSource::Scheduler { label: "weekly".to_string() };
assert_eq!(s.as_run_record_string(), "Cron:weekly");
}
fn run_started(run_id: &str) -> AgentEvent {
AgentEvent::Run {
data: RunEvent::RunStarted {
run_id: run_id.to_string(),
project_id: "test-project".to_string(),
repos: vec!["alpha".to_string()],
started_at_ms: 0,
},
}
}
fn repo_started(run_id: &str, repo: &str) -> AgentEvent {
AgentEvent::Run {
data: RunEvent::RepoStarted {
run_id: run_id.to_string(),
project_id: "test-project".to_string(),
repo: repo.to_string(),
started_at_ms: 0,
},
}
}
fn heartbeat() -> AgentEvent {
AgentEvent::Run { data: RunEvent::Heartbeat { ts: 0 } }
}
#[tokio::test]
async fn heartbeat_is_not_buffered() {
let replay = EventReplay::new();
replay.push(&heartbeat()).await;
assert_eq!(replay.tracked_runs().await, 0);
assert!(replay.snapshot("anything").await.is_empty());
}
#[tokio::test]
async fn snapshot_returns_events_in_push_order() {
let replay = EventReplay::new();
replay.push(&run_started("r1")).await;
replay.push(&repo_started("r1", "alpha")).await;
let frames = replay.snapshot("r1").await;
assert_eq!(frames.len(), 2);
assert!(matches!(frames[0], AgentEvent::Run { data: RunEvent::RunStarted { .. } }));
assert!(matches!(frames[1], AgentEvent::Run { data: RunEvent::RepoStarted { .. } }));
}
#[tokio::test]
async fn max_per_run_drops_oldest_frame() {
let mut replay = EventReplay::new();
replay.max_per_run = 2;
replay.push(&run_started("r1")).await;
replay.push(&repo_started("r1", "alpha")).await;
replay.push(&repo_started("r1", "beta")).await;
let frames = replay.snapshot("r1").await;
assert_eq!(frames.len(), 2);
let repos: Vec<String> = frames
.iter()
.filter_map(|ev| match ev {
AgentEvent::Run { data: RunEvent::RepoStarted { repo, .. } } => Some(repo.clone()),
_ => None,
})
.collect();
assert_eq!(repos, vec!["alpha".to_string(), "beta".to_string()]);
}
#[tokio::test]
async fn max_runs_evicts_least_recently_touched_run() {
let mut replay = EventReplay::new();
replay.max_runs = 2;
replay.push(&run_started("a")).await;
replay.push(&run_started("b")).await;
replay.push(&repo_started("a", "alpha")).await;
replay.push(&run_started("c")).await;
assert_eq!(replay.tracked_runs().await, 2);
assert!(!replay.snapshot("a").await.is_empty(), "`a` was touched, must survive");
assert!(replay.snapshot("b").await.is_empty(), "`b` was LRU, must be evicted");
assert!(!replay.snapshot("c").await.is_empty(), "`c` is newest");
}
#[tokio::test]
async fn unknown_run_id_yields_empty_snapshot() {
let replay = EventReplay::new();
replay.push(&run_started("real")).await;
assert!(replay.snapshot("ghost").await.is_empty());
}
}