use chrono::{DateTime, Utc};
use clap::Args;
use rusqlite::params;
use tracing::{info, warn};
use tga::core::config::Config;
use tga::core::db::Database;
#[derive(Args, Debug)]
pub struct IncidentsCollectArgs {
#[arg(long, value_name = "SOURCE")]
pub source: Option<String>,
}
#[derive(Debug, Default, Clone)]
struct CollectStats {
jira_scanned: usize,
jira_inserted: usize,
datadog_files: usize,
datadog_inserted: usize,
}
pub fn run(config: Config, db: &mut Database, args: IncidentsCollectArgs) -> anyhow::Result<()> {
let mut stats = CollectStats::default();
let restrict = args.source.as_deref();
if matches!(restrict, None | Some("jira")) {
let (scanned, inserted) = ingest_jira_sre(db)?;
stats.jira_scanned = scanned;
stats.jira_inserted = inserted;
}
if matches!(restrict, None | Some("datadog")) {
let (files, inserted) = ingest_datadog(db, &config)?;
stats.datadog_files = files;
stats.datadog_inserted = inserted;
}
println!(
"JIRA SRE: scanned {} work items, inserted {} incidents.",
stats.jira_scanned, stats.jira_inserted,
);
println!(
"Datadog: processed {} files, inserted {} incidents.",
stats.datadog_files, stats.datadog_inserted,
);
Ok(())
}
fn ingest_jira_sre(db: &mut Database) -> anyhow::Result<(usize, usize)> {
let conn = db.connection_mut();
let tx = conn.transaction()?;
let mut scanned = 0usize;
let mut inserted = 0usize;
{
let mut q = tx.prepare(
"SELECT id, status, raw_json FROM work_items \
WHERE source = 'jira' \
AND project = 'SRE' \
AND (item_type = 'Bug' OR item_type = 'Incident')",
)?;
let mut rows = q.query([])?;
let mut insert = tx.prepare(
"INSERT OR REPLACE INTO fact_incidents \
(incident_id, source, detected_at, resolved_at, mttr_hours, severity, \
triggering_deploy, repo, jira_ticket) \
VALUES (?1, 'jira_sre', ?2, ?3, ?4, ?5, NULL, NULL, ?1)",
)?;
while let Some(r) = rows.next()? {
scanned += 1;
let id: String = r.get(0)?;
let _status: Option<String> = r.get(1)?;
let raw_json: Option<String> = r.get(2)?;
let (detected, resolved, severity) = extract_jira_fields(raw_json.as_deref());
let mttr_hours = match (&detected, &resolved) {
(Some(d), Some(r)) => {
Some((r.signed_duration_since(*d).num_seconds() as f64) / 3600.0)
}
_ => None,
};
insert.execute(params![
id,
detected.map(|d| d.to_rfc3339()),
resolved.map(|r| r.to_rfc3339()),
mttr_hours,
severity,
])?;
inserted += 1;
}
}
tx.commit()?;
info!(
scanned,
inserted, "JIRA SRE incident ingestion complete (mttr quick-win path)"
);
Ok((scanned, inserted))
}
fn ingest_datadog(db: &mut Database, config: &Config) -> anyhow::Result<(usize, usize)> {
let Some(dir) = config.dora.as_ref().and_then(|d| d.datadog_dir.as_ref()) else {
return Ok((0, 0));
};
if !dir.exists() {
warn!(
path = %dir.display(),
"dora.datadog_dir does not exist; skipping Datadog ingest"
);
return Ok((0, 0));
}
let mut files = 0usize;
let mut inserted = 0usize;
let conn = db.connection_mut();
let tx = conn.transaction()?;
{
let mut insert = tx.prepare(
"INSERT OR REPLACE INTO fact_incidents \
(incident_id, source, detected_at, resolved_at, mttr_hours, severity, \
triggering_deploy, repo, jira_ticket) \
VALUES (?1, 'datadog', ?2, ?3, ?4, ?5, NULL, NULL, NULL)",
)?;
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
files += 1;
let body = match std::fs::read_to_string(&path) {
Ok(s) => s,
Err(e) => {
warn!(path = %path.display(), error = %e, "datadog file unreadable; skipping");
continue;
}
};
let value = match serde_json::from_str::<serde_json::Value>(&body) {
Ok(v) => v,
Err(e) => {
warn!(path = %path.display(), error = %e, "datadog file is not valid JSON; skipping");
continue;
}
};
let rows = parse_datadog_value(&value);
if rows.is_empty() {
warn!(path = %path.display(), "datadog file did not match any known shape; skipping");
continue;
}
for row in rows {
let mttr_hours = match (&row.detected_at, &row.resolved_at) {
(Some(d), Some(r)) => {
Some((r.signed_duration_since(*d).num_seconds() as f64) / 3600.0)
}
_ => None,
};
insert.execute(params![
row.incident_id,
row.detected_at.map(|t| t.to_rfc3339()),
row.resolved_at.map(|t| t.to_rfc3339()),
mttr_hours,
row.severity,
])?;
inserted += 1;
}
}
}
tx.commit()?;
info!(files, inserted, "Datadog incident ingestion complete");
Ok((files, inserted))
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct DatadogRow {
incident_id: String,
detected_at: Option<DateTime<Utc>>,
resolved_at: Option<DateTime<Utc>>,
severity: Option<String>,
}
fn parse_datadog_value(value: &serde_json::Value) -> Vec<DatadogRow> {
if let Some(arr) = value.get("data").and_then(|v| v.as_array()) {
let mut out = Vec::with_capacity(arr.len());
for entry in arr {
if let Some(row) = parse_datadog_incident_object(entry) {
out.push(row);
}
}
if !out.is_empty() {
return out;
}
}
if let Some(data) = value.get("data") {
if data.is_object() {
if let Some(row) = parse_datadog_incident_object(data) {
return vec![row];
}
}
}
if let Some(row) = parse_datadog_monitor_payload(value) {
return vec![row];
}
Vec::new()
}
fn parse_datadog_incident_object(obj: &serde_json::Value) -> Option<DatadogRow> {
let raw_id = obj.get("id")?;
let id = json_value_as_id(raw_id)?;
let attrs = obj.get("attributes");
let detected_at = attrs
.and_then(|a| a.get("created"))
.and_then(parse_unix_or_iso);
let resolved_at = attrs
.and_then(|a| a.get("resolved"))
.and_then(parse_unix_or_iso);
let severity = attrs
.and_then(|a| a.get("severity"))
.and_then(|v| v.as_str())
.map(str::to_string);
Some(DatadogRow {
incident_id: format!("datadog:{id}"),
detected_at,
resolved_at,
severity,
})
}
fn parse_datadog_monitor_payload(value: &serde_json::Value) -> Option<DatadogRow> {
let id = value.get("id").and_then(json_value_as_id)?;
let downtime = value.get("downtime")?;
let detected_at = downtime.get("start").and_then(parse_unix_or_iso);
let resolved_at = downtime.get("end").and_then(parse_unix_or_iso);
let severity = value
.get("monitor")
.and_then(|m| m.get("priority"))
.and_then(|p| p.as_u64())
.map(|prio| {
format!("P{}", prio.saturating_sub(1))
});
Some(DatadogRow {
incident_id: format!("datadog:{id}"),
detected_at,
resolved_at,
severity,
})
}
fn json_value_as_id(v: &serde_json::Value) -> Option<String> {
if let Some(s) = v.as_str() {
if s.is_empty() {
return None;
}
return Some(s.to_string());
}
if let Some(n) = v.as_i64() {
return Some(n.to_string());
}
v.as_u64().map(|n| n.to_string())
}
fn parse_unix_or_iso(v: &serde_json::Value) -> Option<DateTime<Utc>> {
if let Some(n) = v.as_i64() {
return chrono::DateTime::<Utc>::from_timestamp(n, 0);
}
if let Some(n) = v.as_u64() {
let n = i64::try_from(n).ok()?;
return chrono::DateTime::<Utc>::from_timestamp(n, 0);
}
if let Some(s) = v.as_str() {
if let Ok(n) = s.parse::<i64>() {
return chrono::DateTime::<Utc>::from_timestamp(n, 0);
}
return parse_jira_datetime(s).map(|d| d.with_timezone(&Utc));
}
None
}
fn parse_jira_datetime(s: &str) -> Option<chrono::DateTime<chrono::FixedOffset>> {
if let Ok(d) = chrono::DateTime::parse_from_rfc3339(s) {
return Some(d);
}
chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.3f%z").ok()
}
fn extract_jira_fields(
raw_json: Option<&str>,
) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>, Option<String>) {
let Some(text) = raw_json else {
return (None, None, None);
};
let Ok(v) = serde_json::from_str::<serde_json::Value>(text) else {
return (None, None, None);
};
let fields = v.get("fields");
let parse = |k: &str| -> Option<DateTime<Utc>> {
fields
.and_then(|f| f.get(k))
.and_then(|v| v.as_str())
.and_then(parse_jira_datetime)
.map(|d| d.with_timezone(&Utc))
};
let severity = fields
.and_then(|f| f.get("priority"))
.and_then(|p| p.get("name"))
.and_then(|v| v.as_str())
.map(str::to_string);
(parse("created"), parse("resolutiondate"), severity)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_jira_fields_handles_full_payload() {
let json = r#"{
"fields": {
"created": "2025-01-01T00:00:00.000+0000",
"resolutiondate": "2025-01-01T02:00:00.000+0000",
"priority": { "name": "High" }
}
}"#;
let (d, r, sev) = extract_jira_fields(Some(json));
assert!(d.is_some());
assert!(r.is_some());
assert_eq!(sev.as_deref(), Some("High"));
let mttr = (r.unwrap().signed_duration_since(d.unwrap()).num_seconds() as f64) / 3600.0;
assert!((mttr - 2.0).abs() < 1e-6);
}
#[test]
fn extract_jira_fields_handles_empty_payload() {
let (d, r, sev) = extract_jira_fields(None);
assert!(d.is_none() && r.is_none() && sev.is_none());
let (d, r, sev) = extract_jira_fields(Some("{}"));
assert!(d.is_none() && r.is_none() && sev.is_none());
}
#[test]
fn ingest_jira_sre_with_empty_db_inserts_nothing() {
let mut db = Database::open_in_memory().expect("db");
let (scanned, inserted) = ingest_jira_sre(&mut db).expect("ingest");
assert_eq!(scanned, 0);
assert_eq!(inserted, 0);
}
use tga::core::config::DoraConfig;
fn unique_tmp_dir(label: &str) -> std::path::PathBuf {
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let dir = std::env::temp_dir().join(format!("tga-datadog-{label}-{pid}-{nanos}"));
std::fs::create_dir_all(&dir).expect("create temp dir");
dir
}
#[test]
fn parse_unix_or_iso_handles_epoch_integer() {
let v = serde_json::json!(1_700_000_000_i64);
let dt = parse_unix_or_iso(&v).expect("parses epoch");
assert_eq!(dt.timestamp(), 1_700_000_000);
}
#[test]
fn parse_unix_or_iso_handles_iso_string() {
let v = serde_json::json!("2025-01-01T02:00:00Z");
let dt = parse_unix_or_iso(&v).expect("parses iso");
assert_eq!(dt.to_rfc3339(), "2025-01-01T02:00:00+00:00");
}
#[test]
fn parse_unix_or_iso_handles_stringified_epoch() {
let v = serde_json::json!("1700000000");
let dt = parse_unix_or_iso(&v).expect("parses stringified epoch");
assert_eq!(dt.timestamp(), 1_700_000_000);
}
#[test]
fn parse_unix_or_iso_returns_none_for_unparseable_inputs() {
assert!(parse_unix_or_iso(&serde_json::json!(null)).is_none());
assert!(parse_unix_or_iso(&serde_json::json!("not-a-date")).is_none());
assert!(parse_unix_or_iso(&serde_json::json!(true)).is_none());
}
#[test]
fn ingest_datadog_skips_missing_dir() {
let mut db = Database::open_in_memory().expect("db");
let config = Config {
dora: Some(DoraConfig {
datadog_dir: Some(std::path::PathBuf::from(
"/definitely/does/not/exist/dd-xyz-zzz",
)),
..DoraConfig::default()
}),
..Config::default()
};
let (files, inserted) = ingest_datadog(&mut db, &config).expect("ingest");
assert_eq!(files, 0);
assert_eq!(inserted, 0);
}
#[test]
fn ingest_datadog_skips_unset_dir() {
let mut db = Database::open_in_memory().expect("db");
let config = Config::default();
let (files, inserted) = ingest_datadog(&mut db, &config).expect("ingest");
assert_eq!(files, 0);
assert_eq!(inserted, 0);
}
#[test]
fn ingest_datadog_parses_incident_api_shape() {
let dir = unique_tmp_dir("incident-shape");
let file = dir.join("incident-001.json");
std::fs::write(
&file,
r#"{
"data": {
"id": "abc-123",
"attributes": {
"created": "2025-01-01T00:00:00Z",
"resolved": "2025-01-01T02:00:00Z",
"severity": "SEV-1"
}
}
}"#,
)
.expect("write file");
let mut db = Database::open_in_memory().expect("db");
let config = Config {
dora: Some(DoraConfig {
datadog_dir: Some(dir.clone()),
..DoraConfig::default()
}),
..Config::default()
};
let (files, inserted) = ingest_datadog(&mut db, &config).expect("ingest");
assert_eq!(files, 1);
assert_eq!(inserted, 1);
let conn = db.connection();
let (id, severity, mttr): (String, Option<String>, Option<f64>) = conn
.query_row(
"SELECT incident_id, severity, mttr_hours FROM fact_incidents",
[],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.expect("row");
assert_eq!(id, "datadog:abc-123");
assert_eq!(severity.as_deref(), Some("SEV-1"));
assert!((mttr.expect("mttr") - 2.0).abs() < 1e-6);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn ingest_datadog_parses_list_shape() {
let dir = unique_tmp_dir("list-shape");
let file = dir.join("incidents-bulk.json");
std::fs::write(
&file,
r#"{
"data": [
{
"id": "i-001",
"attributes": {
"created": "2025-01-01T00:00:00Z",
"resolved": "2025-01-01T01:00:00Z",
"severity": "SEV-2"
}
},
{
"id": "i-002",
"attributes": {
"created": "2025-01-02T00:00:00Z",
"severity": "SEV-3"
}
}
]
}"#,
)
.expect("write file");
let mut db = Database::open_in_memory().expect("db");
let config = Config {
dora: Some(DoraConfig {
datadog_dir: Some(dir.clone()),
..DoraConfig::default()
}),
..Config::default()
};
let (files, inserted) = ingest_datadog(&mut db, &config).expect("ingest");
assert_eq!(files, 1);
assert_eq!(inserted, 2);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn ingest_datadog_parses_monitor_shape() {
let dir = unique_tmp_dir("monitor-shape");
let file = dir.join("monitor-trip.json");
std::fs::write(
&file,
r#"{
"id": 42,
"downtime": { "start": 1700000000, "end": 1700003600 },
"monitor": { "name": "API error rate", "priority": 3 }
}"#,
)
.expect("write file");
let mut db = Database::open_in_memory().expect("db");
let config = Config {
dora: Some(DoraConfig {
datadog_dir: Some(dir.clone()),
..DoraConfig::default()
}),
..Config::default()
};
let (files, inserted) = ingest_datadog(&mut db, &config).expect("ingest");
assert_eq!(files, 1);
assert_eq!(inserted, 1);
let conn = db.connection();
let (id, severity, mttr): (String, Option<String>, Option<f64>) = conn
.query_row(
"SELECT incident_id, severity, mttr_hours FROM fact_incidents",
[],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.expect("row");
assert_eq!(id, "datadog:42");
assert_eq!(severity.as_deref(), Some("P2"));
assert!((mttr.expect("mttr") - 1.0).abs() < 1e-6);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn ingest_datadog_skips_unparseable_files() {
let dir = unique_tmp_dir("bad-files");
std::fs::write(dir.join("garbage.json"), "this is not json at all").expect("write garbage");
std::fs::write(
dir.join("good.json"),
r#"{
"data": {
"id": "ok-1",
"attributes": {
"created": "2025-03-01T00:00:00Z",
"resolved": "2025-03-01T00:30:00Z",
"severity": "SEV-2"
}
}
}"#,
)
.expect("write good");
let mut db = Database::open_in_memory().expect("db");
let config = Config {
dora: Some(DoraConfig {
datadog_dir: Some(dir.clone()),
..DoraConfig::default()
}),
..Config::default()
};
let (files, inserted) = ingest_datadog(&mut db, &config).expect("ingest");
assert_eq!(files, 2);
assert_eq!(inserted, 1);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn ingest_datadog_replaces_on_reingest() {
let dir = unique_tmp_dir("idempotent");
let path = dir.join("incident.json");
let payload = r#"{
"data": {
"id": "dup-1",
"attributes": {
"created": "2025-01-01T00:00:00Z",
"resolved": "2025-01-01T01:00:00Z",
"severity": "SEV-3"
}
}
}"#;
std::fs::write(&path, payload).expect("write");
let mut db = Database::open_in_memory().expect("db");
let config = Config {
dora: Some(DoraConfig {
datadog_dir: Some(dir.clone()),
..DoraConfig::default()
}),
..Config::default()
};
let _ = ingest_datadog(&mut db, &config).expect("ingest 1");
let _ = ingest_datadog(&mut db, &config).expect("ingest 2");
let n: i64 = db
.connection()
.query_row("SELECT COUNT(*) FROM fact_incidents", [], |r| r.get(0))
.expect("count");
assert_eq!(n, 1, "INSERT OR REPLACE must dedupe on incident_id PK");
let _ = std::fs::remove_dir_all(&dir);
}
}