1use std::time::Instant;
7
8#[cfg(feature = "cuda")]
9use std::collections::HashMap;
10#[cfg(feature = "cuda")]
11use std::sync::Arc;
12
13#[cfg(feature = "cuda")]
14use crate::error::AccNetError;
15use crate::models::AccountingNetwork;
16
17use super::coordinator::{AnalyticsCoordinator, CoordinatorConfig, CoordinatorStats};
18#[cfg(feature = "cuda")]
19use super::kernels::AnalyticsKernels;
20#[allow(unused_imports)]
21use super::messages::*;
22
23#[derive(Debug)]
25pub struct GpuBuffer {
26 pub size: usize,
28 pub device_ptr: u64,
30}
31
32#[derive(Debug, Clone)]
34pub struct GpuAnalyticsResult {
35 pub snapshot_id: u64,
37 pub pagerank_scores: Vec<f64>,
39 pub fraud_pattern_count: u32,
41 pub fraud_flags: Vec<u32>,
43 pub gaap_violation_count: u32,
45 pub gaap_flags: Vec<u32>,
47 pub suspense_account_count: u32,
49 pub suspense_scores: Vec<f32>,
51 pub benford_distribution: [u32; 9],
53 pub benford_chi_squared: f32,
55 pub benford_anomaly: bool,
57 pub overall_risk_score: f32,
59 pub processing_time_us: u64,
61}
62
63impl Default for GpuAnalyticsResult {
64 fn default() -> Self {
65 Self {
66 snapshot_id: 0,
67 pagerank_scores: Vec::new(),
68 fraud_pattern_count: 0,
69 fraud_flags: Vec::new(),
70 gaap_violation_count: 0,
71 gaap_flags: Vec::new(),
72 suspense_account_count: 0,
73 suspense_scores: Vec::new(),
74 benford_distribution: [0; 9],
75 benford_chi_squared: 0.0,
76 benford_anomaly: false,
77 overall_risk_score: 0.0,
78 processing_time_us: 0,
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85pub struct RuntimeStatus {
86 pub cuda_active: bool,
88 pub device_name: Option<String>,
90 pub compute_capability: Option<(u32, u32)>,
92 pub kernels_launched: usize,
94 pub messages_processed: u64,
96 pub coordinator_stats: CoordinatorStats,
98}
99
100pub struct GpuActorRuntime {
105 coordinator: AnalyticsCoordinator,
107 #[cfg(feature = "cuda")]
109 kernels: Option<AnalyticsKernels>,
110 gpu_active: bool,
112 device_name: Option<String>,
114 compute_capability: Option<(u32, u32)>,
116 messages_processed: u64,
118 #[cfg(feature = "cuda")]
120 cuda_device: Option<Arc<cudarc::driver::CudaContext>>,
121 #[cfg(feature = "cuda")]
123 #[allow(dead_code)]
124 compiled_modules: HashMap<String, bool>,
125}
126
127impl GpuActorRuntime {
128 pub fn new(config: CoordinatorConfig) -> Self {
130 let coordinator = AnalyticsCoordinator::new(config);
131
132 let (gpu_active, device_name, compute_capability) = Self::init_gpu();
134
135 #[cfg(feature = "cuda")]
137 let kernels = if gpu_active {
138 match AnalyticsKernels::generate() {
139 Ok(k) => {
140 log::info!("Generated {} analytics kernels", 6);
141 Some(k)
142 }
143 Err(e) => {
144 log::warn!("Failed to generate kernels: {}", e);
145 None
146 }
147 }
148 } else {
149 None
150 };
151
152 Self {
153 coordinator,
154 #[cfg(feature = "cuda")]
155 kernels,
156 gpu_active,
157 device_name,
158 compute_capability,
159 messages_processed: 0,
160 #[cfg(feature = "cuda")]
161 cuda_device: None,
162 #[cfg(feature = "cuda")]
163 compiled_modules: HashMap::new(),
164 }
165 }
166
167 fn init_gpu() -> (bool, Option<String>, Option<(u32, u32)>) {
169 #[cfg(feature = "cuda")]
170 {
171 match cudarc::driver::CudaContext::new(0) {
172 Ok(context) => {
173 let name = context.name().unwrap_or_else(|_| "Unknown GPU".to_string());
174 let cc = context
176 .compute_capability()
177 .ok()
178 .map(|(major, minor)| (major as u32, minor as u32));
179 log::info!("GPU initialized: {} (CC {:?})", name, cc);
180 (true, Some(name), cc)
181 }
182 Err(e) => {
183 log::warn!("Failed to initialize GPU: {}", e);
184 (false, None, None)
185 }
186 }
187 }
188
189 #[cfg(not(feature = "cuda"))]
190 {
191 log::info!("CUDA feature not enabled, using CPU fallback");
192 (false, None, None)
193 }
194 }
195
196 pub fn is_gpu_active(&self) -> bool {
198 self.gpu_active
199 }
200
201 pub fn status(&self) -> RuntimeStatus {
203 #[cfg(feature = "cuda")]
204 let kernels_launched = if self.kernels.is_some() { 6 } else { 0 };
205 #[cfg(not(feature = "cuda"))]
206 let kernels_launched = 0;
207
208 RuntimeStatus {
209 cuda_active: self.gpu_active,
210 device_name: self.device_name.clone(),
211 compute_capability: self.compute_capability,
212 kernels_launched,
213 messages_processed: self.messages_processed,
214 coordinator_stats: self.coordinator.stats.clone(),
215 }
216 }
217
218 pub fn analyze(&mut self, network: &AccountingNetwork) -> GpuAnalyticsResult {
223 let start = Instant::now();
224 let snapshot_id = self.coordinator.begin_snapshot();
225
226 let mut result = GpuAnalyticsResult {
227 snapshot_id,
228 ..Default::default()
229 };
230
231 let n_accounts = network.accounts.len();
232 let n_flows = network.flows.len();
233
234 if n_accounts == 0 || n_flows == 0 {
235 result.processing_time_us = start.elapsed().as_micros() as u64;
236 return result;
237 }
238
239 #[cfg(feature = "cuda")]
241 {
242 let has_kernels = self.kernels.is_some();
243 if self.gpu_active && has_kernels {
244 match self.analyze_gpu(network, &mut result) {
245 Ok(_) => {
246 result.processing_time_us = start.elapsed().as_micros() as u64;
247 self.messages_processed += 6; return result;
249 }
250 Err(e) => {
251 log::warn!("GPU analysis failed, falling back to CPU: {}", e);
252 }
253 }
254 }
255 }
256
257 self.analyze_cpu(network, &mut result);
259 result.processing_time_us = start.elapsed().as_micros() as u64;
260 result
261 }
262
263 #[cfg(feature = "cuda")]
265 fn analyze_gpu(
266 &mut self,
267 network: &AccountingNetwork,
268 result: &mut GpuAnalyticsResult,
269 ) -> crate::Result<()> {
270 let device = match &self.cuda_device {
271 Some(d) => d.clone(),
272 None => {
273 let d = cudarc::driver::CudaContext::new(0)
274 .map_err(|e| AccNetError::DeviceCreation(e.to_string()))?;
275 self.cuda_device = Some(d.clone());
276 d
277 }
278 };
279
280 result.pagerank_scores = self.compute_pagerank_gpu(&device, network)?;
282
283 let (fraud_count, fraud_flags) = self.detect_fraud_gpu(&device, network)?;
285 result.fraud_pattern_count = fraud_count;
286 result.fraud_flags = fraud_flags;
287
288 let (gaap_count, gaap_flags) = self.validate_gaap_gpu(&device, network)?;
290 result.gaap_violation_count = gaap_count;
291 result.gaap_flags = gaap_flags;
292
293 let (benford_dist, chi_sq, is_anomalous) = self.analyze_benford_gpu(&device, network)?;
295 result.benford_distribution = benford_dist;
296 result.benford_chi_squared = chi_sq;
297 result.benford_anomaly = is_anomalous;
298
299 let (suspense_count, suspense_scores) = self.detect_suspense_gpu(&device, network)?;
301 result.suspense_account_count = suspense_count;
302 result.suspense_scores = suspense_scores;
303
304 result.overall_risk_score = self.calculate_risk_score(result);
306
307 self.coordinator.state.pagerank_complete = true;
309 self.coordinator.state.fraud_detection_complete = true;
310 self.coordinator.state.fraud_pattern_count = fraud_count;
311 self.coordinator.state.gaap_validation_complete = true;
312 self.coordinator.state.gaap_violation_count = gaap_count;
313 self.coordinator.state.benford_complete = true;
314 self.coordinator.state.benford_anomaly = is_anomalous;
315 self.coordinator.state.suspense_complete = true;
316 self.coordinator.state.suspense_account_count = suspense_count;
317
318 Ok(())
319 }
320
321 #[cfg(feature = "cuda")]
322 fn compute_pagerank_gpu(
323 &self,
324 _device: &Arc<cudarc::driver::CudaContext>,
325 network: &AccountingNetwork,
326 ) -> crate::Result<Vec<f64>> {
327 Ok(network.compute_pagerank(
330 self.coordinator.config.pagerank_iterations as usize,
331 self.coordinator.config.pagerank_damping as f64,
332 ))
333 }
334
335 #[cfg(feature = "cuda")]
336 fn detect_fraud_gpu(
337 &self,
338 _device: &Arc<cudarc::driver::CudaContext>,
339 network: &AccountingNetwork,
340 ) -> crate::Result<(u32, Vec<u32>)> {
341 let n_flows = network.flows.len();
342 let mut flags = vec![0u32; n_flows];
343 let mut count = 0u32;
344
345 for (i, flow) in network.flows.iter().enumerate() {
347 let amount = flow.amount.to_f64().abs();
348 let mut flag = 0u32;
349
350 if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
352 flag |= 0x01;
353 }
354
355 if flow.source_account_index == flow.target_account_index {
357 flag |= 0x02;
358 }
359
360 if (amount - 10000.0).abs() < 500.0 {
362 flag |= 0x04;
363 }
364
365 if flag != 0 {
366 count += 1;
367 }
368 flags[i] = flag;
369 }
370
371 Ok((count, flags))
372 }
373
374 #[cfg(feature = "cuda")]
375 fn validate_gaap_gpu(
376 &self,
377 _device: &Arc<cudarc::driver::CudaContext>,
378 network: &AccountingNetwork,
379 ) -> crate::Result<(u32, Vec<u32>)> {
380 let n_flows = network.flows.len();
381 let mut flags = vec![0u32; n_flows];
382 let mut count = 0u32;
383
384 for (i, flow) in network.flows.iter().enumerate() {
385 let source_type = network
386 .accounts
387 .get(flow.source_account_index as usize)
388 .map(|a| a.account_type as u8)
389 .unwrap_or(0);
390 let target_type = network
391 .accounts
392 .get(flow.target_account_index as usize)
393 .map(|a| a.account_type as u8)
394 .unwrap_or(0);
395
396 if source_type == 3 && target_type == 0 {
398 flags[i] = 1;
399 count += 1;
400 }
401 else if source_type == 3 && target_type == 4 {
403 flags[i] = 2;
404 count += 1;
405 }
406 }
407
408 Ok((count, flags))
409 }
410
411 #[cfg(feature = "cuda")]
412 fn analyze_benford_gpu(
413 &self,
414 _device: &Arc<cudarc::driver::CudaContext>,
415 network: &AccountingNetwork,
416 ) -> crate::Result<([u32; 9], f32, bool)> {
417 let mut counts = [0u32; 9];
418
419 for flow in &network.flows {
420 let amount = flow.amount.to_f64().abs();
421 if amount >= 1.0 {
422 let mut v = amount;
423 while v >= 10.0 {
424 v /= 10.0;
425 }
426 let digit = v as u32;
427 if (1..=9).contains(&digit) {
428 counts[(digit - 1) as usize] += 1;
429 }
430 }
431 }
432
433 let total: u32 = counts.iter().sum();
435 let expected = [
436 0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
437 ];
438 let mut chi_sq = 0.0f32;
439
440 if total >= 50 {
441 for (i, &count) in counts.iter().enumerate() {
442 let observed = count as f32 / total as f32;
443 let exp = expected[i];
444 chi_sq += (observed - exp).powi(2) / exp;
445 }
446 }
447
448 let is_anomalous = total >= 50 && chi_sq > 15.507;
449
450 Ok((counts, chi_sq, is_anomalous))
451 }
452
453 #[cfg(feature = "cuda")]
454 fn detect_suspense_gpu(
455 &self,
456 _device: &Arc<cudarc::driver::CudaContext>,
457 network: &AccountingNetwork,
458 ) -> crate::Result<(u32, Vec<f32>)> {
459 let n_accounts = network.accounts.len();
460 let mut scores = vec![0.0f32; n_accounts];
461 let mut count = 0u32;
462
463 for (i, account) in network.accounts.iter().enumerate() {
464 let mut score = 0.0f32;
465
466 let balance = (account.closing_balance.to_f64()).abs();
468 if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
469 score += 0.3;
470 }
471
472 if account.risk_score > 0.5 {
474 score += 0.4;
475 }
476
477 let ratio = if account.out_degree > 0 {
479 account.in_degree as f32 / account.out_degree as f32
480 } else {
481 10.0
482 };
483 if ratio > 5.0 {
484 score += 0.3;
485 }
486
487 scores[i] = score.min(1.0);
488 if scores[i] > 0.5 {
489 count += 1;
490 }
491 }
492
493 Ok((count, scores))
494 }
495
496 fn analyze_cpu(&mut self, network: &AccountingNetwork, result: &mut GpuAnalyticsResult) {
498 let n_accounts = network.accounts.len();
499 let n_flows = network.flows.len();
500
501 result.pagerank_scores = network.compute_pagerank(
503 self.coordinator.config.pagerank_iterations as usize,
504 self.coordinator.config.pagerank_damping as f64,
505 );
506
507 result.fraud_flags = vec![0u32; n_flows];
509 for (i, flow) in network.flows.iter().enumerate() {
510 let amount = flow.amount.to_f64().abs();
511 let mut flag = 0u32;
512
513 if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
514 flag |= 0x01;
515 }
516 if flow.source_account_index == flow.target_account_index {
517 flag |= 0x02;
518 }
519 if (amount - 10000.0).abs() < 500.0 {
520 flag |= 0x04;
521 }
522
523 result.fraud_flags[i] = flag;
524 if flag != 0 {
525 result.fraud_pattern_count += 1;
526 }
527 }
528
529 result.gaap_flags = vec![0u32; n_flows];
531 for (i, flow) in network.flows.iter().enumerate() {
532 let source_type = network
533 .accounts
534 .get(flow.source_account_index as usize)
535 .map(|a| a.account_type as u8)
536 .unwrap_or(0);
537 let target_type = network
538 .accounts
539 .get(flow.target_account_index as usize)
540 .map(|a| a.account_type as u8)
541 .unwrap_or(0);
542
543 if source_type == 3 && (target_type == 0 || target_type == 4) {
544 result.gaap_flags[i] = 1;
545 result.gaap_violation_count += 1;
546 }
547 }
548
549 for flow in &network.flows {
551 let amount = flow.amount.to_f64().abs();
552 if amount >= 1.0 {
553 let mut v = amount;
554 while v >= 10.0 {
555 v /= 10.0;
556 }
557 let digit = v as u32;
558 if (1..=9).contains(&digit) {
559 result.benford_distribution[(digit - 1) as usize] += 1;
560 }
561 }
562 }
563
564 let total: u32 = result.benford_distribution.iter().sum();
565 if total >= 50 {
566 let expected = [
567 0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
568 ];
569 for (i, &count) in result.benford_distribution.iter().enumerate() {
570 let observed = count as f32 / total as f32;
571 let exp = expected[i];
572 result.benford_chi_squared += (observed - exp).powi(2) / exp;
573 }
574 result.benford_anomaly = result.benford_chi_squared > 15.507;
575 }
576
577 result.suspense_scores = vec![0.0f32; n_accounts];
579 for (i, account) in network.accounts.iter().enumerate() {
580 let mut score = 0.0f32;
581
582 let balance = account.closing_balance.to_f64().abs();
583 if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
584 score += 0.3;
585 }
586 if account.risk_score > 0.5 {
587 score += 0.4;
588 }
589 let ratio = if account.out_degree > 0 {
590 account.in_degree as f32 / account.out_degree as f32
591 } else {
592 10.0
593 };
594 if ratio > 5.0 {
595 score += 0.3;
596 }
597
598 result.suspense_scores[i] = score.min(1.0);
599 if result.suspense_scores[i] > 0.5 {
600 result.suspense_account_count += 1;
601 }
602 }
603
604 result.overall_risk_score = self.calculate_risk_score(result);
606
607 self.coordinator.state.pagerank_complete = true;
609 self.coordinator.state.fraud_detection_complete = true;
610 self.coordinator.state.fraud_pattern_count = result.fraud_pattern_count;
611 self.coordinator.state.gaap_validation_complete = true;
612 self.coordinator.state.gaap_violation_count = result.gaap_violation_count;
613 self.coordinator.state.benford_complete = true;
614 self.coordinator.state.benford_anomaly = result.benford_anomaly;
615 self.coordinator.state.suspense_complete = true;
616 self.coordinator.state.suspense_account_count = result.suspense_account_count;
617 }
618
619 fn calculate_risk_score(&self, result: &GpuAnalyticsResult) -> f32 {
620 let fraud_risk = (result.fraud_pattern_count as f32 / 100.0).min(1.0);
621 let gaap_risk = (result.gaap_violation_count as f32 / 50.0).min(1.0);
622 let suspense_risk = (result.suspense_account_count as f32 / 20.0).min(1.0);
623 let benford_risk = if result.benford_anomaly { 0.5 } else { 0.0 };
624
625 (fraud_risk * 0.35 + gaap_risk * 0.25 + suspense_risk * 0.25 + benford_risk * 0.15).min(1.0)
626 }
627
628 pub fn coordinator(&self) -> &AnalyticsCoordinator {
630 &self.coordinator
631 }
632
633 pub fn coordinator_mut(&mut self) -> &mut AnalyticsCoordinator {
635 &mut self.coordinator
636 }
637}
638
639impl Default for GpuActorRuntime {
640 fn default() -> Self {
641 Self::new(CoordinatorConfig::default())
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use uuid::Uuid;
649
650 #[test]
651 fn test_runtime_creation() {
652 let runtime = GpuActorRuntime::default();
653 let status = runtime.status();
654 assert_eq!(status.messages_processed, 0);
656 }
657
658 #[test]
659 fn test_analyze_empty_network() {
660 let mut runtime = GpuActorRuntime::default();
661 let network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
662
663 let result = runtime.analyze(&network);
664 assert_eq!(result.fraud_pattern_count, 0);
665 assert_eq!(result.gaap_violation_count, 0);
666 }
667
668 #[test]
669 fn test_cpu_fallback() {
670 let mut runtime = GpuActorRuntime::default();
671 let mut network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
673
674 use crate::models::{
676 AccountMetadata, AccountNode, AccountType, Decimal128, HybridTimestamp, TransactionFlow,
677 };
678
679 let cash = network.add_account(
680 AccountNode::new(Uuid::new_v4(), AccountType::Asset, 0),
681 AccountMetadata::new("1100", "Cash"),
682 );
683 let revenue = network.add_account(
684 AccountNode::new(Uuid::new_v4(), AccountType::Revenue, 0),
685 AccountMetadata::new("4000", "Revenue"),
686 );
687
688 network.add_flow(TransactionFlow::new(
689 revenue,
690 cash,
691 Decimal128::from_f64(1000.0),
692 Uuid::new_v4(),
693 HybridTimestamp::now(),
694 ));
695
696 let result = runtime.analyze(&network);
697 assert!(result.gaap_violation_count > 0 || result.pagerank_scores.len() == 2);
699 }
700
701 #[test]
702 fn test_risk_score_calculation() {
703 let runtime = GpuActorRuntime::default();
704 let result = GpuAnalyticsResult {
705 fraud_pattern_count: 50,
706 gaap_violation_count: 25,
707 suspense_account_count: 10,
708 benford_anomaly: true,
709 ..Default::default()
710 };
711
712 let risk = runtime.calculate_risk_score(&result);
713 assert!(risk > 0.0);
714 assert!(risk <= 1.0);
715 }
716}