use peat_schema::capability::v1::Capability;
use peat_schema::common::v1::{Position, Timestamp};
use peat_schema::hierarchy::v1::BoundingBox;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SquadDelta {
pub squad_id: String,
pub timestamp_us: u64,
pub sequence: u64,
pub updates: Vec<SquadFieldUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SquadFieldUpdate {
SetLeaderId(String),
SetMemberCount(u32),
SetOperationalCount(u32),
SetAvgFuelMinutes(f32),
SetWorstHealth(i32),
SetReadinessScore(f32),
UpdatePositionCentroid(Position),
AddMemberId(String),
RemoveMemberId(String),
AddCapability(Capability),
RemoveCapability(String),
UpdateBoundingBox(BoundingBox),
UpdateAggregatedAt(Timestamp),
}
impl SquadDelta {
pub fn into_ditto_updates(self) -> Vec<(String, serde_json::Value)> {
let mut updates = Vec::new();
for update in self.updates {
match update {
SquadFieldUpdate::SetLeaderId(id) => {
updates.push(("leader_id".to_string(), json!(id)));
}
SquadFieldUpdate::SetMemberCount(count) => {
updates.push(("member_count".to_string(), json!(count)));
}
SquadFieldUpdate::SetOperationalCount(count) => {
updates.push(("operational_count".to_string(), json!(count)));
}
SquadFieldUpdate::SetAvgFuelMinutes(fuel) => {
updates.push(("avg_fuel_minutes".to_string(), json!(fuel)));
}
SquadFieldUpdate::SetWorstHealth(health) => {
updates.push(("worst_health".to_string(), json!(health)));
}
SquadFieldUpdate::SetReadinessScore(score) => {
updates.push(("readiness_score".to_string(), json!(score)));
}
SquadFieldUpdate::UpdatePositionCentroid(pos) => {
updates.push((
"position_centroid".to_string(),
serde_json::to_value(pos).unwrap_or(json!(null)),
));
}
SquadFieldUpdate::AddMemberId(id) => {
updates.push(("member_ids.$add".to_string(), json!(id)));
}
SquadFieldUpdate::RemoveMemberId(id) => {
updates.push(("member_ids.$remove".to_string(), json!(id)));
}
SquadFieldUpdate::AddCapability(cap) => {
updates.push((
"aggregated_capabilities.$add".to_string(),
serde_json::to_value(cap).unwrap_or(json!(null)),
));
}
SquadFieldUpdate::RemoveCapability(cap_id) => {
updates.push(("aggregated_capabilities.$remove".to_string(), json!(cap_id)));
}
SquadFieldUpdate::UpdateBoundingBox(bbox) => {
updates.push((
"bounding_box".to_string(),
serde_json::to_value(bbox).unwrap_or(json!(null)),
));
}
SquadFieldUpdate::UpdateAggregatedAt(ts) => {
updates.push((
"aggregated_at".to_string(),
serde_json::to_value(ts).unwrap_or(json!(null)),
));
}
}
}
updates.push(("last_update_us".to_string(), json!(self.timestamp_us)));
updates.push(("sequence".to_string(), json!(self.sequence)));
updates
}
pub fn size_bytes(&self) -> usize {
let base_overhead = 64; let per_update_overhead = 16;
let updates_size: usize = self
.updates
.iter()
.map(|u| match u {
SquadFieldUpdate::SetLeaderId(s) => s.len() + per_update_overhead,
SquadFieldUpdate::SetMemberCount(_) => 4 + per_update_overhead,
SquadFieldUpdate::SetOperationalCount(_) => 4 + per_update_overhead,
SquadFieldUpdate::SetAvgFuelMinutes(_) => 4 + per_update_overhead,
SquadFieldUpdate::SetWorstHealth(_) => 4 + per_update_overhead,
SquadFieldUpdate::SetReadinessScore(_) => 4 + per_update_overhead,
SquadFieldUpdate::UpdatePositionCentroid(_) => 24 + per_update_overhead, SquadFieldUpdate::AddMemberId(s) => s.len() + per_update_overhead,
SquadFieldUpdate::RemoveMemberId(s) => s.len() + per_update_overhead,
SquadFieldUpdate::AddCapability(_) => 128 + per_update_overhead, SquadFieldUpdate::RemoveCapability(s) => s.len() + per_update_overhead,
SquadFieldUpdate::UpdateBoundingBox(_) => 64 + per_update_overhead,
SquadFieldUpdate::UpdateAggregatedAt(_) => 16 + per_update_overhead,
})
.sum();
base_overhead + updates_size
}
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlatoonDelta {
pub platoon_id: String,
pub timestamp_us: u64,
pub sequence: u64,
pub updates: Vec<PlatoonFieldUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PlatoonFieldUpdate {
SetLeaderId(String),
SetSquadCount(u32),
SetTotalMemberCount(u32),
SetOperationalCount(u32),
SetAvgFuelMinutes(f32),
SetWorstHealth(i32),
SetReadinessScore(f32),
UpdatePositionCentroid(Position),
AddSquadId(String),
RemoveSquadId(String),
AddCapability(Capability),
RemoveCapability(String),
UpdateBoundingBox(BoundingBox),
UpdateAggregatedAt(Timestamp),
}
impl PlatoonDelta {
pub fn into_ditto_updates(self) -> Vec<(String, serde_json::Value)> {
let mut updates = Vec::new();
for update in self.updates {
match update {
PlatoonFieldUpdate::SetLeaderId(id) => {
updates.push(("leader_id".to_string(), json!(id)));
}
PlatoonFieldUpdate::SetSquadCount(count) => {
updates.push(("squad_count".to_string(), json!(count)));
}
PlatoonFieldUpdate::SetTotalMemberCount(count) => {
updates.push(("total_member_count".to_string(), json!(count)));
}
PlatoonFieldUpdate::SetOperationalCount(count) => {
updates.push(("operational_count".to_string(), json!(count)));
}
PlatoonFieldUpdate::SetAvgFuelMinutes(fuel) => {
updates.push(("avg_fuel_minutes".to_string(), json!(fuel)));
}
PlatoonFieldUpdate::SetWorstHealth(health) => {
updates.push(("worst_health".to_string(), json!(health)));
}
PlatoonFieldUpdate::SetReadinessScore(score) => {
updates.push(("readiness_score".to_string(), json!(score)));
}
PlatoonFieldUpdate::UpdatePositionCentroid(pos) => {
updates.push((
"position_centroid".to_string(),
serde_json::to_value(pos).unwrap_or(json!(null)),
));
}
PlatoonFieldUpdate::AddSquadId(id) => {
updates.push(("squad_ids.$add".to_string(), json!(id)));
}
PlatoonFieldUpdate::RemoveSquadId(id) => {
updates.push(("squad_ids.$remove".to_string(), json!(id)));
}
PlatoonFieldUpdate::AddCapability(cap) => {
updates.push((
"aggregated_capabilities.$add".to_string(),
serde_json::to_value(cap).unwrap_or(json!(null)),
));
}
PlatoonFieldUpdate::RemoveCapability(cap_id) => {
updates.push(("aggregated_capabilities.$remove".to_string(), json!(cap_id)));
}
PlatoonFieldUpdate::UpdateBoundingBox(bbox) => {
updates.push((
"bounding_box".to_string(),
serde_json::to_value(bbox).unwrap_or(json!(null)),
));
}
PlatoonFieldUpdate::UpdateAggregatedAt(ts) => {
updates.push((
"aggregated_at".to_string(),
serde_json::to_value(ts).unwrap_or(json!(null)),
));
}
}
}
updates.push(("last_update_us".to_string(), json!(self.timestamp_us)));
updates.push(("sequence".to_string(), json!(self.sequence)));
updates
}
pub fn size_bytes(&self) -> usize {
let base_overhead = 64;
let per_update_overhead = 16;
let updates_size: usize = self
.updates
.iter()
.map(|u| match u {
PlatoonFieldUpdate::SetLeaderId(s) => s.len() + per_update_overhead,
PlatoonFieldUpdate::SetSquadCount(_) => 4 + per_update_overhead,
PlatoonFieldUpdate::SetTotalMemberCount(_) => 4 + per_update_overhead,
PlatoonFieldUpdate::SetOperationalCount(_) => 4 + per_update_overhead,
PlatoonFieldUpdate::SetAvgFuelMinutes(_) => 4 + per_update_overhead,
PlatoonFieldUpdate::SetWorstHealth(_) => 4 + per_update_overhead,
PlatoonFieldUpdate::SetReadinessScore(_) => 4 + per_update_overhead,
PlatoonFieldUpdate::UpdatePositionCentroid(_) => 24 + per_update_overhead,
PlatoonFieldUpdate::AddSquadId(s) => s.len() + per_update_overhead,
PlatoonFieldUpdate::RemoveSquadId(s) => s.len() + per_update_overhead,
PlatoonFieldUpdate::AddCapability(_) => 128 + per_update_overhead,
PlatoonFieldUpdate::RemoveCapability(s) => s.len() + per_update_overhead,
PlatoonFieldUpdate::UpdateBoundingBox(_) => 64 + per_update_overhead,
PlatoonFieldUpdate::UpdateAggregatedAt(_) => 16 + per_update_overhead,
})
.sum();
base_overhead + updates_size
}
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompanyDelta {
pub company_id: String,
pub timestamp_us: u64,
pub sequence: u64,
pub updates: Vec<CompanyFieldUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompanyFieldUpdate {
SetLeaderId(String),
SetPlatoonCount(u32),
SetTotalMemberCount(u32),
SetOperationalCount(u32),
SetAvgFuelMinutes(f32),
SetWorstHealth(i32),
SetReadinessScore(f32),
UpdatePositionCentroid(Position),
AddPlatoonId(String),
RemovePlatoonId(String),
AddCapability(Capability),
RemoveCapability(String),
UpdateBoundingBox(BoundingBox),
UpdateAggregatedAt(Timestamp),
}
impl CompanyDelta {
pub fn into_ditto_updates(self) -> Vec<(String, serde_json::Value)> {
let mut updates = Vec::new();
for update in self.updates {
match update {
CompanyFieldUpdate::SetLeaderId(id) => {
updates.push(("leader_id".to_string(), json!(id)));
}
CompanyFieldUpdate::SetPlatoonCount(count) => {
updates.push(("platoon_count".to_string(), json!(count)));
}
CompanyFieldUpdate::SetTotalMemberCount(count) => {
updates.push(("total_member_count".to_string(), json!(count)));
}
CompanyFieldUpdate::SetOperationalCount(count) => {
updates.push(("operational_count".to_string(), json!(count)));
}
CompanyFieldUpdate::SetAvgFuelMinutes(fuel) => {
updates.push(("avg_fuel_minutes".to_string(), json!(fuel)));
}
CompanyFieldUpdate::SetWorstHealth(health) => {
updates.push(("worst_health".to_string(), json!(health)));
}
CompanyFieldUpdate::SetReadinessScore(score) => {
updates.push(("readiness_score".to_string(), json!(score)));
}
CompanyFieldUpdate::UpdatePositionCentroid(pos) => {
updates.push((
"position_centroid".to_string(),
serde_json::to_value(pos).unwrap_or(json!(null)),
));
}
CompanyFieldUpdate::AddPlatoonId(id) => {
updates.push(("platoon_ids.$add".to_string(), json!(id)));
}
CompanyFieldUpdate::RemovePlatoonId(id) => {
updates.push(("platoon_ids.$remove".to_string(), json!(id)));
}
CompanyFieldUpdate::AddCapability(cap) => {
updates.push((
"aggregated_capabilities.$add".to_string(),
serde_json::to_value(cap).unwrap_or(json!(null)),
));
}
CompanyFieldUpdate::RemoveCapability(cap_id) => {
updates.push(("aggregated_capabilities.$remove".to_string(), json!(cap_id)));
}
CompanyFieldUpdate::UpdateBoundingBox(bbox) => {
updates.push((
"bounding_box".to_string(),
serde_json::to_value(bbox).unwrap_or(json!(null)),
));
}
CompanyFieldUpdate::UpdateAggregatedAt(ts) => {
updates.push((
"aggregated_at".to_string(),
serde_json::to_value(ts).unwrap_or(json!(null)),
));
}
}
}
updates.push(("last_update_us".to_string(), json!(self.timestamp_us)));
updates.push(("sequence".to_string(), json!(self.sequence)));
updates
}
pub fn size_bytes(&self) -> usize {
let base_overhead = 64;
let per_update_overhead = 16;
let updates_size: usize = self
.updates
.iter()
.map(|u| match u {
CompanyFieldUpdate::SetLeaderId(s) => s.len() + per_update_overhead,
CompanyFieldUpdate::SetPlatoonCount(_) => 4 + per_update_overhead,
CompanyFieldUpdate::SetTotalMemberCount(_) => 4 + per_update_overhead,
CompanyFieldUpdate::SetOperationalCount(_) => 4 + per_update_overhead,
CompanyFieldUpdate::SetAvgFuelMinutes(_) => 4 + per_update_overhead,
CompanyFieldUpdate::SetWorstHealth(_) => 4 + per_update_overhead,
CompanyFieldUpdate::SetReadinessScore(_) => 4 + per_update_overhead,
CompanyFieldUpdate::UpdatePositionCentroid(_) => 24 + per_update_overhead,
CompanyFieldUpdate::AddPlatoonId(s) => s.len() + per_update_overhead,
CompanyFieldUpdate::RemovePlatoonId(s) => s.len() + per_update_overhead,
CompanyFieldUpdate::AddCapability(_) => 128 + per_update_overhead,
CompanyFieldUpdate::RemoveCapability(s) => s.len() + per_update_overhead,
CompanyFieldUpdate::UpdateBoundingBox(_) => 64 + per_update_overhead,
CompanyFieldUpdate::UpdateAggregatedAt(_) => 16 + per_update_overhead,
})
.sum();
base_overhead + updates_size
}
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}
}
pub fn current_timestamp_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64
}
use peat_schema::hierarchy::v1::{CompanySummary, PlatoonSummary, SquadSummary};
impl SquadDelta {
#[allow(clippy::vec_init_then_push, clippy::clone_on_copy)]
pub fn from_summary(summary: &SquadSummary, sequence: u64) -> Self {
let mut updates = Vec::new();
updates.push(SquadFieldUpdate::SetLeaderId(summary.leader_id.clone()));
updates.push(SquadFieldUpdate::SetMemberCount(summary.member_count));
updates.push(SquadFieldUpdate::SetOperationalCount(
summary.operational_count,
));
updates.push(SquadFieldUpdate::SetAvgFuelMinutes(
summary.avg_fuel_minutes,
));
updates.push(SquadFieldUpdate::SetWorstHealth(summary.worst_health));
updates.push(SquadFieldUpdate::SetReadinessScore(summary.readiness_score));
if let Some(pos) = &summary.position_centroid {
updates.push(SquadFieldUpdate::UpdatePositionCentroid(pos.clone()));
}
for member_id in &summary.member_ids {
updates.push(SquadFieldUpdate::AddMemberId(member_id.clone()));
}
for capability in &summary.aggregated_capabilities {
updates.push(SquadFieldUpdate::AddCapability(capability.clone()));
}
if let Some(bbox) = &summary.bounding_box {
updates.push(SquadFieldUpdate::UpdateBoundingBox(bbox.clone()));
}
if let Some(ts) = &summary.aggregated_at {
updates.push(SquadFieldUpdate::UpdateAggregatedAt(ts.clone()));
}
Self {
squad_id: summary.squad_id.clone(),
timestamp_us: current_timestamp_us(),
sequence,
updates,
}
}
}
impl PlatoonDelta {
#[allow(clippy::vec_init_then_push, clippy::clone_on_copy)]
pub fn from_summary(summary: &PlatoonSummary, sequence: u64) -> Self {
let mut updates = Vec::new();
updates.push(PlatoonFieldUpdate::SetLeaderId(summary.leader_id.clone()));
updates.push(PlatoonFieldUpdate::SetSquadCount(summary.squad_count));
updates.push(PlatoonFieldUpdate::SetTotalMemberCount(
summary.total_member_count,
));
updates.push(PlatoonFieldUpdate::SetOperationalCount(
summary.operational_count,
));
updates.push(PlatoonFieldUpdate::SetAvgFuelMinutes(
summary.avg_fuel_minutes,
));
updates.push(PlatoonFieldUpdate::SetWorstHealth(summary.worst_health));
updates.push(PlatoonFieldUpdate::SetReadinessScore(
summary.readiness_score,
));
if let Some(pos) = &summary.position_centroid {
updates.push(PlatoonFieldUpdate::UpdatePositionCentroid(pos.clone()));
}
for squad_id in &summary.squad_ids {
updates.push(PlatoonFieldUpdate::AddSquadId(squad_id.clone()));
}
for capability in &summary.aggregated_capabilities {
updates.push(PlatoonFieldUpdate::AddCapability(capability.clone()));
}
if let Some(bbox) = &summary.bounding_box {
updates.push(PlatoonFieldUpdate::UpdateBoundingBox(bbox.clone()));
}
if let Some(ts) = &summary.aggregated_at {
updates.push(PlatoonFieldUpdate::UpdateAggregatedAt(ts.clone()));
}
Self {
platoon_id: summary.platoon_id.clone(),
timestamp_us: current_timestamp_us(),
sequence,
updates,
}
}
}
impl CompanyDelta {
#[allow(clippy::vec_init_then_push, clippy::clone_on_copy)]
pub fn from_summary(summary: &CompanySummary, sequence: u64) -> Self {
let mut updates = Vec::new();
updates.push(CompanyFieldUpdate::SetLeaderId(summary.leader_id.clone()));
updates.push(CompanyFieldUpdate::SetPlatoonCount(summary.platoon_count));
updates.push(CompanyFieldUpdate::SetTotalMemberCount(
summary.total_member_count,
));
updates.push(CompanyFieldUpdate::SetOperationalCount(
summary.operational_count,
));
updates.push(CompanyFieldUpdate::SetAvgFuelMinutes(
summary.avg_fuel_minutes,
));
updates.push(CompanyFieldUpdate::SetWorstHealth(summary.worst_health));
updates.push(CompanyFieldUpdate::SetReadinessScore(
summary.readiness_score,
));
if let Some(pos) = &summary.position_centroid {
updates.push(CompanyFieldUpdate::UpdatePositionCentroid(pos.clone()));
}
for platoon_id in &summary.platoon_ids {
updates.push(CompanyFieldUpdate::AddPlatoonId(platoon_id.clone()));
}
for capability in &summary.aggregated_capabilities {
updates.push(CompanyFieldUpdate::AddCapability(capability.clone()));
}
if let Some(bbox) = &summary.bounding_box {
updates.push(CompanyFieldUpdate::UpdateBoundingBox(bbox.clone()));
}
if let Some(ts) = &summary.aggregated_at {
updates.push(CompanyFieldUpdate::UpdateAggregatedAt(ts.clone()));
}
Self {
company_id: summary.company_id.clone(),
timestamp_us: current_timestamp_us(),
sequence,
updates,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_squad_delta_serialization() {
let delta = SquadDelta {
squad_id: "squad-1A".to_string(),
timestamp_us: 1234567890,
sequence: 42,
updates: vec![
SquadFieldUpdate::SetMemberCount(7),
SquadFieldUpdate::SetOperationalCount(6),
SquadFieldUpdate::AddMemberId("node-8".to_string()),
],
};
let json = serde_json::to_string(&delta).unwrap();
assert!(json.contains("squad-1A"));
assert!(json.contains("SetMemberCount"));
let deserialized: SquadDelta = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.squad_id, "squad-1A");
assert_eq!(deserialized.updates.len(), 3);
}
#[test]
fn test_squad_delta_into_ditto_updates() {
let delta = SquadDelta {
squad_id: "squad-1A".to_string(),
timestamp_us: 1234567890,
sequence: 42,
updates: vec![
SquadFieldUpdate::SetLeaderId("leader-1".to_string()),
SquadFieldUpdate::SetMemberCount(8),
SquadFieldUpdate::AddMemberId("node-9".to_string()),
],
};
let ditto_updates = delta.into_ditto_updates();
assert!(ditto_updates.len() >= 3);
let leader_update = ditto_updates
.iter()
.find(|(path, _)| path == "leader_id")
.unwrap();
assert_eq!(leader_update.1, json!("leader-1"));
let member_count_update = ditto_updates
.iter()
.find(|(path, _)| path == "member_count")
.unwrap();
assert_eq!(member_count_update.1, json!(8));
let add_member_update = ditto_updates
.iter()
.find(|(path, _)| path == "member_ids.$add")
.unwrap();
assert_eq!(add_member_update.1, json!("node-9"));
}
#[test]
fn test_delta_size_estimation() {
let small_delta = SquadDelta {
squad_id: "squad-1A".to_string(),
timestamp_us: 1234567890,
sequence: 1,
updates: vec![SquadFieldUpdate::SetMemberCount(7)],
};
let large_delta = SquadDelta {
squad_id: "squad-1A".to_string(),
timestamp_us: 1234567890,
sequence: 1,
updates: vec![
SquadFieldUpdate::SetMemberCount(7),
SquadFieldUpdate::SetOperationalCount(6),
SquadFieldUpdate::AddMemberId("node-123456789".to_string()),
SquadFieldUpdate::UpdatePositionCentroid(Position {
latitude: 37.7749,
longitude: -122.4194,
altitude: 100.0,
}),
],
};
assert!(small_delta.size_bytes() < 150);
assert!(large_delta.size_bytes() < 500);
assert!(large_delta.size_bytes() > small_delta.size_bytes());
}
#[test]
fn test_empty_delta() {
let delta = SquadDelta {
squad_id: "squad-1A".to_string(),
timestamp_us: 1234567890,
sequence: 1,
updates: vec![],
};
assert!(delta.is_empty());
assert_eq!(delta.updates.len(), 0);
}
#[test]
fn test_platoon_delta_basic() {
let delta = PlatoonDelta {
platoon_id: "platoon-1".to_string(),
timestamp_us: 1234567890,
sequence: 10,
updates: vec![
PlatoonFieldUpdate::SetSquadCount(3),
PlatoonFieldUpdate::AddSquadId("squad-1A".to_string()),
],
};
let ditto_updates = delta.into_ditto_updates();
assert!(ditto_updates.len() >= 2);
let squad_count = ditto_updates
.iter()
.find(|(path, _)| path == "squad_count")
.unwrap();
assert_eq!(squad_count.1, json!(3));
}
#[test]
fn test_company_delta_basic() {
let delta = CompanyDelta {
company_id: "company-alpha".to_string(),
timestamp_us: 1234567890,
sequence: 5,
updates: vec![
CompanyFieldUpdate::SetPlatoonCount(4),
CompanyFieldUpdate::SetTotalMemberCount(96),
],
};
assert!(!delta.is_empty());
assert_eq!(delta.updates.len(), 2);
}
#[test]
fn test_current_timestamp_us() {
let ts1 = current_timestamp_us();
std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = current_timestamp_us();
assert!(ts2 > ts1);
assert!(ts1 > 1_600_000_000_000_000);
}
}