Skip to main content

nyx_agent_api/
integrations.rs

1//! Outbound project integration delivery.
2
3use hmac::{Hmac, KeyInit, Mac};
4use lettre::message::Mailbox;
5use lettre::transport::smtp::authentication::Credentials;
6use lettre::{AsyncSmtpTransport, AsyncTransport, Message as EmailMessage, Tokio1Executor};
7use serde::Serialize;
8use sha2::Sha256;
9use tokio::sync::broadcast::error::RecvError;
10
11use nyx_agent_core::store::{
12    FindingRecord, ProjectIntegrationRecord, ProjectIntegrationStoredRecord,
13};
14use nyx_agent_core::{now_epoch_ms, Store};
15use nyx_agent_types::event::{AgentEvent, RunEvent, SandboxEvent};
16use nyx_agent_types::integration::{
17    ProjectIntegrationConfigInput, ProjectIntegrationEvent, ProjectIntegrationKind, SmtpSecurity,
18};
19
20type HmacSha256 = Hmac<Sha256>;
21
22#[derive(Debug, Clone)]
23pub struct PreparedIntegrationConfig {
24    pub kind: ProjectIntegrationKind,
25    pub config_json: String,
26    pub target: String,
27}
28
29#[derive(Debug, Clone, Serialize)]
30pub struct IntegrationDeliveryPayload {
31    pub event: String,
32    pub project_id: String,
33    pub project_name: String,
34    pub run_id: Option<String>,
35    pub finding_id: Option<String>,
36    pub title: String,
37    pub summary: String,
38    pub severity: Option<String>,
39    pub status: Option<String>,
40    pub url: Option<String>,
41    pub vulnerabilities: Vec<IntegrationVulnerabilitySummary>,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub counts: Option<IntegrationRunCounts>,
44    pub sent_at_ms: i64,
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct IntegrationVulnerabilitySummary {
49    pub id: String,
50    pub title: String,
51    pub severity: String,
52    pub status: String,
53    pub vuln_class: String,
54}
55
56#[derive(Debug, Clone, Serialize)]
57pub struct IntegrationRunCounts {
58    pub succeeded: u32,
59    pub inconclusive: u32,
60    pub failed: u32,
61    pub verified_vulnerabilities: usize,
62}
63
64pub fn prepare_config(
65    config: &ProjectIntegrationConfigInput,
66) -> Result<PreparedIntegrationConfig, String> {
67    let kind = integration_kind(config);
68    validate_config(config)?;
69    let target = target_summary(config);
70    let config_json = serde_json::to_string(config)
71        .map_err(|err| format!("serialise integration config: {err}"))?;
72    Ok(PreparedIntegrationConfig { kind, config_json, target })
73}
74
75pub fn integration_kind(config: &ProjectIntegrationConfigInput) -> ProjectIntegrationKind {
76    match config {
77        ProjectIntegrationConfigInput::Webhook { .. } => ProjectIntegrationKind::Webhook,
78        ProjectIntegrationConfigInput::Slack { .. } => ProjectIntegrationKind::Slack,
79        ProjectIntegrationConfigInput::Smtp { .. } => ProjectIntegrationKind::Smtp,
80    }
81}
82
83pub fn validate_config(config: &ProjectIntegrationConfigInput) -> Result<(), String> {
84    match config {
85        ProjectIntegrationConfigInput::Webhook { url, .. } => validate_http_url(url, "webhook URL"),
86        ProjectIntegrationConfigInput::Slack { webhook_url } => {
87            validate_http_url(webhook_url, "Slack webhook URL")?;
88            if !webhook_url.starts_with("https://") {
89                return Err("Slack webhook URL must use https".to_string());
90            }
91            Ok(())
92        }
93        ProjectIntegrationConfigInput::Smtp {
94            host,
95            port,
96            username,
97            password,
98            from,
99            recipients,
100            ..
101        } => {
102            if host.trim().is_empty() {
103                return Err("SMTP host is required".to_string());
104            }
105            if *port == 0 {
106                return Err("SMTP port must be greater than 0".to_string());
107            }
108            if username.as_deref().unwrap_or("").trim().is_empty() && password.is_some() {
109                return Err("SMTP password requires a username".to_string());
110            }
111            parse_mailbox(from, "from address")?;
112            if recipients.is_empty() {
113                return Err("at least one recipient is required".to_string());
114            }
115            for recipient in recipients {
116                parse_mailbox(recipient, "recipient")?;
117            }
118            Ok(())
119        }
120    }
121}
122
123pub fn validate_min_severity(value: Option<&str>) -> Result<(), String> {
124    if let Some(value) = value {
125        if severity_rank(value).is_none() {
126            return Err("minimum severity must be Low, Medium, High, or Critical".to_string());
127        }
128    }
129    Ok(())
130}
131
132pub fn target_summary(config: &ProjectIntegrationConfigInput) -> String {
133    match config {
134        ProjectIntegrationConfigInput::Webhook { url, .. } => url_host_summary(url),
135        ProjectIntegrationConfigInput::Slack { webhook_url } => url_host_summary(webhook_url),
136        ProjectIntegrationConfigInput::Smtp { host, port, recipients, .. } => {
137            format!("{host}:{port} -> {}", recipients.join(", "))
138        }
139    }
140}
141
142pub fn spawn_integration_delivery_task(
143    store: Store,
144    events: nyx_agent_types::event::EventSink,
145) -> tokio::task::JoinHandle<()> {
146    tokio::spawn(async move {
147        let dispatcher = IntegrationDispatcher::new();
148        let mut rx = events.subscribe();
149        loop {
150            let ev = match rx.recv().await {
151                Ok(ev) => ev,
152                Err(RecvError::Lagged(skipped)) => {
153                    tracing::warn!(skipped, "integration delivery task lagged");
154                    continue;
155                }
156                Err(RecvError::Closed) => break,
157            };
158            if let Err(err) = dispatcher.handle_event(&store, ev).await {
159                tracing::warn!(error = %err, "integration delivery failed");
160            }
161        }
162    })
163}
164
165#[derive(Clone)]
166pub struct IntegrationDispatcher {
167    http: reqwest::Client,
168}
169
170impl IntegrationDispatcher {
171    pub fn new() -> Self {
172        Self { http: reqwest::Client::new() }
173    }
174
175    pub async fn send_test(
176        &self,
177        store: &Store,
178        integration: &ProjectIntegrationStoredRecord,
179    ) -> Result<(), String> {
180        let project = store
181            .projects()
182            .get(&integration.public.project_id)
183            .await
184            .map_err(|err| err.to_string())?
185            .ok_or_else(|| format!("project `{}` not found", integration.public.project_id))?;
186        let payload = IntegrationDeliveryPayload {
187            event: "test".to_string(),
188            project_id: project.id,
189            project_name: project.name,
190            run_id: None,
191            finding_id: None,
192            title: "Nyx Agent test notification".to_string(),
193            summary: "This is a test delivery from the project integrations page.".to_string(),
194            severity: None,
195            status: Some("Test".to_string()),
196            url: None,
197            vulnerabilities: Vec::new(),
198            counts: None,
199            sent_at_ms: now_epoch_ms(),
200        };
201        self.deliver(integration, &payload).await.map_err(|err| err.to_string())
202    }
203
204    async fn handle_event(&self, store: &Store, ev: AgentEvent) -> Result<(), String> {
205        match ev {
206            AgentEvent::Run {
207                data:
208                    RunEvent::RunFinished {
209                        run_id, project_id, succeeded, inconclusive, failed, ..
210                    },
211            } => {
212                let project = store
213                    .projects()
214                    .get(&project_id)
215                    .await
216                    .map_err(|err| err.to_string())?
217                    .ok_or_else(|| format!("project `{project_id}` not found"))?;
218                let vulnerabilities = store
219                    .verified_vulnerabilities()
220                    .list_by_run(&run_id)
221                    .await
222                    .map_err(|err| err.to_string())?;
223                let top = vulnerabilities
224                    .iter()
225                    .take(5)
226                    .map(|v| IntegrationVulnerabilitySummary {
227                        id: v.id.clone(),
228                        title: v.title.clone(),
229                        severity: v.severity.clone(),
230                        status: v.status.clone(),
231                        vuln_class: v.vuln_class.clone(),
232                    })
233                    .collect::<Vec<_>>();
234                let severity = vulnerabilities
235                    .iter()
236                    .map(|v| v.severity.as_str())
237                    .max_by_key(|severity| severity_rank(severity).unwrap_or(0));
238                let title = if vulnerabilities.is_empty() {
239                    format!("Nyx Agent run {run_id} finished")
240                } else {
241                    format!(
242                        "Nyx Agent run {run_id} found {} verified issue(s)",
243                        vulnerabilities.len()
244                    )
245                };
246                let payload = IntegrationDeliveryPayload {
247                    event: ProjectIntegrationEvent::RunFinished.as_str().to_string(),
248                    project_id: project.id,
249                    project_name: project.name,
250                    run_id: Some(run_id),
251                    finding_id: None,
252                    title,
253                    summary: format!(
254                        "Run finished with {succeeded} succeeded, {inconclusive} inconclusive, {failed} failed repo(s)."
255                    ),
256                    severity: severity.map(str::to_string),
257                    status: Some(if failed > 0 { "Failed" } else { "Finished" }.to_string()),
258                    url: None,
259                    vulnerabilities: top,
260                    counts: Some(IntegrationRunCounts {
261                        succeeded,
262                        inconclusive,
263                        failed,
264                        verified_vulnerabilities: vulnerabilities.len(),
265                    }),
266                    sent_at_ms: now_epoch_ms(),
267                };
268                self.deliver_project_event(
269                    store,
270                    &project_id,
271                    ProjectIntegrationEvent::RunFinished,
272                    payload,
273                )
274                .await
275            }
276            AgentEvent::Sandbox {
277                data: SandboxEvent::VerifierFinished { run_id, finding_id, verdict, .. },
278            } if verdict == "Confirmed" => {
279                let Some(run) = store.runs().get(&run_id).await.map_err(|err| err.to_string())?
280                else {
281                    return Ok(());
282                };
283                let project_id = run.project_id.unwrap_or_else(|| "default-project".to_string());
284                let project = store
285                    .projects()
286                    .get(&project_id)
287                    .await
288                    .map_err(|err| err.to_string())?
289                    .ok_or_else(|| format!("project `{project_id}` not found"))?;
290                let Some(finding) =
291                    store.findings().get(&finding_id).await.map_err(|err| err.to_string())?
292                else {
293                    return Ok(());
294                };
295                let payload = finding_payload(&project.id, &project.name, &finding);
296                self.deliver_project_event(
297                    store,
298                    &project.id,
299                    ProjectIntegrationEvent::FindingVerified,
300                    payload,
301                )
302                .await
303            }
304            _ => Ok(()),
305        }
306    }
307
308    async fn deliver_project_event(
309        &self,
310        store: &Store,
311        project_id: &str,
312        event: ProjectIntegrationEvent,
313        payload: IntegrationDeliveryPayload,
314    ) -> Result<(), String> {
315        let rows = store
316            .integrations()
317            .list_enabled_by_project(project_id)
318            .await
319            .map_err(|err| err.to_string())?;
320        for row in rows {
321            if !row.public.events.contains(&event) || !passes_severity(&row.public, &payload) {
322                continue;
323            }
324            let delivered_at = now_epoch_ms();
325            match self.deliver(&row, &payload).await {
326                Ok(()) => {
327                    if let Err(err) = store
328                        .integrations()
329                        .record_delivery(&row.public.id, delivered_at, "ok", None)
330                        .await
331                    {
332                        tracing::warn!(integration_id = %row.public.id, error = %err, "failed to record integration delivery status");
333                    }
334                }
335                Err(err) => {
336                    let err_s = err.to_string();
337                    if let Err(store_err) = store
338                        .integrations()
339                        .record_delivery(&row.public.id, delivered_at, "error", Some(&err_s))
340                        .await
341                    {
342                        tracing::warn!(integration_id = %row.public.id, error = %store_err, "failed to record integration delivery error");
343                    }
344                    tracing::warn!(integration_id = %row.public.id, error = %err_s, "integration delivery failed");
345                }
346            }
347        }
348        Ok(())
349    }
350
351    async fn deliver(
352        &self,
353        row: &ProjectIntegrationStoredRecord,
354        payload: &IntegrationDeliveryPayload,
355    ) -> anyhow::Result<()> {
356        let cfg: ProjectIntegrationConfigInput = serde_json::from_str(&row.config_json)?;
357        match cfg {
358            ProjectIntegrationConfigInput::Webhook { url, signing_secret } => {
359                let body = serde_json::to_vec(payload)?;
360                let mut req = self
361                    .http
362                    .post(url)
363                    .header("content-type", "application/json")
364                    .body(body.clone());
365                if let Some(secret) = signing_secret.filter(|s| !s.is_empty()) {
366                    let mut mac = HmacSha256::new_from_slice(secret.as_bytes())?;
367                    mac.update(&body);
368                    let sig = format!("sha256={}", hex::encode(mac.finalize().into_bytes()));
369                    req = req.header("X-Nyx-Agent-Signature-256", sig);
370                }
371                let res = req.send().await?;
372                if !res.status().is_success() {
373                    anyhow::bail!("webhook returned {}", res.status());
374                }
375                Ok(())
376            }
377            ProjectIntegrationConfigInput::Slack { webhook_url } => {
378                let body = serde_json::json!({ "text": slack_text(payload) });
379                let res = self.http.post(webhook_url).json(&body).send().await?;
380                if !res.status().is_success() {
381                    anyhow::bail!("Slack webhook returned {}", res.status());
382                }
383                Ok(())
384            }
385            ProjectIntegrationConfigInput::Smtp {
386                host,
387                port,
388                security,
389                username,
390                password,
391                from,
392                recipients,
393            } => {
394                let mut builder = match security {
395                    SmtpSecurity::StartTls => {
396                        AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&host)?
397                    }
398                    SmtpSecurity::None => {
399                        AsyncSmtpTransport::<Tokio1Executor>::builder_dangerous(&host)
400                    }
401                }
402                .port(port);
403                if let Some(username) = username.filter(|s| !s.trim().is_empty()) {
404                    builder = builder
405                        .credentials(Credentials::new(username, password.unwrap_or_default()));
406                }
407                let mut email = EmailMessage::builder()
408                    .from(parse_mailbox(&from, "from address").map_err(|err| anyhow::anyhow!(err))?)
409                    .subject(payload.title.clone());
410                for recipient in recipients {
411                    email = email.to(parse_mailbox(&recipient, "recipient")
412                        .map_err(|err| anyhow::anyhow!(err))?);
413                }
414                let email = email.body(email_text(payload))?;
415                builder.build().send(email).await?;
416                Ok(())
417            }
418        }
419    }
420}
421
422impl Default for IntegrationDispatcher {
423    fn default() -> Self {
424        Self::new()
425    }
426}
427
428fn finding_payload(
429    project_id: &str,
430    project_name: &str,
431    finding: &FindingRecord,
432) -> IntegrationDeliveryPayload {
433    let title = format!("Confirmed {} in {}", finding.cap, finding.path);
434    IntegrationDeliveryPayload {
435        event: ProjectIntegrationEvent::FindingVerified.as_str().to_string(),
436        project_id: project_id.to_string(),
437        project_name: project_name.to_string(),
438        run_id: Some(finding.run_id.clone()),
439        finding_id: Some(finding.id.clone()),
440        title: title.clone(),
441        summary: format!(
442            "{}:{} matched {} ({})",
443            finding.path,
444            finding.line.map(|n| n.to_string()).unwrap_or_else(|| "?".to_string()),
445            finding.rule,
446            finding.severity
447        ),
448        severity: Some(finding.severity.clone()),
449        status: Some("Confirmed".to_string()),
450        url: None,
451        vulnerabilities: vec![IntegrationVulnerabilitySummary {
452            id: finding.id.clone(),
453            title,
454            severity: finding.severity.clone(),
455            status: "Confirmed".to_string(),
456            vuln_class: finding.cap.clone(),
457        }],
458        counts: None,
459        sent_at_ms: now_epoch_ms(),
460    }
461}
462
463fn passes_severity(
464    integration: &ProjectIntegrationRecord,
465    payload: &IntegrationDeliveryPayload,
466) -> bool {
467    let Some(min) = integration.min_severity.as_deref() else {
468        return true;
469    };
470    let Some(severity) = payload.severity.as_deref() else {
471        return false;
472    };
473    severity_rank(severity).unwrap_or(0) >= severity_rank(min).unwrap_or(0)
474}
475
476fn severity_rank(severity: &str) -> Option<u8> {
477    match severity.to_ascii_lowercase().as_str() {
478        "low" => Some(1),
479        "medium" => Some(2),
480        "high" => Some(3),
481        "critical" => Some(4),
482        _ => None,
483    }
484}
485
486fn slack_text(payload: &IntegrationDeliveryPayload) -> String {
487    let mut text =
488        format!("*{}*\nProject: {}\n{}", payload.title, payload.project_name, payload.summary);
489    if let Some(severity) = &payload.severity {
490        text.push_str(&format!("\nSeverity: {severity}"));
491    }
492    for vuln in &payload.vulnerabilities {
493        text.push_str(&format!("\n- [{}] {} ({})", vuln.severity, vuln.title, vuln.status));
494    }
495    text
496}
497
498fn email_text(payload: &IntegrationDeliveryPayload) -> String {
499    let mut text = format!(
500        "{}\n\nProject: {}\nEvent: {}\n{}\n",
501        payload.title, payload.project_name, payload.event, payload.summary
502    );
503    if let Some(severity) = &payload.severity {
504        text.push_str(&format!("Severity: {severity}\n"));
505    }
506    if let Some(run_id) = &payload.run_id {
507        text.push_str(&format!("Run: {run_id}\n"));
508    }
509    if let Some(finding_id) = &payload.finding_id {
510        text.push_str(&format!("Finding: {finding_id}\n"));
511    }
512    if !payload.vulnerabilities.is_empty() {
513        text.push_str("\nFindings:\n");
514        for vuln in &payload.vulnerabilities {
515            text.push_str(&format!(
516                "- [{}] {} ({}, {})\n",
517                vuln.severity, vuln.title, vuln.vuln_class, vuln.status
518            ));
519        }
520    }
521    text
522}
523
524fn validate_http_url(raw: &str, label: &str) -> Result<(), String> {
525    let url = reqwest::Url::parse(raw).map_err(|err| format!("invalid {label}: {err}"))?;
526    if !matches!(url.scheme(), "http" | "https") {
527        return Err(format!("{label} must use http or https"));
528    }
529    if url.host_str().is_none() {
530        return Err(format!("{label} must include a host"));
531    }
532    Ok(())
533}
534
535fn url_host_summary(raw: &str) -> String {
536    reqwest::Url::parse(raw)
537        .ok()
538        .and_then(|url| url.host_str().map(str::to_string))
539        .unwrap_or_else(|| "configured URL".to_string())
540}
541
542fn parse_mailbox(raw: &str, label: &str) -> Result<Mailbox, String> {
543    raw.parse::<Mailbox>().map_err(|err| format!("invalid {label}: {err}"))
544}