1use std::collections::{HashMap, VecDeque};
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::sync::{
6 atomic::{AtomicU64, Ordering},
7 Arc,
8};
9
10use axum::http::StatusCode;
11use axum::response::{IntoResponse, Response};
12use axum::Json;
13use serde_json::json;
14use thiserror::Error;
15use tokio::sync::{Mutex, RwLock};
16
17use nyx_agent_core::store::StoreError;
18use nyx_agent_core::{Config, SecretStore, Store};
19use nyx_agent_types::event::{AgentEvent, AiEvent, EventSink, RunEvent, SandboxEvent};
20use nyx_agent_types::product::{
21 ProjectLaunchProfile, ProjectLaunchProfileInput, ProjectSetupError, ProjectSetupJobEvent,
22 ProjectSetupJobRecord, ProjectSetupJobStatus, ProjectSetupPhase, ProjectSetupResponse,
23 ProjectSetupVerificationStatus, SeedSetupPlan, VerifiedVulnerabilityRecord,
24};
25use nyx_agent_types::project::{
26 AuthSetupError, AuthSetupJobEvent, AuthSetupJobRecord, AuthSetupJobStatus, AuthSetupPhase,
27 AuthSetupResponse, AuthSetupVerification, ProjectAuthOwnedObject, ProjectAuthProfile,
28};
29
30pub type ScanFuture<'a> =
33 Pin<Box<dyn Future<Output = Result<String, ScanTriggerError>> + Send + 'a>>;
34
35#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum ScanTriggerSource {
43 Manual,
46 Scheduler { label: String },
49 Webhook,
51}
52
53impl ScanTriggerSource {
54 pub fn as_run_record_string(&self) -> String {
60 match self {
61 ScanTriggerSource::Manual => "UI".to_string(),
62 ScanTriggerSource::Scheduler { label } => format!("Cron:{label}"),
63 ScanTriggerSource::Webhook => "Webhook".to_string(),
64 }
65 }
66}
67
68#[derive(Debug, Clone, Default, PartialEq, Eq)]
72pub struct ScanRunOverrides {
73 pub exploit_mode_enabled: bool,
74 pub allow_state_changing_live_probes: bool,
75 pub exploit_dry_run: Option<bool>,
76 pub browser_checks_enabled: Option<bool>,
77 pub business_logic_templates_enabled: Option<bool>,
78 pub research_mode_enabled: Option<bool>,
79 pub unsafe_attack_agent_enabled: Option<bool>,
80 pub business_logic_template_ids: Option<Vec<String>>,
81}
82
83pub trait ScanTrigger: Send + Sync + 'static {
87 fn trigger<'a>(
95 &'a self,
96 source: ScanTriggerSource,
97 project_id: Option<String>,
98 repo: Option<String>,
99 run_overrides: Option<ScanRunOverrides>,
100 ) -> ScanFuture<'a>;
101}
102
103pub type AuthSetupAgentFuture<'a> =
104 Pin<Box<dyn Future<Output = Result<AuthSetupAgentOutput, AuthSetupAgentError>> + Send + 'a>>;
105
106#[derive(Debug, Clone)]
107pub struct AuthSetupAgentRequest {
108 pub project_id: String,
109 pub project_name: String,
110 pub target_base_url: Option<String>,
111 pub workspace_roots: Vec<PathBuf>,
112 pub requested_roles: Vec<String>,
113 pub seeded_objects: Vec<ProjectAuthOwnedObject>,
114 pub existing_profiles: Vec<ProjectAuthProfile>,
115 pub static_login_paths: Vec<String>,
116 pub static_object_routes: Vec<String>,
117 pub files_inspected: usize,
118}
119
120#[derive(Debug, Clone)]
121pub struct AuthSetupAgentOutput {
122 pub profiles: Vec<ProjectAuthProfile>,
123 pub roles: Vec<String>,
124 pub login_paths: Vec<String>,
125 pub object_routes: Vec<String>,
126 pub files_inspected: usize,
127 pub verification: AuthSetupVerification,
128 pub message: String,
129}
130
131#[derive(Debug, Error)]
132pub enum AuthSetupAgentError {
133 #[error("auth setup agent unavailable: {0}")]
134 Unavailable(String),
135 #[error("auth setup agent failed: {0}")]
136 Failed(String),
137}
138
139pub trait AuthSetupAgent: Send + Sync + 'static {
140 fn explore<'a>(&'a self, req: AuthSetupAgentRequest) -> AuthSetupAgentFuture<'a>;
141}
142
143pub type ProjectSetupAgentFuture<'a> = Pin<
144 Box<dyn Future<Output = Result<ProjectSetupAgentOutput, ProjectSetupAgentError>> + Send + 'a>,
145>;
146
147#[derive(Debug, Clone)]
148pub struct ProjectSetupAgentRequest {
149 pub project_id: String,
150 pub project_name: String,
151 pub target_base_url: Option<String>,
152 pub workspace_roots: Vec<PathBuf>,
153 pub existing_launch_profile: Option<ProjectLaunchProfile>,
154}
155
156#[derive(Debug, Clone)]
157pub struct ProjectSetupAgentOutput {
158 pub profile: ProjectLaunchProfileInput,
159 pub summary: String,
160 pub checks: Vec<String>,
161 pub warnings: Vec<String>,
162 pub verification_status: ProjectSetupVerificationStatus,
163 pub message: String,
164}
165
166#[derive(Debug, Error)]
167pub enum ProjectSetupAgentError {
168 #[error("project setup agent unavailable: {0}")]
169 Unavailable(String),
170 #[error("project setup agent failed: {0}")]
171 Failed(String),
172}
173
174pub trait ProjectSetupAgent: Send + Sync + 'static {
175 fn explore<'a>(&'a self, req: ProjectSetupAgentRequest) -> ProjectSetupAgentFuture<'a>;
176}
177
178pub type SeedSetupAgentFuture<'a> =
179 Pin<Box<dyn Future<Output = Result<SeedSetupAgentOutput, SeedSetupAgentError>> + Send + 'a>>;
180
181#[derive(Debug, Clone)]
182pub struct SeedSetupAgentRequest {
183 pub project_id: String,
184 pub project_name: String,
185 pub target_base_url: Option<String>,
186 pub workspace_roots: Vec<PathBuf>,
187 pub launch_profile: Option<ProjectLaunchProfile>,
188}
189
190#[derive(Debug, Clone)]
191pub struct SeedSetupAgentOutput {
192 pub plan: SeedSetupPlan,
193 pub message: String,
194}
195
196#[derive(Debug, Error)]
197pub enum SeedSetupAgentError {
198 #[error("seed setup agent unavailable: {0}")]
199 Unavailable(String),
200 #[error("seed setup agent failed: {0}")]
201 Failed(String),
202}
203
204pub trait SeedSetupAgent: Send + Sync + 'static {
205 fn explore<'a>(&'a self, req: SeedSetupAgentRequest) -> SeedSetupAgentFuture<'a>;
206}
207
208pub type RemediationAgentFuture<'a> = Pin<
209 Box<dyn Future<Output = Result<RemediationAgentOutput, RemediationAgentError>> + Send + 'a>,
210>;
211
212#[derive(Debug, Clone)]
213pub struct RemediationAgentRequest {
214 pub vulnerability: VerifiedVulnerabilityRecord,
215 pub workspace_roots: Vec<PathBuf>,
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
219pub struct RemediationChangedFile {
220 pub repo: String,
221 pub path: String,
222 pub status: String,
223 pub additions: Option<i64>,
224 pub deletions: Option<i64>,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
228pub struct RemediationAgentOutput {
229 pub changed_files: Vec<RemediationChangedFile>,
230 pub summary: String,
231 pub final_message: String,
232}
233
234#[derive(Debug, Error)]
235pub enum RemediationAgentError {
236 #[error("remediation agent unavailable: {0}")]
237 Unavailable(String),
238 #[error("remediation agent failed: {0}")]
239 Failed(String),
240}
241
242pub trait RemediationAgent: Send + Sync + 'static {
243 fn fix<'a>(&'a self, req: RemediationAgentRequest) -> RemediationAgentFuture<'a>;
244}
245
246#[derive(Debug, Default)]
247pub struct AuthSetupJobStore {
248 seq: AtomicU64,
249 jobs: Mutex<HashMap<String, AuthSetupJobRecord>>,
250}
251
252#[derive(Debug, Default)]
253pub struct ProjectSetupJobStore {
254 seq: AtomicU64,
255 jobs: Mutex<HashMap<String, ProjectSetupJobRecord>>,
256}
257
258#[derive(Debug, Clone, serde::Serialize)]
259pub struct RemediationJobEvent {
260 pub at: i64,
261 pub phase: String,
262 pub message: String,
263}
264
265#[derive(Debug, Clone, serde::Serialize)]
266pub struct RemediationJobError {
267 pub title: String,
268 pub detail: String,
269}
270
271#[derive(Debug, Clone, serde::Serialize)]
272pub struct RemediationJobRecord {
273 pub id: String,
274 pub vulnerability_id: String,
275 pub project_id: String,
276 pub status: String,
277 pub phase: String,
278 pub message: String,
279 pub started_at: i64,
280 pub finished_at: Option<i64>,
281 pub events: Vec<RemediationJobEvent>,
282 pub result: Option<RemediationAgentOutput>,
283 pub error: Option<RemediationJobError>,
284}
285
286#[derive(Debug, Default)]
287pub struct RemediationJobStore {
288 seq: AtomicU64,
289 jobs: Mutex<HashMap<String, RemediationJobRecord>>,
290}
291
292impl RemediationJobStore {
293 pub fn new() -> Self {
294 Self::default()
295 }
296
297 pub async fn create(
298 &self,
299 vulnerability_id: &str,
300 project_id: &str,
301 now: i64,
302 ) -> RemediationJobRecord {
303 let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
304 let id = format!("fix-{now}-{n}");
305 let event = RemediationJobEvent {
306 at: now,
307 phase: "queued".to_string(),
308 message: "Fix agent queued.".to_string(),
309 };
310 let record = RemediationJobRecord {
311 id: id.clone(),
312 vulnerability_id: vulnerability_id.to_string(),
313 project_id: project_id.to_string(),
314 status: "queued".to_string(),
315 phase: "queued".to_string(),
316 message: event.message.clone(),
317 started_at: now,
318 finished_at: None,
319 events: vec![event],
320 result: None,
321 error: None,
322 };
323 self.jobs.lock().await.insert(id, record.clone());
324 record
325 }
326
327 pub async fn get(&self, id: &str) -> Option<RemediationJobRecord> {
328 self.jobs.lock().await.get(id).cloned()
329 }
330
331 pub async fn push_phase(&self, id: &str, phase: &str, message: impl Into<String>) {
332 let now = nyx_agent_core::now_epoch_ms();
333 let message = message.into();
334 let mut jobs = self.jobs.lock().await;
335 let Some(job) = jobs.get_mut(id) else {
336 return;
337 };
338 job.status = "running".to_string();
339 job.phase = phase.to_string();
340 job.message = message.clone();
341 job.events.push(RemediationJobEvent { at: now, phase: phase.to_string(), message });
342 }
343
344 pub async fn complete(&self, id: &str, result: RemediationAgentOutput) {
345 let now = nyx_agent_core::now_epoch_ms();
346 let mut jobs = self.jobs.lock().await;
347 let Some(job) = jobs.get_mut(id) else {
348 return;
349 };
350 job.status = "succeeded".to_string();
351 job.phase = "complete".to_string();
352 job.message = if result.changed_files.is_empty() {
353 "Fix agent completed without leaving file changes.".to_string()
354 } else {
355 format!("Fix agent updated {} file(s).", result.changed_files.len())
356 };
357 job.finished_at = Some(now);
358 job.result = Some(result);
359 job.error = None;
360 job.events.push(RemediationJobEvent {
361 at: now,
362 phase: "complete".to_string(),
363 message: job.message.clone(),
364 });
365 }
366
367 pub async fn fail(&self, id: &str, error: RemediationJobError) {
368 let now = nyx_agent_core::now_epoch_ms();
369 let mut jobs = self.jobs.lock().await;
370 let Some(job) = jobs.get_mut(id) else {
371 return;
372 };
373 job.status = "failed".to_string();
374 job.phase = "failed".to_string();
375 job.message = error.title.clone();
376 job.finished_at = Some(now);
377 job.result = None;
378 job.error = Some(error.clone());
379 job.events.push(RemediationJobEvent {
380 at: now,
381 phase: "failed".to_string(),
382 message: error.detail,
383 });
384 }
385}
386
387impl ProjectSetupJobStore {
388 pub fn new() -> Self {
389 Self::default()
390 }
391
392 pub async fn create(&self, project_id: &str, now: i64) -> ProjectSetupJobRecord {
393 let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
394 let id = format!("projectsetup-{now}-{n}");
395 let event = ProjectSetupJobEvent {
396 at: now,
397 phase: ProjectSetupPhase::Queued,
398 message: "Project setup queued.".to_string(),
399 };
400 let record = ProjectSetupJobRecord {
401 id: id.clone(),
402 project_id: project_id.to_string(),
403 status: ProjectSetupJobStatus::Queued,
404 phase: ProjectSetupPhase::Queued,
405 message: event.message.clone(),
406 started_at: now,
407 finished_at: None,
408 events: vec![event],
409 result: None,
410 error: None,
411 };
412 self.jobs.lock().await.insert(id, record.clone());
413 record
414 }
415
416 pub async fn get(&self, id: &str) -> Option<ProjectSetupJobRecord> {
417 self.jobs.lock().await.get(id).cloned()
418 }
419
420 pub async fn list_by_project(&self, project_id: &str) -> Vec<ProjectSetupJobRecord> {
421 let mut jobs = self
422 .jobs
423 .lock()
424 .await
425 .values()
426 .filter(|job| job.project_id == project_id)
427 .cloned()
428 .collect::<Vec<_>>();
429 jobs.sort_by(|a, b| b.started_at.cmp(&a.started_at).then_with(|| b.id.cmp(&a.id)));
430 jobs
431 }
432
433 pub async fn push_phase(&self, id: &str, phase: ProjectSetupPhase, message: impl Into<String>) {
434 let now = nyx_agent_core::now_epoch_ms();
435 let message = message.into();
436 let mut jobs = self.jobs.lock().await;
437 let Some(job) = jobs.get_mut(id) else {
438 return;
439 };
440 job.status = ProjectSetupJobStatus::Running;
441 job.phase = phase;
442 job.message = message.clone();
443 job.events.push(ProjectSetupJobEvent { at: now, phase, message });
444 }
445
446 pub async fn complete(&self, id: &str, result: ProjectSetupResponse) {
447 let now = nyx_agent_core::now_epoch_ms();
448 let mut jobs = self.jobs.lock().await;
449 let Some(job) = jobs.get_mut(id) else {
450 return;
451 };
452 job.status = ProjectSetupJobStatus::Succeeded;
453 job.phase = ProjectSetupPhase::Complete;
454 job.message = result.message.clone();
455 job.finished_at = Some(now);
456 job.result = Some(result);
457 job.error = None;
458 job.events.push(ProjectSetupJobEvent {
459 at: now,
460 phase: ProjectSetupPhase::Complete,
461 message: job.message.clone(),
462 });
463 }
464
465 pub async fn fail(&self, id: &str, error: ProjectSetupError) {
466 let now = nyx_agent_core::now_epoch_ms();
467 let mut jobs = self.jobs.lock().await;
468 let Some(job) = jobs.get_mut(id) else {
469 return;
470 };
471 job.status = ProjectSetupJobStatus::Failed;
472 job.phase = ProjectSetupPhase::Failed;
473 job.message = error.title.clone();
474 job.finished_at = Some(now);
475 job.result = None;
476 job.error = Some(error.clone());
477 job.events.push(ProjectSetupJobEvent {
478 at: now,
479 phase: ProjectSetupPhase::Failed,
480 message: error.detail,
481 });
482 }
483}
484
485impl AuthSetupJobStore {
486 pub fn new() -> Self {
487 Self::default()
488 }
489
490 pub async fn create(&self, project_id: &str, now: i64) -> AuthSetupJobRecord {
491 let n = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
492 let id = format!("authsetup-{now}-{n}");
493 let event = AuthSetupJobEvent {
494 at: now,
495 phase: AuthSetupPhase::Queued,
496 message: "Auth setup queued.".to_string(),
497 };
498 let record = AuthSetupJobRecord {
499 id: id.clone(),
500 project_id: project_id.to_string(),
501 status: AuthSetupJobStatus::Queued,
502 phase: AuthSetupPhase::Queued,
503 message: event.message.clone(),
504 started_at: now,
505 finished_at: None,
506 events: vec![event],
507 result: None,
508 error: None,
509 };
510 self.jobs.lock().await.insert(id, record.clone());
511 record
512 }
513
514 pub async fn get(&self, id: &str) -> Option<AuthSetupJobRecord> {
515 self.jobs.lock().await.get(id).cloned()
516 }
517
518 pub async fn push_phase(&self, id: &str, phase: AuthSetupPhase, message: impl Into<String>) {
519 let now = nyx_agent_core::now_epoch_ms();
520 let message = message.into();
521 let mut jobs = self.jobs.lock().await;
522 let Some(job) = jobs.get_mut(id) else {
523 return;
524 };
525 job.status = AuthSetupJobStatus::Running;
526 job.phase = phase;
527 job.message = message.clone();
528 job.events.push(AuthSetupJobEvent { at: now, phase, message });
529 }
530
531 pub async fn complete(&self, id: &str, result: AuthSetupResponse) {
532 let now = nyx_agent_core::now_epoch_ms();
533 let mut jobs = self.jobs.lock().await;
534 let Some(job) = jobs.get_mut(id) else {
535 return;
536 };
537 job.status = AuthSetupJobStatus::Succeeded;
538 job.phase = AuthSetupPhase::Complete;
539 job.message = result.message.clone();
540 job.finished_at = Some(now);
541 job.result = Some(result);
542 job.error = None;
543 job.events.push(AuthSetupJobEvent {
544 at: now,
545 phase: AuthSetupPhase::Complete,
546 message: job.message.clone(),
547 });
548 }
549
550 pub async fn fail(&self, id: &str, error: AuthSetupError) {
551 let now = nyx_agent_core::now_epoch_ms();
552 let mut jobs = self.jobs.lock().await;
553 let Some(job) = jobs.get_mut(id) else {
554 return;
555 };
556 job.status = AuthSetupJobStatus::Failed;
557 job.phase = AuthSetupPhase::Failed;
558 job.message = error.title.clone();
559 job.finished_at = Some(now);
560 job.result = None;
561 job.error = Some(error.clone());
562 job.events.push(AuthSetupJobEvent {
563 at: now,
564 phase: AuthSetupPhase::Failed,
565 message: error.detail,
566 });
567 }
568}
569
570#[derive(Debug, Error)]
571pub enum ScanTriggerError {
572 #[error("scan request was rejected: {0}")]
573 Rejected(String),
574 #[error("daemon is shutting down")]
575 Closed,
576 #[error("scan request queue is full: {0}")]
580 Backpressure(String),
581 #[error("internal error: {0}")]
582 Internal(String),
583}
584
585#[derive(Clone)]
589pub struct SetupContext {
590 pub config_path: PathBuf,
591 pub secrets: SecretStore,
592 pub config: Arc<RwLock<Config>>,
596 pub completed: Arc<std::sync::atomic::AtomicBool>,
600}
601
602impl SetupContext {
603 pub fn new(
604 config_path: PathBuf,
605 config: Config,
606 completed: bool,
607 secrets: SecretStore,
608 ) -> Self {
609 Self {
610 config_path,
611 secrets,
612 config: Arc::new(RwLock::new(config)),
613 completed: Arc::new(std::sync::atomic::AtomicBool::new(completed)),
614 }
615 }
616
617 pub fn is_complete(&self) -> bool {
618 self.completed.load(std::sync::atomic::Ordering::Acquire)
619 }
620
621 pub fn mark_complete(&self) {
622 self.completed.store(true, std::sync::atomic::Ordering::Release);
623 }
624}
625
626#[derive(Debug)]
643pub struct EventReplay {
644 inner: Mutex<ReplayInner>,
645 pub max_per_run: usize,
650 pub max_runs: usize,
654}
655
656#[derive(Debug, Default)]
657struct ReplayInner {
658 by_run: HashMap<String, VecDeque<AgentEvent>>,
659 order: VecDeque<String>,
662}
663
664impl Default for EventReplay {
665 fn default() -> Self {
666 Self::new()
667 }
668}
669
670impl EventReplay {
671 pub fn new() -> Self {
672 Self { inner: Mutex::new(ReplayInner::default()), max_per_run: 128, max_runs: 16 }
673 }
674
675 pub async fn push(&self, event: &AgentEvent) {
678 let Some(run_id) = run_id_for_event(event) else { return };
679 let mut g = self.inner.lock().await;
680
681 if let Some(pos) = g.order.iter().position(|r| r == run_id) {
685 g.order.remove(pos);
686 } else if g.by_run.len() >= self.max_runs {
687 if let Some(victim) = g.order.pop_front() {
688 g.by_run.remove(&victim);
689 }
690 }
691 g.order.push_back(run_id.to_string());
692
693 let buf = g.by_run.entry(run_id.to_string()).or_default();
694 if buf.len() == self.max_per_run {
695 buf.pop_front();
696 }
697 buf.push_back(event.clone());
698 }
699
700 pub async fn snapshot(&self, run_id: &str) -> Vec<AgentEvent> {
702 let g = self.inner.lock().await;
703 g.by_run.get(run_id).map(|q| q.iter().cloned().collect()).unwrap_or_default()
704 }
705
706 pub async fn tracked_runs(&self) -> usize {
708 self.inner.lock().await.by_run.len()
709 }
710}
711
712fn run_id_for_event(ev: &AgentEvent) -> Option<&str> {
713 match ev {
714 AgentEvent::Run { data } => match data {
715 RunEvent::Heartbeat { .. } => None,
716 RunEvent::RunStarted { run_id, .. }
717 | RunEvent::ProjectStarted { run_id, .. }
718 | RunEvent::PhaseStarted { run_id, .. }
719 | RunEvent::PhaseFinished { run_id, .. }
720 | RunEvent::EnvironmentStatus { run_id, .. }
721 | RunEvent::AuthSessionStatus { run_id, .. }
722 | RunEvent::LiveVerificationCapabilities { run_id, .. }
723 | RunEvent::RepoStarted { run_id, .. }
724 | RunEvent::RepoStaticDone { run_id, .. }
725 | RunEvent::RepoDynamicDone { run_id, .. }
726 | RunEvent::RepoFailed { run_id, .. }
727 | RunEvent::RepoIngestFailed { run_id, .. }
728 | RunEvent::RepoFinished { run_id, .. }
729 | RunEvent::ProjectFinished { run_id, .. }
730 | RunEvent::RunFinished { run_id, .. } => Some(run_id.as_str()),
731 },
732 AgentEvent::Ai { data: AiEvent::BudgetTick { run_id, .. } } => Some(run_id.as_str()),
733 AgentEvent::Sandbox { data } => match data {
734 SandboxEvent::VerifierStarted { run_id, .. }
735 | SandboxEvent::VerifierFinished { run_id, .. } => Some(run_id.as_str()),
736 },
737 _ => None,
738 }
739}
740
741#[derive(Clone, Default)]
745pub struct AuthConfig {
746 pub token: Option<String>,
747}
748
749impl AuthConfig {
750 pub fn new(token: Option<String>) -> Self {
751 Self { token }
752 }
753
754 pub fn is_enforced(&self) -> bool {
755 self.token.is_some()
756 }
757}
758
759#[derive(Clone)]
763pub struct ServerState {
764 pub store: Store,
765 pub events: EventSink,
766 pub scan: Arc<dyn ScanTrigger>,
767 pub setup: SetupContext,
768 pub auth: AuthConfig,
769 pub auth_setup_agent: Option<Arc<dyn AuthSetupAgent>>,
770 pub auth_setup_jobs: Arc<AuthSetupJobStore>,
771 pub project_setup_agent: Option<Arc<dyn ProjectSetupAgent>>,
772 pub project_setup_jobs: Arc<ProjectSetupJobStore>,
773 pub seed_setup_agent: Option<Arc<dyn SeedSetupAgent>>,
774 pub remediation_agent: Option<Arc<dyn RemediationAgent>>,
775 pub remediation_jobs: Arc<RemediationJobStore>,
776 pub replay: Arc<EventReplay>,
781 pub state_repos_dir: Option<PathBuf>,
786 pub state_bundles_dir: Option<PathBuf>,
791 pub state_logs_dir: Option<PathBuf>,
795 pub webhook: Option<Arc<crate::webhook::WebhookConfig>>,
799}
800
801impl ServerState {
802 pub fn new(
803 store: Store,
804 events: EventSink,
805 scan: Arc<dyn ScanTrigger>,
806 setup: SetupContext,
807 auth: AuthConfig,
808 ) -> Self {
809 Self {
810 store,
811 events,
812 scan,
813 setup,
814 auth,
815 auth_setup_agent: None,
816 auth_setup_jobs: Arc::new(AuthSetupJobStore::new()),
817 project_setup_agent: None,
818 project_setup_jobs: Arc::new(ProjectSetupJobStore::new()),
819 seed_setup_agent: None,
820 remediation_agent: None,
821 remediation_jobs: Arc::new(RemediationJobStore::new()),
822 replay: Arc::new(EventReplay::new()),
823 state_repos_dir: None,
824 state_bundles_dir: None,
825 state_logs_dir: None,
826 webhook: None,
827 }
828 }
829
830 pub fn with_state_repos_dir(mut self, dir: PathBuf) -> Self {
833 self.state_repos_dir = Some(dir);
834 self
835 }
836
837 pub fn with_auth_setup_agent(mut self, agent: Arc<dyn AuthSetupAgent>) -> Self {
838 self.auth_setup_agent = Some(agent);
839 self
840 }
841
842 pub fn with_project_setup_agent(mut self, agent: Arc<dyn ProjectSetupAgent>) -> Self {
843 self.project_setup_agent = Some(agent);
844 self
845 }
846
847 pub fn with_seed_setup_agent(mut self, agent: Arc<dyn SeedSetupAgent>) -> Self {
848 self.seed_setup_agent = Some(agent);
849 self
850 }
851
852 pub fn with_remediation_agent(mut self, agent: Arc<dyn RemediationAgent>) -> Self {
853 self.remediation_agent = Some(agent);
854 self
855 }
856
857 pub fn with_state_bundles_dir(mut self, dir: PathBuf) -> Self {
860 self.state_bundles_dir = Some(dir);
861 self
862 }
863
864 pub fn with_state_logs_dir(mut self, dir: PathBuf) -> Self {
867 self.state_logs_dir = Some(dir);
868 self
869 }
870
871 pub fn with_webhook(mut self, cfg: crate::webhook::WebhookConfig) -> Self {
874 self.webhook = Some(Arc::new(cfg));
875 self
876 }
877}
878
879#[derive(Debug, Error)]
883pub enum ApiError {
884 #[error("not found: {0}")]
885 NotFound(String),
886 #[error("bad request: {0}")]
887 BadRequest(String),
888 #[error("unauthorized")]
889 Unauthorized,
890 #[error("payload too large: {0}")]
891 PayloadTooLarge(String),
892 #[error("too many requests: {0}")]
897 TooManyRequests(String),
898 #[error("store error: {0}")]
899 Store(#[from] StoreError),
900 #[error("scan trigger failed: {0}")]
901 Scan(#[from] ScanTriggerError),
902 #[error("internal: {0}")]
903 Internal(String),
904}
905
906impl IntoResponse for ApiError {
907 fn into_response(self) -> Response {
908 let (status, code) = match &self {
909 ApiError::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
910 ApiError::BadRequest(_) => (StatusCode::BAD_REQUEST, "bad_request"),
911 ApiError::Unauthorized => (StatusCode::UNAUTHORIZED, "unauthorized"),
912 ApiError::PayloadTooLarge(_) => (StatusCode::PAYLOAD_TOO_LARGE, "payload_too_large"),
913 ApiError::TooManyRequests(_) => (StatusCode::TOO_MANY_REQUESTS, "too_many_requests"),
914 ApiError::Store(_) => (StatusCode::INTERNAL_SERVER_ERROR, "store_error"),
915 ApiError::Scan(ScanTriggerError::Rejected(_)) => {
916 (StatusCode::BAD_REQUEST, "scan_rejected")
917 }
918 ApiError::Scan(ScanTriggerError::Closed) => {
919 (StatusCode::SERVICE_UNAVAILABLE, "shutting_down")
920 }
921 ApiError::Scan(ScanTriggerError::Backpressure(_)) => {
922 (StatusCode::TOO_MANY_REQUESTS, "scan_backpressure")
923 }
924 ApiError::Scan(ScanTriggerError::Internal(_)) => {
925 (StatusCode::INTERNAL_SERVER_ERROR, "scan_internal")
926 }
927 ApiError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "internal"),
928 };
929 let body = Json(json!({ "error": { "code": code, "message": self.to_string() } }));
930 (status, body).into_response()
931 }
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937
938 #[test]
939 fn scan_trigger_source_manual_stamps_ui() {
940 assert_eq!(ScanTriggerSource::Manual.as_run_record_string(), "UI");
941 }
942
943 #[test]
944 fn scan_trigger_source_webhook_stamps_webhook() {
945 assert_eq!(ScanTriggerSource::Webhook.as_run_record_string(), "Webhook");
946 }
947
948 #[test]
949 fn scan_trigger_source_scheduler_stamps_label_under_cron_prefix() {
950 let s = ScanTriggerSource::Scheduler { label: "weekly".to_string() };
951 assert_eq!(s.as_run_record_string(), "Cron:weekly");
952 }
953
954 fn run_started(run_id: &str) -> AgentEvent {
955 AgentEvent::Run {
956 data: RunEvent::RunStarted {
957 run_id: run_id.to_string(),
958 project_id: "test-project".to_string(),
959 repos: vec!["alpha".to_string()],
960 started_at_ms: 0,
961 },
962 }
963 }
964
965 fn repo_started(run_id: &str, repo: &str) -> AgentEvent {
966 AgentEvent::Run {
967 data: RunEvent::RepoStarted {
968 run_id: run_id.to_string(),
969 project_id: "test-project".to_string(),
970 repo: repo.to_string(),
971 started_at_ms: 0,
972 },
973 }
974 }
975
976 fn heartbeat() -> AgentEvent {
977 AgentEvent::Run { data: RunEvent::Heartbeat { ts: 0 } }
978 }
979
980 #[tokio::test]
981 async fn heartbeat_is_not_buffered() {
982 let replay = EventReplay::new();
983 replay.push(&heartbeat()).await;
984 assert_eq!(replay.tracked_runs().await, 0);
985 assert!(replay.snapshot("anything").await.is_empty());
986 }
987
988 #[tokio::test]
989 async fn snapshot_returns_events_in_push_order() {
990 let replay = EventReplay::new();
991 replay.push(&run_started("r1")).await;
992 replay.push(&repo_started("r1", "alpha")).await;
993 let frames = replay.snapshot("r1").await;
994 assert_eq!(frames.len(), 2);
995 assert!(matches!(frames[0], AgentEvent::Run { data: RunEvent::RunStarted { .. } }));
996 assert!(matches!(frames[1], AgentEvent::Run { data: RunEvent::RepoStarted { .. } }));
997 }
998
999 #[tokio::test]
1000 async fn max_per_run_drops_oldest_frame() {
1001 let mut replay = EventReplay::new();
1002 replay.max_per_run = 2;
1003 replay.push(&run_started("r1")).await;
1004 replay.push(&repo_started("r1", "alpha")).await;
1005 replay.push(&repo_started("r1", "beta")).await;
1006 let frames = replay.snapshot("r1").await;
1007 assert_eq!(frames.len(), 2);
1008 let repos: Vec<String> = frames
1011 .iter()
1012 .filter_map(|ev| match ev {
1013 AgentEvent::Run { data: RunEvent::RepoStarted { repo, .. } } => Some(repo.clone()),
1014 _ => None,
1015 })
1016 .collect();
1017 assert_eq!(repos, vec!["alpha".to_string(), "beta".to_string()]);
1018 }
1019
1020 #[tokio::test]
1021 async fn max_runs_evicts_least_recently_touched_run() {
1022 let mut replay = EventReplay::new();
1023 replay.max_runs = 2;
1024 replay.push(&run_started("a")).await;
1025 replay.push(&run_started("b")).await;
1026 replay.push(&repo_started("a", "alpha")).await;
1028 replay.push(&run_started("c")).await;
1030
1031 assert_eq!(replay.tracked_runs().await, 2);
1032 assert!(!replay.snapshot("a").await.is_empty(), "`a` was touched, must survive");
1033 assert!(replay.snapshot("b").await.is_empty(), "`b` was LRU, must be evicted");
1034 assert!(!replay.snapshot("c").await.is_empty(), "`c` is newest");
1035 }
1036
1037 #[tokio::test]
1038 async fn unknown_run_id_yields_empty_snapshot() {
1039 let replay = EventReplay::new();
1040 replay.push(&run_started("real")).await;
1041 assert!(replay.snapshot("ghost").await.is_empty());
1042 }
1043}