Skip to main content

oris_intake/
source.rs

1//! Intake source definitions and implementations
2
3use crate::{IntakeError, IntakeResult, IntakeSourceConfig};
4use regex_lite::Regex;
5use serde::{Deserialize, Serialize};
6
7/// Supported intake source types
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
9#[serde(rename_all = "lowercase")]
10pub enum IntakeSourceType {
11    /// GitHub Actions webhooks
12    Github,
13    /// GitLab CI webhooks
14    Gitlab,
15    /// Prometheus/AlertManager alerts
16    Prometheus,
17    /// Sentry error tracking
18    Sentry,
19    /// Generic HTTP webhook
20    Http,
21    /// Log file monitoring
22    LogFile,
23}
24
25impl Default for IntakeSourceType {
26    fn default() -> Self {
27        Self::Http
28    }
29}
30
31impl std::fmt::Display for IntakeSourceType {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        match self {
34            IntakeSourceType::Github => write!(f, "github"),
35            IntakeSourceType::Gitlab => write!(f, "gitlab"),
36            IntakeSourceType::Prometheus => write!(f, "prometheus"),
37            IntakeSourceType::Sentry => write!(f, "sentry"),
38            IntakeSourceType::Http => write!(f, "http"),
39            IntakeSourceType::LogFile => write!(f, "logfile"),
40        }
41    }
42}
43
44/// Trait for implementing intake sources
45pub trait IntakeSource: Send + Sync {
46    /// Get the source type
47    fn source_type(&self) -> IntakeSourceType;
48
49    /// Process incoming data and extract potential issues
50    fn process(&self, payload: &[u8]) -> IntakeResult<Vec<IntakeEvent>>;
51
52    /// Validate the incoming data format
53    fn validate(&self, payload: &[u8]) -> IntakeResult<()>;
54}
55
56/// An intake event represents a detected issue from an external source
57#[derive(Clone, Debug, Serialize, Deserialize)]
58pub struct IntakeEvent {
59    /// Unique identifier for this event
60    pub event_id: String,
61
62    /// Source type that generated this event
63    pub source_type: IntakeSourceType,
64
65    /// Original source event ID (if available)
66    pub source_event_id: Option<String>,
67
68    /// Title/description of the detected issue
69    pub title: String,
70
71    /// Detailed description
72    pub description: String,
73
74    /// Severity level
75    pub severity: IssueSeverity,
76
77    /// Extracted signals from this event
78    pub signals: Vec<String>,
79
80    /// Raw payload (for debugging)
81    pub raw_payload: Option<String>,
82
83    /// Timestamp when the event was generated
84    pub timestamp_ms: i64,
85}
86
87/// Issue severity levels
88#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
89#[serde(rename_all = "lowercase")]
90pub enum IssueSeverity {
91    Critical,
92    High,
93    Medium,
94    Low,
95    Info,
96}
97
98impl Default for IssueSeverity {
99    fn default() -> Self {
100        Self::Medium
101    }
102}
103
104impl std::fmt::Display for IssueSeverity {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            IssueSeverity::Critical => write!(f, "critical"),
108            IssueSeverity::High => write!(f, "high"),
109            IssueSeverity::Medium => write!(f, "medium"),
110            IssueSeverity::Low => write!(f, "low"),
111            IssueSeverity::Info => write!(f, "info"),
112        }
113    }
114}
115
116/// GitHub Actions webhook payload
117#[derive(Clone, Debug, Deserialize)]
118pub struct GithubWorkflowEvent {
119    pub action: Option<String>,
120    pub workflow: Option<String>,
121    pub run_id: Option<i64>,
122    pub repository: Option<GithubRepository>,
123    pub workflow_run: Option<GithubWorkflowRun>,
124    pub conclusion: Option<String>,
125}
126
127#[derive(Clone, Debug, Deserialize)]
128pub struct GithubRepository {
129    pub full_name: String,
130    pub html_url: String,
131}
132
133#[derive(Clone, Debug, Deserialize)]
134pub struct GithubWorkflowRun {
135    pub head_branch: String,
136    pub head_sha: String,
137    pub html_url: String,
138    pub logs_url: String,
139    pub artifacts_url: String,
140}
141
142/// GitLab CI webhook payload
143#[derive(Clone, Debug, Deserialize)]
144pub struct GitlabPipelineEvent {
145    pub object_kind: Option<String>,
146    pub object_attributes: Option<GitlabPipelineAttributes>,
147    pub project: Option<GitlabProject>,
148    pub builds: Option<Vec<GitlabBuild>>,
149}
150
151#[derive(Clone, Debug, Deserialize)]
152pub struct GitlabPipelineAttributes {
153    pub id: i64,
154    #[serde(rename = "ref")]
155    pub ref_: String,
156    pub sha: String,
157    pub status: String,
158    pub finished_at: Option<String>,
159}
160
161#[derive(Clone, Debug, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub struct GitlabProject {
164    pub id: i64,
165    pub name: String,
166    pub path_with_namespace: String,
167    pub web_url: String,
168}
169
170#[derive(Clone, Debug, Deserialize)]
171pub struct GitlabBuild {
172    pub id: i64,
173    pub name: String,
174    pub stage: String,
175    pub status: String,
176}
177
178/// Generic HTTP webhook event
179#[derive(Clone, Debug, Deserialize)]
180pub struct HttpWebhookEvent {
181    /// Event type
182    pub event_type: String,
183    /// Event ID
184    pub event_id: String,
185    /// Event timestamp
186    pub timestamp: Option<i64>,
187    /// Payload
188    pub payload: serde_json::Value,
189}
190
191/// Build an intake event from GitHub Actions webhook
192pub fn from_github_workflow(event: GithubWorkflowEvent) -> IntakeResult<IntakeEvent> {
193    let workflow_name = event.workflow.clone().unwrap_or_default();
194    let title = format!(
195        "GitHub Workflow {} - {}",
196        event.action.unwrap_or_default(),
197        workflow_name
198    );
199
200    let conclusion = event.conclusion.as_deref().unwrap_or("unknown");
201    let severity = match conclusion {
202        "failure" => IssueSeverity::High,
203        "cancelled" => IssueSeverity::Medium,
204        "success" => IssueSeverity::Low,
205        _ => IssueSeverity::Info,
206    };
207
208    let mut signals = vec![];
209
210    // Extract signals based on workflow conclusion
211    if let Some(conc) = event.conclusion.as_ref() {
212        signals.push(format!("workflow_conclusion:{}", conc));
213    }
214
215    if let Some(run_id) = event.run_id {
216        signals.push(format!("run_id:{}", run_id));
217    }
218
219    let description = format!(
220        "Workflow '{}' concluded with '{}' for repository '{}'",
221        workflow_name,
222        conclusion,
223        event
224            .repository
225            .as_ref()
226            .map(|r| r.full_name.clone())
227            .unwrap_or_default()
228    );
229
230    Ok(IntakeEvent {
231        event_id: uuid::Uuid::new_v4().to_string(),
232        source_type: IntakeSourceType::Github,
233        source_event_id: event.run_id.map(|id| id.to_string()),
234        title,
235        description,
236        severity,
237        signals,
238        raw_payload: None,
239        timestamp_ms: chrono::Utc::now().timestamp_millis(),
240    })
241}
242
243/// Build an intake event from GitLab CI webhook
244pub fn from_gitlab_pipeline(event: GitlabPipelineEvent) -> IntakeResult<IntakeEvent> {
245    let pipeline = event.object_attributes.as_ref();
246    let project = event.project.as_ref();
247
248    let title = format!(
249        "GitLab Pipeline {} - {}",
250        pipeline.map(|p| p.status.clone()).unwrap_or_default(),
251        pipeline.map(|p| p.ref_.clone()).unwrap_or_default()
252    );
253
254    let status = pipeline.map(|p| p.status.as_str()).unwrap_or("unknown");
255    let severity = match status {
256        "failed" => IssueSeverity::High,
257        "canceled" => IssueSeverity::Medium,
258        "success" => IssueSeverity::Low,
259        _ => IssueSeverity::Info,
260    };
261
262    let mut signals = vec![];
263    signals.push(format!("pipeline_status:{}", status));
264
265    if let Some(p) = pipeline {
266        signals.push(format!("pipeline_id:{}", p.id));
267        signals.push(format!("commit_sha:{}", p.sha));
268    }
269
270    let description = format!(
271        "Pipeline '{}' on branch '{}' for project '{}'",
272        pipeline.map(|p| p.id.to_string()).unwrap_or_default(),
273        pipeline.map(|p| p.ref_.clone()).unwrap_or_default(),
274        project
275            .map(|p| p.path_with_namespace.clone())
276            .unwrap_or_default()
277    );
278
279    Ok(IntakeEvent {
280        event_id: uuid::Uuid::new_v4().to_string(),
281        source_type: IntakeSourceType::Gitlab,
282        source_event_id: pipeline.map(|p| p.id.to_string()),
283        title,
284        description,
285        severity,
286        signals,
287        raw_payload: None,
288        timestamp_ms: chrono::Utc::now().timestamp_millis(),
289    })
290}
291
292/// GitHub check_run event payload (check_run failed / completed)
293#[derive(Clone, Debug, Deserialize)]
294pub struct GithubCheckRunEvent {
295    pub action: Option<String>,
296    pub check_run: Option<GithubCheckRun>,
297    pub repository: Option<GithubRepository>,
298}
299
300/// Details of a single check run
301#[derive(Clone, Debug, Deserialize)]
302pub struct GithubCheckRun {
303    pub id: i64,
304    pub name: String,
305    pub head_sha: String,
306    pub status: String,
307    pub conclusion: Option<String>,
308    pub html_url: Option<String>,
309    pub output: Option<GithubCheckRunOutput>,
310}
311
312/// Check run log output summary
313#[derive(Clone, Debug, Deserialize)]
314pub struct GithubCheckRunOutput {
315    pub title: Option<String>,
316    pub summary: Option<String>,
317}
318
319/// Convert a GitHub check_run event into an intake event.
320pub fn from_github_check_run(event: GithubCheckRunEvent) -> IntakeResult<IntakeEvent> {
321    let check = event.check_run.as_ref();
322    let check_name = check.map(|c| c.name.as_str()).unwrap_or("unknown");
323    let conclusion = check
324        .and_then(|c| c.conclusion.as_deref())
325        .unwrap_or("unknown");
326
327    let severity = match conclusion {
328        "failure" | "timed_out" => IssueSeverity::High,
329        "cancelled" | "action_required" => IssueSeverity::Medium,
330        "success" | "neutral" | "skipped" => IssueSeverity::Low,
331        _ => IssueSeverity::Info,
332    };
333
334    let output_title = check
335        .and_then(|c| c.output.as_ref())
336        .and_then(|o| o.title.as_deref())
337        .unwrap_or("");
338    let output_summary = check
339        .and_then(|c| c.output.as_ref())
340        .and_then(|o| o.summary.as_deref())
341        .unwrap_or("");
342
343    let title = format!("GitHub check_run '{}' {}", check_name, conclusion);
344    let description = format!(
345        "Check '{}' concluded '{}' on commit '{}' for repository '{}'. {}: {}",
346        check_name,
347        conclusion,
348        check.map(|c| c.head_sha.as_str()).unwrap_or(""),
349        event
350            .repository
351            .as_ref()
352            .map(|r| r.full_name.as_str())
353            .unwrap_or(""),
354        output_title,
355        output_summary
356    );
357
358    let mut signals = vec![format!("check_run_conclusion:{}", conclusion)];
359    if let Some(c) = check {
360        signals.push(format!("check_run_name:{}", c.name));
361        signals.push(format!("commit_sha:{}", c.head_sha));
362    }
363    if !output_title.is_empty() {
364        signals.push(format!("output_title:{}", output_title));
365    }
366    if !output_summary.is_empty() {
367        signals.push(format!("output_summary:{}", output_summary));
368    }
369
370    Ok(IntakeEvent {
371        event_id: uuid::Uuid::new_v4().to_string(),
372        source_type: IntakeSourceType::Github,
373        source_event_id: check.map(|c| c.id.to_string()),
374        title,
375        description,
376        severity,
377        signals,
378        raw_payload: None,
379        timestamp_ms: chrono::Utc::now().timestamp_millis(),
380    })
381}
382
383/// GitHub intake source — handles `workflow_run` and `check_run` webhook payloads.
384///
385/// Dispatches on the `X-GitHub-Event` header. Because this source operates over
386/// raw bytes, it accepts an optional `event_type` hint at construction time that
387/// mirrors the `X-GitHub-Event` header value (`"workflow_run"` or `"check_run"`).
388#[derive(Clone, Debug)]
389pub struct GithubIntakeSource {
390    /// The expected GitHub event type (mirrors X-GitHub-Event header).
391    /// Defaults to dispatching by presence of known top-level keys when `None`.
392    pub event_type: Option<String>,
393}
394
395impl GithubIntakeSource {
396    /// Create a new source that processes the given GitHub event type.
397    pub fn new(event_type: impl Into<String>) -> Self {
398        Self {
399            event_type: Some(event_type.into()),
400        }
401    }
402
403    /// Create a source that auto-detects the event type from payload shape.
404    pub fn auto() -> Self {
405        Self { event_type: None }
406    }
407
408    fn dispatch(&self, payload: &[u8]) -> IntakeResult<Vec<IntakeEvent>> {
409        let value: serde_json::Value =
410            serde_json::from_slice(payload).map_err(|e| IntakeError::ParseError(e.to_string()))?;
411
412        let hint = self.event_type.as_deref().unwrap_or_else(|| {
413            // Auto-detect: check_run events have a `check_run` top-level key
414            if value.get("check_run").is_some() {
415                "check_run"
416            } else {
417                "workflow_run"
418            }
419        });
420
421        match hint {
422            "check_run" => {
423                let ev: GithubCheckRunEvent = serde_json::from_value(value)
424                    .map_err(|e| IntakeError::ParseError(e.to_string()))?;
425                from_github_check_run(ev).map(|e| vec![e])
426            }
427            _ => {
428                let ev: GithubWorkflowEvent = serde_json::from_value(value)
429                    .map_err(|e| IntakeError::ParseError(e.to_string()))?;
430                from_github_workflow(ev).map(|e| vec![e])
431            }
432        }
433    }
434}
435
436impl IntakeSource for GithubIntakeSource {
437    fn source_type(&self) -> IntakeSourceType {
438        IntakeSourceType::Github
439    }
440
441    fn process(&self, payload: &[u8]) -> IntakeResult<Vec<IntakeEvent>> {
442        self.dispatch(payload)
443    }
444
445    fn validate(&self, payload: &[u8]) -> IntakeResult<()> {
446        // Require valid UTF-8 JSON
447        let _: serde_json::Value = serde_json::from_slice(payload)
448            .map_err(|e| IntakeError::ParseError(format!("invalid JSON: {}", e)))?;
449        Ok(())
450    }
451}
452
453/// Log-file intake source for structured application or CI logs.
454///
455/// The current implementation scans UTF-8 log lines and emits one `IntakeEvent`
456/// for each line matching common failure patterns such as `error`, `panic`,
457/// `fatal`, or `exception`.
458#[derive(Clone, Debug)]
459pub struct LogFileIntakeSource {
460    patterns: Vec<Regex>,
461}
462
463impl LogFileIntakeSource {
464    /// Create a log intake source with default error-oriented patterns.
465    pub fn new() -> Self {
466        Self {
467            patterns: vec![
468                Regex::new(r"(?i)\berror\b").unwrap(),
469                Regex::new(r"(?i)\bpanic\b").unwrap(),
470                Regex::new(r"(?i)\bfatal\b").unwrap(),
471                Regex::new(r"(?i)\bexception\b").unwrap(),
472                Regex::new(r"(?i)test\s+failed").unwrap(),
473            ],
474        }
475    }
476
477    fn severity_for_line(&self, line: &str) -> IssueSeverity {
478        let lower = line.to_ascii_lowercase();
479        if lower.contains("panic") || lower.contains("fatal") {
480            IssueSeverity::High
481        } else if lower.contains("error") || lower.contains("exception") {
482            IssueSeverity::Medium
483        } else {
484            IssueSeverity::Low
485        }
486    }
487
488    fn matches(&self, line: &str) -> bool {
489        self.patterns.iter().any(|pattern| pattern.is_match(line))
490    }
491}
492
493impl Default for LogFileIntakeSource {
494    fn default() -> Self {
495        Self::new()
496    }
497}
498
499impl IntakeSource for LogFileIntakeSource {
500    fn source_type(&self) -> IntakeSourceType {
501        IntakeSourceType::LogFile
502    }
503
504    fn process(&self, payload: &[u8]) -> IntakeResult<Vec<IntakeEvent>> {
505        let contents = std::str::from_utf8(payload)
506            .map_err(|e| IntakeError::ParseError(format!("invalid UTF-8 log payload: {}", e)))?;
507
508        let events = contents
509            .lines()
510            .enumerate()
511            .filter_map(|(index, line)| {
512                let trimmed = line.trim();
513                if trimmed.is_empty() || !self.matches(trimmed) {
514                    return None;
515                }
516
517                Some(IntakeEvent {
518                    event_id: uuid::Uuid::new_v4().to_string(),
519                    source_type: IntakeSourceType::LogFile,
520                    source_event_id: Some(format!("line:{}", index + 1)),
521                    title: format!("LogFile error at line {}", index + 1),
522                    description: trimmed.to_string(),
523                    severity: self.severity_for_line(trimmed),
524                    signals: vec![format!("log_match:{}", trimmed)],
525                    raw_payload: Some(trimmed.to_string()),
526                    timestamp_ms: chrono::Utc::now().timestamp_millis(),
527                })
528            })
529            .collect();
530
531        Ok(events)
532    }
533
534    fn validate(&self, payload: &[u8]) -> IntakeResult<()> {
535        std::str::from_utf8(payload)
536            .map_err(|e| IntakeError::ParseError(format!("invalid UTF-8 log payload: {}", e)))?;
537        Ok(())
538    }
539}
540
541/// Prometheus/Alertmanager v4 webhook payload.
542///
543/// See https://prometheus.io/docs/alerting/latest/configuration/#webhook_config
544#[derive(Clone, Debug, Deserialize)]
545pub struct AlertmanagerPayload {
546    pub version: Option<String>,
547    pub status: Option<String>,
548    #[serde(rename = "groupLabels")]
549    pub group_labels: Option<std::collections::HashMap<String, String>>,
550    #[serde(rename = "commonLabels")]
551    pub common_labels: Option<std::collections::HashMap<String, String>>,
552    #[serde(rename = "commonAnnotations")]
553    pub common_annotations: Option<std::collections::HashMap<String, String>>,
554    pub alerts: Option<Vec<AlertmanagerAlert>>,
555    #[serde(rename = "externalURL")]
556    pub external_url: Option<String>,
557}
558
559/// Single alert within an Alertmanager payload.
560#[derive(Clone, Debug, Deserialize)]
561pub struct AlertmanagerAlert {
562    pub status: Option<String>,
563    pub labels: Option<std::collections::HashMap<String, String>>,
564    pub annotations: Option<std::collections::HashMap<String, String>>,
565    pub fingerprint: Option<String>,
566}
567
568/// Prometheus/Alertmanager intake source.
569///
570/// Processes Alertmanager webhook v4 payloads and converts firing alerts to
571/// `IntakeEvent` instances. One event is emitted per alert in the payload.
572#[derive(Clone, Debug, Default)]
573pub struct PrometheusIntakeSource;
574
575impl PrometheusIntakeSource {
576    fn alert_severity(labels: &std::collections::HashMap<String, String>) -> IssueSeverity {
577        match labels.get("severity").map(|s| s.as_str()).unwrap_or("") {
578            "critical" | "page" => IssueSeverity::Critical,
579            "warning" | "high" => IssueSeverity::High,
580            "info" | "low" => IssueSeverity::Low,
581            _ => IssueSeverity::Medium,
582        }
583    }
584}
585
586impl IntakeSource for PrometheusIntakeSource {
587    fn source_type(&self) -> IntakeSourceType {
588        IntakeSourceType::Prometheus
589    }
590
591    fn process(&self, payload: &[u8]) -> IntakeResult<Vec<IntakeEvent>> {
592        let alert_payload: AlertmanagerPayload = serde_json::from_slice(payload)
593            .map_err(|e| IntakeError::ParseError(format!("invalid Alertmanager JSON: {}", e)))?;
594
595        let group_status = alert_payload.status.as_deref().unwrap_or("unknown");
596        let alerts = alert_payload.alerts.unwrap_or_default();
597        let empty_map = std::collections::HashMap::new();
598
599        let events = alerts
600            .into_iter()
601            .filter(|a| a.status.as_deref().unwrap_or("") == "firing")
602            .map(|alert| {
603                let labels = alert.labels.as_ref().unwrap_or(&empty_map);
604                let annotations = alert.annotations.as_ref().unwrap_or(&empty_map);
605                let alert_name = labels
606                    .get("alertname")
607                    .map(|s| s.as_str())
608                    .unwrap_or("unknown");
609                let summary = annotations
610                    .get("summary")
611                    .or_else(|| annotations.get("message"))
612                    .map(|s| s.as_str())
613                    .unwrap_or("Prometheus alert fired");
614                let description = annotations
615                    .get("description")
616                    .or_else(|| annotations.get("runbook_url"))
617                    .map(|s| s.as_str())
618                    .unwrap_or(summary);
619
620                let severity = Self::alert_severity(labels);
621
622                let signals: Vec<String> = labels
623                    .iter()
624                    .map(|(k, v)| format!("label:{}:{}", k, v))
625                    .chain(
626                        alert
627                            .fingerprint
628                            .as_ref()
629                            .map(|fp| format!("fingerprint:{}", fp))
630                            .into_iter(),
631                    )
632                    .collect();
633
634                IntakeEvent {
635                    event_id: uuid::Uuid::new_v4().to_string(),
636                    source_type: IntakeSourceType::Prometheus,
637                    source_event_id: alert.fingerprint.clone(),
638                    title: format!("Alert: {} [{}]", alert_name, group_status),
639                    description: description.to_string(),
640                    severity,
641                    signals,
642                    raw_payload: None,
643                    timestamp_ms: chrono::Utc::now().timestamp_millis(),
644                }
645            })
646            .collect();
647
648        Ok(events)
649    }
650
651    fn validate(&self, payload: &[u8]) -> IntakeResult<()> {
652        let _: serde_json::Value = serde_json::from_slice(payload)
653            .map_err(|e| IntakeError::ParseError(format!("invalid JSON: {}", e)))?;
654        Ok(())
655    }
656}
657
658/// Minimal Sentry issue alert webhook payload.
659#[derive(Clone, Debug, Deserialize)]
660pub struct SentryAlertPayload {
661    pub action: Option<String>,
662    pub actor: Option<SentryActor>,
663    pub data: Option<SentryAlertData>,
664}
665
666#[derive(Clone, Debug, Deserialize)]
667pub struct SentryActor {
668    pub name: Option<String>,
669}
670
671#[derive(Clone, Debug, Deserialize)]
672pub struct SentryAlertData {
673    pub issue: Option<SentryIssue>,
674    pub error: Option<SentryError>,
675}
676
677#[derive(Clone, Debug, Deserialize)]
678pub struct SentryIssue {
679    pub id: Option<String>,
680    pub title: Option<String>,
681    pub level: Option<String>,
682    pub project: Option<SentryProject>,
683    pub permalink: Option<String>,
684}
685
686#[derive(Clone, Debug, Deserialize)]
687pub struct SentryProject {
688    pub slug: Option<String>,
689    pub name: Option<String>,
690}
691
692#[derive(Clone, Debug, Deserialize)]
693pub struct SentryError {
694    pub message: Option<String>,
695    pub level: Option<String>,
696}
697
698/// Sentry issue alert intake source.
699///
700/// Handles Sentry issue-alert webhook payloads (action: created/resolved).
701/// Emits one `IntakeEvent` per webhook delivery.
702#[derive(Clone, Debug, Default)]
703pub struct SentryIntakeSource;
704
705impl SentryIntakeSource {
706    fn level_to_severity(level: &str) -> IssueSeverity {
707        match level {
708            "fatal" | "critical" => IssueSeverity::Critical,
709            "error" => IssueSeverity::High,
710            "warning" => IssueSeverity::Medium,
711            "info" | "debug" => IssueSeverity::Low,
712            _ => IssueSeverity::Medium,
713        }
714    }
715}
716
717impl IntakeSource for SentryIntakeSource {
718    fn source_type(&self) -> IntakeSourceType {
719        IntakeSourceType::Sentry
720    }
721
722    fn process(&self, payload: &[u8]) -> IntakeResult<Vec<IntakeEvent>> {
723        let alert: SentryAlertPayload = serde_json::from_slice(payload)
724            .map_err(|e| IntakeError::ParseError(format!("invalid Sentry JSON: {}", e)))?;
725
726        let action = alert.action.as_deref().unwrap_or("unknown");
727        // Only process "created" (new error) and "triggered" (alert rule) actions.
728        if !matches!(action, "created" | "triggered") {
729            return Ok(vec![]);
730        }
731
732        let data = alert.data.as_ref();
733        let issue = data.and_then(|d| d.issue.as_ref());
734        let error_data = data.and_then(|d| d.error.as_ref());
735
736        let (title, level, event_id, description) = if let Some(issue) = issue {
737            let lvl = issue.level.as_deref().unwrap_or("error");
738            let project = issue
739                .project
740                .as_ref()
741                .and_then(|p| p.slug.as_deref())
742                .unwrap_or("unknown");
743            let title = issue.title.as_deref().unwrap_or("Sentry issue").to_string();
744            let id = issue.id.clone();
745            let desc = issue
746                .permalink
747                .as_deref()
748                .map(|url| format!("View issue: {}", url))
749                .unwrap_or_else(|| format!("Sentry [{}] {} in project {}", lvl, title, project));
750            (title, lvl.to_string(), id, desc)
751        } else if let Some(err) = error_data {
752            let lvl = err.level.as_deref().unwrap_or("error");
753            let msg = err.message.as_deref().unwrap_or("Sentry error");
754            (msg.to_string(), lvl.to_string(), None, msg.to_string())
755        } else {
756            return Ok(vec![]);
757        };
758
759        let severity = Self::level_to_severity(&level);
760        let signals = vec![
761            format!("sentry_action:{}", action),
762            format!("sentry_level:{}", level),
763        ];
764
765        Ok(vec![IntakeEvent {
766            event_id: uuid::Uuid::new_v4().to_string(),
767            source_type: IntakeSourceType::Sentry,
768            source_event_id: event_id,
769            title: format!("Sentry: {}", title),
770            description,
771            severity,
772            signals,
773            raw_payload: None,
774            timestamp_ms: chrono::Utc::now().timestamp_millis(),
775        }])
776    }
777
778    fn validate(&self, payload: &[u8]) -> IntakeResult<()> {
779        let _: serde_json::Value = serde_json::from_slice(payload)
780            .map_err(|e| IntakeError::ParseError(format!("invalid JSON: {}", e)))?;
781        Ok(())
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788
789    #[test]
790    fn test_issue_severity_display() {
791        assert_eq!(IssueSeverity::Critical.to_string(), "critical");
792        assert_eq!(IssueSeverity::High.to_string(), "high");
793    }
794
795    #[test]
796    fn test_github_event_conversion() {
797        let event = GithubWorkflowEvent {
798            action: Some("completed".to_string()),
799            workflow: Some("ci.yml".to_string()),
800            run_id: Some(12345),
801            repository: Some(GithubRepository {
802                full_name: "owner/repo".to_string(),
803                html_url: "https://github.com/owner/repo".to_string(),
804            }),
805            workflow_run: Some(GithubWorkflowRun {
806                head_branch: "main".to_string(),
807                head_sha: "abc123".to_string(),
808                html_url: "https://github.com/owner/repo/actions/runs/12345".to_string(),
809                logs_url: "https://api.github.com/owner/repo/actions/runs/12345/logs".to_string(),
810                artifacts_url: "https://api.github.com/owner/repo/actions/runs/12345/artifacts"
811                    .to_string(),
812            }),
813            conclusion: Some("failure".to_string()),
814        };
815
816        let intake = from_github_workflow(event).unwrap();
817        assert_eq!(intake.severity, IssueSeverity::High);
818        assert!(intake.signals.iter().any(|s| s.contains("failure")));
819    }
820
821    #[test]
822    fn test_log_file_intake_source_extracts_matching_lines() {
823        let source = LogFileIntakeSource::new();
824        let payload = b"info startup complete\nERROR database unavailable\npanic: worker crashed\n";
825
826        let events = source.process(payload).expect("log file should parse");
827        assert_eq!(events.len(), 2);
828        assert_eq!(events[0].source_type, IntakeSourceType::LogFile);
829        assert!(events[0].description.contains("ERROR database unavailable"));
830        assert_eq!(events[1].severity, IssueSeverity::High);
831    }
832
833    #[test]
834    fn test_prometheus_and_sentry_stubs_validate_json() {
835        let prometheus = PrometheusIntakeSource;
836        let sentry = SentryIntakeSource;
837
838        assert!(prometheus.validate(br#"{"alerts":[]}"#).is_ok());
839        assert!(sentry.validate(br#"{"event_id":"abc"}"#).is_ok());
840    }
841
842    #[test]
843    fn test_prometheus_intake_source_parses_firing_alert() {
844        let source = PrometheusIntakeSource;
845        let payload = br#"{
846            "version": "4",
847            "status": "firing",
848            "commonLabels": {"severity": "critical"},
849            "commonAnnotations": {"summary": "DB down"},
850            "alerts": [
851                {
852                    "status": "firing",
853                    "labels": {"alertname": "DBDown", "severity": "critical"},
854                    "annotations": {"summary": "Database is unreachable"},
855                    "fingerprint": "abc123"
856                },
857                {
858                    "status": "resolved",
859                    "labels": {"alertname": "DBDown", "severity": "critical"},
860                    "annotations": {"summary": "Database is unreachable"},
861                    "fingerprint": "abc124"
862                }
863            ]
864        }"#;
865        let events = source
866            .process(payload)
867            .expect("should parse alertmanager payload");
868        // Only firing alerts should produce events
869        assert_eq!(events.len(), 1);
870        assert_eq!(events[0].severity, IssueSeverity::Critical);
871        assert!(events[0].title.contains("DBDown"));
872        assert_eq!(events[0].source_type, IntakeSourceType::Prometheus);
873        assert_eq!(events[0].source_event_id, Some("abc123".to_string()));
874    }
875
876    #[test]
877    fn test_prometheus_intake_source_empty_alerts() {
878        let source = PrometheusIntakeSource;
879        let payload = br#"{"version":"4","status":"resolved","alerts":[]}"#;
880        let events = source.process(payload).expect("empty alerts ok");
881        assert!(events.is_empty());
882    }
883
884    #[test]
885    fn test_sentry_intake_source_parses_issue_created() {
886        let source = SentryIntakeSource;
887        let payload = br#"{
888            "action": "created",
889            "data": {
890                "issue": {
891                    "id": "sentry-issue-1",
892                    "title": "ZeroDivisionError",
893                    "level": "error",
894                    "project": {"slug": "my-app", "name": "My App"},
895                    "permalink": "https://sentry.io/org/my-app/issues/1"
896                }
897            }
898        }"#;
899        let events = source.process(payload).expect("sentry parse ok");
900        assert_eq!(events.len(), 1);
901        assert_eq!(events[0].severity, IssueSeverity::High);
902        assert!(events[0].title.contains("ZeroDivisionError"));
903        assert_eq!(events[0].source_type, IntakeSourceType::Sentry);
904    }
905
906    #[test]
907    fn test_sentry_intake_source_resolved_action_skipped() {
908        let source = SentryIntakeSource;
909        let payload = br#"{"action": "resolved", "data": {"issue": {"id": "1", "title": "Err", "level": "error"}}}"#;
910        let events = source.process(payload).expect("resolved parse ok");
911        assert!(events.is_empty());
912    }
913}