Skip to main content

ringkernel_accnet/actors/
runtime.rs

1//! GPU-native actor runtime for accounting analytics.
2//!
3//! This module provides the runtime that manages GPU kernel actors
4//! using the RingKernel system.
5
6use std::time::Instant;
7
8#[cfg(feature = "cuda")]
9use std::collections::HashMap;
10#[cfg(feature = "cuda")]
11use std::sync::Arc;
12
13#[cfg(feature = "cuda")]
14use crate::error::AccNetError;
15use crate::models::AccountingNetwork;
16
17use super::coordinator::{AnalyticsCoordinator, CoordinatorConfig, CoordinatorStats};
18#[cfg(feature = "cuda")]
19use super::kernels::AnalyticsKernels;
20#[allow(unused_imports)]
21use super::messages::*;
22
23/// GPU memory buffer for data exchange between host and kernels.
24#[derive(Debug)]
25pub struct GpuBuffer {
26    /// Buffer size in bytes.
27    pub size: usize,
28    /// Device pointer (opaque handle).
29    pub device_ptr: u64,
30}
31
32/// Analytics result from GPU processing.
33#[derive(Debug, Clone)]
34pub struct GpuAnalyticsResult {
35    /// Snapshot ID.
36    pub snapshot_id: u64,
37    /// PageRank scores (index -> score).
38    pub pagerank_scores: Vec<f64>,
39    /// Fraud pattern count.
40    pub fraud_pattern_count: u32,
41    /// Fraud pattern flags per flow.
42    pub fraud_flags: Vec<u32>,
43    /// GAAP violation count.
44    pub gaap_violation_count: u32,
45    /// GAAP violation flags per flow.
46    pub gaap_flags: Vec<u32>,
47    /// Suspense account count.
48    pub suspense_account_count: u32,
49    /// Suspense scores per account.
50    pub suspense_scores: Vec<f32>,
51    /// Benford digit distribution.
52    pub benford_distribution: [u32; 9],
53    /// Benford chi-squared statistic.
54    pub benford_chi_squared: f32,
55    /// Benford anomaly detected.
56    pub benford_anomaly: bool,
57    /// Overall risk score.
58    pub overall_risk_score: f32,
59    /// Processing time in microseconds.
60    pub processing_time_us: u64,
61}
62
63impl Default for GpuAnalyticsResult {
64    fn default() -> Self {
65        Self {
66            snapshot_id: 0,
67            pagerank_scores: Vec::new(),
68            fraud_pattern_count: 0,
69            fraud_flags: Vec::new(),
70            gaap_violation_count: 0,
71            gaap_flags: Vec::new(),
72            suspense_account_count: 0,
73            suspense_scores: Vec::new(),
74            benford_distribution: [0; 9],
75            benford_chi_squared: 0.0,
76            benford_anomaly: false,
77            overall_risk_score: 0.0,
78            processing_time_us: 0,
79        }
80    }
81}
82
83/// GPU actor runtime status.
84#[derive(Debug, Clone)]
85pub struct RuntimeStatus {
86    /// Whether CUDA is available and active.
87    pub cuda_active: bool,
88    /// GPU device name.
89    pub device_name: Option<String>,
90    /// Compute capability.
91    pub compute_capability: Option<(u32, u32)>,
92    /// Number of kernels launched.
93    pub kernels_launched: usize,
94    /// Total messages processed.
95    pub messages_processed: u64,
96    /// Coordinator statistics.
97    pub coordinator_stats: CoordinatorStats,
98}
99
100/// GPU-native actor runtime for accounting analytics.
101///
102/// This runtime manages the lifecycle of GPU kernel actors and
103/// provides a high-level API for processing accounting networks.
104pub struct GpuActorRuntime {
105    /// Coordinator for orchestrating the pipeline.
106    coordinator: AnalyticsCoordinator,
107    /// Generated kernel code.
108    #[cfg(feature = "cuda")]
109    kernels: Option<AnalyticsKernels>,
110    /// Whether GPU is active.
111    gpu_active: bool,
112    /// Device name.
113    device_name: Option<String>,
114    /// Compute capability.
115    compute_capability: Option<(u32, u32)>,
116    /// Messages processed counter.
117    messages_processed: u64,
118    /// CUDA device handle (when feature enabled).
119    #[cfg(feature = "cuda")]
120    cuda_device: Option<Arc<cudarc::driver::CudaContext>>,
121    /// Compiled PTX modules (reserved for future multi-kernel support).
122    #[cfg(feature = "cuda")]
123    #[allow(dead_code)]
124    compiled_modules: HashMap<String, bool>,
125}
126
127impl GpuActorRuntime {
128    /// Create a new GPU actor runtime.
129    pub fn new(config: CoordinatorConfig) -> Self {
130        let coordinator = AnalyticsCoordinator::new(config);
131
132        // Try to initialize GPU
133        let (gpu_active, device_name, compute_capability) = Self::init_gpu();
134
135        // Generate kernel code if GPU is available
136        #[cfg(feature = "cuda")]
137        let kernels = if gpu_active {
138            match AnalyticsKernels::generate() {
139                Ok(k) => {
140                    log::info!("Generated {} analytics kernels", 6);
141                    Some(k)
142                }
143                Err(e) => {
144                    log::warn!("Failed to generate kernels: {}", e);
145                    None
146                }
147            }
148        } else {
149            None
150        };
151
152        Self {
153            coordinator,
154            #[cfg(feature = "cuda")]
155            kernels,
156            gpu_active,
157            device_name,
158            compute_capability,
159            messages_processed: 0,
160            #[cfg(feature = "cuda")]
161            cuda_device: None,
162            #[cfg(feature = "cuda")]
163            compiled_modules: HashMap::new(),
164        }
165    }
166
167    /// Initialize GPU and return status.
168    fn init_gpu() -> (bool, Option<String>, Option<(u32, u32)>) {
169        #[cfg(feature = "cuda")]
170        {
171            match cudarc::driver::CudaContext::new(0) {
172                Ok(context) => {
173                    let name = context.name().unwrap_or_else(|_| "Unknown GPU".to_string());
174                    // Get compute capability
175                    let cc = context
176                        .compute_capability()
177                        .ok()
178                        .map(|(major, minor)| (major as u32, minor as u32));
179                    log::info!("GPU initialized: {} (CC {:?})", name, cc);
180                    (true, Some(name), cc)
181                }
182                Err(e) => {
183                    log::warn!("Failed to initialize GPU: {}", e);
184                    (false, None, None)
185                }
186            }
187        }
188
189        #[cfg(not(feature = "cuda"))]
190        {
191            log::info!("CUDA feature not enabled, using CPU fallback");
192            (false, None, None)
193        }
194    }
195
196    /// Check if GPU is active.
197    pub fn is_gpu_active(&self) -> bool {
198        self.gpu_active
199    }
200
201    /// Get runtime status.
202    pub fn status(&self) -> RuntimeStatus {
203        #[cfg(feature = "cuda")]
204        let kernels_launched = if self.kernels.is_some() { 6 } else { 0 };
205        #[cfg(not(feature = "cuda"))]
206        let kernels_launched = 0;
207
208        RuntimeStatus {
209            cuda_active: self.gpu_active,
210            device_name: self.device_name.clone(),
211            compute_capability: self.compute_capability,
212            kernels_launched,
213            messages_processed: self.messages_processed,
214            coordinator_stats: self.coordinator.stats.clone(),
215        }
216    }
217
218    /// Analyze a network using GPU actors.
219    ///
220    /// This is the main entry point for GPU-accelerated analytics.
221    /// Returns analytics results computed by the GPU kernel actors.
222    pub fn analyze(&mut self, network: &AccountingNetwork) -> GpuAnalyticsResult {
223        let start = Instant::now();
224        let snapshot_id = self.coordinator.begin_snapshot();
225
226        let mut result = GpuAnalyticsResult {
227            snapshot_id,
228            ..Default::default()
229        };
230
231        let n_accounts = network.accounts.len();
232        let n_flows = network.flows.len();
233
234        if n_accounts == 0 || n_flows == 0 {
235            result.processing_time_us = start.elapsed().as_micros() as u64;
236            return result;
237        }
238
239        // GPU path
240        #[cfg(feature = "cuda")]
241        {
242            let has_kernels = self.kernels.is_some();
243            if self.gpu_active && has_kernels {
244                match self.analyze_gpu(network, &mut result) {
245                    Ok(_) => {
246                        result.processing_time_us = start.elapsed().as_micros() as u64;
247                        self.messages_processed += 6; // One message per kernel
248                        return result;
249                    }
250                    Err(e) => {
251                        log::warn!("GPU analysis failed, falling back to CPU: {}", e);
252                    }
253                }
254            }
255        }
256
257        // CPU fallback
258        self.analyze_cpu(network, &mut result);
259        result.processing_time_us = start.elapsed().as_micros() as u64;
260        result
261    }
262
263    /// GPU-accelerated analysis (CUDA).
264    #[cfg(feature = "cuda")]
265    fn analyze_gpu(
266        &mut self,
267        network: &AccountingNetwork,
268        result: &mut GpuAnalyticsResult,
269    ) -> crate::Result<()> {
270        let device = match &self.cuda_device {
271            Some(d) => d.clone(),
272            None => {
273                let d = cudarc::driver::CudaContext::new(0)
274                    .map_err(|e| AccNetError::DeviceCreation(e.to_string()))?;
275                self.cuda_device = Some(d.clone());
276                d
277            }
278        };
279
280        // === PageRank ===
281        result.pagerank_scores = self.compute_pagerank_gpu(&device, network)?;
282
283        // === Fraud Detection ===
284        let (fraud_count, fraud_flags) = self.detect_fraud_gpu(&device, network)?;
285        result.fraud_pattern_count = fraud_count;
286        result.fraud_flags = fraud_flags;
287
288        // === GAAP Validation ===
289        let (gaap_count, gaap_flags) = self.validate_gaap_gpu(&device, network)?;
290        result.gaap_violation_count = gaap_count;
291        result.gaap_flags = gaap_flags;
292
293        // === Benford Analysis ===
294        let (benford_dist, chi_sq, is_anomalous) = self.analyze_benford_gpu(&device, network)?;
295        result.benford_distribution = benford_dist;
296        result.benford_chi_squared = chi_sq;
297        result.benford_anomaly = is_anomalous;
298
299        // === Suspense Detection ===
300        let (suspense_count, suspense_scores) = self.detect_suspense_gpu(&device, network)?;
301        result.suspense_account_count = suspense_count;
302        result.suspense_scores = suspense_scores;
303
304        // Calculate overall risk
305        result.overall_risk_score = self.calculate_risk_score(result);
306
307        // Update coordinator state
308        self.coordinator.state.pagerank_complete = true;
309        self.coordinator.state.fraud_detection_complete = true;
310        self.coordinator.state.fraud_pattern_count = fraud_count;
311        self.coordinator.state.gaap_validation_complete = true;
312        self.coordinator.state.gaap_violation_count = gaap_count;
313        self.coordinator.state.benford_complete = true;
314        self.coordinator.state.benford_anomaly = is_anomalous;
315        self.coordinator.state.suspense_complete = true;
316        self.coordinator.state.suspense_account_count = suspense_count;
317
318        Ok(())
319    }
320
321    #[cfg(feature = "cuda")]
322    fn compute_pagerank_gpu(
323        &self,
324        _device: &Arc<cudarc::driver::CudaContext>,
325        network: &AccountingNetwork,
326    ) -> crate::Result<Vec<f64>> {
327        // For now, use CPU implementation until ring kernel infrastructure is ready
328        // The ring kernel code is generated but requires the full RingKernel runtime
329        Ok(network.compute_pagerank(
330            self.coordinator.config.pagerank_iterations as usize,
331            self.coordinator.config.pagerank_damping as f64,
332        ))
333    }
334
335    #[cfg(feature = "cuda")]
336    fn detect_fraud_gpu(
337        &self,
338        _device: &Arc<cudarc::driver::CudaContext>,
339        network: &AccountingNetwork,
340    ) -> crate::Result<(u32, Vec<u32>)> {
341        let n_flows = network.flows.len();
342        let mut flags = vec![0u32; n_flows];
343        let mut count = 0u32;
344
345        // Simple CPU-side fraud detection until ring kernels are fully integrated
346        for (i, flow) in network.flows.iter().enumerate() {
347            let amount = flow.amount.to_f64().abs();
348            let mut flag = 0u32;
349
350            // Round amount check
351            if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
352                flag |= 0x01;
353            }
354
355            // Self-loop check
356            if flow.source_account_index == flow.target_account_index {
357                flag |= 0x02;
358            }
359
360            // Threshold proximity
361            if (amount - 10000.0).abs() < 500.0 {
362                flag |= 0x04;
363            }
364
365            if flag != 0 {
366                count += 1;
367            }
368            flags[i] = flag;
369        }
370
371        Ok((count, flags))
372    }
373
374    #[cfg(feature = "cuda")]
375    fn validate_gaap_gpu(
376        &self,
377        _device: &Arc<cudarc::driver::CudaContext>,
378        network: &AccountingNetwork,
379    ) -> crate::Result<(u32, Vec<u32>)> {
380        let n_flows = network.flows.len();
381        let mut flags = vec![0u32; n_flows];
382        let mut count = 0u32;
383
384        for (i, flow) in network.flows.iter().enumerate() {
385            let source_type = network
386                .accounts
387                .get(flow.source_account_index as usize)
388                .map(|a| a.account_type as u8)
389                .unwrap_or(0);
390            let target_type = network
391                .accounts
392                .get(flow.target_account_index as usize)
393                .map(|a| a.account_type as u8)
394                .unwrap_or(0);
395
396            // Revenue (3) -> Asset (0) direct
397            if source_type == 3 && target_type == 0 {
398                flags[i] = 1;
399                count += 1;
400            }
401            // Revenue (3) -> Expense (4) direct
402            else if source_type == 3 && target_type == 4 {
403                flags[i] = 2;
404                count += 1;
405            }
406        }
407
408        Ok((count, flags))
409    }
410
411    #[cfg(feature = "cuda")]
412    fn analyze_benford_gpu(
413        &self,
414        _device: &Arc<cudarc::driver::CudaContext>,
415        network: &AccountingNetwork,
416    ) -> crate::Result<([u32; 9], f32, bool)> {
417        let mut counts = [0u32; 9];
418
419        for flow in &network.flows {
420            let amount = flow.amount.to_f64().abs();
421            if amount >= 1.0 {
422                let mut v = amount;
423                while v >= 10.0 {
424                    v /= 10.0;
425                }
426                let digit = v as u32;
427                if (1..=9).contains(&digit) {
428                    counts[(digit - 1) as usize] += 1;
429                }
430            }
431        }
432
433        // Chi-squared test
434        let total: u32 = counts.iter().sum();
435        let expected = [
436            0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
437        ];
438        let mut chi_sq = 0.0f32;
439
440        if total >= 50 {
441            for (i, &count) in counts.iter().enumerate() {
442                let observed = count as f32 / total as f32;
443                let exp = expected[i];
444                chi_sq += (observed - exp).powi(2) / exp;
445            }
446        }
447
448        let is_anomalous = total >= 50 && chi_sq > 15.507;
449
450        Ok((counts, chi_sq, is_anomalous))
451    }
452
453    #[cfg(feature = "cuda")]
454    fn detect_suspense_gpu(
455        &self,
456        _device: &Arc<cudarc::driver::CudaContext>,
457        network: &AccountingNetwork,
458    ) -> crate::Result<(u32, Vec<f32>)> {
459        let n_accounts = network.accounts.len();
460        let mut scores = vec![0.0f32; n_accounts];
461        let mut count = 0u32;
462
463        for (i, account) in network.accounts.iter().enumerate() {
464            let mut score = 0.0f32;
465
466            // Round balance check
467            let balance = (account.closing_balance.to_f64()).abs();
468            if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
469                score += 0.3;
470            }
471
472            // High risk score
473            if account.risk_score > 0.5 {
474                score += 0.4;
475            }
476
477            // Flow imbalance
478            let ratio = if account.out_degree > 0 {
479                account.in_degree as f32 / account.out_degree as f32
480            } else {
481                10.0
482            };
483            if ratio > 5.0 {
484                score += 0.3;
485            }
486
487            scores[i] = score.min(1.0);
488            if scores[i] > 0.5 {
489                count += 1;
490            }
491        }
492
493        Ok((count, scores))
494    }
495
496    /// CPU fallback analysis.
497    fn analyze_cpu(&mut self, network: &AccountingNetwork, result: &mut GpuAnalyticsResult) {
498        let n_accounts = network.accounts.len();
499        let n_flows = network.flows.len();
500
501        // PageRank
502        result.pagerank_scores = network.compute_pagerank(
503            self.coordinator.config.pagerank_iterations as usize,
504            self.coordinator.config.pagerank_damping as f64,
505        );
506
507        // Fraud detection
508        result.fraud_flags = vec![0u32; n_flows];
509        for (i, flow) in network.flows.iter().enumerate() {
510            let amount = flow.amount.to_f64().abs();
511            let mut flag = 0u32;
512
513            if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
514                flag |= 0x01;
515            }
516            if flow.source_account_index == flow.target_account_index {
517                flag |= 0x02;
518            }
519            if (amount - 10000.0).abs() < 500.0 {
520                flag |= 0x04;
521            }
522
523            result.fraud_flags[i] = flag;
524            if flag != 0 {
525                result.fraud_pattern_count += 1;
526            }
527        }
528
529        // GAAP validation
530        result.gaap_flags = vec![0u32; n_flows];
531        for (i, flow) in network.flows.iter().enumerate() {
532            let source_type = network
533                .accounts
534                .get(flow.source_account_index as usize)
535                .map(|a| a.account_type as u8)
536                .unwrap_or(0);
537            let target_type = network
538                .accounts
539                .get(flow.target_account_index as usize)
540                .map(|a| a.account_type as u8)
541                .unwrap_or(0);
542
543            if source_type == 3 && (target_type == 0 || target_type == 4) {
544                result.gaap_flags[i] = 1;
545                result.gaap_violation_count += 1;
546            }
547        }
548
549        // Benford analysis
550        for flow in &network.flows {
551            let amount = flow.amount.to_f64().abs();
552            if amount >= 1.0 {
553                let mut v = amount;
554                while v >= 10.0 {
555                    v /= 10.0;
556                }
557                let digit = v as u32;
558                if (1..=9).contains(&digit) {
559                    result.benford_distribution[(digit - 1) as usize] += 1;
560                }
561            }
562        }
563
564        let total: u32 = result.benford_distribution.iter().sum();
565        if total >= 50 {
566            let expected = [
567                0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
568            ];
569            for (i, &count) in result.benford_distribution.iter().enumerate() {
570                let observed = count as f32 / total as f32;
571                let exp = expected[i];
572                result.benford_chi_squared += (observed - exp).powi(2) / exp;
573            }
574            result.benford_anomaly = result.benford_chi_squared > 15.507;
575        }
576
577        // Suspense detection
578        result.suspense_scores = vec![0.0f32; n_accounts];
579        for (i, account) in network.accounts.iter().enumerate() {
580            let mut score = 0.0f32;
581
582            let balance = account.closing_balance.to_f64().abs();
583            if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
584                score += 0.3;
585            }
586            if account.risk_score > 0.5 {
587                score += 0.4;
588            }
589            let ratio = if account.out_degree > 0 {
590                account.in_degree as f32 / account.out_degree as f32
591            } else {
592                10.0
593            };
594            if ratio > 5.0 {
595                score += 0.3;
596            }
597
598            result.suspense_scores[i] = score.min(1.0);
599            if result.suspense_scores[i] > 0.5 {
600                result.suspense_account_count += 1;
601            }
602        }
603
604        // Overall risk
605        result.overall_risk_score = self.calculate_risk_score(result);
606
607        // Update coordinator
608        self.coordinator.state.pagerank_complete = true;
609        self.coordinator.state.fraud_detection_complete = true;
610        self.coordinator.state.fraud_pattern_count = result.fraud_pattern_count;
611        self.coordinator.state.gaap_validation_complete = true;
612        self.coordinator.state.gaap_violation_count = result.gaap_violation_count;
613        self.coordinator.state.benford_complete = true;
614        self.coordinator.state.benford_anomaly = result.benford_anomaly;
615        self.coordinator.state.suspense_complete = true;
616        self.coordinator.state.suspense_account_count = result.suspense_account_count;
617    }
618
619    fn calculate_risk_score(&self, result: &GpuAnalyticsResult) -> f32 {
620        let fraud_risk = (result.fraud_pattern_count as f32 / 100.0).min(1.0);
621        let gaap_risk = (result.gaap_violation_count as f32 / 50.0).min(1.0);
622        let suspense_risk = (result.suspense_account_count as f32 / 20.0).min(1.0);
623        let benford_risk = if result.benford_anomaly { 0.5 } else { 0.0 };
624
625        (fraud_risk * 0.35 + gaap_risk * 0.25 + suspense_risk * 0.25 + benford_risk * 0.15).min(1.0)
626    }
627
628    /// Get coordinator reference.
629    pub fn coordinator(&self) -> &AnalyticsCoordinator {
630        &self.coordinator
631    }
632
633    /// Get coordinator mutable reference.
634    pub fn coordinator_mut(&mut self) -> &mut AnalyticsCoordinator {
635        &mut self.coordinator
636    }
637}
638
639impl Default for GpuActorRuntime {
640    fn default() -> Self {
641        Self::new(CoordinatorConfig::default())
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use uuid::Uuid;
649
650    #[test]
651    fn test_runtime_creation() {
652        let runtime = GpuActorRuntime::default();
653        let status = runtime.status();
654        // GPU availability depends on the system
655        assert_eq!(status.messages_processed, 0);
656    }
657
658    #[test]
659    fn test_analyze_empty_network() {
660        let mut runtime = GpuActorRuntime::default();
661        let network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
662
663        let result = runtime.analyze(&network);
664        assert_eq!(result.fraud_pattern_count, 0);
665        assert_eq!(result.gaap_violation_count, 0);
666    }
667
668    #[test]
669    fn test_cpu_fallback() {
670        let mut runtime = GpuActorRuntime::default();
671        // Force CPU path by creating network
672        let mut network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
673
674        // Add some test data
675        use crate::models::{
676            AccountMetadata, AccountNode, AccountType, Decimal128, HybridTimestamp, TransactionFlow,
677        };
678
679        let cash = network.add_account(
680            AccountNode::new(Uuid::new_v4(), AccountType::Asset, 0),
681            AccountMetadata::new("1100", "Cash"),
682        );
683        let revenue = network.add_account(
684            AccountNode::new(Uuid::new_v4(), AccountType::Revenue, 0),
685            AccountMetadata::new("4000", "Revenue"),
686        );
687
688        network.add_flow(TransactionFlow::new(
689            revenue,
690            cash,
691            Decimal128::from_f64(1000.0),
692            Uuid::new_v4(),
693            HybridTimestamp::now(),
694        ));
695
696        let result = runtime.analyze(&network);
697        // Should have detected GAAP violation (Revenue -> Asset)
698        assert!(result.gaap_violation_count > 0 || result.pagerank_scores.len() == 2);
699    }
700
701    #[test]
702    fn test_risk_score_calculation() {
703        let runtime = GpuActorRuntime::default();
704        let result = GpuAnalyticsResult {
705            fraud_pattern_count: 50,
706            gaap_violation_count: 25,
707            suspense_account_count: 10,
708            benford_anomaly: true,
709            ..Default::default()
710        };
711
712        let risk = runtime.calculate_risk_score(&result);
713        assert!(risk > 0.0);
714        assert!(risk <= 1.0);
715    }
716}