nodedb_cluster/distributed_graph/
types.rs1use serde::{Deserialize, Serialize};
6
7#[derive(
9 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
10)]
11pub struct SuperstepBarrier {
12 pub algorithm: String,
13 pub iteration: u32,
14 pub max_iterations: u32,
15 pub params: String,
16 #[serde(default)]
21 pub system_as_of: Option<i64>,
22}
23
24#[derive(
26 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
27)]
28pub struct BoundaryContributions {
29 pub iteration: u32,
30 pub source_shard: u32,
31 pub contributions: Vec<(String, f64)>,
32}
33
34#[derive(
36 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
37)]
38pub struct SuperstepAck {
39 pub shard_id: u32,
40 pub iteration: u32,
41 pub local_delta: f64,
42 pub vertex_count: usize,
43 pub contributions_sent: usize,
44}
45
46#[derive(
48 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
49)]
50pub struct AlgoComplete {
51 pub iterations: u32,
52 pub converged: bool,
53 pub final_delta: f64,
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59
60 #[test]
61 fn superstep_barrier_serde() {
62 let barrier = SuperstepBarrier {
63 algorithm: "pagerank".into(),
64 iteration: 3,
65 max_iterations: 20,
66 params: r#"{"damping":0.85}"#.into(),
67 system_as_of: None,
68 };
69 let bytes = zerompk::to_msgpack_vec(&barrier).unwrap();
70 let decoded: SuperstepBarrier = zerompk::from_msgpack(&bytes).unwrap();
71 assert_eq!(decoded.iteration, 3);
72 assert!(decoded.system_as_of.is_none());
73 }
74
75 #[test]
76 fn superstep_barrier_carries_system_as_of() {
77 let barrier = SuperstepBarrier {
78 algorithm: "pagerank".into(),
79 iteration: 1,
80 max_iterations: 10,
81 params: String::new(),
82 system_as_of: Some(1_700_000_000_000_000_000),
83 };
84 let bytes = zerompk::to_msgpack_vec(&barrier).unwrap();
85 let decoded: SuperstepBarrier = zerompk::from_msgpack(&bytes).unwrap();
86 assert_eq!(decoded.system_as_of, Some(1_700_000_000_000_000_000));
87 }
88
89 #[test]
90 fn boundary_contributions_serde() {
91 let contrib = BoundaryContributions {
92 iteration: 1,
93 source_shard: 5,
94 contributions: vec![("alice".into(), 0.042), ("bob".into(), 0.031)],
95 };
96 let bytes = zerompk::to_msgpack_vec(&contrib).unwrap();
97 let decoded: BoundaryContributions = zerompk::from_msgpack(&bytes).unwrap();
98 assert_eq!(decoded.contributions.len(), 2);
99 }
100
101 #[test]
102 fn superstep_ack_serde() {
103 let ack = SuperstepAck {
104 shard_id: 3,
105 iteration: 2,
106 local_delta: 0.001,
107 vertex_count: 1000,
108 contributions_sent: 50,
109 };
110 let bytes = zerompk::to_msgpack_vec(&ack).unwrap();
111 let decoded: SuperstepAck = zerompk::from_msgpack(&bytes).unwrap();
112 assert!((decoded.local_delta - 0.001).abs() < 1e-10);
113 }
114
115 #[test]
116 fn algo_complete_serde() {
117 let msg = AlgoComplete {
118 iterations: 15,
119 converged: true,
120 final_delta: 1e-8,
121 };
122 let bytes = zerompk::to_msgpack_vec(&msg).unwrap();
123 let decoded: AlgoComplete = zerompk::from_msgpack(&bytes).unwrap();
124 assert!(decoded.converged);
125 }
126}