1use std::time::Instant;
7
8#[cfg(feature = "cuda")]
9use std::collections::HashMap;
10#[cfg(feature = "cuda")]
11use std::sync::Arc;
12
13use crate::models::AccountingNetwork;
14
15use super::coordinator::{AnalyticsCoordinator, CoordinatorConfig, CoordinatorStats};
16#[cfg(feature = "cuda")]
17use super::kernels::AnalyticsKernels;
18#[allow(unused_imports)]
19use super::messages::*;
20
21#[derive(Debug)]
23pub struct GpuBuffer {
24 pub size: usize,
26 pub device_ptr: u64,
28}
29
30#[derive(Debug, Clone)]
32pub struct GpuAnalyticsResult {
33 pub snapshot_id: u64,
35 pub pagerank_scores: Vec<f64>,
37 pub fraud_pattern_count: u32,
39 pub fraud_flags: Vec<u32>,
41 pub gaap_violation_count: u32,
43 pub gaap_flags: Vec<u32>,
45 pub suspense_account_count: u32,
47 pub suspense_scores: Vec<f32>,
49 pub benford_distribution: [u32; 9],
51 pub benford_chi_squared: f32,
53 pub benford_anomaly: bool,
55 pub overall_risk_score: f32,
57 pub processing_time_us: u64,
59}
60
61impl Default for GpuAnalyticsResult {
62 fn default() -> Self {
63 Self {
64 snapshot_id: 0,
65 pagerank_scores: Vec::new(),
66 fraud_pattern_count: 0,
67 fraud_flags: Vec::new(),
68 gaap_violation_count: 0,
69 gaap_flags: Vec::new(),
70 suspense_account_count: 0,
71 suspense_scores: Vec::new(),
72 benford_distribution: [0; 9],
73 benford_chi_squared: 0.0,
74 benford_anomaly: false,
75 overall_risk_score: 0.0,
76 processing_time_us: 0,
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct RuntimeStatus {
84 pub cuda_active: bool,
86 pub device_name: Option<String>,
88 pub compute_capability: Option<(u32, u32)>,
90 pub kernels_launched: usize,
92 pub messages_processed: u64,
94 pub coordinator_stats: CoordinatorStats,
96}
97
98pub struct GpuActorRuntime {
103 coordinator: AnalyticsCoordinator,
105 #[cfg(feature = "cuda")]
107 kernels: Option<AnalyticsKernels>,
108 gpu_active: bool,
110 device_name: Option<String>,
112 compute_capability: Option<(u32, u32)>,
114 messages_processed: u64,
116 #[cfg(feature = "cuda")]
118 cuda_device: Option<Arc<cudarc::driver::CudaDevice>>,
119 #[cfg(feature = "cuda")]
121 #[allow(dead_code)]
122 compiled_modules: HashMap<String, bool>,
123}
124
125impl GpuActorRuntime {
126 pub fn new(config: CoordinatorConfig) -> Self {
128 let coordinator = AnalyticsCoordinator::new(config);
129
130 let (gpu_active, device_name, compute_capability) = Self::init_gpu();
132
133 #[cfg(feature = "cuda")]
135 let kernels = if gpu_active {
136 match AnalyticsKernels::generate() {
137 Ok(k) => {
138 log::info!("Generated {} analytics kernels", 6);
139 Some(k)
140 }
141 Err(e) => {
142 log::warn!("Failed to generate kernels: {}", e);
143 None
144 }
145 }
146 } else {
147 None
148 };
149
150 Self {
151 coordinator,
152 #[cfg(feature = "cuda")]
153 kernels,
154 gpu_active,
155 device_name,
156 compute_capability,
157 messages_processed: 0,
158 #[cfg(feature = "cuda")]
159 cuda_device: None,
160 #[cfg(feature = "cuda")]
161 compiled_modules: HashMap::new(),
162 }
163 }
164
165 fn init_gpu() -> (bool, Option<String>, Option<(u32, u32)>) {
167 #[cfg(feature = "cuda")]
168 {
169 match cudarc::driver::CudaDevice::new(0) {
170 Ok(device) => {
171 let name = device.name().unwrap_or_else(|_| "Unknown GPU".to_string());
172 let cc = device.attribute(
174 cudarc::driver::sys::CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MAJOR
175 ).ok().and_then(|major| {
176 device.attribute(
177 cudarc::driver::sys::CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MINOR
178 ).ok().map(|minor| (major as u32, minor as u32))
179 });
180 log::info!("GPU initialized: {} (CC {:?})", name, cc);
181 (true, Some(name), cc)
182 }
183 Err(e) => {
184 log::warn!("Failed to initialize GPU: {}", e);
185 (false, None, None)
186 }
187 }
188 }
189
190 #[cfg(not(feature = "cuda"))]
191 {
192 log::info!("CUDA feature not enabled, using CPU fallback");
193 (false, None, None)
194 }
195 }
196
197 pub fn is_gpu_active(&self) -> bool {
199 self.gpu_active
200 }
201
202 pub fn status(&self) -> RuntimeStatus {
204 #[cfg(feature = "cuda")]
205 let kernels_launched = if self.kernels.is_some() { 6 } else { 0 };
206 #[cfg(not(feature = "cuda"))]
207 let kernels_launched = 0;
208
209 RuntimeStatus {
210 cuda_active: self.gpu_active,
211 device_name: self.device_name.clone(),
212 compute_capability: self.compute_capability,
213 kernels_launched,
214 messages_processed: self.messages_processed,
215 coordinator_stats: self.coordinator.stats.clone(),
216 }
217 }
218
219 pub fn analyze(&mut self, network: &AccountingNetwork) -> GpuAnalyticsResult {
224 let start = Instant::now();
225 let snapshot_id = self.coordinator.begin_snapshot();
226
227 let mut result = GpuAnalyticsResult {
228 snapshot_id,
229 ..Default::default()
230 };
231
232 let n_accounts = network.accounts.len();
233 let n_flows = network.flows.len();
234
235 if n_accounts == 0 || n_flows == 0 {
236 result.processing_time_us = start.elapsed().as_micros() as u64;
237 return result;
238 }
239
240 #[cfg(feature = "cuda")]
242 {
243 let has_kernels = self.kernels.is_some();
244 if self.gpu_active && has_kernels {
245 match self.analyze_gpu(network, &mut result) {
246 Ok(_) => {
247 result.processing_time_us = start.elapsed().as_micros() as u64;
248 self.messages_processed += 6; return result;
250 }
251 Err(e) => {
252 log::warn!("GPU analysis failed, falling back to CPU: {}", e);
253 }
254 }
255 }
256 }
257
258 self.analyze_cpu(network, &mut result);
260 result.processing_time_us = start.elapsed().as_micros() as u64;
261 result
262 }
263
264 #[cfg(feature = "cuda")]
266 fn analyze_gpu(
267 &mut self,
268 network: &AccountingNetwork,
269 result: &mut GpuAnalyticsResult,
270 ) -> Result<(), String> {
271 use cudarc::driver::*;
272
273 let device = match &self.cuda_device {
274 Some(d) => d.clone(),
275 None => {
276 let d = CudaDevice::new(0).map_err(|e| e.to_string())?;
277 self.cuda_device = Some(d.clone());
278 d
279 }
280 };
281
282 result.pagerank_scores = self.compute_pagerank_gpu(&device, network)?;
284
285 let (fraud_count, fraud_flags) = self.detect_fraud_gpu(&device, network)?;
287 result.fraud_pattern_count = fraud_count;
288 result.fraud_flags = fraud_flags;
289
290 let (gaap_count, gaap_flags) = self.validate_gaap_gpu(&device, network)?;
292 result.gaap_violation_count = gaap_count;
293 result.gaap_flags = gaap_flags;
294
295 let (benford_dist, chi_sq, is_anomalous) = self.analyze_benford_gpu(&device, network)?;
297 result.benford_distribution = benford_dist;
298 result.benford_chi_squared = chi_sq;
299 result.benford_anomaly = is_anomalous;
300
301 let (suspense_count, suspense_scores) = self.detect_suspense_gpu(&device, network)?;
303 result.suspense_account_count = suspense_count;
304 result.suspense_scores = suspense_scores;
305
306 result.overall_risk_score = self.calculate_risk_score(result);
308
309 self.coordinator.state.pagerank_complete = true;
311 self.coordinator.state.fraud_detection_complete = true;
312 self.coordinator.state.fraud_pattern_count = fraud_count;
313 self.coordinator.state.gaap_validation_complete = true;
314 self.coordinator.state.gaap_violation_count = gaap_count;
315 self.coordinator.state.benford_complete = true;
316 self.coordinator.state.benford_anomaly = is_anomalous;
317 self.coordinator.state.suspense_complete = true;
318 self.coordinator.state.suspense_account_count = suspense_count;
319
320 Ok(())
321 }
322
323 #[cfg(feature = "cuda")]
324 fn compute_pagerank_gpu(
325 &self,
326 _device: &Arc<cudarc::driver::CudaDevice>,
327 network: &AccountingNetwork,
328 ) -> Result<Vec<f64>, String> {
329 Ok(network.compute_pagerank(
332 self.coordinator.config.pagerank_iterations as usize,
333 self.coordinator.config.pagerank_damping as f64,
334 ))
335 }
336
337 #[cfg(feature = "cuda")]
338 fn detect_fraud_gpu(
339 &self,
340 _device: &Arc<cudarc::driver::CudaDevice>,
341 network: &AccountingNetwork,
342 ) -> Result<(u32, Vec<u32>), String> {
343 let n_flows = network.flows.len();
344 let mut flags = vec![0u32; n_flows];
345 let mut count = 0u32;
346
347 for (i, flow) in network.flows.iter().enumerate() {
349 let amount = flow.amount.to_f64().abs();
350 let mut flag = 0u32;
351
352 if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
354 flag |= 0x01;
355 }
356
357 if flow.source_account_index == flow.target_account_index {
359 flag |= 0x02;
360 }
361
362 if (amount - 10000.0).abs() < 500.0 {
364 flag |= 0x04;
365 }
366
367 if flag != 0 {
368 count += 1;
369 }
370 flags[i] = flag;
371 }
372
373 Ok((count, flags))
374 }
375
376 #[cfg(feature = "cuda")]
377 fn validate_gaap_gpu(
378 &self,
379 _device: &Arc<cudarc::driver::CudaDevice>,
380 network: &AccountingNetwork,
381 ) -> Result<(u32, Vec<u32>), String> {
382 let n_flows = network.flows.len();
383 let mut flags = vec![0u32; n_flows];
384 let mut count = 0u32;
385
386 for (i, flow) in network.flows.iter().enumerate() {
387 let source_type = network
388 .accounts
389 .get(flow.source_account_index as usize)
390 .map(|a| a.account_type as u8)
391 .unwrap_or(0);
392 let target_type = network
393 .accounts
394 .get(flow.target_account_index as usize)
395 .map(|a| a.account_type as u8)
396 .unwrap_or(0);
397
398 if source_type == 3 && target_type == 0 {
400 flags[i] = 1;
401 count += 1;
402 }
403 else if source_type == 3 && target_type == 4 {
405 flags[i] = 2;
406 count += 1;
407 }
408 }
409
410 Ok((count, flags))
411 }
412
413 #[cfg(feature = "cuda")]
414 fn analyze_benford_gpu(
415 &self,
416 _device: &Arc<cudarc::driver::CudaDevice>,
417 network: &AccountingNetwork,
418 ) -> Result<([u32; 9], f32, bool), String> {
419 let mut counts = [0u32; 9];
420
421 for flow in &network.flows {
422 let amount = flow.amount.to_f64().abs();
423 if amount >= 1.0 {
424 let mut v = amount;
425 while v >= 10.0 {
426 v /= 10.0;
427 }
428 let digit = v as u32;
429 if (1..=9).contains(&digit) {
430 counts[(digit - 1) as usize] += 1;
431 }
432 }
433 }
434
435 let total: u32 = counts.iter().sum();
437 let expected = [
438 0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
439 ];
440 let mut chi_sq = 0.0f32;
441
442 if total >= 50 {
443 for (i, &count) in counts.iter().enumerate() {
444 let observed = count as f32 / total as f32;
445 let exp = expected[i];
446 chi_sq += (observed - exp).powi(2) / exp;
447 }
448 }
449
450 let is_anomalous = total >= 50 && chi_sq > 15.507;
451
452 Ok((counts, chi_sq, is_anomalous))
453 }
454
455 #[cfg(feature = "cuda")]
456 fn detect_suspense_gpu(
457 &self,
458 _device: &Arc<cudarc::driver::CudaDevice>,
459 network: &AccountingNetwork,
460 ) -> Result<(u32, Vec<f32>), String> {
461 let n_accounts = network.accounts.len();
462 let mut scores = vec![0.0f32; n_accounts];
463 let mut count = 0u32;
464
465 for (i, account) in network.accounts.iter().enumerate() {
466 let mut score = 0.0f32;
467
468 let balance = (account.closing_balance.to_f64()).abs();
470 if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
471 score += 0.3;
472 }
473
474 if account.risk_score > 0.5 {
476 score += 0.4;
477 }
478
479 let ratio = if account.out_degree > 0 {
481 account.in_degree as f32 / account.out_degree as f32
482 } else {
483 10.0
484 };
485 if ratio > 5.0 {
486 score += 0.3;
487 }
488
489 scores[i] = score.min(1.0);
490 if scores[i] > 0.5 {
491 count += 1;
492 }
493 }
494
495 Ok((count, scores))
496 }
497
498 fn analyze_cpu(&mut self, network: &AccountingNetwork, result: &mut GpuAnalyticsResult) {
500 let n_accounts = network.accounts.len();
501 let n_flows = network.flows.len();
502
503 result.pagerank_scores = network.compute_pagerank(
505 self.coordinator.config.pagerank_iterations as usize,
506 self.coordinator.config.pagerank_damping as f64,
507 );
508
509 result.fraud_flags = vec![0u32; n_flows];
511 for (i, flow) in network.flows.iter().enumerate() {
512 let amount = flow.amount.to_f64().abs();
513 let mut flag = 0u32;
514
515 if amount >= 1000.0 && (amount % 1000.0).abs() < 1.0 {
516 flag |= 0x01;
517 }
518 if flow.source_account_index == flow.target_account_index {
519 flag |= 0x02;
520 }
521 if (amount - 10000.0).abs() < 500.0 {
522 flag |= 0x04;
523 }
524
525 result.fraud_flags[i] = flag;
526 if flag != 0 {
527 result.fraud_pattern_count += 1;
528 }
529 }
530
531 result.gaap_flags = vec![0u32; n_flows];
533 for (i, flow) in network.flows.iter().enumerate() {
534 let source_type = network
535 .accounts
536 .get(flow.source_account_index as usize)
537 .map(|a| a.account_type as u8)
538 .unwrap_or(0);
539 let target_type = network
540 .accounts
541 .get(flow.target_account_index as usize)
542 .map(|a| a.account_type as u8)
543 .unwrap_or(0);
544
545 if source_type == 3 && (target_type == 0 || target_type == 4) {
546 result.gaap_flags[i] = 1;
547 result.gaap_violation_count += 1;
548 }
549 }
550
551 for flow in &network.flows {
553 let amount = flow.amount.to_f64().abs();
554 if amount >= 1.0 {
555 let mut v = amount;
556 while v >= 10.0 {
557 v /= 10.0;
558 }
559 let digit = v as u32;
560 if (1..=9).contains(&digit) {
561 result.benford_distribution[(digit - 1) as usize] += 1;
562 }
563 }
564 }
565
566 let total: u32 = result.benford_distribution.iter().sum();
567 if total >= 50 {
568 let expected = [
569 0.301f32, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046,
570 ];
571 for (i, &count) in result.benford_distribution.iter().enumerate() {
572 let observed = count as f32 / total as f32;
573 let exp = expected[i];
574 result.benford_chi_squared += (observed - exp).powi(2) / exp;
575 }
576 result.benford_anomaly = result.benford_chi_squared > 15.507;
577 }
578
579 result.suspense_scores = vec![0.0f32; n_accounts];
581 for (i, account) in network.accounts.iter().enumerate() {
582 let mut score = 0.0f32;
583
584 let balance = account.closing_balance.to_f64().abs();
585 if balance >= 1000.0 && (balance % 1000.0).abs() < 1.0 {
586 score += 0.3;
587 }
588 if account.risk_score > 0.5 {
589 score += 0.4;
590 }
591 let ratio = if account.out_degree > 0 {
592 account.in_degree as f32 / account.out_degree as f32
593 } else {
594 10.0
595 };
596 if ratio > 5.0 {
597 score += 0.3;
598 }
599
600 result.suspense_scores[i] = score.min(1.0);
601 if result.suspense_scores[i] > 0.5 {
602 result.suspense_account_count += 1;
603 }
604 }
605
606 result.overall_risk_score = self.calculate_risk_score(result);
608
609 self.coordinator.state.pagerank_complete = true;
611 self.coordinator.state.fraud_detection_complete = true;
612 self.coordinator.state.fraud_pattern_count = result.fraud_pattern_count;
613 self.coordinator.state.gaap_validation_complete = true;
614 self.coordinator.state.gaap_violation_count = result.gaap_violation_count;
615 self.coordinator.state.benford_complete = true;
616 self.coordinator.state.benford_anomaly = result.benford_anomaly;
617 self.coordinator.state.suspense_complete = true;
618 self.coordinator.state.suspense_account_count = result.suspense_account_count;
619 }
620
621 fn calculate_risk_score(&self, result: &GpuAnalyticsResult) -> f32 {
622 let fraud_risk = (result.fraud_pattern_count as f32 / 100.0).min(1.0);
623 let gaap_risk = (result.gaap_violation_count as f32 / 50.0).min(1.0);
624 let suspense_risk = (result.suspense_account_count as f32 / 20.0).min(1.0);
625 let benford_risk = if result.benford_anomaly { 0.5 } else { 0.0 };
626
627 (fraud_risk * 0.35 + gaap_risk * 0.25 + suspense_risk * 0.25 + benford_risk * 0.15).min(1.0)
628 }
629
630 pub fn coordinator(&self) -> &AnalyticsCoordinator {
632 &self.coordinator
633 }
634
635 pub fn coordinator_mut(&mut self) -> &mut AnalyticsCoordinator {
637 &mut self.coordinator
638 }
639}
640
641impl Default for GpuActorRuntime {
642 fn default() -> Self {
643 Self::new(CoordinatorConfig::default())
644 }
645}
646
647#[cfg(test)]
648mod tests {
649 use super::*;
650 use uuid::Uuid;
651
652 #[test]
653 fn test_runtime_creation() {
654 let runtime = GpuActorRuntime::default();
655 let status = runtime.status();
656 assert_eq!(status.messages_processed, 0);
658 }
659
660 #[test]
661 fn test_analyze_empty_network() {
662 let mut runtime = GpuActorRuntime::default();
663 let network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
664
665 let result = runtime.analyze(&network);
666 assert_eq!(result.fraud_pattern_count, 0);
667 assert_eq!(result.gaap_violation_count, 0);
668 }
669
670 #[test]
671 fn test_cpu_fallback() {
672 let mut runtime = GpuActorRuntime::default();
673 let mut network = AccountingNetwork::new(Uuid::new_v4(), 2024, 1);
675
676 use crate::models::{
678 AccountMetadata, AccountNode, AccountType, Decimal128, HybridTimestamp, TransactionFlow,
679 };
680
681 let cash = network.add_account(
682 AccountNode::new(Uuid::new_v4(), AccountType::Asset, 0),
683 AccountMetadata::new("1100", "Cash"),
684 );
685 let revenue = network.add_account(
686 AccountNode::new(Uuid::new_v4(), AccountType::Revenue, 0),
687 AccountMetadata::new("4000", "Revenue"),
688 );
689
690 network.add_flow(TransactionFlow::new(
691 revenue,
692 cash,
693 Decimal128::from_f64(1000.0),
694 Uuid::new_v4(),
695 HybridTimestamp::now(),
696 ));
697
698 let result = runtime.analyze(&network);
699 assert!(result.gaap_violation_count > 0 || result.pagerank_scores.len() == 2);
701 }
702
703 #[test]
704 fn test_risk_score_calculation() {
705 let runtime = GpuActorRuntime::default();
706 let result = GpuAnalyticsResult {
707 fraud_pattern_count: 50,
708 gaap_violation_count: 25,
709 suspense_account_count: 10,
710 benford_anomaly: true,
711 ..Default::default()
712 };
713
714 let risk = runtime.calculate_risk_score(&result);
715 assert!(risk > 0.0);
716 assert!(risk <= 1.0);
717 }
718}