rustkernel_core/
messages.rs

1//! Message types and traits for Ring kernel communication.
2//!
3//! This module provides the base message infrastructure for GPU-native
4//! persistent actor communication using RingKernel's K2K messaging.
5
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicU64, Ordering};
8
9// Re-export core message types from ringkernel
10pub use ringkernel_core::{MessageHeader, MessageId, RingMessage};
11
12/// Global message ID counter for correlation tracking.
13static MESSAGE_COUNTER: AtomicU64 = AtomicU64::new(1);
14
15/// Generate a new unique message ID.
16#[must_use]
17pub fn next_message_id() -> MessageId {
18    MessageId(MESSAGE_COUNTER.fetch_add(1, Ordering::SeqCst))
19}
20
21/// Correlation ID for request-response pairing.
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct CorrelationId(pub u64);
24
25impl CorrelationId {
26    /// Create a new correlation ID.
27    #[must_use]
28    pub fn new() -> Self {
29        Self(MESSAGE_COUNTER.fetch_add(1, Ordering::SeqCst))
30    }
31
32    /// Create from a raw value.
33    #[must_use]
34    pub const fn from_raw(value: u64) -> Self {
35        Self(value)
36    }
37}
38
39impl Default for CorrelationId {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45/// Trait for batch kernel messages (CPU-orchestrated execution).
46///
47/// This trait provides serialization and type information for messages
48/// used with `BatchKernel` implementations. Unlike `RingMessage` which
49/// uses rkyv for GPU-native serialization, `BatchMessage` uses serde
50/// for JSON serialization suitable for CPU-side orchestration.
51///
52/// Typically derived using `#[derive(KernelMessage)]`:
53///
54/// ```ignore
55/// #[derive(Debug, Clone, Serialize, Deserialize, KernelMessage)]
56/// #[message(type_id = 100, domain = "GraphAnalytics")]
57/// pub struct PageRankInput {
58///     pub graph: CsrGraph,
59///     pub damping: f64,
60/// }
61/// ```
62pub trait BatchMessage:
63    serde::Serialize + for<'de> serde::Deserialize<'de> + Send + Sync + 'static
64{
65    /// Get the message type ID.
66    fn message_type_id() -> u64;
67
68    /// Serialize to JSON bytes.
69    fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
70        serde_json::to_vec(self)
71    }
72
73    /// Deserialize from JSON bytes.
74    fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error>
75    where
76        Self: Sized,
77    {
78        serde_json::from_slice(bytes)
79    }
80
81    /// Get the size hint for serialized data.
82    fn size_hint(&self) -> usize {
83        std::mem::size_of::<Self>()
84    }
85}
86
87/// Base trait for kernel request messages.
88pub trait KernelRequest: RingMessage + Send + Sync {
89    /// Get the correlation ID for this request.
90    fn correlation_id(&self) -> CorrelationId;
91
92    /// Set the correlation ID.
93    fn set_correlation_id(&mut self, id: CorrelationId);
94}
95
96/// Base trait for kernel response messages.
97pub trait KernelResponse: RingMessage + Send + Sync {
98    /// Get the correlation ID that this response corresponds to.
99    fn correlation_id(&self) -> CorrelationId;
100
101    /// Check if the response indicates success.
102    fn is_success(&self) -> bool;
103
104    /// Get any error message if the response indicates failure.
105    fn error_message(&self) -> Option<&str>;
106}
107
108/// Generic result wrapper for kernel responses.
109#[derive(Clone, Debug, Serialize, Deserialize)]
110pub struct KernelResult<T> {
111    /// Correlation ID linking to the original request.
112    pub correlation_id: CorrelationId,
113    /// The result data, if successful.
114    pub data: Option<T>,
115    /// Error message, if failed.
116    pub error: Option<String>,
117}
118
119impl<T> KernelResult<T> {
120    /// Create a successful result.
121    pub fn success(correlation_id: CorrelationId, data: T) -> Self {
122        Self {
123            correlation_id,
124            data: Some(data),
125            error: None,
126        }
127    }
128
129    /// Create a failed result.
130    pub fn failure(correlation_id: CorrelationId, error: impl Into<String>) -> Self {
131        Self {
132            correlation_id,
133            data: None,
134            error: Some(error.into()),
135        }
136    }
137
138    /// Check if this is a successful result.
139    #[must_use]
140    pub fn is_success(&self) -> bool {
141        self.data.is_some() && self.error.is_none()
142    }
143
144    /// Convert to a standard Result type.
145    pub fn into_result(self) -> Result<T, String> {
146        match (self.data, self.error) {
147            (Some(data), None) => Ok(data),
148            (_, Some(err)) => Err(err),
149            (None, None) => Err("No data or error provided".to_string()),
150        }
151    }
152}
153
154/// Message type IDs for each domain.
155///
156/// These are used for routing and serialization in K2K messaging.
157/// Each domain has a reserved range of IDs.
158#[allow(missing_docs)]
159pub mod type_ids {
160    // Graph Analytics domain (100-199)
161    pub const PAGERANK_REQUEST: u32 = 100;
162    pub const PAGERANK_RESPONSE: u32 = 101;
163    pub const DEGREE_CENTRALITY_REQUEST: u32 = 102;
164    pub const DEGREE_CENTRALITY_RESPONSE: u32 = 103;
165    pub const BETWEENNESS_REQUEST: u32 = 104;
166    pub const BETWEENNESS_RESPONSE: u32 = 105;
167    pub const CLOSENESS_REQUEST: u32 = 106;
168    pub const CLOSENESS_RESPONSE: u32 = 107;
169    pub const EIGENVECTOR_REQUEST: u32 = 108;
170    pub const EIGENVECTOR_RESPONSE: u32 = 109;
171    pub const KATZ_REQUEST: u32 = 110;
172    pub const KATZ_RESPONSE: u32 = 111;
173    pub const COMMUNITY_REQUEST: u32 = 120;
174    pub const COMMUNITY_RESPONSE: u32 = 121;
175    pub const MOTIF_REQUEST: u32 = 130;
176    pub const MOTIF_RESPONSE: u32 = 131;
177    pub const SIMILARITY_REQUEST: u32 = 140;
178    pub const SIMILARITY_RESPONSE: u32 = 141;
179    pub const METRICS_REQUEST: u32 = 150;
180    pub const METRICS_RESPONSE: u32 = 151;
181
182    // ML domain (200-299)
183    pub const KMEANS_REQUEST: u32 = 200;
184    pub const KMEANS_RESPONSE: u32 = 201;
185    pub const DBSCAN_REQUEST: u32 = 202;
186    pub const DBSCAN_RESPONSE: u32 = 203;
187    pub const HIERARCHICAL_REQUEST: u32 = 204;
188    pub const HIERARCHICAL_RESPONSE: u32 = 205;
189    pub const ISOLATION_FOREST_REQUEST: u32 = 210;
190    pub const ISOLATION_FOREST_RESPONSE: u32 = 211;
191    pub const LOF_REQUEST: u32 = 212;
192    pub const LOF_RESPONSE: u32 = 213;
193    pub const REGRESSION_REQUEST: u32 = 220;
194    pub const REGRESSION_RESPONSE: u32 = 221;
195
196    // Compliance domain (300-399)
197    pub const CIRCULAR_FLOW_REQUEST: u32 = 300;
198    pub const CIRCULAR_FLOW_RESPONSE: u32 = 301;
199    pub const RECIPROCITY_REQUEST: u32 = 302;
200    pub const RECIPROCITY_RESPONSE: u32 = 303;
201    pub const RAPID_MOVEMENT_REQUEST: u32 = 304;
202    pub const RAPID_MOVEMENT_RESPONSE: u32 = 305;
203    pub const AML_PATTERN_REQUEST: u32 = 306;
204    pub const AML_PATTERN_RESPONSE: u32 = 307;
205    pub const SANCTIONS_REQUEST: u32 = 310;
206    pub const SANCTIONS_RESPONSE: u32 = 311;
207    pub const KYC_REQUEST: u32 = 320;
208    pub const KYC_RESPONSE: u32 = 321;
209    pub const TRANSACTION_MONITOR_REQUEST: u32 = 330;
210    pub const TRANSACTION_MONITOR_RESPONSE: u32 = 331;
211
212    // Risk domain (400-499)
213    pub const CREDIT_RISK_REQUEST: u32 = 400;
214    pub const CREDIT_RISK_RESPONSE: u32 = 401;
215    pub const MONTE_CARLO_VAR_REQUEST: u32 = 410;
216    pub const MONTE_CARLO_VAR_RESPONSE: u32 = 411;
217    pub const PORTFOLIO_RISK_REQUEST: u32 = 412;
218    pub const PORTFOLIO_RISK_RESPONSE: u32 = 413;
219    pub const STRESS_TEST_REQUEST: u32 = 420;
220    pub const STRESS_TEST_RESPONSE: u32 = 421;
221
222    // Temporal domain (500-599)
223    pub const ARIMA_REQUEST: u32 = 500;
224    pub const ARIMA_RESPONSE: u32 = 501;
225    pub const PROPHET_REQUEST: u32 = 502;
226    pub const PROPHET_RESPONSE: u32 = 503;
227    pub const CHANGE_POINT_REQUEST: u32 = 510;
228    pub const CHANGE_POINT_RESPONSE: u32 = 511;
229    pub const VOLATILITY_REQUEST: u32 = 520;
230    pub const VOLATILITY_RESPONSE: u32 = 521;
231
232    // OrderBook domain (600-699)
233    pub const ORDER_SUBMIT_REQUEST: u32 = 600;
234    pub const ORDER_SUBMIT_RESPONSE: u32 = 601;
235    pub const ORDER_CANCEL_REQUEST: u32 = 602;
236    pub const ORDER_CANCEL_RESPONSE: u32 = 603;
237    pub const ORDER_MODIFY_REQUEST: u32 = 604;
238    pub const ORDER_MODIFY_RESPONSE: u32 = 605;
239    pub const BOOK_QUERY_REQUEST: u32 = 610;
240    pub const BOOK_QUERY_RESPONSE: u32 = 611;
241
242    // Clearing domain (700-799)
243    pub const CLEARING_VALIDATION_REQUEST: u32 = 700;
244    pub const CLEARING_VALIDATION_RESPONSE: u32 = 701;
245    pub const DVP_MATCHING_REQUEST: u32 = 710;
246    pub const DVP_MATCHING_RESPONSE: u32 = 711;
247    pub const NETTING_REQUEST: u32 = 720;
248    pub const NETTING_RESPONSE: u32 = 721;
249    pub const SETTLEMENT_REQUEST: u32 = 730;
250    pub const SETTLEMENT_RESPONSE: u32 = 731;
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn test_correlation_id() {
259        let id1 = CorrelationId::new();
260        let id2 = CorrelationId::new();
261        assert_ne!(id1, id2);
262    }
263
264    #[test]
265    fn test_message_id_generation() {
266        let id1 = next_message_id();
267        let id2 = next_message_id();
268        assert!(id2.0 > id1.0);
269    }
270
271    #[test]
272    fn test_kernel_result_success() {
273        let result = KernelResult::success(CorrelationId::new(), 42);
274        assert!(result.is_success());
275        assert_eq!(result.into_result(), Ok(42));
276    }
277
278    #[test]
279    fn test_kernel_result_failure() {
280        let result: KernelResult<i32> = KernelResult::failure(CorrelationId::new(), "error");
281        assert!(!result.is_success());
282        assert_eq!(result.into_result(), Err("error".to_string()));
283    }
284
285    #[test]
286    fn test_batch_message_trait() {
287        // Test struct implementing BatchMessage via KernelMessage derive
288        #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
289        struct TestMessage {
290            value: i32,
291        }
292
293        // Manual impl to test the trait
294        impl BatchMessage for TestMessage {
295            fn message_type_id() -> u64 {
296                42
297            }
298        }
299
300        let msg = TestMessage { value: 123 };
301
302        // Test message_type_id
303        assert_eq!(TestMessage::message_type_id(), 42);
304
305        // Test JSON serialization
306        let json = msg.to_json().expect("serialization failed");
307        assert!(!json.is_empty());
308
309        // Test JSON deserialization
310        let decoded: TestMessage = TestMessage::from_json(&json).expect("deserialization failed");
311        assert_eq!(decoded.value, 123);
312    }
313}