Skip to main content

ringkernel_accnet/actors/
messages.rs

1//! Message types for GPU actor communication.
2//!
3//! All messages are GPU-serializable using rkyv for zero-copy transfer.
4
5use ringkernel_core::message::{CorrelationId, MessageId, Priority};
6use ringkernel_derive::RingMessage;
7
8/// Flow generation request - triggers flow generation from journal entries.
9#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
10#[message(type_id = 100)]
11pub struct FlowGenerationRequest {
12    /// Message ID.
13    #[message(id)]
14    pub id: MessageId,
15    /// Batch ID for tracking.
16    pub batch_id: u64,
17    /// Number of journal entries to process.
18    pub entry_count: u32,
19    /// Journal entry data offset in shared buffer.
20    pub data_offset: u64,
21}
22
23/// Flow generation response - contains generated flows.
24#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
25#[message(type_id = 101)]
26pub struct FlowGenerationResponse {
27    /// Message ID.
28    #[message(id)]
29    pub id: MessageId,
30    /// Correlation to request.
31    #[message(correlation)]
32    pub correlation_id: CorrelationId,
33    /// Batch ID.
34    pub batch_id: u64,
35    /// Number of flows generated.
36    pub flow_count: u32,
37    /// Flows data offset in shared buffer.
38    pub data_offset: u64,
39}
40
41/// PageRank computation request.
42#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
43#[message(type_id = 200)]
44pub struct PageRankRequest {
45    /// Message ID.
46    #[message(id)]
47    pub id: MessageId,
48    /// Number of accounts in the network.
49    pub account_count: u32,
50    /// Number of edges (flows).
51    pub edge_count: u32,
52    /// Damping factor (typically 0.85).
53    pub damping: f32,
54    /// Number of iterations.
55    pub iterations: u32,
56    /// Graph data offset in shared buffer.
57    pub graph_offset: u64,
58}
59
60/// PageRank computation response.
61#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
62#[message(type_id = 201)]
63pub struct PageRankResponse {
64    /// Message ID.
65    #[message(id)]
66    pub id: MessageId,
67    /// Correlation to request.
68    #[message(correlation)]
69    pub correlation_id: CorrelationId,
70    /// PageRank scores offset in shared buffer.
71    pub scores_offset: u64,
72    /// Convergence achieved.
73    pub converged: bool,
74    /// Final iteration count.
75    pub iterations_run: u32,
76}
77
78/// Fraud detection request.
79#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
80#[message(type_id = 300)]
81pub struct FraudDetectionRequest {
82    /// Message ID.
83    #[message(id)]
84    pub id: MessageId,
85    /// Priority level.
86    #[message(priority)]
87    pub priority: Priority,
88    /// Network snapshot ID.
89    pub snapshot_id: u64,
90    /// Number of flows to analyze.
91    pub flow_count: u32,
92    /// Flows data offset.
93    pub flows_offset: u64,
94    /// Account data offset.
95    pub accounts_offset: u64,
96    /// Account count.
97    pub account_count: u32,
98}
99
100/// Fraud detection response.
101#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
102#[message(type_id = 301)]
103pub struct FraudDetectionResponse {
104    /// Message ID.
105    #[message(id)]
106    pub id: MessageId,
107    /// Correlation to request.
108    #[message(correlation)]
109    pub correlation_id: CorrelationId,
110    /// Number of fraud patterns detected.
111    pub pattern_count: u32,
112    /// Patterns data offset.
113    pub patterns_offset: u64,
114    /// Overall risk score (0.0 - 1.0).
115    pub risk_score: f32,
116}
117
118/// GAAP validation request.
119#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
120#[message(type_id = 400)]
121pub struct GaapValidationRequest {
122    /// Message ID.
123    #[message(id)]
124    pub id: MessageId,
125    /// Number of flows to validate.
126    pub flow_count: u32,
127    /// Flows data offset.
128    pub flows_offset: u64,
129    /// Account types offset.
130    pub account_types_offset: u64,
131}
132
133/// GAAP validation response.
134#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
135#[message(type_id = 401)]
136pub struct GaapValidationResponse {
137    /// Message ID.
138    #[message(id)]
139    pub id: MessageId,
140    /// Correlation to request.
141    #[message(correlation)]
142    pub correlation_id: CorrelationId,
143    /// Number of violations found.
144    pub violation_count: u32,
145    /// Violations data offset.
146    pub violations_offset: u64,
147}
148
149/// Benford analysis request.
150#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
151#[message(type_id = 500)]
152pub struct BenfordAnalysisRequest {
153    /// Message ID.
154    #[message(id)]
155    pub id: MessageId,
156    /// Number of amounts to analyze.
157    pub amount_count: u32,
158    /// Amounts data offset.
159    pub amounts_offset: u64,
160}
161
162/// Benford analysis response.
163#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
164#[message(type_id = 501)]
165pub struct BenfordAnalysisResponse {
166    /// Message ID.
167    #[message(id)]
168    pub id: MessageId,
169    /// Correlation to request.
170    #[message(correlation)]
171    pub correlation_id: CorrelationId,
172    /// Digit distribution (counts for digits 1-9).
173    pub digit_counts: [u32; 9],
174    /// Chi-squared statistic.
175    pub chi_squared: f32,
176    /// Is anomalous (exceeds threshold).
177    pub is_anomalous: bool,
178}
179
180/// Suspense detection request.
181#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
182#[message(type_id = 600)]
183pub struct SuspenseDetectionRequest {
184    /// Message ID.
185    #[message(id)]
186    pub id: MessageId,
187    /// Number of accounts.
188    pub account_count: u32,
189    /// Account balances offset.
190    pub balances_offset: u64,
191    /// Account risk scores offset.
192    pub risk_scores_offset: u64,
193    /// Flow counts offset.
194    pub flow_counts_offset: u64,
195}
196
197/// Suspense detection response.
198#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
199#[message(type_id = 601)]
200pub struct SuspenseDetectionResponse {
201    /// Message ID.
202    #[message(id)]
203    pub id: MessageId,
204    /// Correlation to request.
205    #[message(correlation)]
206    pub correlation_id: CorrelationId,
207    /// Number of suspense accounts detected.
208    pub suspense_count: u32,
209    /// Suspense scores offset.
210    pub scores_offset: u64,
211}
212
213/// Aggregated analytics result sent to host.
214#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
215#[message(type_id = 900)]
216pub struct AnalyticsResult {
217    /// Message ID.
218    #[message(id)]
219    pub id: MessageId,
220    /// Snapshot ID this result corresponds to.
221    pub snapshot_id: u64,
222    /// PageRank computed.
223    pub pagerank_complete: bool,
224    /// Fraud detection complete.
225    pub fraud_detection_complete: bool,
226    /// GAAP validation complete.
227    pub gaap_validation_complete: bool,
228    /// Benford analysis complete.
229    pub benford_complete: bool,
230    /// Total fraud patterns detected.
231    pub fraud_pattern_count: u32,
232    /// Total GAAP violations.
233    pub gaap_violation_count: u32,
234    /// Total suspense accounts.
235    pub suspense_account_count: u32,
236    /// Overall risk score.
237    pub overall_risk_score: f32,
238    /// Benford anomaly detected.
239    pub benford_anomaly: bool,
240    /// Processing time in microseconds.
241    pub processing_time_us: u64,
242}
243
244/// Command to shutdown a kernel gracefully.
245#[derive(Debug, Clone, RingMessage, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
246#[message(type_id = 999)]
247pub struct ShutdownCommand {
248    /// Message ID.
249    #[message(id)]
250    pub id: MessageId,
251    /// Reason for shutdown.
252    pub reason: u32, // 0 = normal, 1 = error, 2 = restart
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn test_message_serialization() {
261        let request = FlowGenerationRequest {
262            id: MessageId::generate(),
263            batch_id: 42,
264            entry_count: 100,
265            data_offset: 0x1000,
266        };
267
268        // Messages should be serializable
269        assert_eq!(request.batch_id, 42);
270        assert_eq!(request.entry_count, 100);
271    }
272
273    #[test]
274    fn test_pagerank_request() {
275        let request = PageRankRequest {
276            id: MessageId::generate(),
277            account_count: 50,
278            edge_count: 200,
279            damping: 0.85,
280            iterations: 20,
281            graph_offset: 0,
282        };
283
284        assert_eq!(request.damping, 0.85);
285        assert_eq!(request.iterations, 20);
286    }
287}