Skip to main content

ringkernel_accnet/actors/
coordinator.rs

1//! Analytics coordinator for orchestrating GPU kernel actors.
2//!
3//! The coordinator manages the lifecycle of all analytics kernels
4//! and orchestrates the flow of data through the pipeline.
5
6use std::time::{Duration, Instant};
7
8use ringkernel_core::MessageId;
9
10use super::messages::*;
11
12/// Kernel identifiers for the analytics pipeline.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14pub enum AnalyticsKernelId {
15    /// PageRank computation kernel.
16    PageRank,
17    /// Fraud detection kernel.
18    FraudDetector,
19    /// GAAP validation kernel.
20    GaapValidator,
21    /// Benford analysis kernel.
22    BenfordAnalyzer,
23    /// Suspense detection kernel.
24    SuspenseDetector,
25    /// Results aggregator kernel.
26    ResultsAggregator,
27}
28
29impl AnalyticsKernelId {
30    /// Get the kernel name for launching.
31    pub fn name(&self) -> &'static str {
32        match self {
33            Self::PageRank => "pagerank_actor",
34            Self::FraudDetector => "fraud_detector_actor",
35            Self::GaapValidator => "gaap_validator_actor",
36            Self::BenfordAnalyzer => "benford_analyzer_actor",
37            Self::SuspenseDetector => "suspense_detector_actor",
38            Self::ResultsAggregator => "results_aggregator_actor",
39        }
40    }
41
42    /// All kernel IDs in launch order.
43    pub fn all() -> &'static [Self] {
44        &[
45            Self::PageRank,
46            Self::FraudDetector,
47            Self::GaapValidator,
48            Self::BenfordAnalyzer,
49            Self::SuspenseDetector,
50            Self::ResultsAggregator,
51        ]
52    }
53}
54
55/// Configuration for the analytics coordinator.
56#[derive(Debug, Clone)]
57pub struct CoordinatorConfig {
58    /// PageRank damping factor.
59    pub pagerank_damping: f32,
60    /// PageRank iterations.
61    pub pagerank_iterations: u32,
62    /// Velocity threshold for fraud detection.
63    pub velocity_threshold: f32,
64    /// Round amount threshold.
65    pub round_amount_threshold: f64,
66    /// Queue capacity for each kernel.
67    pub queue_capacity: usize,
68    /// Block size for kernels.
69    pub block_size: u32,
70    /// Enable K2K messaging.
71    pub enable_k2k: bool,
72    /// Enable HLC timestamps.
73    pub enable_hlc: bool,
74}
75
76impl Default for CoordinatorConfig {
77    fn default() -> Self {
78        Self {
79            pagerank_damping: 0.85,
80            pagerank_iterations: 20,
81            velocity_threshold: 10.0,
82            round_amount_threshold: 1000.0,
83            queue_capacity: 256,
84            block_size: 256,
85            enable_k2k: true,
86            enable_hlc: true,
87        }
88    }
89}
90
91/// Pipeline state tracking.
92#[derive(Debug, Default)]
93pub struct PipelineState {
94    /// Current snapshot being processed.
95    pub current_snapshot_id: u64,
96    /// PageRank complete for current snapshot.
97    pub pagerank_complete: bool,
98    /// Fraud detection complete.
99    pub fraud_detection_complete: bool,
100    /// GAAP validation complete.
101    pub gaap_validation_complete: bool,
102    /// Benford analysis complete.
103    pub benford_complete: bool,
104    /// Suspense detection complete.
105    pub suspense_complete: bool,
106    /// Processing start time.
107    pub start_time: Option<Instant>,
108    /// Results collected.
109    pub fraud_pattern_count: u32,
110    /// GAAP violations.
111    pub gaap_violation_count: u32,
112    /// Suspense accounts.
113    pub suspense_account_count: u32,
114    /// Benford anomaly detected.
115    pub benford_anomaly: bool,
116}
117
118impl PipelineState {
119    /// Check if all analytics are complete.
120    pub fn is_complete(&self) -> bool {
121        self.pagerank_complete
122            && self.fraud_detection_complete
123            && self.gaap_validation_complete
124            && self.benford_complete
125            && self.suspense_complete
126    }
127
128    /// Get processing duration.
129    pub fn processing_time(&self) -> Option<Duration> {
130        self.start_time.map(|t| t.elapsed())
131    }
132
133    /// Reset for new snapshot.
134    pub fn reset(&mut self, snapshot_id: u64) {
135        self.current_snapshot_id = snapshot_id;
136        self.pagerank_complete = false;
137        self.fraud_detection_complete = false;
138        self.gaap_validation_complete = false;
139        self.benford_complete = false;
140        self.suspense_complete = false;
141        self.start_time = Some(Instant::now());
142        self.fraud_pattern_count = 0;
143        self.gaap_violation_count = 0;
144        self.suspense_account_count = 0;
145        self.benford_anomaly = false;
146    }
147}
148
149/// Analytics pipeline coordinator.
150///
151/// Manages GPU kernel actors and orchestrates the analytics pipeline.
152pub struct AnalyticsCoordinator {
153    /// Configuration.
154    pub config: CoordinatorConfig,
155    /// Pipeline state.
156    pub state: PipelineState,
157    /// Next snapshot ID.
158    next_snapshot_id: u64,
159    /// Processing statistics.
160    pub stats: CoordinatorStats,
161}
162
163/// Coordinator statistics.
164#[derive(Debug, Default, Clone)]
165pub struct CoordinatorStats {
166    /// Total snapshots processed.
167    pub snapshots_processed: u64,
168    /// Total processing time (microseconds).
169    pub total_processing_time_us: u64,
170    /// Average processing time (microseconds).
171    pub avg_processing_time_us: f64,
172    /// Total fraud patterns detected.
173    pub total_fraud_patterns: u64,
174    /// Total GAAP violations.
175    pub total_gaap_violations: u64,
176    /// Total suspense accounts flagged.
177    pub total_suspense_accounts: u64,
178}
179
180impl AnalyticsCoordinator {
181    /// Create a new coordinator.
182    pub fn new(config: CoordinatorConfig) -> Self {
183        Self {
184            config,
185            state: PipelineState::default(),
186            next_snapshot_id: 1,
187            stats: CoordinatorStats::default(),
188        }
189    }
190
191    /// Start processing a new network snapshot.
192    pub fn begin_snapshot(&mut self) -> u64 {
193        let snapshot_id = self.next_snapshot_id;
194        self.next_snapshot_id += 1;
195        self.state.reset(snapshot_id);
196        snapshot_id
197    }
198
199    /// Create a PageRank request.
200    pub fn create_pagerank_request(
201        &self,
202        account_count: u32,
203        edge_count: u32,
204        graph_offset: u64,
205    ) -> PageRankRequest {
206        PageRankRequest {
207            id: MessageId::generate(),
208            account_count,
209            edge_count,
210            damping: self.config.pagerank_damping,
211            iterations: self.config.pagerank_iterations,
212            graph_offset,
213        }
214    }
215
216    /// Create a fraud detection request.
217    pub fn create_fraud_detection_request(
218        &self,
219        flow_count: u32,
220        flows_offset: u64,
221        accounts_offset: u64,
222        account_count: u32,
223    ) -> FraudDetectionRequest {
224        FraudDetectionRequest {
225            id: MessageId::generate(),
226            priority: ringkernel_core::Priority::High,
227            snapshot_id: self.state.current_snapshot_id,
228            flow_count,
229            flows_offset,
230            accounts_offset,
231            account_count,
232        }
233    }
234
235    /// Create a GAAP validation request.
236    pub fn create_gaap_validation_request(
237        &self,
238        flow_count: u32,
239        flows_offset: u64,
240        account_types_offset: u64,
241    ) -> GaapValidationRequest {
242        GaapValidationRequest {
243            id: MessageId::generate(),
244            flow_count,
245            flows_offset,
246            account_types_offset,
247        }
248    }
249
250    /// Create a Benford analysis request.
251    pub fn create_benford_analysis_request(
252        &self,
253        amount_count: u32,
254        amounts_offset: u64,
255    ) -> BenfordAnalysisRequest {
256        BenfordAnalysisRequest {
257            id: MessageId::generate(),
258            amount_count,
259            amounts_offset,
260        }
261    }
262
263    /// Create a suspense detection request.
264    pub fn create_suspense_detection_request(
265        &self,
266        account_count: u32,
267        balances_offset: u64,
268        risk_scores_offset: u64,
269        flow_counts_offset: u64,
270    ) -> SuspenseDetectionRequest {
271        SuspenseDetectionRequest {
272            id: MessageId::generate(),
273            account_count,
274            balances_offset,
275            risk_scores_offset,
276            flow_counts_offset,
277        }
278    }
279
280    /// Handle PageRank response.
281    pub fn handle_pagerank_response(&mut self, _response: PageRankResponse) {
282        self.state.pagerank_complete = true;
283        // PageRank scores are stored at response.scores_offset
284    }
285
286    /// Handle fraud detection response.
287    pub fn handle_fraud_response(&mut self, response: FraudDetectionResponse) {
288        self.state.fraud_detection_complete = true;
289        self.state.fraud_pattern_count = response.pattern_count;
290    }
291
292    /// Handle GAAP validation response.
293    pub fn handle_gaap_response(&mut self, response: GaapValidationResponse) {
294        self.state.gaap_validation_complete = true;
295        self.state.gaap_violation_count = response.violation_count;
296    }
297
298    /// Handle Benford analysis response.
299    pub fn handle_benford_response(&mut self, response: BenfordAnalysisResponse) {
300        self.state.benford_complete = true;
301        self.state.benford_anomaly = response.is_anomalous;
302    }
303
304    /// Handle suspense detection response.
305    pub fn handle_suspense_response(&mut self, response: SuspenseDetectionResponse) {
306        self.state.suspense_complete = true;
307        self.state.suspense_account_count = response.suspense_count;
308    }
309
310    /// Finalize the current snapshot.
311    pub fn finalize_snapshot(&mut self) -> AnalyticsResult {
312        let processing_time = self
313            .state
314            .processing_time()
315            .map(|d| d.as_micros() as u64)
316            .unwrap_or(0);
317
318        // Update statistics
319        self.stats.snapshots_processed += 1;
320        self.stats.total_processing_time_us += processing_time;
321        self.stats.avg_processing_time_us =
322            self.stats.total_processing_time_us as f64 / self.stats.snapshots_processed as f64;
323        self.stats.total_fraud_patterns += self.state.fraud_pattern_count as u64;
324        self.stats.total_gaap_violations += self.state.gaap_violation_count as u64;
325        self.stats.total_suspense_accounts += self.state.suspense_account_count as u64;
326
327        // Calculate overall risk score
328        let fraud_risk = (self.state.fraud_pattern_count as f32 / 100.0).min(1.0);
329        let gaap_risk = (self.state.gaap_violation_count as f32 / 50.0).min(1.0);
330        let suspense_risk = (self.state.suspense_account_count as f32 / 20.0).min(1.0);
331        let benford_risk = if self.state.benford_anomaly { 0.5 } else { 0.0 };
332
333        let overall_risk =
334            (fraud_risk * 0.35 + gaap_risk * 0.25 + suspense_risk * 0.25 + benford_risk * 0.15)
335                .min(1.0);
336
337        AnalyticsResult {
338            id: MessageId::generate(),
339            snapshot_id: self.state.current_snapshot_id,
340            pagerank_complete: self.state.pagerank_complete,
341            fraud_detection_complete: self.state.fraud_detection_complete,
342            gaap_validation_complete: self.state.gaap_validation_complete,
343            benford_complete: self.state.benford_complete,
344            fraud_pattern_count: self.state.fraud_pattern_count,
345            gaap_violation_count: self.state.gaap_violation_count,
346            suspense_account_count: self.state.suspense_account_count,
347            overall_risk_score: overall_risk,
348            benford_anomaly: self.state.benford_anomaly,
349            processing_time_us: processing_time,
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_coordinator_creation() {
360        let coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
361        assert_eq!(coord.config.pagerank_damping, 0.85);
362        assert_eq!(coord.config.pagerank_iterations, 20);
363    }
364
365    #[test]
366    fn test_begin_snapshot() {
367        let mut coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
368        let id1 = coord.begin_snapshot();
369        let id2 = coord.begin_snapshot();
370        assert_eq!(id1, 1);
371        assert_eq!(id2, 2);
372    }
373
374    #[test]
375    fn test_pipeline_state() {
376        let mut state = PipelineState::default();
377        state.reset(1);
378
379        assert!(!state.is_complete());
380
381        state.pagerank_complete = true;
382        state.fraud_detection_complete = true;
383        state.gaap_validation_complete = true;
384        state.benford_complete = true;
385        state.suspense_complete = true;
386
387        assert!(state.is_complete());
388    }
389
390    #[test]
391    fn test_create_requests() {
392        let coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
393
394        let pr_req = coord.create_pagerank_request(100, 500, 0);
395        assert_eq!(pr_req.account_count, 100);
396        assert_eq!(pr_req.edge_count, 500);
397        assert_eq!(pr_req.damping, 0.85);
398
399        let fraud_req = coord.create_fraud_detection_request(500, 0, 1000, 100);
400        assert_eq!(fraud_req.flow_count, 500);
401    }
402
403    #[test]
404    fn test_finalize_snapshot() {
405        let mut coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
406        coord.begin_snapshot();
407
408        // Simulate responses
409        coord.state.pagerank_complete = true;
410        coord.state.fraud_detection_complete = true;
411        coord.state.fraud_pattern_count = 5;
412        coord.state.gaap_validation_complete = true;
413        coord.state.gaap_violation_count = 3;
414        coord.state.benford_complete = true;
415        coord.state.benford_anomaly = false;
416        coord.state.suspense_complete = true;
417        coord.state.suspense_account_count = 2;
418
419        let result = coord.finalize_snapshot();
420        assert_eq!(result.snapshot_id, 1);
421        assert_eq!(result.fraud_pattern_count, 5);
422        assert_eq!(result.gaap_violation_count, 3);
423        assert_eq!(result.suspense_account_count, 2);
424        assert!(result.overall_risk_score > 0.0);
425        assert!(result.overall_risk_score <= 1.0);
426    }
427
428    #[test]
429    fn test_kernel_ids() {
430        assert_eq!(AnalyticsKernelId::PageRank.name(), "pagerank_actor");
431        assert_eq!(
432            AnalyticsKernelId::FraudDetector.name(),
433            "fraud_detector_actor"
434        );
435        assert_eq!(AnalyticsKernelId::all().len(), 6);
436    }
437}