1use chrono::{DateTime, Utc};
4use serde::Serialize;
5use serde_json::Value;
6
7use crate::query::EventSource;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
11#[serde(rename_all = "snake_case")]
12pub enum TimelineEventKind {
13 ProcessExec,
14 ProcessExit,
15 ProcessKprobe,
16 NetworkFlow,
17 GuardDecision,
18 ScanResult,
19}
20
21impl std::fmt::Display for TimelineEventKind {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 match self {
24 Self::ProcessExec => write!(f, "process_exec"),
25 Self::ProcessExit => write!(f, "process_exit"),
26 Self::ProcessKprobe => write!(f, "process_kprobe"),
27 Self::NetworkFlow => write!(f, "network_flow"),
28 Self::GuardDecision => write!(f, "guard_decision"),
29 Self::ScanResult => write!(f, "scan_result"),
30 }
31 }
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
36#[serde(rename_all = "snake_case")]
37pub enum NormalizedVerdict {
38 Allow,
39 Deny,
40 Warn,
41 None,
42 Forwarded,
43 Dropped,
44}
45
46impl std::fmt::Display for NormalizedVerdict {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 Self::Allow => write!(f, "allow"),
50 Self::Deny => write!(f, "deny"),
51 Self::Warn => write!(f, "warn"),
52 Self::None => write!(f, "none"),
53 Self::Forwarded => write!(f, "forwarded"),
54 Self::Dropped => write!(f, "dropped"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize)]
61pub struct TimelineEvent {
62 pub timestamp: DateTime<Utc>,
63 pub source: EventSource,
64 pub kind: TimelineEventKind,
65 pub verdict: NormalizedVerdict,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub severity: Option<String>,
68 pub summary: String,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub process: Option<String>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub namespace: Option<String>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub pod: Option<String>,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 pub action_type: Option<String>,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub signature_valid: Option<bool>,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub raw: Option<Value>,
81}
82
83pub fn parse_envelope(envelope: &Value, verify: bool) -> Option<TimelineEvent> {
88 let fact = envelope.get("fact")?;
89 let schema = fact.get("schema").and_then(|s| s.as_str())?;
90
91 let issued_at = envelope.get("issued_at").and_then(|v| v.as_str())?;
93 let timestamp = DateTime::parse_from_rfc3339(issued_at)
94 .ok()?
95 .with_timezone(&Utc);
96
97 let signature_valid = if verify {
99 Some(spine::verify_envelope(envelope).unwrap_or(false))
100 } else {
101 None
102 };
103
104 match schema {
105 "clawdstrike.sdr.fact.tetragon_event.v1" => {
106 parse_tetragon(fact, timestamp, signature_valid, envelope.clone())
107 }
108 "clawdstrike.sdr.fact.hubble_flow.v1" => {
109 parse_hubble(fact, timestamp, signature_valid, envelope.clone())
110 }
111 s if s.starts_with("clawdstrike.sdr.fact.receipt") => {
112 parse_receipt(fact, timestamp, signature_valid, envelope.clone())
113 }
114 s if s.starts_with("clawdstrike.sdr.fact.scan") => {
115 parse_scan(fact, timestamp, signature_valid, envelope.clone())
116 }
117 _ => None,
118 }
119}
120
121fn parse_tetragon(
123 fact: &Value,
124 timestamp: DateTime<Utc>,
125 sig: Option<bool>,
126 raw: Value,
127) -> Option<TimelineEvent> {
128 let event_type = fact
129 .get("event_type")
130 .and_then(|v| v.as_str())
131 .unwrap_or("unknown");
132 let binary = fact
133 .get("process")
134 .and_then(|p| p.get("binary"))
135 .and_then(|b| b.as_str());
136 let severity = fact
137 .get("severity")
138 .and_then(|s| s.as_str())
139 .map(String::from);
140 let ns = fact
141 .get("process")
142 .and_then(|p| p.get("pod"))
143 .and_then(|p| p.get("namespace"))
144 .and_then(|n| n.as_str())
145 .map(String::from);
146 let pod_name = fact
147 .get("process")
148 .and_then(|p| p.get("pod"))
149 .and_then(|p| p.get("name"))
150 .and_then(|n| n.as_str())
151 .map(String::from);
152
153 let kind = match event_type {
154 "PROCESS_EXEC" => TimelineEventKind::ProcessExec,
155 "PROCESS_EXIT" => TimelineEventKind::ProcessExit,
156 "PROCESS_KPROBE" => TimelineEventKind::ProcessKprobe,
157 _ => TimelineEventKind::ProcessExec,
158 };
159
160 let summary = format!("{} {}", event_type.to_lowercase(), binary.unwrap_or("?"));
161
162 Some(TimelineEvent {
163 timestamp,
164 source: EventSource::Tetragon,
165 kind,
166 verdict: NormalizedVerdict::None,
167 severity,
168 summary,
169 process: binary.map(String::from),
170 namespace: ns,
171 pod: pod_name,
172 action_type: Some("process".to_string()),
173 signature_valid: sig,
174 raw: Some(raw),
175 })
176}
177
178fn parse_hubble(
180 fact: &Value,
181 timestamp: DateTime<Utc>,
182 sig: Option<bool>,
183 raw: Value,
184) -> Option<TimelineEvent> {
185 let verdict_str = fact
186 .get("verdict")
187 .and_then(|v| v.as_str())
188 .unwrap_or("UNKNOWN");
189 let direction = fact
190 .get("traffic_direction")
191 .and_then(|v| v.as_str())
192 .unwrap_or("unknown");
193 let flow_summary = fact
194 .get("summary")
195 .and_then(|v| v.as_str())
196 .unwrap_or("network flow");
197
198 let verdict = match verdict_str {
199 "FORWARDED" => NormalizedVerdict::Forwarded,
200 "DROPPED" => NormalizedVerdict::Dropped,
201 _ => NormalizedVerdict::None,
202 };
203
204 let ns = fact
205 .get("source")
206 .and_then(|s| s.get("namespace"))
207 .and_then(|n| n.as_str())
208 .map(String::from);
209 let pod_name = fact
210 .get("source")
211 .and_then(|s| s.get("pod_name"))
212 .and_then(|n| n.as_str())
213 .map(String::from);
214
215 let summary = format!("{} {}", direction.to_lowercase(), flow_summary);
216
217 Some(TimelineEvent {
218 timestamp,
219 source: EventSource::Hubble,
220 kind: TimelineEventKind::NetworkFlow,
221 verdict,
222 severity: None,
223 summary,
224 process: None,
225 namespace: ns,
226 pod: pod_name,
227 action_type: Some(
228 match direction {
229 "EGRESS" => "egress",
230 "INGRESS" => "ingress",
231 _ => "network",
232 }
233 .to_string(),
234 ),
235 signature_valid: sig,
236 raw: Some(raw),
237 })
238}
239
240fn parse_receipt(
242 fact: &Value,
243 timestamp: DateTime<Utc>,
244 sig: Option<bool>,
245 raw: Value,
246) -> Option<TimelineEvent> {
247 let decision = fact
248 .get("decision")
249 .and_then(|v| v.as_str())
250 .unwrap_or("unknown");
251 let guard_name = fact
252 .get("guard")
253 .and_then(|v| v.as_str())
254 .unwrap_or("unknown");
255 let action = fact
256 .get("action_type")
257 .and_then(|v| v.as_str())
258 .map(String::from);
259 let severity = fact
260 .get("severity")
261 .and_then(|s| s.as_str())
262 .map(String::from);
263 let ns = fact
264 .get("source")
265 .and_then(|s| s.get("namespace"))
266 .and_then(|n| n.as_str())
267 .map(String::from);
268 let pod_name = fact
269 .get("source")
270 .and_then(|s| s.get("pod_name").or_else(|| s.get("pod")))
271 .and_then(|n| n.as_str())
272 .map(String::from);
273
274 let verdict = match decision.to_lowercase().as_str() {
275 "allow" | "allowed" | "pass" | "passed" => NormalizedVerdict::Allow,
276 "deny" | "denied" | "block" | "blocked" => NormalizedVerdict::Deny,
277 "warn" | "warned" | "warning" => NormalizedVerdict::Warn,
278 _ => NormalizedVerdict::None,
279 };
280
281 let summary = format!("{} decision={}", guard_name, decision);
282
283 Some(TimelineEvent {
284 timestamp,
285 source: EventSource::Receipt,
286 kind: TimelineEventKind::GuardDecision,
287 verdict,
288 severity,
289 summary,
290 process: None,
291 namespace: ns,
292 pod: pod_name,
293 action_type: action,
294 signature_valid: sig,
295 raw: Some(raw),
296 })
297}
298
299fn parse_scan(
301 fact: &Value,
302 timestamp: DateTime<Utc>,
303 sig: Option<bool>,
304 raw: Value,
305) -> Option<TimelineEvent> {
306 let scan_type = fact
307 .get("scan_type")
308 .and_then(|v| v.as_str())
309 .unwrap_or("unknown");
310 let status = fact
311 .get("status")
312 .and_then(|v| v.as_str())
313 .unwrap_or("unknown");
314 let severity = fact
315 .get("severity")
316 .and_then(|s| s.as_str())
317 .map(String::from);
318
319 let verdict = match status.to_lowercase().as_str() {
320 "pass" | "passed" | "clean" => NormalizedVerdict::Allow,
321 "fail" | "failed" | "dirty" => NormalizedVerdict::Deny,
322 "warn" | "warning" => NormalizedVerdict::Warn,
323 _ => NormalizedVerdict::None,
324 };
325
326 let summary = format!("scan {} status={}", scan_type, status);
327
328 Some(TimelineEvent {
329 timestamp,
330 source: EventSource::Scan,
331 kind: TimelineEventKind::ScanResult,
332 verdict,
333 severity,
334 summary,
335 process: None,
336 namespace: None,
337 pod: None,
338 action_type: Some("scan".to_string()),
339 signature_valid: sig,
340 raw: Some(raw),
341 })
342}
343
344pub fn merge_timeline(mut events: Vec<TimelineEvent>) -> Vec<TimelineEvent> {
346 events.sort_by_key(|e| e.timestamp);
347 events
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use chrono::TimeZone;
354 use serde_json::json;
355
356 #[test]
357 fn timeline_event_kind_display() {
358 assert_eq!(TimelineEventKind::ProcessExec.to_string(), "process_exec");
359 assert_eq!(TimelineEventKind::ProcessExit.to_string(), "process_exit");
360 assert_eq!(
361 TimelineEventKind::ProcessKprobe.to_string(),
362 "process_kprobe"
363 );
364 assert_eq!(TimelineEventKind::NetworkFlow.to_string(), "network_flow");
365 assert_eq!(
366 TimelineEventKind::GuardDecision.to_string(),
367 "guard_decision"
368 );
369 assert_eq!(TimelineEventKind::ScanResult.to_string(), "scan_result");
370 }
371
372 #[test]
373 fn normalized_verdict_display() {
374 assert_eq!(NormalizedVerdict::Allow.to_string(), "allow");
375 assert_eq!(NormalizedVerdict::Deny.to_string(), "deny");
376 assert_eq!(NormalizedVerdict::Warn.to_string(), "warn");
377 assert_eq!(NormalizedVerdict::None.to_string(), "none");
378 assert_eq!(NormalizedVerdict::Forwarded.to_string(), "forwarded");
379 assert_eq!(NormalizedVerdict::Dropped.to_string(), "dropped");
380 }
381
382 #[test]
383 fn parse_tetragon_envelope() {
384 let envelope = json!({
385 "issued_at": "2025-06-15T12:00:00Z",
386 "fact": {
387 "schema": "clawdstrike.sdr.fact.tetragon_event.v1",
388 "event_type": "PROCESS_EXEC",
389 "process": {
390 "binary": "/usr/bin/curl",
391 "pod": {
392 "namespace": "default",
393 "name": "agent-pod-abc123"
394 }
395 },
396 "severity": "info"
397 }
398 });
399
400 let event = parse_envelope(&envelope, false).unwrap();
401 assert_eq!(event.source, EventSource::Tetragon);
402 assert_eq!(event.kind, TimelineEventKind::ProcessExec);
403 assert_eq!(event.verdict, NormalizedVerdict::None);
404 assert_eq!(event.process.as_deref(), Some("/usr/bin/curl"));
405 assert_eq!(event.namespace.as_deref(), Some("default"));
406 assert_eq!(event.pod.as_deref(), Some("agent-pod-abc123"));
407 assert_eq!(event.severity.as_deref(), Some("info"));
408 assert_eq!(event.summary, "process_exec /usr/bin/curl");
409 assert!(event.signature_valid.is_none());
410 assert_eq!(
411 event
412 .raw
413 .as_ref()
414 .and_then(|v| v.get("fact"))
415 .and_then(|v| v.get("schema"))
416 .and_then(|v| v.as_str()),
417 Some("clawdstrike.sdr.fact.tetragon_event.v1")
418 );
419 }
420
421 #[test]
422 fn parse_tetragon_process_exit() {
423 let envelope = json!({
424 "issued_at": "2025-06-15T12:01:00Z",
425 "fact": {
426 "schema": "clawdstrike.sdr.fact.tetragon_event.v1",
427 "event_type": "PROCESS_EXIT",
428 "process": {
429 "binary": "/usr/bin/ls"
430 }
431 }
432 });
433
434 let event = parse_envelope(&envelope, false).unwrap();
435 assert_eq!(event.kind, TimelineEventKind::ProcessExit);
436 assert_eq!(event.summary, "process_exit /usr/bin/ls");
437 }
438
439 #[test]
440 fn parse_hubble_envelope() {
441 let envelope = json!({
442 "issued_at": "2025-06-15T12:05:00Z",
443 "fact": {
444 "schema": "clawdstrike.sdr.fact.hubble_flow.v1",
445 "verdict": "FORWARDED",
446 "traffic_direction": "EGRESS",
447 "summary": "TCP 10.0.0.1:8080 -> 10.0.0.2:443",
448 "source": {
449 "namespace": "production",
450 "pod_name": "web-server-xyz"
451 }
452 }
453 });
454
455 let event = parse_envelope(&envelope, false).unwrap();
456 assert_eq!(event.source, EventSource::Hubble);
457 assert_eq!(event.kind, TimelineEventKind::NetworkFlow);
458 assert_eq!(event.verdict, NormalizedVerdict::Forwarded);
459 assert_eq!(event.namespace.as_deref(), Some("production"));
460 assert_eq!(event.pod.as_deref(), Some("web-server-xyz"));
461 assert!(event.summary.contains("egress"));
462 }
463
464 #[test]
465 fn parse_hubble_egress_action_type() {
466 let envelope = json!({
467 "issued_at": "2025-06-15T12:05:00Z",
468 "fact": {
469 "schema": "clawdstrike.sdr.fact.hubble_flow.v1",
470 "verdict": "FORWARDED",
471 "traffic_direction": "EGRESS",
472 "summary": "TCP 10.0.0.1:8080 -> 93.184.216.34:443"
473 }
474 });
475
476 let event = parse_envelope(&envelope, false).unwrap();
477 assert_eq!(
478 event.action_type.as_deref(),
479 Some("egress"),
480 "EGRESS traffic_direction should map to action_type 'egress'"
481 );
482 }
483
484 #[test]
485 fn parse_hubble_ingress_action_type() {
486 let envelope = json!({
487 "issued_at": "2025-06-15T12:05:00Z",
488 "fact": {
489 "schema": "clawdstrike.sdr.fact.hubble_flow.v1",
490 "verdict": "FORWARDED",
491 "traffic_direction": "INGRESS",
492 "summary": "TCP 93.184.216.34:443 -> 10.0.0.1:8080"
493 }
494 });
495
496 let event = parse_envelope(&envelope, false).unwrap();
497 assert_eq!(
498 event.action_type.as_deref(),
499 Some("ingress"),
500 "INGRESS traffic_direction should map to action_type 'ingress'"
501 );
502 }
503
504 #[test]
505 fn parse_hubble_unknown_direction_falls_back_to_network() {
506 let envelope = json!({
507 "issued_at": "2025-06-15T12:05:00Z",
508 "fact": {
509 "schema": "clawdstrike.sdr.fact.hubble_flow.v1",
510 "verdict": "FORWARDED",
511 "traffic_direction": "UNKNOWN",
512 "summary": "flow"
513 }
514 });
515
516 let event = parse_envelope(&envelope, false).unwrap();
517 assert_eq!(
518 event.action_type.as_deref(),
519 Some("network"),
520 "unknown traffic_direction should fall back to 'network'"
521 );
522 }
523
524 #[test]
525 fn parse_hubble_dropped() {
526 let envelope = json!({
527 "issued_at": "2025-06-15T12:06:00Z",
528 "fact": {
529 "schema": "clawdstrike.sdr.fact.hubble_flow.v1",
530 "verdict": "DROPPED",
531 "traffic_direction": "INGRESS",
532 "summary": "blocked connection"
533 }
534 });
535
536 let event = parse_envelope(&envelope, false).unwrap();
537 assert_eq!(event.verdict, NormalizedVerdict::Dropped);
538 }
539
540 #[test]
541 fn parse_receipt_envelope() {
542 let envelope = json!({
543 "issued_at": "2025-06-15T12:10:00Z",
544 "fact": {
545 "schema": "clawdstrike.sdr.fact.receipt.v1",
546 "decision": "deny",
547 "guard": "ForbiddenPathGuard",
548 "action_type": "file",
549 "severity": "critical"
550 }
551 });
552
553 let event = parse_envelope(&envelope, false).unwrap();
554 assert_eq!(event.source, EventSource::Receipt);
555 assert_eq!(event.kind, TimelineEventKind::GuardDecision);
556 assert_eq!(event.verdict, NormalizedVerdict::Deny);
557 assert_eq!(event.action_type.as_deref(), Some("file"));
558 assert_eq!(event.severity.as_deref(), Some("critical"));
559 assert!(event.summary.contains("ForbiddenPathGuard"));
560 }
561
562 #[test]
563 fn parse_receipt_envelope_preserves_source_metadata() {
564 let envelope = json!({
565 "issued_at": "2025-06-15T12:10:00Z",
566 "fact": {
567 "schema": "clawdstrike.sdr.fact.receipt.v1",
568 "decision": "deny",
569 "guard": "ForbiddenPathGuard",
570 "action_type": "file",
571 "source": {
572 "namespace": "prod",
573 "pod_name": "agent-worker-1"
574 }
575 }
576 });
577
578 let event = parse_envelope(&envelope, false).unwrap();
579 assert_eq!(event.source, EventSource::Receipt);
580 assert_eq!(event.namespace.as_deref(), Some("prod"));
581 assert_eq!(event.pod.as_deref(), Some("agent-worker-1"));
582 }
583
584 #[test]
585 fn parse_scan_envelope() {
586 let envelope = json!({
587 "issued_at": "2025-06-15T12:15:00Z",
588 "fact": {
589 "schema": "clawdstrike.sdr.fact.scan.v1",
590 "scan_type": "vulnerability",
591 "status": "fail",
592 "severity": "high"
593 }
594 });
595
596 let event = parse_envelope(&envelope, false).unwrap();
597 assert_eq!(event.source, EventSource::Scan);
598 assert_eq!(event.kind, TimelineEventKind::ScanResult);
599 assert_eq!(event.verdict, NormalizedVerdict::Deny);
600 assert_eq!(event.severity.as_deref(), Some("high"));
601 assert!(event.summary.contains("vulnerability"));
602 }
603
604 #[test]
605 fn parse_unknown_schema_returns_none() {
606 let envelope = json!({
607 "issued_at": "2025-06-15T12:00:00Z",
608 "fact": {
609 "schema": "unknown.schema.v1"
610 }
611 });
612
613 assert!(parse_envelope(&envelope, false).is_none());
614 }
615
616 #[test]
617 fn parse_missing_fact_returns_none() {
618 let envelope = json!({
619 "issued_at": "2025-06-15T12:00:00Z"
620 });
621
622 assert!(parse_envelope(&envelope, false).is_none());
623 }
624
625 #[test]
626 fn parse_missing_timestamp_returns_none() {
627 let envelope = json!({
628 "fact": {
629 "schema": "clawdstrike.sdr.fact.tetragon_event.v1",
630 "event_type": "PROCESS_EXEC",
631 "process": { "binary": "/bin/sh" }
632 }
633 });
634
635 assert!(parse_envelope(&envelope, false).is_none());
636 }
637
638 #[test]
639 fn merge_timeline_sorts_by_timestamp() {
640 let events = vec![
641 TimelineEvent {
642 timestamp: Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap(),
643 source: EventSource::Tetragon,
644 kind: TimelineEventKind::ProcessExec,
645 verdict: NormalizedVerdict::None,
646 severity: None,
647 summary: "second".to_string(),
648 process: None,
649 namespace: None,
650 pod: None,
651 action_type: None,
652 signature_valid: None,
653 raw: None,
654 },
655 TimelineEvent {
656 timestamp: Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap(),
657 source: EventSource::Hubble,
658 kind: TimelineEventKind::NetworkFlow,
659 verdict: NormalizedVerdict::Forwarded,
660 severity: None,
661 summary: "first".to_string(),
662 process: None,
663 namespace: None,
664 pod: None,
665 action_type: None,
666 signature_valid: None,
667 raw: None,
668 },
669 TimelineEvent {
670 timestamp: Utc.with_ymd_and_hms(2025, 6, 15, 16, 0, 0).unwrap(),
671 source: EventSource::Receipt,
672 kind: TimelineEventKind::GuardDecision,
673 verdict: NormalizedVerdict::Deny,
674 severity: None,
675 summary: "third".to_string(),
676 process: None,
677 namespace: None,
678 pod: None,
679 action_type: None,
680 signature_valid: None,
681 raw: None,
682 },
683 ];
684
685 let merged = merge_timeline(events);
686 assert_eq!(merged.len(), 3);
687 assert_eq!(merged[0].summary, "first");
688 assert_eq!(merged[1].summary, "second");
689 assert_eq!(merged[2].summary, "third");
690 }
691
692 #[test]
693 fn merge_timeline_empty() {
694 let events = merge_timeline(vec![]);
695 assert!(events.is_empty());
696 }
697
698 #[test]
699 fn timeline_event_serialization_skips_none_fields() {
700 let event = TimelineEvent {
701 timestamp: Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap(),
702 source: EventSource::Tetragon,
703 kind: TimelineEventKind::ProcessExec,
704 verdict: NormalizedVerdict::None,
705 severity: None,
706 summary: "test".to_string(),
707 process: None,
708 namespace: None,
709 pod: None,
710 action_type: None,
711 signature_valid: None,
712 raw: None,
713 };
714
715 let json = serde_json::to_value(&event).unwrap();
716 assert!(!json.as_object().unwrap().contains_key("severity"));
717 assert!(!json.as_object().unwrap().contains_key("process"));
718 assert!(!json.as_object().unwrap().contains_key("namespace"));
719 assert!(!json.as_object().unwrap().contains_key("pod"));
720 assert!(!json.as_object().unwrap().contains_key("action_type"));
721 assert!(!json.as_object().unwrap().contains_key("signature_valid"));
722 assert!(!json.as_object().unwrap().contains_key("raw"));
723 }
724}