Skip to main content

nodedb_cluster/distributed_graph/
types.rs

1//! Shared BSP message types for distributed graph algorithms.
2
3use serde::{Deserialize, Serialize};
4
5/// Superstep barrier message: coordinator → all shards.
6#[derive(
7    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
8)]
9pub struct SuperstepBarrier {
10    pub algorithm: String,
11    pub iteration: u32,
12    pub max_iterations: u32,
13    pub params: String,
14}
15
16/// Boundary vertex contributions: shard → target shard (scatter phase).
17#[derive(
18    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
19)]
20pub struct BoundaryContributions {
21    pub iteration: u32,
22    pub source_shard: u16,
23    pub contributions: Vec<(String, f64)>,
24}
25
26/// Superstep acknowledgement: shard → coordinator (gather phase).
27#[derive(
28    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
29)]
30pub struct SuperstepAck {
31    pub shard_id: u16,
32    pub iteration: u32,
33    pub local_delta: f64,
34    pub vertex_count: usize,
35    pub contributions_sent: usize,
36}
37
38/// Algorithm completion signal: coordinator → all shards.
39#[derive(
40    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
41)]
42pub struct AlgoComplete {
43    pub iterations: u32,
44    pub converged: bool,
45    pub final_delta: f64,
46}
47
48#[cfg(test)]
49mod tests {
50    use super::*;
51
52    #[test]
53    fn superstep_barrier_serde() {
54        let barrier = SuperstepBarrier {
55            algorithm: "pagerank".into(),
56            iteration: 3,
57            max_iterations: 20,
58            params: r#"{"damping":0.85}"#.into(),
59        };
60        let bytes = zerompk::to_msgpack_vec(&barrier).unwrap();
61        let decoded: SuperstepBarrier = zerompk::from_msgpack(&bytes).unwrap();
62        assert_eq!(decoded.iteration, 3);
63    }
64
65    #[test]
66    fn boundary_contributions_serde() {
67        let contrib = BoundaryContributions {
68            iteration: 1,
69            source_shard: 5,
70            contributions: vec![("alice".into(), 0.042), ("bob".into(), 0.031)],
71        };
72        let bytes = zerompk::to_msgpack_vec(&contrib).unwrap();
73        let decoded: BoundaryContributions = zerompk::from_msgpack(&bytes).unwrap();
74        assert_eq!(decoded.contributions.len(), 2);
75    }
76
77    #[test]
78    fn superstep_ack_serde() {
79        let ack = SuperstepAck {
80            shard_id: 3,
81            iteration: 2,
82            local_delta: 0.001,
83            vertex_count: 1000,
84            contributions_sent: 50,
85        };
86        let bytes = zerompk::to_msgpack_vec(&ack).unwrap();
87        let decoded: SuperstepAck = zerompk::from_msgpack(&bytes).unwrap();
88        assert!((decoded.local_delta - 0.001).abs() < 1e-10);
89    }
90
91    #[test]
92    fn algo_complete_serde() {
93        let msg = AlgoComplete {
94            iterations: 15,
95            converged: true,
96            final_delta: 1e-8,
97        };
98        let bytes = zerompk::to_msgpack_vec(&msg).unwrap();
99        let decoded: AlgoComplete = zerompk::from_msgpack(&bytes).unwrap();
100        assert!(decoded.converged);
101    }
102}