use serde_json::Value;
use crate::classes::network_activity::{NetworkActivity, NetworkActivityType};
use crate::objects::metadata::Metadata;
use crate::objects::network_endpoint::{ConnectionInfo, NetworkEndpoint};
use crate::severity::map_severity;
#[must_use]
pub fn hubble_fact_to_network_activity(
fact: &Value,
time_ms: i64,
product_version: &str,
) -> Option<NetworkActivity> {
let verdict = fact.get("verdict").and_then(|v| v.as_str())?;
let activity = match verdict {
"FORWARDED" => NetworkActivityType::Traffic,
"DROPPED" => NetworkActivityType::Refuse,
"ERROR" => NetworkActivityType::Fail,
_ => NetworkActivityType::Other,
};
let severity_str = fact
.get("severity")
.and_then(|s| s.as_str())
.unwrap_or(match verdict {
"DROPPED" => "high",
"ERROR" => "high",
_ => "info",
});
let severity_id = map_severity(severity_str);
let status_id = match verdict {
"FORWARDED" => 1, "DROPPED" | "ERROR" => 2, _ => 0, };
let metadata = Metadata::clawdstrike(product_version);
let src_endpoint = extract_endpoint(fact.get("source"));
let dst_endpoint = extract_endpoint(fact.get("destination"));
let src_ip = fact
.get("ip")
.and_then(|ip| ip.get("source"))
.and_then(|s| s.as_str());
let dst_ip = fact
.get("ip")
.and_then(|ip| ip.get("destination"))
.and_then(|d| d.as_str());
let src_endpoint = merge_ip(src_endpoint, src_ip);
let dst_endpoint = merge_ip(dst_endpoint, dst_ip);
let (connection_info, l4_src_port, l4_dst_port) = extract_connection_info(fact);
let src_endpoint = merge_port(src_endpoint, l4_src_port);
let dst_endpoint = merge_port(dst_endpoint, l4_dst_port);
let direction = fact
.get("traffic_direction")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let summary = fact
.get("summary")
.and_then(|v| v.as_str())
.unwrap_or("network flow");
let action_id = match verdict {
"FORWARDED" => Some(1u8), "DROPPED" => Some(2), _ => None,
};
let disposition_id = match verdict {
"FORWARDED" => Some(1u8), "DROPPED" => Some(2), _ => None,
};
let mut event =
NetworkActivity::new(activity, time_ms, severity_id.as_u8(), status_id, metadata)
.with_message(format!("{} {}", direction.to_lowercase(), summary));
if let Some(src) = src_endpoint {
event = event.with_src_endpoint(src);
}
if let Some(dst) = dst_endpoint {
event = event.with_dst_endpoint(dst);
}
if let Some(ci) = connection_info {
event = event.with_connection_info(ci);
}
if let Some(aid) = action_id {
event = event.with_action_id(aid);
}
if let Some(did) = disposition_id {
event = event.with_disposition_id(did);
}
Some(event)
}
fn extract_endpoint(source: Option<&Value>) -> Option<NetworkEndpoint> {
let source = source?;
Some(NetworkEndpoint {
ip: None,
port: None,
domain: None,
hostname: source
.get("pod_name")
.or_else(|| source.get("hostname"))
.and_then(|v| v.as_str())
.map(String::from),
subnet_uid: source
.get("namespace")
.and_then(|v| v.as_str())
.map(String::from),
})
}
fn merge_ip(endpoint: Option<NetworkEndpoint>, ip: Option<&str>) -> Option<NetworkEndpoint> {
match (endpoint, ip) {
(Some(mut ep), Some(ip_str)) => {
ep.ip = Some(ip_str.to_string());
Some(ep)
}
(None, Some(ip_str)) => Some(NetworkEndpoint {
ip: Some(ip_str.to_string()),
port: None,
domain: None,
hostname: None,
subnet_uid: None,
}),
(ep, None) => ep,
}
}
fn extract_connection_info(fact: &Value) -> (Option<ConnectionInfo>, Option<u16>, Option<u16>) {
let l4 = match fact.get("l4") {
Some(l4) => l4,
None => return (None, None, None),
};
let (protocol_name, protocol_num, src_port, dst_port) = if let Some(tcp) = l4.get("TCP") {
(
"TCP",
6u8,
parse_port(tcp.get("source_port")),
parse_port(tcp.get("destination_port")),
)
} else if let Some(udp) = l4.get("UDP") {
(
"UDP",
17,
parse_port(udp.get("source_port")),
parse_port(udp.get("destination_port")),
)
} else if let Some(protocol) = l4.get("protocol").and_then(|v| v.as_str()) {
let protocol_upper = protocol.to_ascii_uppercase();
let (protocol_name, protocol_num) = match protocol_upper.as_str() {
"TCP" => ("TCP", 6u8),
"UDP" => ("UDP", 17u8),
"SCTP" => ("SCTP", 132u8),
"ICMPV4" => ("ICMPv4", 1u8),
"ICMPV6" => ("ICMPv6", 58u8),
_ => return (None, None, None),
};
(
protocol_name,
protocol_num,
parse_port(l4.get("source_port")),
parse_port(l4.get("destination_port")),
)
} else {
return (None, None, None);
};
let direction = fact.get("traffic_direction").and_then(|v| v.as_str());
let direction_id = match direction {
Some("INGRESS") => Some(1u8),
Some("EGRESS") => Some(2),
_ => Some(0),
};
(
Some(ConnectionInfo {
protocol_name: Some(protocol_name.to_string()),
protocol_num: Some(protocol_num),
direction: direction.map(|d| d.to_string()),
direction_id,
}),
src_port,
dst_port,
)
}
fn parse_port(value: Option<&Value>) -> Option<u16> {
value
.and_then(|v| v.as_u64())
.and_then(|p| u16::try_from(p).ok())
}
fn merge_port(endpoint: Option<NetworkEndpoint>, port: Option<u16>) -> Option<NetworkEndpoint> {
match (endpoint, port) {
(Some(mut ep), Some(p)) => {
ep.port = Some(p);
Some(ep)
}
(Some(ep), None) => Some(ep),
(None, Some(p)) => Some(NetworkEndpoint {
ip: None,
port: Some(p),
domain: None,
hostname: None,
subnet_uid: None,
}),
(None, None) => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::validate::validate_ocsf_json;
use serde_json::json;
#[test]
fn forwarded_flow() {
let fact = json!({
"verdict": "FORWARDED",
"traffic_direction": "EGRESS",
"summary": "TCP 10.0.0.1:8080 -> 93.184.216.34:443",
"source": {
"namespace": "production",
"pod_name": "web-server-xyz"
},
"destination": {
"namespace": "external"
},
"ip": {
"source": "10.0.0.1",
"destination": "93.184.216.34"
},
"l4": {
"TCP": {
"source_port": 8080,
"destination_port": 443
}
}
});
let event = hubble_fact_to_network_activity(&fact, 1_709_366_400_000, "0.1.3").unwrap();
assert_eq!(event.class_uid, 4001);
assert_eq!(event.type_uid, 400106); assert_eq!(event.severity_id, 1); assert_eq!(event.status_id, 1); assert_eq!(event.action_id, Some(1)); assert_eq!(event.disposition_id, Some(1));
let src = event.src_endpoint.as_ref().unwrap();
assert_eq!(src.ip.as_deref(), Some("10.0.0.1"));
assert_eq!(src.port, Some(8080));
assert_eq!(src.hostname.as_deref(), Some("web-server-xyz"));
let dst = event.dst_endpoint.as_ref().unwrap();
assert_eq!(dst.ip.as_deref(), Some("93.184.216.34"));
assert_eq!(dst.port, Some(443));
let ci = event.connection_info.as_ref().unwrap();
assert_eq!(ci.protocol_name.as_deref(), Some("TCP"));
assert_eq!(ci.protocol_num, Some(6));
assert_eq!(ci.direction_id, Some(2));
let json_val = serde_json::to_value(&event).unwrap();
let errors = validate_ocsf_json(&json_val);
assert!(errors.is_empty(), "validation errors: {:?}", errors);
}
#[test]
fn dropped_flow() {
let fact = json!({
"verdict": "DROPPED",
"traffic_direction": "INGRESS",
"summary": "blocked connection"
});
let event = hubble_fact_to_network_activity(&fact, 0, "0.1.3").unwrap();
assert_eq!(event.type_uid, 400105); assert_eq!(event.status_id, 2); assert_eq!(event.action_id, Some(2)); }
#[test]
fn forwarded_flow_with_flat_l4_schema() {
let fact = json!({
"verdict": "FORWARDED",
"traffic_direction": "EGRESS",
"summary": "TCP 10.0.0.1:8080 -> 93.184.216.34:443",
"ip": {
"source": "10.0.0.1",
"destination": "93.184.216.34"
},
"l4": {
"protocol": "TCP",
"source_port": 8080,
"destination_port": 443
}
});
let event = hubble_fact_to_network_activity(&fact, 1_709_366_400_000, "0.1.3").unwrap();
let src = event.src_endpoint.as_ref().unwrap();
let dst = event.dst_endpoint.as_ref().unwrap();
let ci = event.connection_info.as_ref().unwrap();
assert_eq!(src.port, Some(8080));
assert_eq!(dst.port, Some(443));
assert_eq!(ci.protocol_name.as_deref(), Some("TCP"));
assert_eq!(ci.protocol_num, Some(6));
}
#[test]
fn error_flow() {
let fact = json!({
"verdict": "ERROR",
"summary": "connection error"
});
let event = hubble_fact_to_network_activity(&fact, 0, "0.1.3").unwrap();
assert_eq!(event.type_uid, 400104); }
#[test]
fn missing_verdict_returns_none() {
let fact = json!({
"summary": "some flow"
});
assert!(hubble_fact_to_network_activity(&fact, 0, "0.1.3").is_none());
}
}