use chrono::{DateTime, Utc};
use clap::Args;
use rusqlite::params;
use tracing::{info, warn};
use tga::core::config::Config;
use tga::core::db::Database;
#[derive(Args, Debug)]
#[command(
about = "Ingest production incidents into fact_incidents.",
long_about = "Walk configured incident sources and persist incident records into\n\
`fact_incidents`. Supported sources:\n\n\
jira -- query work_items for SRE bugs and incidents (default; no credentials)\n\
datadog -- walk dora.datadog_dir for .json incident files\n\n\
Both sources can run in the same invocation when --source is omitted.\n\
Rows already present are skipped (idempotent UPSERT semantics).",
after_help = "EXAMPLES:\n\
# Ingest from all configured sources\n\
tga incidents collect\n\n\
# Ingest from Datadog JSON exports only\n\
tga incidents collect --source datadog\n\n\
TIPS:\n\
- Set `dora.datadog_dir` in config.yaml to point at your exported JSON files.\n\
- After ingestion, run `tga dora` to compute MTTR and Change Failure Rate."
)]
pub struct IncidentsCollectArgs {
#[arg(long, value_name = "SOURCE")]
pub source: Option<String>,
}
#[derive(Debug, Default, Clone)]
struct CollectStats {
jira_scanned: usize,
jira_inserted: usize,
datadog_files: usize,
datadog_inserted: usize,
}
pub fn run(config: Config, db: &mut Database, args: IncidentsCollectArgs) -> anyhow::Result<()> {
let mut stats = CollectStats::default();
let restrict = args.source.as_deref();
if matches!(restrict, None | Some("jira")) {
let (scanned, inserted) = ingest_jira_sre(db)?;
stats.jira_scanned = scanned;
stats.jira_inserted = inserted;
}
if matches!(restrict, None | Some("datadog")) {
let (files, inserted) = ingest_datadog(db, &config)?;
stats.datadog_files = files;
stats.datadog_inserted = inserted;
}
println!(
"JIRA SRE: scanned {} work items, inserted {} incidents.",
stats.jira_scanned, stats.jira_inserted,
);
println!(
"Datadog: processed {} files, inserted {} incidents.",
stats.datadog_files, stats.datadog_inserted,
);
Ok(())
}
fn ingest_jira_sre(db: &mut Database) -> anyhow::Result<(usize, usize)> {
let conn = db.connection_mut();
let tx = conn.transaction()?;
let mut scanned = 0usize;
let mut inserted = 0usize;
{
let mut q = tx.prepare(
"SELECT id, status, raw_json FROM work_items \
WHERE source = 'jira' \
AND project = 'SRE' \
AND (item_type = 'Bug' OR item_type = 'Incident')",
)?;
let mut rows = q.query([])?;
let mut insert = tx.prepare(
"INSERT OR REPLACE INTO fact_incidents \
(incident_id, source, detected_at, resolved_at, mttr_hours, severity, \
triggering_deploy, repo, jira_ticket) \
VALUES (?1, 'jira_sre', ?2, ?3, ?4, ?5, NULL, NULL, ?1)",
)?;
while let Some(r) = rows.next()? {
scanned += 1;
let id: String = r.get(0)?;
let _status: Option<String> = r.get(1)?;
let raw_json: Option<String> = r.get(2)?;
let (detected, resolved, severity) = extract_jira_fields(raw_json.as_deref());
let mttr_hours = match (&detected, &resolved) {
(Some(d), Some(r)) => {
Some((r.signed_duration_since(*d).num_seconds() as f64) / 3600.0)
}
_ => None,
};
insert.execute(params![
id,
detected.map(|d| d.to_rfc3339()),
resolved.map(|r| r.to_rfc3339()),
mttr_hours,
severity,
])?;
inserted += 1;
}
}
tx.commit()?;
info!(
scanned,
inserted, "JIRA SRE incident ingestion complete (mttr quick-win path)"
);
Ok((scanned, inserted))
}
fn ingest_datadog(db: &mut Database, config: &Config) -> anyhow::Result<(usize, usize)> {
let Some(dir) = config.dora.as_ref().and_then(|d| d.datadog_dir.as_ref()) else {
return Ok((0, 0));
};
if !dir.exists() {
warn!(
path = %dir.display(),
"dora.datadog_dir does not exist; skipping Datadog ingest"
);
return Ok((0, 0));
}
let mut files = 0usize;
let mut inserted = 0usize;
let conn = db.connection_mut();
let tx = conn.transaction()?;
{
let mut insert = tx.prepare(
"INSERT OR REPLACE INTO fact_incidents \
(incident_id, source, detected_at, resolved_at, mttr_hours, severity, \
triggering_deploy, repo, jira_ticket) \
VALUES (?1, 'datadog', ?2, ?3, ?4, ?5, NULL, NULL, NULL)",
)?;
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
files += 1;
let body = match std::fs::read_to_string(&path) {
Ok(s) => s,
Err(e) => {
warn!(path = %path.display(), error = %e, "datadog file unreadable; skipping");
continue;
}
};
let value = match serde_json::from_str::<serde_json::Value>(&body) {
Ok(v) => v,
Err(e) => {
warn!(path = %path.display(), error = %e, "datadog file is not valid JSON; skipping");
continue;
}
};
let rows = parse_datadog_value(&value);
if rows.is_empty() {
warn!(path = %path.display(), "datadog file did not match any known shape; skipping");
continue;
}
for row in rows {
let mttr_hours = match (&row.detected_at, &row.resolved_at) {
(Some(d), Some(r)) => {
Some((r.signed_duration_since(*d).num_seconds() as f64) / 3600.0)
}
_ => None,
};
insert.execute(params![
row.incident_id,
row.detected_at.map(|t| t.to_rfc3339()),
row.resolved_at.map(|t| t.to_rfc3339()),
mttr_hours,
row.severity,
])?;
inserted += 1;
}
}
}
tx.commit()?;
info!(files, inserted, "Datadog incident ingestion complete");
Ok((files, inserted))
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct DatadogRow {
incident_id: String,
detected_at: Option<DateTime<Utc>>,
resolved_at: Option<DateTime<Utc>>,
severity: Option<String>,
}
fn parse_datadog_value(value: &serde_json::Value) -> Vec<DatadogRow> {
if let Some(arr) = value.get("data").and_then(|v| v.as_array()) {
let mut out = Vec::with_capacity(arr.len());
for entry in arr {
if let Some(row) = parse_datadog_incident_object(entry) {
out.push(row);
}
}
if !out.is_empty() {
return out;
}
}
if let Some(data) = value.get("data") {
if data.is_object() {
if let Some(row) = parse_datadog_incident_object(data) {
return vec![row];
}
}
}
if let Some(row) = parse_datadog_monitor_payload(value) {
return vec![row];
}
Vec::new()
}
fn parse_datadog_incident_object(obj: &serde_json::Value) -> Option<DatadogRow> {
let raw_id = obj.get("id")?;
let id = json_value_as_id(raw_id)?;
let attrs = obj.get("attributes");
let detected_at = attrs
.and_then(|a| a.get("created"))
.and_then(parse_unix_or_iso);
let resolved_at = attrs
.and_then(|a| a.get("resolved"))
.and_then(parse_unix_or_iso);
let severity = attrs
.and_then(|a| a.get("severity"))
.and_then(|v| v.as_str())
.map(str::to_string);
Some(DatadogRow {
incident_id: format!("datadog:{id}"),
detected_at,
resolved_at,
severity,
})
}
fn parse_datadog_monitor_payload(value: &serde_json::Value) -> Option<DatadogRow> {
let id = value.get("id").and_then(json_value_as_id)?;
let downtime = value.get("downtime")?;
let detected_at = downtime.get("start").and_then(parse_unix_or_iso);
let resolved_at = downtime.get("end").and_then(parse_unix_or_iso);
let severity = value
.get("monitor")
.and_then(|m| m.get("priority"))
.and_then(|p| p.as_u64())
.map(|prio| {
format!("P{}", prio.saturating_sub(1))
});
Some(DatadogRow {
incident_id: format!("datadog:{id}"),
detected_at,
resolved_at,
severity,
})
}
fn json_value_as_id(v: &serde_json::Value) -> Option<String> {
if let Some(s) = v.as_str() {
if s.is_empty() {
return None;
}
return Some(s.to_string());
}
if let Some(n) = v.as_i64() {
return Some(n.to_string());
}
v.as_u64().map(|n| n.to_string())
}
fn parse_unix_or_iso(v: &serde_json::Value) -> Option<DateTime<Utc>> {
if let Some(n) = v.as_i64() {
return chrono::DateTime::<Utc>::from_timestamp(n, 0);
}
if let Some(n) = v.as_u64() {
let n = i64::try_from(n).ok()?;
return chrono::DateTime::<Utc>::from_timestamp(n, 0);
}
if let Some(s) = v.as_str() {
if let Ok(n) = s.parse::<i64>() {
return chrono::DateTime::<Utc>::from_timestamp(n, 0);
}
return parse_jira_datetime(s).map(|d| d.with_timezone(&Utc));
}
None
}
fn parse_jira_datetime(s: &str) -> Option<chrono::DateTime<chrono::FixedOffset>> {
if let Ok(d) = chrono::DateTime::parse_from_rfc3339(s) {
return Some(d);
}
chrono::DateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.3f%z").ok()
}
fn extract_jira_fields(
raw_json: Option<&str>,
) -> (Option<DateTime<Utc>>, Option<DateTime<Utc>>, Option<String>) {
let Some(text) = raw_json else {
return (None, None, None);
};
let Ok(v) = serde_json::from_str::<serde_json::Value>(text) else {
return (None, None, None);
};
let fields = v.get("fields");
let parse = |k: &str| -> Option<DateTime<Utc>> {
fields
.and_then(|f| f.get(k))
.and_then(|v| v.as_str())
.and_then(parse_jira_datetime)
.map(|d| d.with_timezone(&Utc))
};
let severity = fields
.and_then(|f| f.get("priority"))
.and_then(|p| p.get("name"))
.and_then(|v| v.as_str())
.map(str::to_string);
(parse("created"), parse("resolutiondate"), severity)
}
#[cfg(test)]
#[path = "incidents_tests.rs"]
mod tests;