use peat_schema::capability::v1::Capability;
use peat_schema::common::v1::{Position, Timestamp};
use peat_schema::hierarchy::v1::BoundingBox;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CellDelta {
pub cell_id: String,
pub timestamp_us: u64,
pub sequence: u64,
pub updates: Vec<CellFieldUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CellFieldUpdate {
SetLeaderId(String),
SetMemberCount(u32),
SetOperationalCount(u32),
SetAvgFuelMinutes(f32),
SetWorstHealth(i32),
SetReadinessScore(f32),
UpdatePositionCentroid(Position),
AddMemberId(String),
RemoveMemberId(String),
AddCapability(Capability),
RemoveCapability(String),
UpdateBoundingBox(BoundingBox),
UpdateAggregatedAt(Timestamp),
}
impl CellDelta {
pub fn into_ditto_updates(self) -> Vec<(String, serde_json::Value)> {
let mut updates = Vec::new();
for update in self.updates {
match update {
CellFieldUpdate::SetLeaderId(id) => {
updates.push(("leader_id".to_string(), json!(id)));
}
CellFieldUpdate::SetMemberCount(count) => {
updates.push(("member_count".to_string(), json!(count)));
}
CellFieldUpdate::SetOperationalCount(count) => {
updates.push(("operational_count".to_string(), json!(count)));
}
CellFieldUpdate::SetAvgFuelMinutes(fuel) => {
updates.push(("avg_fuel_minutes".to_string(), json!(fuel)));
}
CellFieldUpdate::SetWorstHealth(health) => {
updates.push(("worst_health".to_string(), json!(health)));
}
CellFieldUpdate::SetReadinessScore(score) => {
updates.push(("readiness_score".to_string(), json!(score)));
}
CellFieldUpdate::UpdatePositionCentroid(pos) => {
updates.push((
"position_centroid".to_string(),
serde_json::to_value(pos).unwrap_or(json!(null)),
));
}
CellFieldUpdate::AddMemberId(id) => {
updates.push(("member_ids.$add".to_string(), json!(id)));
}
CellFieldUpdate::RemoveMemberId(id) => {
updates.push(("member_ids.$remove".to_string(), json!(id)));
}
CellFieldUpdate::AddCapability(cap) => {
updates.push((
"aggregated_capabilities.$add".to_string(),
serde_json::to_value(cap).unwrap_or(json!(null)),
));
}
CellFieldUpdate::RemoveCapability(cap_id) => {
updates.push(("aggregated_capabilities.$remove".to_string(), json!(cap_id)));
}
CellFieldUpdate::UpdateBoundingBox(bbox) => {
updates.push((
"bounding_box".to_string(),
serde_json::to_value(bbox).unwrap_or(json!(null)),
));
}
CellFieldUpdate::UpdateAggregatedAt(ts) => {
updates.push((
"aggregated_at".to_string(),
serde_json::to_value(ts).unwrap_or(json!(null)),
));
}
}
}
updates.push(("last_update_us".to_string(), json!(self.timestamp_us)));
updates.push(("sequence".to_string(), json!(self.sequence)));
updates
}
pub fn size_bytes(&self) -> usize {
let base_overhead = 64; let per_update_overhead = 16;
let updates_size: usize = self
.updates
.iter()
.map(|u| match u {
CellFieldUpdate::SetLeaderId(s) => s.len() + per_update_overhead,
CellFieldUpdate::SetMemberCount(_) => 4 + per_update_overhead,
CellFieldUpdate::SetOperationalCount(_) => 4 + per_update_overhead,
CellFieldUpdate::SetAvgFuelMinutes(_) => 4 + per_update_overhead,
CellFieldUpdate::SetWorstHealth(_) => 4 + per_update_overhead,
CellFieldUpdate::SetReadinessScore(_) => 4 + per_update_overhead,
CellFieldUpdate::UpdatePositionCentroid(_) => 24 + per_update_overhead, CellFieldUpdate::AddMemberId(s) => s.len() + per_update_overhead,
CellFieldUpdate::RemoveMemberId(s) => s.len() + per_update_overhead,
CellFieldUpdate::AddCapability(_) => 128 + per_update_overhead, CellFieldUpdate::RemoveCapability(s) => s.len() + per_update_overhead,
CellFieldUpdate::UpdateBoundingBox(_) => 64 + per_update_overhead,
CellFieldUpdate::UpdateAggregatedAt(_) => 16 + per_update_overhead,
})
.sum();
base_overhead + updates_size
}
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CohortDelta {
pub cohort_id: String,
pub timestamp_us: u64,
pub sequence: u64,
pub updates: Vec<CohortFieldUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CohortFieldUpdate {
SetLeaderId(String),
SetCellCount(u32),
SetTotalMemberCount(u32),
SetOperationalCount(u32),
SetAvgFuelMinutes(f32),
SetWorstHealth(i32),
SetReadinessScore(f32),
UpdatePositionCentroid(Position),
AddCellId(String),
RemoveCellId(String),
AddCapability(Capability),
RemoveCapability(String),
UpdateBoundingBox(BoundingBox),
UpdateAggregatedAt(Timestamp),
}
impl CohortDelta {
pub fn into_ditto_updates(self) -> Vec<(String, serde_json::Value)> {
let mut updates = Vec::new();
for update in self.updates {
match update {
CohortFieldUpdate::SetLeaderId(id) => {
updates.push(("leader_id".to_string(), json!(id)));
}
CohortFieldUpdate::SetCellCount(count) => {
updates.push(("cell_count".to_string(), json!(count)));
}
CohortFieldUpdate::SetTotalMemberCount(count) => {
updates.push(("total_member_count".to_string(), json!(count)));
}
CohortFieldUpdate::SetOperationalCount(count) => {
updates.push(("operational_count".to_string(), json!(count)));
}
CohortFieldUpdate::SetAvgFuelMinutes(fuel) => {
updates.push(("avg_fuel_minutes".to_string(), json!(fuel)));
}
CohortFieldUpdate::SetWorstHealth(health) => {
updates.push(("worst_health".to_string(), json!(health)));
}
CohortFieldUpdate::SetReadinessScore(score) => {
updates.push(("readiness_score".to_string(), json!(score)));
}
CohortFieldUpdate::UpdatePositionCentroid(pos) => {
updates.push((
"position_centroid".to_string(),
serde_json::to_value(pos).unwrap_or(json!(null)),
));
}
CohortFieldUpdate::AddCellId(id) => {
updates.push(("cell_ids.$add".to_string(), json!(id)));
}
CohortFieldUpdate::RemoveCellId(id) => {
updates.push(("cell_ids.$remove".to_string(), json!(id)));
}
CohortFieldUpdate::AddCapability(cap) => {
updates.push((
"aggregated_capabilities.$add".to_string(),
serde_json::to_value(cap).unwrap_or(json!(null)),
));
}
CohortFieldUpdate::RemoveCapability(cap_id) => {
updates.push(("aggregated_capabilities.$remove".to_string(), json!(cap_id)));
}
CohortFieldUpdate::UpdateBoundingBox(bbox) => {
updates.push((
"bounding_box".to_string(),
serde_json::to_value(bbox).unwrap_or(json!(null)),
));
}
CohortFieldUpdate::UpdateAggregatedAt(ts) => {
updates.push((
"aggregated_at".to_string(),
serde_json::to_value(ts).unwrap_or(json!(null)),
));
}
}
}
updates.push(("last_update_us".to_string(), json!(self.timestamp_us)));
updates.push(("sequence".to_string(), json!(self.sequence)));
updates
}
pub fn size_bytes(&self) -> usize {
let base_overhead = 64;
let per_update_overhead = 16;
let updates_size: usize = self
.updates
.iter()
.map(|u| match u {
CohortFieldUpdate::SetLeaderId(s) => s.len() + per_update_overhead,
CohortFieldUpdate::SetCellCount(_) => 4 + per_update_overhead,
CohortFieldUpdate::SetTotalMemberCount(_) => 4 + per_update_overhead,
CohortFieldUpdate::SetOperationalCount(_) => 4 + per_update_overhead,
CohortFieldUpdate::SetAvgFuelMinutes(_) => 4 + per_update_overhead,
CohortFieldUpdate::SetWorstHealth(_) => 4 + per_update_overhead,
CohortFieldUpdate::SetReadinessScore(_) => 4 + per_update_overhead,
CohortFieldUpdate::UpdatePositionCentroid(_) => 24 + per_update_overhead,
CohortFieldUpdate::AddCellId(s) => s.len() + per_update_overhead,
CohortFieldUpdate::RemoveCellId(s) => s.len() + per_update_overhead,
CohortFieldUpdate::AddCapability(_) => 128 + per_update_overhead,
CohortFieldUpdate::RemoveCapability(s) => s.len() + per_update_overhead,
CohortFieldUpdate::UpdateBoundingBox(_) => 64 + per_update_overhead,
CohortFieldUpdate::UpdateAggregatedAt(_) => 16 + per_update_overhead,
})
.sum();
base_overhead + updates_size
}
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FederationDelta {
pub federation_id: String,
pub timestamp_us: u64,
pub sequence: u64,
pub updates: Vec<FederationFieldUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FederationFieldUpdate {
SetLeaderId(String),
SetCohortCount(u32),
SetTotalMemberCount(u32),
SetOperationalCount(u32),
SetAvgFuelMinutes(f32),
SetWorstHealth(i32),
SetReadinessScore(f32),
UpdatePositionCentroid(Position),
AddCohortId(String),
RemoveCohortId(String),
AddCapability(Capability),
RemoveCapability(String),
UpdateBoundingBox(BoundingBox),
UpdateAggregatedAt(Timestamp),
}
impl FederationDelta {
pub fn into_ditto_updates(self) -> Vec<(String, serde_json::Value)> {
let mut updates = Vec::new();
for update in self.updates {
match update {
FederationFieldUpdate::SetLeaderId(id) => {
updates.push(("leader_id".to_string(), json!(id)));
}
FederationFieldUpdate::SetCohortCount(count) => {
updates.push(("cohort_count".to_string(), json!(count)));
}
FederationFieldUpdate::SetTotalMemberCount(count) => {
updates.push(("total_member_count".to_string(), json!(count)));
}
FederationFieldUpdate::SetOperationalCount(count) => {
updates.push(("operational_count".to_string(), json!(count)));
}
FederationFieldUpdate::SetAvgFuelMinutes(fuel) => {
updates.push(("avg_fuel_minutes".to_string(), json!(fuel)));
}
FederationFieldUpdate::SetWorstHealth(health) => {
updates.push(("worst_health".to_string(), json!(health)));
}
FederationFieldUpdate::SetReadinessScore(score) => {
updates.push(("readiness_score".to_string(), json!(score)));
}
FederationFieldUpdate::UpdatePositionCentroid(pos) => {
updates.push((
"position_centroid".to_string(),
serde_json::to_value(pos).unwrap_or(json!(null)),
));
}
FederationFieldUpdate::AddCohortId(id) => {
updates.push(("cohort_ids.$add".to_string(), json!(id)));
}
FederationFieldUpdate::RemoveCohortId(id) => {
updates.push(("cohort_ids.$remove".to_string(), json!(id)));
}
FederationFieldUpdate::AddCapability(cap) => {
updates.push((
"aggregated_capabilities.$add".to_string(),
serde_json::to_value(cap).unwrap_or(json!(null)),
));
}
FederationFieldUpdate::RemoveCapability(cap_id) => {
updates.push(("aggregated_capabilities.$remove".to_string(), json!(cap_id)));
}
FederationFieldUpdate::UpdateBoundingBox(bbox) => {
updates.push((
"bounding_box".to_string(),
serde_json::to_value(bbox).unwrap_or(json!(null)),
));
}
FederationFieldUpdate::UpdateAggregatedAt(ts) => {
updates.push((
"aggregated_at".to_string(),
serde_json::to_value(ts).unwrap_or(json!(null)),
));
}
}
}
updates.push(("last_update_us".to_string(), json!(self.timestamp_us)));
updates.push(("sequence".to_string(), json!(self.sequence)));
updates
}
pub fn size_bytes(&self) -> usize {
let base_overhead = 64;
let per_update_overhead = 16;
let updates_size: usize = self
.updates
.iter()
.map(|u| match u {
FederationFieldUpdate::SetLeaderId(s) => s.len() + per_update_overhead,
FederationFieldUpdate::SetCohortCount(_) => 4 + per_update_overhead,
FederationFieldUpdate::SetTotalMemberCount(_) => 4 + per_update_overhead,
FederationFieldUpdate::SetOperationalCount(_) => 4 + per_update_overhead,
FederationFieldUpdate::SetAvgFuelMinutes(_) => 4 + per_update_overhead,
FederationFieldUpdate::SetWorstHealth(_) => 4 + per_update_overhead,
FederationFieldUpdate::SetReadinessScore(_) => 4 + per_update_overhead,
FederationFieldUpdate::UpdatePositionCentroid(_) => 24 + per_update_overhead,
FederationFieldUpdate::AddCohortId(s) => s.len() + per_update_overhead,
FederationFieldUpdate::RemoveCohortId(s) => s.len() + per_update_overhead,
FederationFieldUpdate::AddCapability(_) => 128 + per_update_overhead,
FederationFieldUpdate::RemoveCapability(s) => s.len() + per_update_overhead,
FederationFieldUpdate::UpdateBoundingBox(_) => 64 + per_update_overhead,
FederationFieldUpdate::UpdateAggregatedAt(_) => 16 + per_update_overhead,
})
.sum();
base_overhead + updates_size
}
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CoalitionDelta {
pub coalition_id: String,
pub timestamp_us: u64,
pub sequence: u64,
pub updates: Vec<CoalitionFieldUpdate>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CoalitionFieldUpdate {
SetLeaderId(String),
SetFederationCount(u32),
SetTotalMemberCount(u32),
SetOperationalCount(u32),
SetAvgFuelMinutes(f32),
SetWorstHealth(i32),
SetReadinessScore(f32),
UpdatePositionCentroid(Position),
AddFederationId(String),
RemoveFederationId(String),
AddCapability(Capability),
RemoveCapability(String),
UpdateBoundingBox(BoundingBox),
UpdateAggregatedAt(Timestamp),
}
impl CoalitionDelta {
pub fn into_ditto_updates(self) -> Vec<(String, serde_json::Value)> {
let mut updates = Vec::new();
for update in self.updates {
match update {
CoalitionFieldUpdate::SetLeaderId(id) => {
updates.push(("leader_id".to_string(), json!(id)));
}
CoalitionFieldUpdate::SetFederationCount(count) => {
updates.push(("federation_count".to_string(), json!(count)));
}
CoalitionFieldUpdate::SetTotalMemberCount(count) => {
updates.push(("total_member_count".to_string(), json!(count)));
}
CoalitionFieldUpdate::SetOperationalCount(count) => {
updates.push(("operational_count".to_string(), json!(count)));
}
CoalitionFieldUpdate::SetAvgFuelMinutes(fuel) => {
updates.push(("avg_fuel_minutes".to_string(), json!(fuel)));
}
CoalitionFieldUpdate::SetWorstHealth(health) => {
updates.push(("worst_health".to_string(), json!(health)));
}
CoalitionFieldUpdate::SetReadinessScore(score) => {
updates.push(("readiness_score".to_string(), json!(score)));
}
CoalitionFieldUpdate::UpdatePositionCentroid(pos) => {
updates.push((
"position_centroid".to_string(),
serde_json::to_value(pos).unwrap_or(json!(null)),
));
}
CoalitionFieldUpdate::AddFederationId(id) => {
updates.push(("federation_ids.$add".to_string(), json!(id)));
}
CoalitionFieldUpdate::RemoveFederationId(id) => {
updates.push(("federation_ids.$remove".to_string(), json!(id)));
}
CoalitionFieldUpdate::AddCapability(cap) => {
updates.push((
"aggregated_capabilities.$add".to_string(),
serde_json::to_value(cap).unwrap_or(json!(null)),
));
}
CoalitionFieldUpdate::RemoveCapability(cap_id) => {
updates.push(("aggregated_capabilities.$remove".to_string(), json!(cap_id)));
}
CoalitionFieldUpdate::UpdateBoundingBox(bbox) => {
updates.push((
"bounding_box".to_string(),
serde_json::to_value(bbox).unwrap_or(json!(null)),
));
}
CoalitionFieldUpdate::UpdateAggregatedAt(ts) => {
updates.push((
"aggregated_at".to_string(),
serde_json::to_value(ts).unwrap_or(json!(null)),
));
}
}
}
updates.push(("last_update_us".to_string(), json!(self.timestamp_us)));
updates.push(("sequence".to_string(), json!(self.sequence)));
updates
}
pub fn size_bytes(&self) -> usize {
let base_overhead = 64;
let per_update_overhead = 16;
let updates_size: usize = self
.updates
.iter()
.map(|u| match u {
CoalitionFieldUpdate::SetLeaderId(s) => s.len() + per_update_overhead,
CoalitionFieldUpdate::SetFederationCount(_) => 4 + per_update_overhead,
CoalitionFieldUpdate::SetTotalMemberCount(_) => 4 + per_update_overhead,
CoalitionFieldUpdate::SetOperationalCount(_) => 4 + per_update_overhead,
CoalitionFieldUpdate::SetAvgFuelMinutes(_) => 4 + per_update_overhead,
CoalitionFieldUpdate::SetWorstHealth(_) => 4 + per_update_overhead,
CoalitionFieldUpdate::SetReadinessScore(_) => 4 + per_update_overhead,
CoalitionFieldUpdate::UpdatePositionCentroid(_) => 24 + per_update_overhead,
CoalitionFieldUpdate::AddFederationId(s) => s.len() + per_update_overhead,
CoalitionFieldUpdate::RemoveFederationId(s) => s.len() + per_update_overhead,
CoalitionFieldUpdate::AddCapability(_) => 128 + per_update_overhead,
CoalitionFieldUpdate::RemoveCapability(s) => s.len() + per_update_overhead,
CoalitionFieldUpdate::UpdateBoundingBox(_) => 64 + per_update_overhead,
CoalitionFieldUpdate::UpdateAggregatedAt(_) => 16 + per_update_overhead,
})
.sum();
base_overhead + updates_size
}
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}
}
pub fn current_timestamp_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64
}
use peat_schema::hierarchy::v1::{CellSummary, CoalitionSummary, CohortSummary, FederationSummary};
impl CellDelta {
#[allow(clippy::vec_init_then_push, clippy::clone_on_copy)]
pub fn from_summary(summary: &CellSummary, sequence: u64) -> Self {
let mut updates: Vec<CellFieldUpdate> = Vec::new();
updates.push(CellFieldUpdate::SetLeaderId(summary.leader_id.clone()));
updates.push(CellFieldUpdate::SetMemberCount(summary.member_count));
updates.push(CellFieldUpdate::SetOperationalCount(
summary.operational_count,
));
updates.push(CellFieldUpdate::SetAvgFuelMinutes(summary.avg_fuel_minutes));
updates.push(CellFieldUpdate::SetWorstHealth(summary.worst_health));
updates.push(CellFieldUpdate::SetReadinessScore(summary.readiness_score));
if let Some(pos) = &summary.position_centroid {
updates.push(CellFieldUpdate::UpdatePositionCentroid(pos.clone()));
}
for member_id in &summary.member_ids {
updates.push(CellFieldUpdate::AddMemberId(member_id.clone()));
}
for capability in &summary.aggregated_capabilities {
updates.push(CellFieldUpdate::AddCapability(capability.clone()));
}
if let Some(bbox) = &summary.bounding_box {
updates.push(CellFieldUpdate::UpdateBoundingBox(bbox.clone()));
}
if let Some(ts) = &summary.aggregated_at {
updates.push(CellFieldUpdate::UpdateAggregatedAt(ts.clone()));
}
Self {
cell_id: summary.cell_id.clone(),
timestamp_us: current_timestamp_us(),
sequence,
updates,
}
}
}
impl CohortDelta {
#[allow(clippy::vec_init_then_push, clippy::clone_on_copy)]
pub fn from_summary(summary: &CohortSummary, sequence: u64) -> Self {
let mut updates: Vec<CohortFieldUpdate> = Vec::new();
updates.push(CohortFieldUpdate::SetLeaderId(summary.leader_id.clone()));
updates.push(CohortFieldUpdate::SetCellCount(summary.cell_count));
updates.push(CohortFieldUpdate::SetTotalMemberCount(
summary.total_member_count,
));
updates.push(CohortFieldUpdate::SetOperationalCount(
summary.operational_count,
));
updates.push(CohortFieldUpdate::SetAvgFuelMinutes(
summary.avg_fuel_minutes,
));
updates.push(CohortFieldUpdate::SetWorstHealth(summary.worst_health));
updates.push(CohortFieldUpdate::SetReadinessScore(
summary.readiness_score,
));
if let Some(pos) = &summary.position_centroid {
updates.push(CohortFieldUpdate::UpdatePositionCentroid(pos.clone()));
}
for cell_id in &summary.cell_ids {
updates.push(CohortFieldUpdate::AddCellId(cell_id.clone()));
}
for capability in &summary.aggregated_capabilities {
updates.push(CohortFieldUpdate::AddCapability(capability.clone()));
}
if let Some(bbox) = &summary.bounding_box {
updates.push(CohortFieldUpdate::UpdateBoundingBox(bbox.clone()));
}
if let Some(ts) = &summary.aggregated_at {
updates.push(CohortFieldUpdate::UpdateAggregatedAt(ts.clone()));
}
Self {
cohort_id: summary.cohort_id.clone(),
timestamp_us: current_timestamp_us(),
sequence,
updates,
}
}
}
impl FederationDelta {
#[allow(clippy::vec_init_then_push, clippy::clone_on_copy)]
pub fn from_summary(summary: &FederationSummary, sequence: u64) -> Self {
let mut updates: Vec<FederationFieldUpdate> = Vec::new();
updates.push(FederationFieldUpdate::SetLeaderId(
summary.leader_id.clone(),
));
updates.push(FederationFieldUpdate::SetCohortCount(summary.cohort_count));
updates.push(FederationFieldUpdate::SetTotalMemberCount(
summary.total_member_count,
));
updates.push(FederationFieldUpdate::SetOperationalCount(
summary.operational_count,
));
updates.push(FederationFieldUpdate::SetAvgFuelMinutes(
summary.avg_fuel_minutes,
));
updates.push(FederationFieldUpdate::SetWorstHealth(summary.worst_health));
updates.push(FederationFieldUpdate::SetReadinessScore(
summary.readiness_score,
));
if let Some(pos) = &summary.position_centroid {
updates.push(FederationFieldUpdate::UpdatePositionCentroid(pos.clone()));
}
for cohort_id in &summary.cohort_ids {
updates.push(FederationFieldUpdate::AddCohortId(cohort_id.clone()));
}
for capability in &summary.aggregated_capabilities {
updates.push(FederationFieldUpdate::AddCapability(capability.clone()));
}
if let Some(bbox) = &summary.bounding_box {
updates.push(FederationFieldUpdate::UpdateBoundingBox(bbox.clone()));
}
if let Some(ts) = &summary.aggregated_at {
updates.push(FederationFieldUpdate::UpdateAggregatedAt(ts.clone()));
}
Self {
federation_id: summary.federation_id.clone(),
timestamp_us: current_timestamp_us(),
sequence,
updates,
}
}
}
impl CoalitionDelta {
#[allow(clippy::vec_init_then_push, clippy::clone_on_copy)]
pub fn from_summary(summary: &CoalitionSummary, sequence: u64) -> Self {
let mut updates: Vec<CoalitionFieldUpdate> = Vec::new();
updates.push(CoalitionFieldUpdate::SetLeaderId(summary.leader_id.clone()));
updates.push(CoalitionFieldUpdate::SetFederationCount(
summary.federation_count,
));
updates.push(CoalitionFieldUpdate::SetTotalMemberCount(
summary.total_member_count,
));
updates.push(CoalitionFieldUpdate::SetOperationalCount(
summary.operational_count,
));
updates.push(CoalitionFieldUpdate::SetAvgFuelMinutes(
summary.avg_fuel_minutes,
));
updates.push(CoalitionFieldUpdate::SetWorstHealth(summary.worst_health));
updates.push(CoalitionFieldUpdate::SetReadinessScore(
summary.readiness_score,
));
if let Some(pos) = &summary.position_centroid {
updates.push(CoalitionFieldUpdate::UpdatePositionCentroid(pos.clone()));
}
for federation_id in &summary.federation_ids {
updates.push(CoalitionFieldUpdate::AddFederationId(federation_id.clone()));
}
for capability in &summary.aggregated_capabilities {
updates.push(CoalitionFieldUpdate::AddCapability(capability.clone()));
}
if let Some(bbox) = &summary.bounding_box {
updates.push(CoalitionFieldUpdate::UpdateBoundingBox(bbox.clone()));
}
if let Some(ts) = &summary.aggregated_at {
updates.push(CoalitionFieldUpdate::UpdateAggregatedAt(ts.clone()));
}
Self {
coalition_id: summary.coalition_id.clone(),
timestamp_us: current_timestamp_us(),
sequence,
updates,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cell_delta_serialization() {
let delta = CellDelta {
cell_id: "cell-1A".to_string(),
timestamp_us: 1234567890,
sequence: 42,
updates: vec![
CellFieldUpdate::SetMemberCount(7),
CellFieldUpdate::SetOperationalCount(6),
CellFieldUpdate::AddMemberId("node-8".to_string()),
],
};
let json = serde_json::to_string(&delta).unwrap();
assert!(json.contains("cell-1A"));
assert!(json.contains("SetMemberCount"));
let deserialized: CellDelta = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.cell_id, "cell-1A");
assert_eq!(deserialized.updates.len(), 3);
}
#[test]
fn test_cell_delta_into_ditto_updates() {
let delta = CellDelta {
cell_id: "cell-1A".to_string(),
timestamp_us: 1234567890,
sequence: 42,
updates: vec![
CellFieldUpdate::SetLeaderId("leader-1".to_string()),
CellFieldUpdate::SetMemberCount(8),
CellFieldUpdate::AddMemberId("node-9".to_string()),
],
};
let ditto_updates = delta.into_ditto_updates();
assert!(ditto_updates.len() >= 3);
let leader_update = ditto_updates
.iter()
.find(|(path, _)| path == "leader_id")
.unwrap();
assert_eq!(leader_update.1, json!("leader-1"));
let member_count_update = ditto_updates
.iter()
.find(|(path, _)| path == "member_count")
.unwrap();
assert_eq!(member_count_update.1, json!(8));
let add_member_update = ditto_updates
.iter()
.find(|(path, _)| path == "member_ids.$add")
.unwrap();
assert_eq!(add_member_update.1, json!("node-9"));
}
#[test]
fn test_delta_size_estimation() {
let small_delta = CellDelta {
cell_id: "cell-1A".to_string(),
timestamp_us: 1234567890,
sequence: 1,
updates: vec![CellFieldUpdate::SetMemberCount(7)],
};
let large_delta = CellDelta {
cell_id: "cell-1A".to_string(),
timestamp_us: 1234567890,
sequence: 1,
updates: vec![
CellFieldUpdate::SetMemberCount(7),
CellFieldUpdate::SetOperationalCount(6),
CellFieldUpdate::AddMemberId("node-123456789".to_string()),
CellFieldUpdate::UpdatePositionCentroid(Position {
latitude: 37.7749,
longitude: -122.4194,
altitude: 100.0,
}),
],
};
assert!(small_delta.size_bytes() < 150);
assert!(large_delta.size_bytes() < 500);
assert!(large_delta.size_bytes() > small_delta.size_bytes());
}
#[test]
fn test_empty_delta() {
let delta = CellDelta {
cell_id: "cell-1A".to_string(),
timestamp_us: 1234567890,
sequence: 1,
updates: vec![],
};
assert!(delta.is_empty());
assert_eq!(delta.updates.len(), 0);
}
#[test]
fn test_cohort_delta_basic() {
let delta = CohortDelta {
cohort_id: "cohort-1".to_string(),
timestamp_us: 1234567890,
sequence: 10,
updates: vec![
CohortFieldUpdate::SetCellCount(3),
CohortFieldUpdate::AddCellId("cell-1A".to_string()),
],
};
let ditto_updates = delta.into_ditto_updates();
assert!(ditto_updates.len() >= 2);
let cell_count = ditto_updates
.iter()
.find(|(path, _)| path == "cell_count")
.unwrap();
assert_eq!(cell_count.1, json!(3));
}
#[test]
fn test_federation_delta_basic() {
let delta = FederationDelta {
federation_id: "federation-alpha".to_string(),
timestamp_us: 1234567890,
sequence: 5,
updates: vec![
FederationFieldUpdate::SetCohortCount(4),
FederationFieldUpdate::SetTotalMemberCount(96),
],
};
assert!(!delta.is_empty());
assert_eq!(delta.updates.len(), 2);
}
#[test]
fn test_coalition_delta_basic() {
let delta = CoalitionDelta {
coalition_id: "coalition-1".to_string(),
timestamp_us: 1234567890,
sequence: 7,
updates: vec![
CoalitionFieldUpdate::SetFederationCount(3),
CoalitionFieldUpdate::AddFederationId("federation-alpha".to_string()),
CoalitionFieldUpdate::SetTotalMemberCount(300),
],
};
assert!(!delta.is_empty());
assert_eq!(delta.updates.len(), 3);
let ditto_updates = delta.into_ditto_updates();
let federation_count = ditto_updates
.iter()
.find(|(path, _)| path == "federation_count")
.unwrap();
assert_eq!(federation_count.1, json!(3));
}
#[test]
fn test_current_timestamp_us() {
let ts1 = current_timestamp_us();
std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = current_timestamp_us();
assert!(ts2 > ts1);
assert!(ts1 > 1_600_000_000_000_000);
}
}