use chrono::Utc;
use serde::{Deserialize, Serialize};
use starla_common::{MeasurementData, MeasurementResult, MeasurementType};
use std::net::IpAddr;
pub fn format_result_value(value: &serde_json::Value) -> String {
let mut buf = String::new();
write_result_value(&mut buf, value);
buf
}
fn write_result_value(buf: &mut String, value: &serde_json::Value) {
use std::fmt::Write;
match value {
serde_json::Value::Array(items) => {
if items.is_empty() {
buf.push_str("[ ]");
return;
}
buf.push_str("[ ");
for (i, item) in items.iter().enumerate() {
if i > 0 {
buf.push_str(", ");
}
write_result_value(buf, item);
}
buf.push_str(" ]");
}
serde_json::Value::Object(map) => {
if map.is_empty() {
buf.push_str("{ }");
return;
}
buf.push_str("{ ");
for (i, (k, v)) in map.iter().enumerate() {
if i > 0 {
buf.push_str(", ");
}
write!(buf, "\"{}\":", k).unwrap();
match v {
serde_json::Value::String(s) => write!(buf, "\"{}\"", s).unwrap(),
_ => write_result_value(buf, v),
}
}
buf.push_str(" }");
}
serde_json::Value::String(s) => {
use std::fmt::Write;
write!(buf, "\"{}\"", s).unwrap();
}
other => buf.push_str(&other.to_string()),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtlasResult {
#[serde(skip_serializing)]
pub prb_id: u32,
#[serde(rename = "id")]
pub id: String,
pub fw: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub mver: Option<String>,
pub lts: i64,
pub time: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub dst_name: Option<String>,
pub af: u8,
pub dst_addr: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub dst_port: Option<String>,
pub src_addr: String,
pub proto: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub ttl: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub size: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub endtime: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub paris_id: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bundle: Option<u32>,
pub result: serde_json::Value,
}
impl AtlasResult {
pub fn from_measurement(result: MeasurementResult, source_ip: Option<IpAddr>) -> Self {
let proto = result.proto.clone().unwrap_or_else(|| {
match result.measurement_type {
MeasurementType::Ping => "ICMP",
MeasurementType::Traceroute => "UDP",
MeasurementType::Dns => "UDP",
MeasurementType::Http => "TCP",
MeasurementType::Tls => "TCP",
MeasurementType::Ntp => "UDP",
}
.to_string()
});
let dst_addr_str = result.dst_addr.to_string();
let (dst_name, dst_port) = match result.measurement_type {
MeasurementType::Dns => (None, Some("53".to_string())),
_ => (
Some(
result
.dst_name
.clone()
.unwrap_or_else(|| dst_addr_str.clone()),
),
None,
),
};
let src_addr = result
.src_addr
.map(|ip| ip.to_string())
.or_else(|| source_ip.map(|ip| ip.to_string()))
.unwrap_or_default();
let size = match result.measurement_type {
MeasurementType::Dns => None,
_ => result.size,
};
Self {
prb_id: result.prb_id.0,
id: result.msm_id.0.to_string(),
fw: result.fw,
mver: Some("2.6.4".to_string()), lts: 0, time: result.timestamp.0,
dst_name,
af: result.af,
dst_addr: dst_addr_str,
dst_port,
src_addr,
proto,
ttl: result.ttl,
size,
endtime: None,
paris_id: None,
group_id: None,
bundle: None,
result: match result.data {
MeasurementData::Generic(v) => v,
MeasurementData::PreFormatted(s) => serde_json::Value::String(s),
MeasurementData::FullLine(s) => {
serde_json::Value::String(format!("__FULLLINE__{}", s))
}
},
}
}
pub fn with_proto(mut self, proto: &str) -> Self {
self.proto = proto.to_string();
self
}
pub fn with_dst_name(mut self, name: &str) -> Self {
self.dst_name = Some(name.to_string());
self
}
pub fn with_dst_port(mut self, port: &str) -> Self {
self.dst_port = Some(port.to_string());
self
}
pub fn with_ttl(mut self, ttl: u8) -> Self {
self.ttl = Some(ttl);
self
}
pub fn with_size(mut self, size: u16) -> Self {
self.size = Some(size);
self
}
pub fn with_lts(mut self, lts: i64) -> Self {
self.lts = lts;
self
}
pub fn with_bundle(mut self, group_id: u64, bundle: u32) -> Self {
self.group_id = Some(group_id);
self.bundle = Some(bundle);
self
}
pub fn to_result_line(&self) -> String {
use std::fmt::Write;
if let serde_json::Value::String(ref v) = self.result {
if let Some(body) = v.strip_prefix("__FULLLINE__") {
return format!("RESULT {{ {} }}\n", body);
}
}
let mut s = String::with_capacity(512);
write!(
s,
"RESULT {{ \"id\":\"{}\", \"fw\":{}, \"mver\": \"{}\", \"lts\":{}, \"time\":{}",
self.id,
self.fw,
self.mver.as_deref().unwrap_or("2.6.4"),
self.lts,
self.time,
)
.unwrap();
if let Some(ref bundle) = self.bundle {
write!(s, ", \"bundle\":{}", bundle).unwrap();
}
if let Some(ref name) = self.dst_name {
write!(s, ", \"dst_name\":\"{}\"", name).unwrap();
}
write!(s, ", \"af\":{}", self.af).unwrap();
if !self.dst_addr.is_empty() {
write!(s, ", \"dst_addr\":\"{}\"", self.dst_addr).unwrap();
}
if let Some(ref port) = self.dst_port {
write!(s, ", \"dst_port\":\"{}\"", port).unwrap();
}
if !self.src_addr.is_empty() {
write!(s, ", \"src_addr\":\"{}\"", self.src_addr).unwrap();
}
write!(s, ", \"proto\":\"{}\"", self.proto).unwrap();
if let Some(ttl) = self.ttl {
write!(s, ", \"ttl\":{}", ttl).unwrap();
}
if let Some(size) = self.size {
write!(s, ", \"size\":{}", size).unwrap();
}
if let Some(endtime) = self.endtime {
write!(s, ", \"endtime\":{}", endtime).unwrap();
}
if let Some(paris_id) = self.paris_id {
write!(s, ", \"paris_id\":{}", paris_id).unwrap();
}
match &self.result {
serde_json::Value::String(pre) => {
write!(s, ", \"result\": {}", pre).unwrap();
}
other => {
let result_json = format_result_value(other);
write!(s, ", \"result\": {}", result_json).unwrap();
}
}
s.push_str(" }\n");
s
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultBundle {
pub bundle_id: u64,
pub results: Vec<AtlasResult>,
pub created_at: i64,
}
impl ResultBundle {
pub fn new(bundle_id: u64) -> Self {
Self {
bundle_id,
results: Vec::new(),
created_at: Utc::now().timestamp(),
}
}
pub fn add(&mut self, mut result: AtlasResult) {
let bundle_index = self.results.len() as u32;
result.group_id = Some(self.bundle_id);
result.bundle = Some(bundle_index);
self.results.push(result);
}
pub fn len(&self) -> usize {
self.results.len()
}
pub fn is_empty(&self) -> bool {
self.results.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use starla_common::{MeasurementId, ProbeId, Timestamp};
fn make_ping_result() -> MeasurementResult {
MeasurementResult {
fw: 5080,
measurement_type: MeasurementType::Ping,
prb_id: ProbeId(12345),
msm_id: MeasurementId(1001),
timestamp: Timestamp(999999999),
af: 4,
dst_addr: "193.0.14.129".parse().unwrap(),
dst_name: Some("193.0.14.129".to_string()),
src_addr: Some("10.0.0.1".parse().unwrap()),
proto: Some("ICMP".to_string()),
ttl: Some(56),
size: Some(32),
data: MeasurementData::Generic(serde_json::json!([
{ "rtt": 10.500000 },
{ "rtt": 11.200000 },
{ "rtt": 10.800000 }
])),
}
}
fn make_dns_result() -> MeasurementResult {
MeasurementResult {
fw: 5080,
measurement_type: MeasurementType::Dns,
prb_id: ProbeId(12345),
msm_id: MeasurementId(8310237),
timestamp: Timestamp(999999999),
af: 4,
dst_addr: "8.8.8.8".parse().unwrap(),
dst_name: None, src_addr: Some("10.0.0.1".parse().unwrap()),
proto: Some("UDP".to_string()),
ttl: None,
size: None, data: MeasurementData::Generic(serde_json::json!({
"rt": 35.265,
"size": 62,
"ID": 12345,
"ANCOUNT": 1,
"QDCOUNT": 1,
"NSCOUNT": 0,
"ARCOUNT": 0
})),
}
}
#[test]
fn test_ping_result_line_format() {
let result = make_ping_result();
let atlas = AtlasResult::from_measurement(result, None).with_lts(10);
let line = atlas.to_result_line();
assert!(line.starts_with("RESULT { "));
assert!(line.ends_with(" }\n"));
let id_pos = line.find("\"id\":").unwrap();
let fw_pos = line.find("\"fw\":").unwrap();
let mver_pos = line.find("\"mver\":").unwrap();
let lts_pos = line.find("\"lts\":").unwrap();
let time_pos = line.find("\"time\":").unwrap();
let dst_name_pos = line.find("\"dst_name\":").unwrap();
let af_pos = line.find("\"af\":").unwrap();
let result_pos = line.find("\"result\":").unwrap();
assert!(id_pos < fw_pos);
assert!(fw_pos < mver_pos);
assert!(mver_pos < lts_pos);
assert!(lts_pos < time_pos);
assert!(time_pos < dst_name_pos);
assert!(dst_name_pos < af_pos);
assert!(af_pos < result_pos);
assert!(line.contains("\"mver\": \""));
assert!(!line.contains("prb_id"));
assert!(!line.contains(", ,"));
assert!(line.contains("\"result\": [ { \"rtt\":"));
}
#[test]
fn test_dns_result_line_no_double_comma() {
let result = make_dns_result();
let atlas = AtlasResult::from_measurement(result, None).with_lts(10);
let line = atlas.to_result_line();
assert!(
!line.contains(", ,"),
"Double comma found in DNS result line: {}",
line
);
assert!(line.contains("\"result\": { "));
assert!(line.contains("\"dst_port\":\"53\""));
assert!(!line.contains("\"dst_name\":"));
}
#[test]
fn test_ping_result_line_no_ttl_on_timeout() {
let mut result = make_ping_result();
result.ttl = None; result.data = MeasurementData::Generic(serde_json::json!([
{ "x": "*" },
{ "x": "*" }
]));
let atlas = AtlasResult::from_measurement(result, None).with_lts(10);
let line = atlas.to_result_line();
assert!(!line.contains("\"ttl\":"));
assert!(line.contains("\"x\":\"*\""));
}
#[test]
fn test_format_result_value_spacing() {
let val = serde_json::json!([{"rtt": 10.5}, {"rtt": 11.2}]);
let formatted = format_result_value(&val);
assert_eq!(formatted, "[ { \"rtt\":10.5 }, { \"rtt\":11.2 } ]");
let val = serde_json::json!({"rt": 35.2, "size": 62});
let formatted = format_result_value(&val);
assert!(formatted.starts_with("{ "));
assert!(formatted.ends_with(" }"));
let val = serde_json::json!([]);
assert_eq!(format_result_value(&val), "[ ]");
}
#[test]
fn test_preformatted_result_used_verbatim() {
let mut result = make_ping_result();
result.data = MeasurementData::PreFormatted(
"[ { \"rtt\":10.500000 }, { \"rtt\":11.200000 } ]".to_string(),
);
let atlas = AtlasResult::from_measurement(result, None).with_lts(10);
let line = atlas.to_result_line();
assert!(line.contains("\"result\": [ { \"rtt\":10.500000 }, { \"rtt\":11.200000 } ]"));
assert!(!line.contains("\"result\": [ { \"rtt\":10.5 }"));
}
#[test]
fn test_fullline_bypasses_envelope() {
let mut result = make_ping_result();
result.data = MeasurementData::FullLine(
"\"id\":\"42\", \"fw\":5080, \"custom_field\":\"test\", \"cert\":[ \"PEM\" ]"
.to_string(),
);
let atlas = AtlasResult::from_measurement(result, None).with_lts(10);
let line = atlas.to_result_line();
assert!(line.starts_with("RESULT { \"id\":\"42\""));
assert!(line.contains("\"custom_field\":\"test\""));
assert!(line.contains("\"cert\":[ \"PEM\" ]"));
assert!(line.ends_with(" }\n"));
assert!(!line.contains("\"mver\":"));
assert!(!line.contains("\"dst_name\":"));
}
#[test]
fn test_http_result_minimal_envelope() {
let mut result = make_ping_result();
result.measurement_type = MeasurementType::Http;
result.dst_name = None;
result.proto = None;
result.ttl = None;
result.size = None;
result.data = MeasurementData::PreFormatted(
"[ { \"method\":\"GET\", \"af\": 4, \"dst_addr\":\"1.2.3.4\", \
\"src_addr\":\"5.6.7.8\", \"rt\":123.456000, \"res\":200, \"ver\":\"1.1\", \
\"hsize\":100, \"bsize\":500 } ]"
.to_string(),
);
let atlas = AtlasResult::from_measurement(result, None).with_lts(10);
let line = atlas.to_result_line();
assert!(line.contains("\"method\":\"GET\""));
assert!(line.contains("\"res\":200"));
}
#[test]
fn test_traceroute_preformatted_hops() {
let mut result = make_ping_result();
result.measurement_type = MeasurementType::Traceroute;
result.data = MeasurementData::PreFormatted(
"[ { \"hop\":1, \"result\": [ { \"from\":\"10.0.0.1\", \"ttl\":64, \"size\":28, \
\"rtt\":1.234 } ] }, { \"hop\":2, \"result\": [ { \"x\":\"*\" } ] } ]"
.to_string(),
);
let atlas = AtlasResult::from_measurement(result, None).with_lts(10);
let line = atlas.to_result_line();
assert!(line.contains("\"hop\":1"));
assert!(line.contains("\"rtt\":1.234"));
assert!(line.contains("\"x\":\"*\""));
}
}