use ringkernel_derive::RingMessage;
use rkyv::{Archive, Deserialize, Serialize};
use rustkernel_core::messages::MessageId;
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 100)]
pub struct PageRankQueryRing {
pub id: MessageId,
pub node_id: u64,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 101)]
pub struct PageRankQueryResponse {
pub request_id: u64,
pub node_id: u64,
pub score_fp: i64,
pub iteration: u32,
pub converged: bool,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 102)]
pub struct PageRankIterateRing {
pub id: MessageId,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 103)]
pub struct PageRankIterateResponse {
pub request_id: u64,
pub iteration: u32,
pub max_delta_fp: i64,
pub converged: bool,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 104)]
pub struct PageRankConvergeRing {
pub id: MessageId,
pub threshold_fp: i64,
pub max_iterations: u32,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 105)]
pub struct PageRankConvergeResponse {
pub request_id: u64,
pub iterations: u32,
pub final_delta_fp: i64,
pub converged: bool,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 130)]
pub struct K2KIterationSync {
pub id: MessageId,
pub worker_id: u64,
pub iteration: u64,
pub local_delta_fp: i64,
pub nodes_processed: u64,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 131)]
pub struct K2KIterationSyncResponse {
pub request_id: u64,
pub iteration: u64,
pub all_synced: bool,
pub global_delta_fp: i64,
pub global_converged: bool,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 132)]
pub struct K2KBoundaryUpdate {
pub id: MessageId,
pub source_partition: u64,
pub target_partition: u64,
pub iteration: u64,
pub update_count: u32,
pub node_ids_packed: [u64; 8],
pub scores_packed: [i64; 8],
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 133)]
pub struct K2KBoundaryUpdateAck {
pub request_id: u64,
pub iteration: u64,
pub updates_applied: u32,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 134)]
pub struct K2KBarrier {
pub id: MessageId,
pub barrier_id: u64,
pub worker_id: u64,
pub ready_count: u32,
pub total_workers: u32,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 135)]
pub struct K2KBarrierRelease {
pub barrier_id: u64,
pub all_ready: bool,
pub next_iteration: u64,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 136)]
pub struct K2KHeartbeat {
pub id: MessageId,
pub worker_id: u64,
pub sequence: u64,
pub timestamp_us: u64,
pub state: u8,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 110)]
pub struct ComputeModularityRing {
pub id: MessageId,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 111)]
pub struct ModularityResponse {
pub request_id: u64,
pub modularity_fp: i64,
pub num_communities: u32,
}
#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
#[archive(check_bytes)]
#[message(type_id = 112)]
pub struct K2KCommunityMerge {
pub id: MessageId,
pub source_partition: u64,
pub community_a: u64,
pub community_b: u64,
pub delta_q_fp: i64,
}
#[inline]
pub fn to_fixed_point(value: f64) -> i64 {
(value * 100_000_000.0) as i64
}
#[inline]
pub fn from_fixed_point(fp: i64) -> f64 {
fp as f64 / 100_000_000.0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fixed_point_conversion() {
let value = 0.85;
let fp = to_fixed_point(value);
let back = from_fixed_point(fp);
assert!((value - back).abs() < 1e-8);
}
#[test]
fn test_pagerank_query_ring() {
let msg = PageRankQueryRing {
id: MessageId(1),
node_id: 42,
};
assert_eq!(msg.node_id, 42);
}
#[test]
fn test_k2k_iteration_sync() {
let msg = K2KIterationSync {
id: MessageId(2),
worker_id: 1,
iteration: 5,
local_delta_fp: to_fixed_point(0.001),
nodes_processed: 1000,
};
assert_eq!(msg.iteration, 5);
let delta = from_fixed_point(msg.local_delta_fp);
assert!((delta - 0.001).abs() < 1e-8);
}
#[test]
fn test_k2k_barrier() {
let msg = K2KBarrier {
id: MessageId(3),
barrier_id: 10,
worker_id: 2,
ready_count: 3,
total_workers: 4,
};
assert_eq!(msg.barrier_id, 10);
assert_eq!(msg.ready_count, 3);
}
}