ringkernel_accnet/actors/
coordinator.rs1use std::time::{Duration, Instant};
7
8use ringkernel_core::MessageId;
9
10use super::messages::*;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14pub enum AnalyticsKernelId {
15 PageRank,
17 FraudDetector,
19 GaapValidator,
21 BenfordAnalyzer,
23 SuspenseDetector,
25 ResultsAggregator,
27}
28
29impl AnalyticsKernelId {
30 pub fn name(&self) -> &'static str {
32 match self {
33 Self::PageRank => "pagerank_actor",
34 Self::FraudDetector => "fraud_detector_actor",
35 Self::GaapValidator => "gaap_validator_actor",
36 Self::BenfordAnalyzer => "benford_analyzer_actor",
37 Self::SuspenseDetector => "suspense_detector_actor",
38 Self::ResultsAggregator => "results_aggregator_actor",
39 }
40 }
41
42 pub fn all() -> &'static [Self] {
44 &[
45 Self::PageRank,
46 Self::FraudDetector,
47 Self::GaapValidator,
48 Self::BenfordAnalyzer,
49 Self::SuspenseDetector,
50 Self::ResultsAggregator,
51 ]
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct CoordinatorConfig {
58 pub pagerank_damping: f32,
60 pub pagerank_iterations: u32,
62 pub velocity_threshold: f32,
64 pub round_amount_threshold: f64,
66 pub queue_capacity: usize,
68 pub block_size: u32,
70 pub enable_k2k: bool,
72 pub enable_hlc: bool,
74}
75
76impl Default for CoordinatorConfig {
77 fn default() -> Self {
78 Self {
79 pagerank_damping: 0.85,
80 pagerank_iterations: 20,
81 velocity_threshold: 10.0,
82 round_amount_threshold: 1000.0,
83 queue_capacity: 256,
84 block_size: 256,
85 enable_k2k: true,
86 enable_hlc: true,
87 }
88 }
89}
90
91#[derive(Debug, Default)]
93pub struct PipelineState {
94 pub current_snapshot_id: u64,
96 pub pagerank_complete: bool,
98 pub fraud_detection_complete: bool,
100 pub gaap_validation_complete: bool,
102 pub benford_complete: bool,
104 pub suspense_complete: bool,
106 pub start_time: Option<Instant>,
108 pub fraud_pattern_count: u32,
110 pub gaap_violation_count: u32,
112 pub suspense_account_count: u32,
114 pub benford_anomaly: bool,
116}
117
118impl PipelineState {
119 pub fn is_complete(&self) -> bool {
121 self.pagerank_complete
122 && self.fraud_detection_complete
123 && self.gaap_validation_complete
124 && self.benford_complete
125 && self.suspense_complete
126 }
127
128 pub fn processing_time(&self) -> Option<Duration> {
130 self.start_time.map(|t| t.elapsed())
131 }
132
133 pub fn reset(&mut self, snapshot_id: u64) {
135 self.current_snapshot_id = snapshot_id;
136 self.pagerank_complete = false;
137 self.fraud_detection_complete = false;
138 self.gaap_validation_complete = false;
139 self.benford_complete = false;
140 self.suspense_complete = false;
141 self.start_time = Some(Instant::now());
142 self.fraud_pattern_count = 0;
143 self.gaap_violation_count = 0;
144 self.suspense_account_count = 0;
145 self.benford_anomaly = false;
146 }
147}
148
149pub struct AnalyticsCoordinator {
153 pub config: CoordinatorConfig,
155 pub state: PipelineState,
157 next_snapshot_id: u64,
159 pub stats: CoordinatorStats,
161}
162
163#[derive(Debug, Default, Clone)]
165pub struct CoordinatorStats {
166 pub snapshots_processed: u64,
168 pub total_processing_time_us: u64,
170 pub avg_processing_time_us: f64,
172 pub total_fraud_patterns: u64,
174 pub total_gaap_violations: u64,
176 pub total_suspense_accounts: u64,
178}
179
180impl AnalyticsCoordinator {
181 pub fn new(config: CoordinatorConfig) -> Self {
183 Self {
184 config,
185 state: PipelineState::default(),
186 next_snapshot_id: 1,
187 stats: CoordinatorStats::default(),
188 }
189 }
190
191 pub fn begin_snapshot(&mut self) -> u64 {
193 let snapshot_id = self.next_snapshot_id;
194 self.next_snapshot_id += 1;
195 self.state.reset(snapshot_id);
196 snapshot_id
197 }
198
199 pub fn create_pagerank_request(
201 &self,
202 account_count: u32,
203 edge_count: u32,
204 graph_offset: u64,
205 ) -> PageRankRequest {
206 PageRankRequest {
207 id: MessageId::generate(),
208 account_count,
209 edge_count,
210 damping: self.config.pagerank_damping,
211 iterations: self.config.pagerank_iterations,
212 graph_offset,
213 }
214 }
215
216 pub fn create_fraud_detection_request(
218 &self,
219 flow_count: u32,
220 flows_offset: u64,
221 accounts_offset: u64,
222 account_count: u32,
223 ) -> FraudDetectionRequest {
224 FraudDetectionRequest {
225 id: MessageId::generate(),
226 priority: ringkernel_core::Priority::High,
227 snapshot_id: self.state.current_snapshot_id,
228 flow_count,
229 flows_offset,
230 accounts_offset,
231 account_count,
232 }
233 }
234
235 pub fn create_gaap_validation_request(
237 &self,
238 flow_count: u32,
239 flows_offset: u64,
240 account_types_offset: u64,
241 ) -> GaapValidationRequest {
242 GaapValidationRequest {
243 id: MessageId::generate(),
244 flow_count,
245 flows_offset,
246 account_types_offset,
247 }
248 }
249
250 pub fn create_benford_analysis_request(
252 &self,
253 amount_count: u32,
254 amounts_offset: u64,
255 ) -> BenfordAnalysisRequest {
256 BenfordAnalysisRequest {
257 id: MessageId::generate(),
258 amount_count,
259 amounts_offset,
260 }
261 }
262
263 pub fn create_suspense_detection_request(
265 &self,
266 account_count: u32,
267 balances_offset: u64,
268 risk_scores_offset: u64,
269 flow_counts_offset: u64,
270 ) -> SuspenseDetectionRequest {
271 SuspenseDetectionRequest {
272 id: MessageId::generate(),
273 account_count,
274 balances_offset,
275 risk_scores_offset,
276 flow_counts_offset,
277 }
278 }
279
280 pub fn handle_pagerank_response(&mut self, _response: PageRankResponse) {
282 self.state.pagerank_complete = true;
283 }
285
286 pub fn handle_fraud_response(&mut self, response: FraudDetectionResponse) {
288 self.state.fraud_detection_complete = true;
289 self.state.fraud_pattern_count = response.pattern_count;
290 }
291
292 pub fn handle_gaap_response(&mut self, response: GaapValidationResponse) {
294 self.state.gaap_validation_complete = true;
295 self.state.gaap_violation_count = response.violation_count;
296 }
297
298 pub fn handle_benford_response(&mut self, response: BenfordAnalysisResponse) {
300 self.state.benford_complete = true;
301 self.state.benford_anomaly = response.is_anomalous;
302 }
303
304 pub fn handle_suspense_response(&mut self, response: SuspenseDetectionResponse) {
306 self.state.suspense_complete = true;
307 self.state.suspense_account_count = response.suspense_count;
308 }
309
310 pub fn finalize_snapshot(&mut self) -> AnalyticsResult {
312 let processing_time = self
313 .state
314 .processing_time()
315 .map(|d| d.as_micros() as u64)
316 .unwrap_or(0);
317
318 self.stats.snapshots_processed += 1;
320 self.stats.total_processing_time_us += processing_time;
321 self.stats.avg_processing_time_us =
322 self.stats.total_processing_time_us as f64 / self.stats.snapshots_processed as f64;
323 self.stats.total_fraud_patterns += self.state.fraud_pattern_count as u64;
324 self.stats.total_gaap_violations += self.state.gaap_violation_count as u64;
325 self.stats.total_suspense_accounts += self.state.suspense_account_count as u64;
326
327 let fraud_risk = (self.state.fraud_pattern_count as f32 / 100.0).min(1.0);
329 let gaap_risk = (self.state.gaap_violation_count as f32 / 50.0).min(1.0);
330 let suspense_risk = (self.state.suspense_account_count as f32 / 20.0).min(1.0);
331 let benford_risk = if self.state.benford_anomaly { 0.5 } else { 0.0 };
332
333 let overall_risk =
334 (fraud_risk * 0.35 + gaap_risk * 0.25 + suspense_risk * 0.25 + benford_risk * 0.15)
335 .min(1.0);
336
337 AnalyticsResult {
338 id: MessageId::generate(),
339 snapshot_id: self.state.current_snapshot_id,
340 pagerank_complete: self.state.pagerank_complete,
341 fraud_detection_complete: self.state.fraud_detection_complete,
342 gaap_validation_complete: self.state.gaap_validation_complete,
343 benford_complete: self.state.benford_complete,
344 fraud_pattern_count: self.state.fraud_pattern_count,
345 gaap_violation_count: self.state.gaap_violation_count,
346 suspense_account_count: self.state.suspense_account_count,
347 overall_risk_score: overall_risk,
348 benford_anomaly: self.state.benford_anomaly,
349 processing_time_us: processing_time,
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[test]
359 fn test_coordinator_creation() {
360 let coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
361 assert_eq!(coord.config.pagerank_damping, 0.85);
362 assert_eq!(coord.config.pagerank_iterations, 20);
363 }
364
365 #[test]
366 fn test_begin_snapshot() {
367 let mut coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
368 let id1 = coord.begin_snapshot();
369 let id2 = coord.begin_snapshot();
370 assert_eq!(id1, 1);
371 assert_eq!(id2, 2);
372 }
373
374 #[test]
375 fn test_pipeline_state() {
376 let mut state = PipelineState::default();
377 state.reset(1);
378
379 assert!(!state.is_complete());
380
381 state.pagerank_complete = true;
382 state.fraud_detection_complete = true;
383 state.gaap_validation_complete = true;
384 state.benford_complete = true;
385 state.suspense_complete = true;
386
387 assert!(state.is_complete());
388 }
389
390 #[test]
391 fn test_create_requests() {
392 let coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
393
394 let pr_req = coord.create_pagerank_request(100, 500, 0);
395 assert_eq!(pr_req.account_count, 100);
396 assert_eq!(pr_req.edge_count, 500);
397 assert_eq!(pr_req.damping, 0.85);
398
399 let fraud_req = coord.create_fraud_detection_request(500, 0, 1000, 100);
400 assert_eq!(fraud_req.flow_count, 500);
401 }
402
403 #[test]
404 fn test_finalize_snapshot() {
405 let mut coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
406 coord.begin_snapshot();
407
408 coord.state.pagerank_complete = true;
410 coord.state.fraud_detection_complete = true;
411 coord.state.fraud_pattern_count = 5;
412 coord.state.gaap_validation_complete = true;
413 coord.state.gaap_violation_count = 3;
414 coord.state.benford_complete = true;
415 coord.state.benford_anomaly = false;
416 coord.state.suspense_complete = true;
417 coord.state.suspense_account_count = 2;
418
419 let result = coord.finalize_snapshot();
420 assert_eq!(result.snapshot_id, 1);
421 assert_eq!(result.fraud_pattern_count, 5);
422 assert_eq!(result.gaap_violation_count, 3);
423 assert_eq!(result.suspense_account_count, 2);
424 assert!(result.overall_risk_score > 0.0);
425 assert!(result.overall_risk_score <= 1.0);
426 }
427
428 #[test]
429 fn test_kernel_ids() {
430 assert_eq!(AnalyticsKernelId::PageRank.name(), "pagerank_actor");
431 assert_eq!(
432 AnalyticsKernelId::FraudDetector.name(),
433 "fraud_detector_actor"
434 );
435 assert_eq!(AnalyticsKernelId::all().len(), 6);
436 }
437}