1use ringkernel_derive::RingMessage;
14use rkyv::{Archive, Deserialize, Serialize};
15use rustkernel_core::messages::MessageId;
16
17#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
23#[archive(check_bytes)]
24#[message(type_id = 200)]
25pub struct PageRankQueryRing {
26 pub id: MessageId,
28 pub node_id: u64,
30}
31
32#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
34#[archive(check_bytes)]
35#[message(type_id = 201)]
36pub struct PageRankQueryResponse {
37 pub request_id: u64,
39 pub node_id: u64,
41 pub score_fp: i64,
43 pub iteration: u32,
45 pub converged: bool,
47}
48
49#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
51#[archive(check_bytes)]
52#[message(type_id = 202)]
53pub struct PageRankIterateRing {
54 pub id: MessageId,
56}
57
58#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
60#[archive(check_bytes)]
61#[message(type_id = 203)]
62pub struct PageRankIterateResponse {
63 pub request_id: u64,
65 pub iteration: u32,
67 pub max_delta_fp: i64,
69 pub converged: bool,
71}
72
73#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
75#[archive(check_bytes)]
76#[message(type_id = 204)]
77pub struct PageRankConvergeRing {
78 pub id: MessageId,
80 pub threshold_fp: i64,
82 pub max_iterations: u32,
84}
85
86#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
88#[archive(check_bytes)]
89#[message(type_id = 205)]
90pub struct PageRankConvergeResponse {
91 pub request_id: u64,
93 pub iterations: u32,
95 pub final_delta_fp: i64,
97 pub converged: bool,
99}
100
101#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
109#[archive(check_bytes)]
110#[message(type_id = 230)]
111pub struct K2KIterationSync {
112 pub id: MessageId,
114 pub worker_id: u64,
116 pub iteration: u64,
118 pub local_delta_fp: i64,
120 pub nodes_processed: u64,
122}
123
124#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
126#[archive(check_bytes)]
127#[message(type_id = 231)]
128pub struct K2KIterationSyncResponse {
129 pub request_id: u64,
131 pub iteration: u64,
133 pub all_synced: bool,
135 pub global_delta_fp: i64,
137 pub global_converged: bool,
139}
140
141#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
145#[archive(check_bytes)]
146#[message(type_id = 232)]
147pub struct K2KBoundaryUpdate {
148 pub id: MessageId,
150 pub source_partition: u64,
152 pub target_partition: u64,
154 pub iteration: u64,
156 pub update_count: u32,
158 pub node_ids_packed: [u64; 8],
160 pub scores_packed: [i64; 8],
162}
163
164#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
166#[archive(check_bytes)]
167#[message(type_id = 233)]
168pub struct K2KBoundaryUpdateAck {
169 pub request_id: u64,
171 pub iteration: u64,
173 pub updates_applied: u32,
175}
176
177#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
181#[archive(check_bytes)]
182#[message(type_id = 234)]
183pub struct K2KBarrier {
184 pub id: MessageId,
186 pub barrier_id: u64,
188 pub worker_id: u64,
190 pub ready_count: u32,
192 pub total_workers: u32,
194}
195
196#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
198#[archive(check_bytes)]
199#[message(type_id = 235)]
200pub struct K2KBarrierRelease {
201 pub barrier_id: u64,
203 pub all_ready: bool,
205 pub next_iteration: u64,
207}
208
209#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
211#[archive(check_bytes)]
212#[message(type_id = 236)]
213pub struct K2KHeartbeat {
214 pub id: MessageId,
216 pub worker_id: u64,
218 pub sequence: u64,
220 pub timestamp_us: u64,
222 pub state: u8,
224}
225
226#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
232#[archive(check_bytes)]
233#[message(type_id = 210)]
234pub struct ComputeModularityRing {
235 pub id: MessageId,
237}
238
239#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
241#[archive(check_bytes)]
242#[message(type_id = 211)]
243pub struct ModularityResponse {
244 pub request_id: u64,
246 pub modularity_fp: i64,
248 pub num_communities: u32,
250}
251
252#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
256#[archive(check_bytes)]
257#[message(type_id = 212)]
258pub struct K2KCommunityMerge {
259 pub id: MessageId,
261 pub source_partition: u64,
263 pub community_a: u64,
265 pub community_b: u64,
267 pub delta_q_fp: i64,
269}
270
271#[inline]
277pub fn to_fixed_point(value: f64) -> i64 {
278 (value * 100_000_000.0) as i64
279}
280
281#[inline]
283pub fn from_fixed_point(fp: i64) -> f64 {
284 fp as f64 / 100_000_000.0
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_fixed_point_conversion() {
293 let value = 0.85;
294 let fp = to_fixed_point(value);
295 let back = from_fixed_point(fp);
296 assert!((value - back).abs() < 1e-8);
297 }
298
299 #[test]
300 fn test_pagerank_query_ring() {
301 let msg = PageRankQueryRing {
302 id: MessageId(1),
303 node_id: 42,
304 };
305 assert_eq!(msg.node_id, 42);
306 }
307
308 #[test]
309 fn test_k2k_iteration_sync() {
310 let msg = K2KIterationSync {
311 id: MessageId(2),
312 worker_id: 1,
313 iteration: 5,
314 local_delta_fp: to_fixed_point(0.001),
315 nodes_processed: 1000,
316 };
317 assert_eq!(msg.iteration, 5);
318 let delta = from_fixed_point(msg.local_delta_fp);
319 assert!((delta - 0.001).abs() < 1e-8);
320 }
321
322 #[test]
323 fn test_k2k_barrier() {
324 let msg = K2KBarrier {
325 id: MessageId(3),
326 barrier_id: 10,
327 worker_id: 2,
328 ready_count: 3,
329 total_workers: 4,
330 };
331 assert_eq!(msg.barrier_id, 10);
332 assert_eq!(msg.ready_count, 3);
333 }
334}