rustkernel_graph/
ring_messages.rs

1//! Ring message types for Graph Analytics kernels.
2//!
3//! This module defines zero-copy Ring messages for GPU-native persistent actors.
4//! Type IDs 200-299 are reserved for Graph Analytics domain.
5//!
6//! ## Type ID Allocation
7//!
8//! - 200-209: PageRank messages
9//! - 210-219: Community detection messages
10//! - 220-229: Centrality messages
11//! - 230-239: K2K coordination messages
12
13use ringkernel_derive::RingMessage;
14use rkyv::{Archive, Deserialize, Serialize};
15use rustkernel_core::messages::MessageId;
16
17// ============================================================================
18// PageRank Ring Messages (200-209)
19// ============================================================================
20
21/// PageRank query request - get score for a specific node.
22#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
23#[archive(check_bytes)]
24#[message(type_id = 200)]
25pub struct PageRankQueryRing {
26    /// Message ID.
27    pub id: MessageId,
28    /// Node ID to query.
29    pub node_id: u64,
30}
31
32/// PageRank query response.
33#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
34#[archive(check_bytes)]
35#[message(type_id = 201)]
36pub struct PageRankQueryResponse {
37    /// Original message ID.
38    pub request_id: u64,
39    /// Node ID queried.
40    pub node_id: u64,
41    /// PageRank score (fixed-point: value * 100_000_000).
42    pub score_fp: i64,
43    /// Current iteration count.
44    pub iteration: u32,
45    /// Whether algorithm has converged.
46    pub converged: bool,
47}
48
49/// PageRank iterate request - perform one power iteration step.
50#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
51#[archive(check_bytes)]
52#[message(type_id = 202)]
53pub struct PageRankIterateRing {
54    /// Message ID.
55    pub id: MessageId,
56}
57
58/// PageRank iterate response.
59#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
60#[archive(check_bytes)]
61#[message(type_id = 203)]
62pub struct PageRankIterateResponse {
63    /// Original message ID.
64    pub request_id: u64,
65    /// Iteration number.
66    pub iteration: u32,
67    /// Maximum delta in this iteration (fixed-point: value * 100_000_000).
68    pub max_delta_fp: i64,
69    /// Whether algorithm has converged.
70    pub converged: bool,
71}
72
73/// PageRank converge request - iterate until threshold or max iterations.
74#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
75#[archive(check_bytes)]
76#[message(type_id = 204)]
77pub struct PageRankConvergeRing {
78    /// Message ID.
79    pub id: MessageId,
80    /// Convergence threshold (fixed-point: value * 100_000_000).
81    pub threshold_fp: i64,
82    /// Maximum iterations.
83    pub max_iterations: u32,
84}
85
86/// PageRank convergence response.
87#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
88#[archive(check_bytes)]
89#[message(type_id = 205)]
90pub struct PageRankConvergeResponse {
91    /// Original message ID.
92    pub request_id: u64,
93    /// Total iterations executed.
94    pub iterations: u32,
95    /// Final maximum delta (fixed-point: value * 100_000_000).
96    pub final_delta_fp: i64,
97    /// Whether algorithm converged (vs hit max iterations).
98    pub converged: bool,
99}
100
101// ============================================================================
102// K2K Coordination Messages (230-239)
103// ============================================================================
104
105/// K2K iteration synchronization request.
106///
107/// Used for coordinating distributed PageRank across graph partitions.
108#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
109#[archive(check_bytes)]
110#[message(type_id = 230)]
111pub struct K2KIterationSync {
112    /// Message ID.
113    pub id: MessageId,
114    /// Worker/partition ID (hashed KernelId).
115    pub worker_id: u64,
116    /// Current iteration number.
117    pub iteration: u64,
118    /// Local delta from this partition (fixed-point: value * 100_000_000).
119    pub local_delta_fp: i64,
120    /// Number of nodes processed.
121    pub nodes_processed: u64,
122}
123
124/// K2K iteration sync response.
125#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
126#[archive(check_bytes)]
127#[message(type_id = 231)]
128pub struct K2KIterationSyncResponse {
129    /// Original message ID.
130    pub request_id: u64,
131    /// Acknowledged iteration.
132    pub iteration: u64,
133    /// Whether all workers have synced.
134    pub all_synced: bool,
135    /// Global delta (max across all partitions, fixed-point).
136    pub global_delta_fp: i64,
137    /// Whether global convergence achieved.
138    pub global_converged: bool,
139}
140
141/// K2K boundary node update.
142///
143/// When graph is partitioned, boundary nodes need score updates from other partitions.
144#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
145#[archive(check_bytes)]
146#[message(type_id = 232)]
147pub struct K2KBoundaryUpdate {
148    /// Message ID.
149    pub id: MessageId,
150    /// Source partition ID.
151    pub source_partition: u64,
152    /// Target partition ID.
153    pub target_partition: u64,
154    /// Iteration number.
155    pub iteration: u64,
156    /// Number of boundary node updates.
157    pub update_count: u32,
158    /// Boundary node IDs (serialized array).
159    pub node_ids_packed: [u64; 8],
160    /// Boundary node scores (fixed-point, serialized array).
161    pub scores_packed: [i64; 8],
162}
163
164/// K2K boundary update acknowledgment.
165#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
166#[archive(check_bytes)]
167#[message(type_id = 233)]
168pub struct K2KBoundaryUpdateAck {
169    /// Original message ID.
170    pub request_id: u64,
171    /// Iteration number.
172    pub iteration: u64,
173    /// Updates applied.
174    pub updates_applied: u32,
175}
176
177/// K2K barrier synchronization.
178///
179/// Used to synchronize all workers before proceeding to next iteration.
180#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
181#[archive(check_bytes)]
182#[message(type_id = 234)]
183pub struct K2KBarrier {
184    /// Message ID.
185    pub id: MessageId,
186    /// Barrier ID (iteration-based).
187    pub barrier_id: u64,
188    /// Worker ID.
189    pub worker_id: u64,
190    /// Workers ready count (from worker's perspective).
191    pub ready_count: u32,
192    /// Total workers expected.
193    pub total_workers: u32,
194}
195
196/// K2K barrier release.
197#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
198#[archive(check_bytes)]
199#[message(type_id = 235)]
200pub struct K2KBarrierRelease {
201    /// Original barrier ID.
202    pub barrier_id: u64,
203    /// All workers synchronized.
204    pub all_ready: bool,
205    /// Next iteration number.
206    pub next_iteration: u64,
207}
208
209/// K2K worker heartbeat.
210#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
211#[archive(check_bytes)]
212#[message(type_id = 236)]
213pub struct K2KHeartbeat {
214    /// Message ID.
215    pub id: MessageId,
216    /// Worker ID.
217    pub worker_id: u64,
218    /// Sequence number.
219    pub sequence: u64,
220    /// Timestamp in microseconds.
221    pub timestamp_us: u64,
222    /// Current state: 0=idle, 1=computing, 2=syncing, 3=converged.
223    pub state: u8,
224}
225
226// ============================================================================
227// Community Detection Ring Messages (210-219)
228// ============================================================================
229
230/// Request to compute modularity for current community assignment.
231#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
232#[archive(check_bytes)]
233#[message(type_id = 210)]
234pub struct ComputeModularityRing {
235    /// Message ID.
236    pub id: MessageId,
237}
238
239/// Modularity computation response.
240#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
241#[archive(check_bytes)]
242#[message(type_id = 211)]
243pub struct ModularityResponse {
244    /// Original message ID.
245    pub request_id: u64,
246    /// Modularity score (fixed-point: value * 100_000_000).
247    pub modularity_fp: i64,
248    /// Number of communities.
249    pub num_communities: u32,
250}
251
252/// K2K community merge proposal.
253///
254/// Used in distributed Louvain for proposing community merges across partitions.
255#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
256#[archive(check_bytes)]
257#[message(type_id = 212)]
258pub struct K2KCommunityMerge {
259    /// Message ID.
260    pub id: MessageId,
261    /// Source partition.
262    pub source_partition: u64,
263    /// Community A ID.
264    pub community_a: u64,
265    /// Community B ID.
266    pub community_b: u64,
267    /// Delta modularity from merge (fixed-point: value * 100_000_000).
268    pub delta_q_fp: i64,
269}
270
271// ============================================================================
272// Helper Functions
273// ============================================================================
274
275/// Convert f64 to fixed-point i64 (8 decimal places).
276#[inline]
277pub fn to_fixed_point(value: f64) -> i64 {
278    (value * 100_000_000.0) as i64
279}
280
281/// Convert fixed-point i64 to f64.
282#[inline]
283pub fn from_fixed_point(fp: i64) -> f64 {
284    fp as f64 / 100_000_000.0
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_fixed_point_conversion() {
293        let value = 0.85;
294        let fp = to_fixed_point(value);
295        let back = from_fixed_point(fp);
296        assert!((value - back).abs() < 1e-8);
297    }
298
299    #[test]
300    fn test_pagerank_query_ring() {
301        let msg = PageRankQueryRing {
302            id: MessageId(1),
303            node_id: 42,
304        };
305        assert_eq!(msg.node_id, 42);
306    }
307
308    #[test]
309    fn test_k2k_iteration_sync() {
310        let msg = K2KIterationSync {
311            id: MessageId(2),
312            worker_id: 1,
313            iteration: 5,
314            local_delta_fp: to_fixed_point(0.001),
315            nodes_processed: 1000,
316        };
317        assert_eq!(msg.iteration, 5);
318        let delta = from_fixed_point(msg.local_delta_fp);
319        assert!((delta - 0.001).abs() < 1e-8);
320    }
321
322    #[test]
323    fn test_k2k_barrier() {
324        let msg = K2KBarrier {
325            id: MessageId(3),
326            barrier_id: 10,
327            worker_id: 2,
328            ready_count: 3,
329            total_workers: 4,
330        };
331        assert_eq!(msg.barrier_id, 10);
332        assert_eq!(msg.ready_count, 3);
333    }
334}