Skip to main content

ringkernel_accnet/actors/
runtime.rs

1//! GPU-native actor runtime for accounting analytics.
2//!
3//! This module provides the runtime that manages GPU kernel actors
4//! using the RingKernel system.
5
6use std::time::Instant;
7
8#[cfg(feature = "cuda")]
9use std::collections::HashMap;
10#[cfg(feature = "cuda")]
11use std::sync::Arc;
12
13use crate::models::AccountingNetwork;
14
15use super::coordinator::{AnalyticsCoordinator, CoordinatorConfig, CoordinatorStats};
16#[cfg(feature = "cuda")]
17use super::kernels::AnalyticsKernels;
18#[allow(unused_imports)]
19use super::messages::*;
20
21/// GPU memory buffer for data exchange between host and kernels.
22#[derive(Debug)]
23pub struct GpuBuffer {
24    /// Buffer size in bytes.
25    pub size: usize,
26    /// Device pointer (opaque handle).
27    pub device_ptr: u64,
28}
29
30/// Analytics result from GPU processing.
31#[derive(Debug, Clone)]
32pub struct GpuAnalyticsResult {
33    /// Snapshot ID.
34    pub snapshot_id: u64,
35    /// PageRank scores (index -> score).
36    pub pagerank_scores: Vec<f64>,
37    /// Fraud pattern count.
38    pub fraud_pattern_count: u32,
39    /// Fraud pattern flags per flow.
40    pub fraud_flags: Vec<u32>,
41    /// GAAP violation count.
42    pub gaap_violation_count: u32,
43    /// GAAP violation flags per flow.
44    pub gaap_flags: Vec<u32>,
45    /// Suspense account count.
46    pub suspense_account_count: u32,
47    /// Suspense scores per account.
48    pub suspense_scores: Vec<f32>,
49    /// Benford digit distribution.
50    pub benford_distribution: [u32; 9],
51    /// Benford chi-squared statistic.
52    pub benford_chi_squared: f32,
53    /// Benford anomaly detected.
54    pub benford_anomaly: bool,
55    /// Overall risk score.
56    pub overall_risk_score: f32,
57    /// Processing time in microseconds.
58    pub processing_time_us: u64,
59}
60
61impl Default for GpuAnalyticsResult {
62    fn default() -> Self {
63        Self {
64            snapshot_id: 0,
65            pagerank_scores: Vec::new(),
66            fraud_pattern_count: 0,
67            fraud_flags: Vec::new(),
68            gaap_violation_count: 0,
69            gaap_flags: Vec::new(),
70            suspense_account_count: 0,
71            suspense_scores: Vec::new(),
72            benford_distribution: [0; 9],
73            benford_chi_squared: 0.0,
74            benford_anomaly: false,
75            overall_risk_score: 0.0,
76            processing_time_us: 0,
77        }
78    }
79}
80
81/// GPU actor runtime status.
82#[derive(Debug, Clone)]
83pub struct RuntimeStatus {
84    /// Whether CUDA is available and active.
85    pub cuda_active: bool,
86    /// GPU device name.
87    pub device_name: Option<String>,
88    /// Compute capability.
89    pub compute_capability: Option<(u32, u32)>,
90    /// Number of kernels launched.
91    pub kernels_launched: usize,
92    /// Total messages processed.
93    pub messages_processed: u64,
94    /// Coordinator statistics.
95    pub coordinator_stats: CoordinatorStats,
96}
97
98/// GPU-native actor runtime for accounting analytics.
99///
100/// This runtime manages the lifecycle of GPU kernel actors and
101/// provides a high-level API for processing accounting networks.
102pub struct GpuActorRuntime {
103    /// Coordinator for orchestrating the pipeline.
104    coordinator: AnalyticsCoordinator,
105    /// Generated kernel code.
106    #[cfg(feature = "cuda")]
107    kernels: Option<AnalyticsKernels>,
108    /// Whether GPU is active.
109    gpu_active: bool,
110    /// Device name.
111    device_name: Option<String>,
112    /// Compute capability.
113    compute_capability: Option<(u32, u32)>,
114    /// Messages processed counter.
115    messages_processed: u64,
116    /// CUDA device handle (when feature enabled).
117    #[cfg(feature = "cuda")]
118    cuda_device: Option<Arc<cudarc::driver::CudaDevice>>,
119    /// Compiled PTX modules (reserved for future multi-kernel support).
120    #[cfg(feature = "cuda")]
121    #[allow(dead_code)]
122    compiled_modules: HashMap<String, bool>,
123}
124
125impl GpuActorRuntime {
126    /// Create a new GPU actor runtime.
127    pub fn new(config: CoordinatorConfig) -> Self {
128        let coordinator = AnalyticsCoordinator::new(config);
129
130        // Try to initialize GPU
131        let (gpu_active, device_name, compute_capability) = Self::init_gpu();
132
133        // Generate kernel code if GPU is available
134        #[cfg(feature = "cuda")]
135        let kernels = if gpu_active {
136            match AnalyticsKernels::generate() {
137                Ok(k) => {
138                    log::info!("Generated {} analytics kernels", 6);
139                    Some(k)
140                }
141                Err(e) => {
142                    log::warn!("Failed to generate kernels: {}", e);
143                    None
144                }
145            }
146        } else {
147            None
148        };
149
150        Self {
151            coordinator,
152            #[cfg(feature = "cuda")]
153            kernels,
154            gpu_active,
155            device_name,
156            compute_capability,
157            messages_processed: 0,
158            #[cfg(feature = "cuda")]
159            cuda_device: None,
160            #[cfg(feature = "cuda")]
161            compiled_modules: HashMap::new(),
162        }
163    }
164
165    /// Initialize GPU and return status.
166    fn init_gpu() -> (bool, Option<String>, Option<(u32, u32)>) {
167        #[cfg(feature = "cuda")]
168        {
169            match cudarc::driver::CudaDevice::new(0) {
170                Ok(device) => {
171                    let name = device.name().unwrap_or_else(|_| "Unknown GPU".to_string());
172                    // Get compute capability
173                    let cc = device.attribute(
174                        cudarc::driver::sys::CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MAJOR
175                    ).ok().and_then(|major| {
176                        device.attribute(
177                            cudarc::driver::sys::CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MINOR
178                        ).ok().map(|minor| (major as u32, minor as u32))
179                    });
180                    log::info!("GPU initialized: {} (CC {:?})", name, cc);
181                    (true, Some(name), cc)
182                }
183                Err(e) => {
184                    log::warn!("Failed to initialize GPU: {}", e);
185                    (false, None, None)
186                }
187            }
188        }
189
190        #[cfg(not(feature = "cuda"))]
191        {
192            log::info!("CUDA feature not enabled, using CPU fallback");
193            (false, None, None)
194        }
195    }
196
197    /// Check if GPU is active.
198    pub fn is_gpu_active(&self) -> bool {
199        self.gpu_active
200    }
201
202    /// Get runtime status.
203    pub fn status(&self) -> RuntimeStatus {
204        #[cfg(feature = "cuda")]
205        let kernels_launched = if self.kernels.is_some() { 6 } else { 0 };
206        #[cfg(not(feature = "cuda"))]
207        let kernels_launched = 0;
208
209        RuntimeStatus {
210            cuda_active: self.gpu_active,
211            device_name: self.device_name.clone(),
212            compute_capability: self.compute_capability,
213            kernels_launched,
214            messages_processed: self.messages_processed,
215            coordinator_stats: self.coordinator.stats.clone(),
216        }
217    }
218
219    /// Analyze a network using GPU actors.
220    ///
221    /// This is the main entry point for GPU-accelerated analytics.
222    /// Returns analytics results computed by the GPU kernel actors.
223    pub fn analyze(&mut self, network: &AccountingNetwork) -> GpuAnalyticsResult {
224        let start = Instant::now();
225        let snapshot_id = self.coordinator.begin_snapshot();
226
227        let mut result = GpuAnalyticsResult {
228            snapshot_id,
229            ..Default::default()
230        };
231
232        let n_accounts = network.accounts.len();
233        let n_flows = network.flows.len();
234
235        if n_accounts == 0 || n_flows == 0 {
236            result.processing_time_us = start.elapsed().as_micros() as u64;
237            return result;
238        }
239
240        // GPU path
241        #[cfg(feature = "cuda")]
242        {
243            let has_kernels = self.kernels.is_some();
244            if self.gpu_active && has_kernels {
245                match self.analyze_gpu(network, &mut result) {
246                    Ok(_) => {
247                        result.processing_time_us = start.elapsed().as_micros() as u64;
248                        self.messages_processed += 6; // One message per kernel
249                        return result;
250                    }
251                    Err(e) => {
252                        log::warn!("GPU analysis failed, falling back to CPU: {}", e);
253                    }
254                }
255            }
256        }
257
258        // CPU fallback
259        self.analyze_cpu(network, &mut result);
260        result.processing_time_us = start.elapsed().as_micros() as u64;
261        result
262    }
263
264    /// GPU-accelerated analysis (CUDA).
265    #[cfg(feature = "cuda")]
266    fn analyze_gpu(
267        &mut self,
268        network: &AccountingNetwork,
269        result: &mut GpuAnalyticsResult,
270    ) -> Result<(), String> {
271        use cudarc::driver::*;
272
273        let device = match &self.cuda_device {
274            Some(d) => d.clone(),
275            None => {
276                let d = CudaDevice::new(0).map_err(|e| e.to_string())?;
277                self.cuda_device = Some(d.clone());
278                d
279            }
280        };
281
282        // === PageRank ===
283        result.pagerank_scores = self.compute_pagerank_gpu(&device, network)?;
284
285        // === Fraud Detection ===
286        let (fraud_count, fraud_flags) = self.detect_fraud_gpu(&device, network)?;
287        result.fraud_pattern_count = fraud_count;
288        result.fraud_flags = fraud_flags;
289
290        // === GAAP Validation ===
291        let (gaap_count, gaap_flags) = self.validate_gaap_gpu(&device, network)?;
292        result.gaap_violation_count = gaap_count;
293        result.gaap_flags = gaap_flags;
294
295        // === Benford Analysis ===
296        let (benford_dist, chi_sq, is_anomalous) = self.analyze_benford_gpu(&device, network)?;
297        result.benford_distribution = benford_dist;
298        result.benford_chi_squared = chi_sq;
299        result.benford_anomaly = is_anomalous;
300
301        // === Suspense Detection ===
302        let (suspense_count, suspense_scores) = self.detect_suspense_gpu(&device, network)?;
303        result.suspense_account_count = suspense_count;
304        result.suspense_scores = suspense_scores;
305
306        // Calculate overall risk
307        result.overall_risk_score = self.calculate_risk_score(result);
308
309        // Update coordinator state
310        self.coordinator.state.pagerank_complete = true;
311        self.coordinator.state.fraud_detection_complete = true;
312        self.coordinator.state.fraud_pattern_count = fraud_count;
313        self.coordinator.state.gaap_validation_complete = true;
314        self.coordinator.state.gaap_violation_count = gaap_count;
315        self.coordinator.state.benford_complete = true;
316        self.coordinator.state.benford_anomaly = is_anomalous;
317        self.coordinator.state.suspense_complete = true;
318        self.coordinator.state.suspense_account_count = suspense_count;
319
320        Ok(())
321    }
322
323    #[cfg(feature = "cuda")]
324    fn compute_pagerank_gpu(
325        &self,
326        _device: &Arc<cudarc::driver::CudaDevice>,
327        network: &AccountingNetwork,
328    ) -> Result<Vec<f64>, String> {
329        // For now, use CPU implementation until ring kernel infrastructure is ready
330        // The ring kernel code is generated but requires the full RingKernel runtime
331        Ok(network.compute_pagerank(
332            self.coordinator.config.pagerank_iterations as usize,
333            self.coordinator.config.pagerank_damping as f64,
334        ))
335    }
336
337    #[cfg(feature = "cuda")]
338    fn detect_fraud_gpu(
339        &self,
340        _device: &Arc<cudarc::driver::CudaDevice>,
341        network: &AccountingNetwork,
342    ) -> Result<(u32, Vec<u32>), String> {
343        let n_flows = network.flows.len();
344        let mut flags = vec![0u32; n_flows];
345        let mut count = 0u32;
346
347        // Simple CPU-side fraud detection until ring kernels are fully integrated
348        for (i, flow) in network.flows.iter().enumerate() {
349            let amount = flow.amount.to_f64().abs();
350            let mut flag = 0u32;
351
352            // Round amount check
353            if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
354                flag |= 0x01;
355            }
356
357            // Self-loop check
358            if flow.source_account_index == flow.target_account_index {
359                flag |= 0x02;
360            }
361
362            // Threshold proximity
363            if (amount - 10000.0).abs() < 500.0 {
364                flag |= 0x04;
365            }
366
367            if flag != 0 {
368                count += 1;
369            }
370            flags[i] = flag;
371        }
372
373        Ok((count, flags))
374    }
375
376    #[cfg(feature = "cuda")]
377    fn validate_gaap_gpu(
378        &self,
379        _device: &Arc<cudarc::driver::CudaDevice>,
380        network: &AccountingNetwork,
381    ) -> Result<(u32, Vec<u32>), String> {
382        let n_flows = network.flows.len();
383        let mut flags = vec![0u32; n_flows];
384        let mut count = 0u32;
385
386        for (i, flow) in network.flows.iter().enumerate() {
387            let source_type = network
388                .accounts
389                .get(flow.source_account_index as usize)
390                .map(|a| a.account_type as u8)
391                .unwrap_or(0);
392            let target_type = network
393                .accounts
394                .get(flow.target_account_index as usize)
395                .map(|a| a.account_type as u8)
396                .unwrap_or(0);
397
398            // Revenue (3) -> Asset (0) direct
399            if source_type == 3 && target_type == 0 {
400                flags[i] = 1;
401                count += 1;
402            }
403            // Revenue (3) -> Expense (4) direct
404            else if source_type == 3 && target_type == 4 {
405                flags[i] = 2;
406                count += 1;
407            }
408        }
409
410        Ok((count, flags))
411    }
412
413    #[cfg(feature = "cuda")]
414    fn analyze_benford_gpu(
415        &self,
416        _device: &Arc<cudarc::driver::CudaDevice>,
417        network: &AccountingNetwork,
418    ) -> Result<([u32; 9], f32, bool), String> {
419        let mut counts = [0u32; 9];
420
421        for flow in &network.flows {
422            let amount = flow.amount.to_f64().abs();
423            if amount >= 1.0 {
424                let mut v = amount;
425                while v >= 10.0 {
426                    v /= 10.0;
427                }
428                let digit = v as u32;
429                if (1..=9).contains(&digit) {
430                    counts[(digit - 1) as usize] += 1;
431                }
432            }
433        }
434
435        // Chi-squared test
436        let total: u32 = counts.iter().sum();
437        let expected = [
438            0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
439        ];
440        let mut chi_sq = 0.0f32;
441
442        if total >= 50 {
443            for (i, &count) in counts.iter().enumerate() {
444                let observed = count as f32 / total as f32;
445                let exp = expected[i];
446                chi_sq += (observed - exp).powi(2) / exp;
447            }
448        }
449
450        let is_anomalous = total >= 50 && chi_sq > 15.507;
451
452        Ok((counts, chi_sq, is_anomalous))
453    }
454
455    #[cfg(feature = "cuda")]
456    fn detect_suspense_gpu(
457        &self,
458        _device: &Arc<cudarc::driver::CudaDevice>,
459        network: &AccountingNetwork,
460    ) -> Result<(u32, Vec<f32>), String> {
461        let n_accounts = network.accounts.len();
462        let mut scores = vec![0.0f32; n_accounts];
463        let mut count = 0u32;
464
465        for (i, account) in network.accounts.iter().enumerate() {
466            let mut score = 0.0f32;
467
468            // Round balance check
469            let balance = (account.closing_balance.to_f64()).abs();
470            if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
471                score += 0.3;
472            }
473
474            // High risk score
475            if account.risk_score > 0.5 {
476                score += 0.4;
477            }
478
479            // Flow imbalance
480            let ratio = if account.out_degree > 0 {
481                account.in_degree as f32 / account.out_degree as f32
482            } else {
483                10.0
484            };
485            if ratio > 5.0 {
486                score += 0.3;
487            }
488
489            scores[i] = score.min(1.0);
490            if scores[i] > 0.5 {
491                count += 1;
492            }
493        }
494
495        Ok((count, scores))
496    }
497
498    /// CPU fallback analysis.
499    fn analyze_cpu(&mut self, network: &AccountingNetwork, result: &mut GpuAnalyticsResult) {
500        let n_accounts = network.accounts.len();
501        let n_flows = network.flows.len();
502
503        // PageRank
504        result.pagerank_scores = network.compute_pagerank(
505            self.coordinator.config.pagerank_iterations as usize,
506            self.coordinator.config.pagerank_damping as f64,
507        );
508
509        // Fraud detection
510        result.fraud_flags = vec![0u32; n_flows];
511        for (i, flow) in network.flows.iter().enumerate() {
512            let amount = flow.amount.to_f64().abs();
513            let mut flag = 0u32;
514
515            if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
516                flag |= 0x01;
517            }
518            if flow.source_account_index == flow.target_account_index {
519                flag |= 0x02;
520            }
521            if (amount - 10000.0).abs() < 500.0 {
522                flag |= 0x04;
523            }
524
525            result.fraud_flags[i] = flag;
526            if flag != 0 {
527                result.fraud_pattern_count += 1;
528            }
529        }
530
531        // GAAP validation
532        result.gaap_flags = vec![0u32; n_flows];
533        for (i, flow) in network.flows.iter().enumerate() {
534            let source_type = network
535                .accounts
536                .get(flow.source_account_index as usize)
537                .map(|a| a.account_type as u8)
538                .unwrap_or(0);
539            let target_type = network
540                .accounts
541                .get(flow.target_account_index as usize)
542                .map(|a| a.account_type as u8)
543                .unwrap_or(0);
544
545            if source_type == 3 && (target_type == 0 || target_type == 4) {
546                result.gaap_flags[i] = 1;
547                result.gaap_violation_count += 1;
548            }
549        }
550
551        // Benford analysis
552        for flow in &network.flows {
553            let amount = flow.amount.to_f64().abs();
554            if amount >= 1.0 {
555                let mut v = amount;
556                while v >= 10.0 {
557                    v /= 10.0;
558                }
559                let digit = v as u32;
560                if (1..=9).contains(&digit) {
561                    result.benford_distribution[(digit - 1) as usize] += 1;
562                }
563            }
564        }
565
566        let total: u32 = result.benford_distribution.iter().sum();
567        if total >= 50 {
568            let expected = [
569                0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
570            ];
571            for (i, &count) in result.benford_distribution.iter().enumerate() {
572                let observed = count as f32 / total as f32;
573                let exp = expected[i];
574                result.benford_chi_squared += (observed - exp).powi(2) / exp;
575            }
576            result.benford_anomaly = result.benford_chi_squared > 15.507;
577        }
578
579        // Suspense detection
580        result.suspense_scores = vec![0.0f32; n_accounts];
581        for (i, account) in network.accounts.iter().enumerate() {
582            let mut score = 0.0f32;
583
584            let balance = account.closing_balance.to_f64().abs();
585            if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
586                score += 0.3;
587            }
588            if account.risk_score > 0.5 {
589                score += 0.4;
590            }
591            let ratio = if account.out_degree > 0 {
592                account.in_degree as f32 / account.out_degree as f32
593            } else {
594                10.0
595            };
596            if ratio > 5.0 {
597                score += 0.3;
598            }
599
600            result.suspense_scores[i] = score.min(1.0);
601            if result.suspense_scores[i] > 0.5 {
602                result.suspense_account_count += 1;
603            }
604        }
605
606        // Overall risk
607        result.overall_risk_score = self.calculate_risk_score(result);
608
609        // Update coordinator
610        self.coordinator.state.pagerank_complete = true;
611        self.coordinator.state.fraud_detection_complete = true;
612        self.coordinator.state.fraud_pattern_count = result.fraud_pattern_count;
613        self.coordinator.state.gaap_validation_complete = true;
614        self.coordinator.state.gaap_violation_count = result.gaap_violation_count;
615        self.coordinator.state.benford_complete = true;
616        self.coordinator.state.benford_anomaly = result.benford_anomaly;
617        self.coordinator.state.suspense_complete = true;
618        self.coordinator.state.suspense_account_count = result.suspense_account_count;
619    }
620
621    fn calculate_risk_score(&self, result: &GpuAnalyticsResult) -> f32 {
622        let fraud_risk = (result.fraud_pattern_count as f32 / 100.0).min(1.0);
623        let gaap_risk = (result.gaap_violation_count as f32 / 50.0).min(1.0);
624        let suspense_risk = (result.suspense_account_count as f32 / 20.0).min(1.0);
625        let benford_risk = if result.benford_anomaly { 0.5 } else { 0.0 };
626
627        (fraud_risk * 0.35 + gaap_risk * 0.25 + suspense_risk * 0.25 + benford_risk * 0.15).min(1.0)
628    }
629
630    /// Get coordinator reference.
631    pub fn coordinator(&self) -> &AnalyticsCoordinator {
632        &self.coordinator
633    }
634
635    /// Get coordinator mutable reference.
636    pub fn coordinator_mut(&mut self) -> &mut AnalyticsCoordinator {
637        &mut self.coordinator
638    }
639}
640
641impl Default for GpuActorRuntime {
642    fn default() -> Self {
643        Self::new(CoordinatorConfig::default())
644    }
645}
646
647#[cfg(test)]
648mod tests {
649    use super::*;
650    use uuid::Uuid;
651
652    #[test]
653    fn test_runtime_creation() {
654        let runtime = GpuActorRuntime::default();
655        let status = runtime.status();
656        // GPU availability depends on the system
657        assert_eq!(status.messages_processed, 0);
658    }
659
660    #[test]
661    fn test_analyze_empty_network() {
662        let mut runtime = GpuActorRuntime::default();
663        let network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
664
665        let result = runtime.analyze(&network);
666        assert_eq!(result.fraud_pattern_count, 0);
667        assert_eq!(result.gaap_violation_count, 0);
668    }
669
670    #[test]
671    fn test_cpu_fallback() {
672        let mut runtime = GpuActorRuntime::default();
673        // Force CPU path by creating network
674        let mut network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
675
676        // Add some test data
677        use crate::models::{
678            AccountMetadata, AccountNode, AccountType, Decimal128, HybridTimestamp, TransactionFlow,
679        };
680
681        let cash = network.add_account(
682            AccountNode::new(Uuid::new_v4(), AccountType::Asset, 0),
683            AccountMetadata::new("1100", "Cash"),
684        );
685        let revenue = network.add_account(
686            AccountNode::new(Uuid::new_v4(), AccountType::Revenue, 0),
687            AccountMetadata::new("4000", "Revenue"),
688        );
689
690        network.add_flow(TransactionFlow::new(
691            revenue,
692            cash,
693            Decimal128::from_f64(1000.0),
694            Uuid::new_v4(),
695            HybridTimestamp::now(),
696        ));
697
698        let result = runtime.analyze(&network);
699        // Should have detected GAAP violation (Revenue -> Asset)
700        assert!(result.gaap_violation_count > 0 || result.pagerank_scores.len() == 2);
701    }
702
703    #[test]
704    fn test_risk_score_calculation() {
705        let runtime = GpuActorRuntime::default();
706        let result = GpuAnalyticsResult {
707            fraud_pattern_count: 50,
708            gaap_violation_count: 25,
709            suspense_account_count: 10,
710            benford_anomaly: true,
711            ..Default::default()
712        };
713
714        let risk = runtime.calculate_risk_score(&result);
715        assert!(risk > 0.0);
716        assert!(risk <= 1.0);
717    }
718}