Skip to main content

nodedb_cluster/distributed_graph/
types.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Shared BSP message types for distributed graph algorithms.
4
5use serde::{Deserialize, Serialize};
6
7/// Superstep barrier message: coordinator → all shards.
8#[derive(
9    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
10)]
11pub struct SuperstepBarrier {
12    pub algorithm: String,
13    pub iteration: u32,
14    pub max_iterations: u32,
15    pub params: String,
16    /// Bitemporal system-time ordinal for the algorithm run. `None` means
17    /// "current state"; shards building their local `CsrSnapshot` from an
18    /// `EdgeStore` thread this through to `scan_all_edges_decoded` so every
19    /// shard sees the same historical topology.
20    #[serde(default)]
21    pub system_as_of: Option<i64>,
22}
23
24/// Boundary vertex contributions: shard → target shard (scatter phase).
25#[derive(
26    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
27)]
28pub struct BoundaryContributions {
29    pub iteration: u32,
30    pub source_shard: u32,
31    pub contributions: Vec<(String, f64)>,
32}
33
34/// Superstep acknowledgement: shard → coordinator (gather phase).
35#[derive(
36    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
37)]
38pub struct SuperstepAck {
39    pub shard_id: u32,
40    pub iteration: u32,
41    pub local_delta: f64,
42    pub vertex_count: usize,
43    pub contributions_sent: usize,
44}
45
46/// Algorithm completion signal: coordinator → all shards.
47#[derive(
48    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
49)]
50pub struct AlgoComplete {
51    pub iterations: u32,
52    pub converged: bool,
53    pub final_delta: f64,
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59
60    #[test]
61    fn superstep_barrier_serde() {
62        let barrier = SuperstepBarrier {
63            algorithm: "pagerank".into(),
64            iteration: 3,
65            max_iterations: 20,
66            params: r#"{"damping":0.85}"#.into(),
67            system_as_of: None,
68        };
69        let bytes = zerompk::to_msgpack_vec(&barrier).unwrap();
70        let decoded: SuperstepBarrier = zerompk::from_msgpack(&bytes).unwrap();
71        assert_eq!(decoded.iteration, 3);
72        assert!(decoded.system_as_of.is_none());
73    }
74
75    #[test]
76    fn superstep_barrier_carries_system_as_of() {
77        let barrier = SuperstepBarrier {
78            algorithm: "pagerank".into(),
79            iteration: 1,
80            max_iterations: 10,
81            params: String::new(),
82            system_as_of: Some(1_700_000_000_000_000_000),
83        };
84        let bytes = zerompk::to_msgpack_vec(&barrier).unwrap();
85        let decoded: SuperstepBarrier = zerompk::from_msgpack(&bytes).unwrap();
86        assert_eq!(decoded.system_as_of, Some(1_700_000_000_000_000_000));
87    }
88
89    #[test]
90    fn boundary_contributions_serde() {
91        let contrib = BoundaryContributions {
92            iteration: 1,
93            source_shard: 5,
94            contributions: vec![("alice".into(), 0.042), ("bob".into(), 0.031)],
95        };
96        let bytes = zerompk::to_msgpack_vec(&contrib).unwrap();
97        let decoded: BoundaryContributions = zerompk::from_msgpack(&bytes).unwrap();
98        assert_eq!(decoded.contributions.len(), 2);
99    }
100
101    #[test]
102    fn superstep_ack_serde() {
103        let ack = SuperstepAck {
104            shard_id: 3,
105            iteration: 2,
106            local_delta: 0.001,
107            vertex_count: 1000,
108            contributions_sent: 50,
109        };
110        let bytes = zerompk::to_msgpack_vec(&ack).unwrap();
111        let decoded: SuperstepAck = zerompk::from_msgpack(&bytes).unwrap();
112        assert!((decoded.local_delta - 0.001).abs() < 1e-10);
113    }
114
115    #[test]
116    fn algo_complete_serde() {
117        let msg = AlgoComplete {
118            iterations: 15,
119            converged: true,
120            final_delta: 1e-8,
121        };
122        let bytes = zerompk::to_msgpack_vec(&msg).unwrap();
123        let decoded: AlgoComplete = zerompk::from_msgpack(&bytes).unwrap();
124        assert!(decoded.converged);
125    }
126}