1use hmac::{Hmac, KeyInit, Mac};
4use lettre::message::Mailbox;
5use lettre::transport::smtp::authentication::Credentials;
6use lettre::{AsyncSmtpTransport, AsyncTransport, Message as EmailMessage, Tokio1Executor};
7use serde::Serialize;
8use sha2::Sha256;
9use tokio::sync::broadcast::error::RecvError;
10
11use nyx_agent_core::store::{
12 FindingRecord, ProjectIntegrationRecord, ProjectIntegrationStoredRecord,
13};
14use nyx_agent_core::{now_epoch_ms, Store};
15use nyx_agent_types::event::{AgentEvent, RunEvent, SandboxEvent};
16use nyx_agent_types::integration::{
17 ProjectIntegrationConfigInput, ProjectIntegrationEvent, ProjectIntegrationKind, SmtpSecurity,
18};
19
20type HmacSha256 = Hmac<Sha256>;
21
22#[derive(Debug, Clone)]
23pub struct PreparedIntegrationConfig {
24 pub kind: ProjectIntegrationKind,
25 pub config_json: String,
26 pub target: String,
27}
28
29#[derive(Debug, Clone, Serialize)]
30pub struct IntegrationDeliveryPayload {
31 pub event: String,
32 pub project_id: String,
33 pub project_name: String,
34 pub run_id: Option<String>,
35 pub finding_id: Option<String>,
36 pub title: String,
37 pub summary: String,
38 pub severity: Option<String>,
39 pub status: Option<String>,
40 pub url: Option<String>,
41 pub vulnerabilities: Vec<IntegrationVulnerabilitySummary>,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 pub counts: Option<IntegrationRunCounts>,
44 pub sent_at_ms: i64,
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct IntegrationVulnerabilitySummary {
49 pub id: String,
50 pub title: String,
51 pub severity: String,
52 pub status: String,
53 pub vuln_class: String,
54}
55
56#[derive(Debug, Clone, Serialize)]
57pub struct IntegrationRunCounts {
58 pub succeeded: u32,
59 pub inconclusive: u32,
60 pub failed: u32,
61 pub verified_vulnerabilities: usize,
62}
63
64pub fn prepare_config(
65 config: &ProjectIntegrationConfigInput,
66) -> Result<PreparedIntegrationConfig, String> {
67 let kind = integration_kind(config);
68 validate_config(config)?;
69 let target = target_summary(config);
70 let config_json = serde_json::to_string(config)
71 .map_err(|err| format!("serialise integration config: {err}"))?;
72 Ok(PreparedIntegrationConfig { kind, config_json, target })
73}
74
75pub fn integration_kind(config: &ProjectIntegrationConfigInput) -> ProjectIntegrationKind {
76 match config {
77 ProjectIntegrationConfigInput::Webhook { .. } => ProjectIntegrationKind::Webhook,
78 ProjectIntegrationConfigInput::Slack { .. } => ProjectIntegrationKind::Slack,
79 ProjectIntegrationConfigInput::Smtp { .. } => ProjectIntegrationKind::Smtp,
80 }
81}
82
83pub fn validate_config(config: &ProjectIntegrationConfigInput) -> Result<(), String> {
84 match config {
85 ProjectIntegrationConfigInput::Webhook { url, .. } => validate_http_url(url, "webhook URL"),
86 ProjectIntegrationConfigInput::Slack { webhook_url } => {
87 validate_http_url(webhook_url, "Slack webhook URL")?;
88 if !webhook_url.starts_with("https://") {
89 return Err("Slack webhook URL must use https".to_string());
90 }
91 Ok(())
92 }
93 ProjectIntegrationConfigInput::Smtp {
94 host,
95 port,
96 username,
97 password,
98 from,
99 recipients,
100 ..
101 } => {
102 if host.trim().is_empty() {
103 return Err("SMTP host is required".to_string());
104 }
105 if *port == 0 {
106 return Err("SMTP port must be greater than 0".to_string());
107 }
108 if username.as_deref().unwrap_or("").trim().is_empty() && password.is_some() {
109 return Err("SMTP password requires a username".to_string());
110 }
111 parse_mailbox(from, "from address")?;
112 if recipients.is_empty() {
113 return Err("at least one recipient is required".to_string());
114 }
115 for recipient in recipients {
116 parse_mailbox(recipient, "recipient")?;
117 }
118 Ok(())
119 }
120 }
121}
122
123pub fn validate_min_severity(value: Option<&str>) -> Result<(), String> {
124 if let Some(value) = value {
125 if severity_rank(value).is_none() {
126 return Err("minimum severity must be Low, Medium, High, or Critical".to_string());
127 }
128 }
129 Ok(())
130}
131
132pub fn target_summary(config: &ProjectIntegrationConfigInput) -> String {
133 match config {
134 ProjectIntegrationConfigInput::Webhook { url, .. } => url_host_summary(url),
135 ProjectIntegrationConfigInput::Slack { webhook_url } => url_host_summary(webhook_url),
136 ProjectIntegrationConfigInput::Smtp { host, port, recipients, .. } => {
137 format!("{host}:{port} -> {}", recipients.join(", "))
138 }
139 }
140}
141
142pub fn spawn_integration_delivery_task(
143 store: Store,
144 events: nyx_agent_types::event::EventSink,
145) -> tokio::task::JoinHandle<()> {
146 tokio::spawn(async move {
147 let dispatcher = IntegrationDispatcher::new();
148 let mut rx = events.subscribe();
149 loop {
150 let ev = match rx.recv().await {
151 Ok(ev) => ev,
152 Err(RecvError::Lagged(skipped)) => {
153 tracing::warn!(skipped, "integration delivery task lagged");
154 continue;
155 }
156 Err(RecvError::Closed) => break,
157 };
158 if let Err(err) = dispatcher.handle_event(&store, ev).await {
159 tracing::warn!(error = %err, "integration delivery failed");
160 }
161 }
162 })
163}
164
165#[derive(Clone)]
166pub struct IntegrationDispatcher {
167 http: reqwest::Client,
168}
169
170impl IntegrationDispatcher {
171 pub fn new() -> Self {
172 Self { http: reqwest::Client::new() }
173 }
174
175 pub async fn send_test(
176 &self,
177 store: &Store,
178 integration: &ProjectIntegrationStoredRecord,
179 ) -> Result<(), String> {
180 let project = store
181 .projects()
182 .get(&integration.public.project_id)
183 .await
184 .map_err(|err| err.to_string())?
185 .ok_or_else(|| format!("project `{}` not found", integration.public.project_id))?;
186 let payload = IntegrationDeliveryPayload {
187 event: "test".to_string(),
188 project_id: project.id,
189 project_name: project.name,
190 run_id: None,
191 finding_id: None,
192 title: "Nyx Agent test notification".to_string(),
193 summary: "This is a test delivery from the project integrations page.".to_string(),
194 severity: None,
195 status: Some("Test".to_string()),
196 url: None,
197 vulnerabilities: Vec::new(),
198 counts: None,
199 sent_at_ms: now_epoch_ms(),
200 };
201 self.deliver(integration, &payload).await.map_err(|err| err.to_string())
202 }
203
204 async fn handle_event(&self, store: &Store, ev: AgentEvent) -> Result<(), String> {
205 match ev {
206 AgentEvent::Run {
207 data:
208 RunEvent::RunFinished {
209 run_id, project_id, succeeded, inconclusive, failed, ..
210 },
211 } => {
212 let project = store
213 .projects()
214 .get(&project_id)
215 .await
216 .map_err(|err| err.to_string())?
217 .ok_or_else(|| format!("project `{project_id}` not found"))?;
218 let vulnerabilities = store
219 .verified_vulnerabilities()
220 .list_by_run(&run_id)
221 .await
222 .map_err(|err| err.to_string())?;
223 let top = vulnerabilities
224 .iter()
225 .take(5)
226 .map(|v| IntegrationVulnerabilitySummary {
227 id: v.id.clone(),
228 title: v.title.clone(),
229 severity: v.severity.clone(),
230 status: v.status.clone(),
231 vuln_class: v.vuln_class.clone(),
232 })
233 .collect::<Vec<_>>();
234 let severity = vulnerabilities
235 .iter()
236 .map(|v| v.severity.as_str())
237 .max_by_key(|severity| severity_rank(severity).unwrap_or(0));
238 let title = if vulnerabilities.is_empty() {
239 format!("Nyx Agent run {run_id} finished")
240 } else {
241 format!(
242 "Nyx Agent run {run_id} found {} verified issue(s)",
243 vulnerabilities.len()
244 )
245 };
246 let payload = IntegrationDeliveryPayload {
247 event: ProjectIntegrationEvent::RunFinished.as_str().to_string(),
248 project_id: project.id,
249 project_name: project.name,
250 run_id: Some(run_id),
251 finding_id: None,
252 title,
253 summary: format!(
254 "Run finished with {succeeded} succeeded, {inconclusive} inconclusive, {failed} failed repo(s)."
255 ),
256 severity: severity.map(str::to_string),
257 status: Some(if failed > 0 { "Failed" } else { "Finished" }.to_string()),
258 url: None,
259 vulnerabilities: top,
260 counts: Some(IntegrationRunCounts {
261 succeeded,
262 inconclusive,
263 failed,
264 verified_vulnerabilities: vulnerabilities.len(),
265 }),
266 sent_at_ms: now_epoch_ms(),
267 };
268 self.deliver_project_event(
269 store,
270 &project_id,
271 ProjectIntegrationEvent::RunFinished,
272 payload,
273 )
274 .await
275 }
276 AgentEvent::Sandbox {
277 data: SandboxEvent::VerifierFinished { run_id, finding_id, verdict, .. },
278 } if verdict == "Confirmed" => {
279 let Some(run) = store.runs().get(&run_id).await.map_err(|err| err.to_string())?
280 else {
281 return Ok(());
282 };
283 let project_id = run.project_id.unwrap_or_else(|| "default-project".to_string());
284 let project = store
285 .projects()
286 .get(&project_id)
287 .await
288 .map_err(|err| err.to_string())?
289 .ok_or_else(|| format!("project `{project_id}` not found"))?;
290 let Some(finding) =
291 store.findings().get(&finding_id).await.map_err(|err| err.to_string())?
292 else {
293 return Ok(());
294 };
295 let payload = finding_payload(&project.id, &project.name, &finding);
296 self.deliver_project_event(
297 store,
298 &project.id,
299 ProjectIntegrationEvent::FindingVerified,
300 payload,
301 )
302 .await
303 }
304 _ => Ok(()),
305 }
306 }
307
308 async fn deliver_project_event(
309 &self,
310 store: &Store,
311 project_id: &str,
312 event: ProjectIntegrationEvent,
313 payload: IntegrationDeliveryPayload,
314 ) -> Result<(), String> {
315 let rows = store
316 .integrations()
317 .list_enabled_by_project(project_id)
318 .await
319 .map_err(|err| err.to_string())?;
320 for row in rows {
321 if !row.public.events.contains(&event) || !passes_severity(&row.public, &payload) {
322 continue;
323 }
324 let delivered_at = now_epoch_ms();
325 match self.deliver(&row, &payload).await {
326 Ok(()) => {
327 if let Err(err) = store
328 .integrations()
329 .record_delivery(&row.public.id, delivered_at, "ok", None)
330 .await
331 {
332 tracing::warn!(integration_id = %row.public.id, error = %err, "failed to record integration delivery status");
333 }
334 }
335 Err(err) => {
336 let err_s = err.to_string();
337 if let Err(store_err) = store
338 .integrations()
339 .record_delivery(&row.public.id, delivered_at, "error", Some(&err_s))
340 .await
341 {
342 tracing::warn!(integration_id = %row.public.id, error = %store_err, "failed to record integration delivery error");
343 }
344 tracing::warn!(integration_id = %row.public.id, error = %err_s, "integration delivery failed");
345 }
346 }
347 }
348 Ok(())
349 }
350
351 async fn deliver(
352 &self,
353 row: &ProjectIntegrationStoredRecord,
354 payload: &IntegrationDeliveryPayload,
355 ) -> anyhow::Result<()> {
356 let cfg: ProjectIntegrationConfigInput = serde_json::from_str(&row.config_json)?;
357 match cfg {
358 ProjectIntegrationConfigInput::Webhook { url, signing_secret } => {
359 let body = serde_json::to_vec(payload)?;
360 let mut req = self
361 .http
362 .post(url)
363 .header("content-type", "application/json")
364 .body(body.clone());
365 if let Some(secret) = signing_secret.filter(|s| !s.is_empty()) {
366 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())?;
367 mac.update(&body);
368 let sig = format!("sha256={}", hex::encode(mac.finalize().into_bytes()));
369 req = req.header("X-Nyx-Agent-Signature-256", sig);
370 }
371 let res = req.send().await?;
372 if !res.status().is_success() {
373 anyhow::bail!("webhook returned {}", res.status());
374 }
375 Ok(())
376 }
377 ProjectIntegrationConfigInput::Slack { webhook_url } => {
378 let body = serde_json::json!({ "text": slack_text(payload) });
379 let res = self.http.post(webhook_url).json(&body).send().await?;
380 if !res.status().is_success() {
381 anyhow::bail!("Slack webhook returned {}", res.status());
382 }
383 Ok(())
384 }
385 ProjectIntegrationConfigInput::Smtp {
386 host,
387 port,
388 security,
389 username,
390 password,
391 from,
392 recipients,
393 } => {
394 let mut builder = match security {
395 SmtpSecurity::StartTls => {
396 AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&host)?
397 }
398 SmtpSecurity::None => {
399 AsyncSmtpTransport::<Tokio1Executor>::builder_dangerous(&host)
400 }
401 }
402 .port(port);
403 if let Some(username) = username.filter(|s| !s.trim().is_empty()) {
404 builder = builder
405 .credentials(Credentials::new(username, password.unwrap_or_default()));
406 }
407 let mut email = EmailMessage::builder()
408 .from(parse_mailbox(&from, "from address").map_err(|err| anyhow::anyhow!(err))?)
409 .subject(payload.title.clone());
410 for recipient in recipients {
411 email = email.to(parse_mailbox(&recipient, "recipient")
412 .map_err(|err| anyhow::anyhow!(err))?);
413 }
414 let email = email.body(email_text(payload))?;
415 builder.build().send(email).await?;
416 Ok(())
417 }
418 }
419 }
420}
421
422impl Default for IntegrationDispatcher {
423 fn default() -> Self {
424 Self::new()
425 }
426}
427
428fn finding_payload(
429 project_id: &str,
430 project_name: &str,
431 finding: &FindingRecord,
432) -> IntegrationDeliveryPayload {
433 let title = format!("Confirmed {} in {}", finding.cap, finding.path);
434 IntegrationDeliveryPayload {
435 event: ProjectIntegrationEvent::FindingVerified.as_str().to_string(),
436 project_id: project_id.to_string(),
437 project_name: project_name.to_string(),
438 run_id: Some(finding.run_id.clone()),
439 finding_id: Some(finding.id.clone()),
440 title: title.clone(),
441 summary: format!(
442 "{}:{} matched {} ({})",
443 finding.path,
444 finding.line.map(|n| n.to_string()).unwrap_or_else(|| "?".to_string()),
445 finding.rule,
446 finding.severity
447 ),
448 severity: Some(finding.severity.clone()),
449 status: Some("Confirmed".to_string()),
450 url: None,
451 vulnerabilities: vec![IntegrationVulnerabilitySummary {
452 id: finding.id.clone(),
453 title,
454 severity: finding.severity.clone(),
455 status: "Confirmed".to_string(),
456 vuln_class: finding.cap.clone(),
457 }],
458 counts: None,
459 sent_at_ms: now_epoch_ms(),
460 }
461}
462
463fn passes_severity(
464 integration: &ProjectIntegrationRecord,
465 payload: &IntegrationDeliveryPayload,
466) -> bool {
467 let Some(min) = integration.min_severity.as_deref() else {
468 return true;
469 };
470 let Some(severity) = payload.severity.as_deref() else {
471 return false;
472 };
473 severity_rank(severity).unwrap_or(0) >= severity_rank(min).unwrap_or(0)
474}
475
476fn severity_rank(severity: &str) -> Option<u8> {
477 match severity.to_ascii_lowercase().as_str() {
478 "low" => Some(1),
479 "medium" => Some(2),
480 "high" => Some(3),
481 "critical" => Some(4),
482 _ => None,
483 }
484}
485
486fn slack_text(payload: &IntegrationDeliveryPayload) -> String {
487 let mut text =
488 format!("*{}*\nProject: {}\n{}", payload.title, payload.project_name, payload.summary);
489 if let Some(severity) = &payload.severity {
490 text.push_str(&format!("\nSeverity: {severity}"));
491 }
492 for vuln in &payload.vulnerabilities {
493 text.push_str(&format!("\n- [{}] {} ({})", vuln.severity, vuln.title, vuln.status));
494 }
495 text
496}
497
498fn email_text(payload: &IntegrationDeliveryPayload) -> String {
499 let mut text = format!(
500 "{}\n\nProject: {}\nEvent: {}\n{}\n",
501 payload.title, payload.project_name, payload.event, payload.summary
502 );
503 if let Some(severity) = &payload.severity {
504 text.push_str(&format!("Severity: {severity}\n"));
505 }
506 if let Some(run_id) = &payload.run_id {
507 text.push_str(&format!("Run: {run_id}\n"));
508 }
509 if let Some(finding_id) = &payload.finding_id {
510 text.push_str(&format!("Finding: {finding_id}\n"));
511 }
512 if !payload.vulnerabilities.is_empty() {
513 text.push_str("\nFindings:\n");
514 for vuln in &payload.vulnerabilities {
515 text.push_str(&format!(
516 "- [{}] {} ({}, {})\n",
517 vuln.severity, vuln.title, vuln.vuln_class, vuln.status
518 ));
519 }
520 }
521 text
522}
523
524fn validate_http_url(raw: &str, label: &str) -> Result<(), String> {
525 let url = reqwest::Url::parse(raw).map_err(|err| format!("invalid {label}: {err}"))?;
526 if !matches!(url.scheme(), "http" | "https") {
527 return Err(format!("{label} must use http or https"));
528 }
529 if url.host_str().is_none() {
530 return Err(format!("{label} must include a host"));
531 }
532 Ok(())
533}
534
535fn url_host_summary(raw: &str) -> String {
536 reqwest::Url::parse(raw)
537 .ok()
538 .and_then(|url| url.host_str().map(str::to_string))
539 .unwrap_or_else(|| "configured URL".to_string())
540}
541
542fn parse_mailbox(raw: &str, label: &str) -> Result<Mailbox, String> {
543 raw.parse::<Mailbox>().map_err(|err| format!("invalid {label}: {err}"))
544}