ringkernel-accnet 0.4.2

GPU-accelerated accounting network analytics with real-time visualization
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
//! GPU kernel code generation for accounting analytics actors.
//!
//! Uses ringkernel-cuda-codegen to generate persistent ring kernels
//! that process messages in a continuous loop on the GPU.

use ringkernel_cuda_codegen::{transpile_ring_kernel, RingKernelConfig};

/// Generated CUDA kernel code for all analytics actors.
pub struct AnalyticsKernels {
    /// PageRank computation kernel.
    pub pagerank: String,
    /// Fraud detection kernel.
    pub fraud_detector: String,
    /// GAAP validation kernel.
    pub gaap_validator: String,
    /// Benford analysis kernel.
    pub benford_analyzer: String,
    /// Suspense detection kernel.
    pub suspense_detector: String,
    /// Results aggregator kernel.
    pub results_aggregator: String,
}

impl AnalyticsKernels {
    /// Generate all analytics kernels.
    pub fn generate() -> Result<Self, String> {
        Ok(Self {
            pagerank: generate_pagerank_kernel()?,
            fraud_detector: generate_fraud_detector_kernel()?,
            gaap_validator: generate_gaap_validator_kernel()?,
            benford_analyzer: generate_benford_analyzer_kernel()?,
            suspense_detector: generate_suspense_detector_kernel()?,
            results_aggregator: generate_results_aggregator_kernel()?,
        })
    }
}

/// Generate the PageRank ring kernel.
///
/// This kernel computes PageRank scores using power iteration.
/// It processes PageRankRequest messages and sends PageRankResponse.
fn generate_pagerank_kernel() -> Result<String, String> {
    let handler: syn::ItemFn = syn::parse_quote! {
        fn pagerank_step(
            ctx: &RingContext,
            // Graph structure (CSR format)
            row_ptr: &[u32],        // row_ptr[i] = start of neighbors for node i
            col_idx: &[u32],        // col_idx[j] = neighbor node index
            out_degree: &[u32],     // out_degree[i] = number of outgoing edges
            // PageRank values
            pr_current: &[f32],     // Current PageRank values
            pr_next: &mut [f32],    // Next iteration values
            // Parameters
            damping: f32,
            n_nodes: u32
        ) -> u32 {
            let tid = ctx.global_thread_id();

            // Each thread handles one node
            if tid >= n_nodes { return 0; }

            let i = tid as usize;

            // Base score for dangling nodes
            let base_score = (1.0 - damping) / (n_nodes as f32);

            // Sum contributions from incoming neighbors
            let start = row_ptr[i] as usize;
            let end = row_ptr[i + 1] as usize;

            let mut sum: f32 = 0.0;

            // Iterate over incoming edges
            let mut j = start;
            while j < end {
                let neighbor = col_idx[j] as usize;
                let neighbor_degree = out_degree[neighbor] as f32;
                if neighbor_degree > 0.0 {
                    sum = sum + pr_current[neighbor] / neighbor_degree;
                }
                j = j + 1;
            }

            // Update PageRank
            pr_next[i] = base_score + damping * sum;

            // Synchronize before next iteration
            ctx.sync_threads();

            1 // Return success
        }
    };

    let config = RingKernelConfig::new("pagerank_actor")
        .with_block_size(256)
        .with_queue_capacity(64)
        .with_hlc(true)
        .with_k2k(true);

    transpile_ring_kernel(&handler, &config)
        .map_err(|e| format!("Failed to generate pagerank kernel: {}", e))
}

/// Generate the fraud detection ring kernel.
///
/// Detects fraud patterns: circular flows, velocity anomalies, round amounts.
fn generate_fraud_detector_kernel() -> Result<String, String> {
    let handler: syn::ItemFn = syn::parse_quote! {
        fn detect_fraud(
            ctx: &RingContext,
            // Flow data
            flow_source: &[u32],      // Source account for each flow
            flow_target: &[u32],      // Target account for each flow
            flow_amount: &[f64],      // Amount for each flow
            flow_timestamp: &[u64],   // Timestamp for each flow
            // Account data
            account_risk: &[f32],     // Current risk score per account
            // Output
            pattern_flags: &mut [u32], // Detected patterns (bitflags)
            risk_delta: &mut [f32],    // Risk score change per flow
            // Parameters
            n_flows: u32,
            velocity_threshold: f32,   // Max flows per time unit
            round_amount_threshold: f64
        ) -> u32 {
            let tid = ctx.global_thread_id();

            if tid >= n_flows { return 0; }

            let i = tid as usize;
            let mut flags: u32 = 0;
            let mut risk: f32 = 0.0;

            let source = flow_source[i];
            let target = flow_target[i];
            let amount = flow_amount[i];
            let _timestamp = flow_timestamp[i];

            // Check for round amount (potential structuring)
            let abs_amount = if amount > 0.0 { amount } else { -amount };
            let thousands = abs_amount / 1000.0;
            let truncated = (thousands as i32) as f64;
            let fractional = thousands - truncated;

            if abs_amount >= 1000.0 && fractional < 0.001 {
                flags = flags | 0x01; // ROUND_AMOUNT flag
                risk = risk + 0.2;
            }

            // Check for self-loop (immediate circular)
            if source == target {
                flags = flags | 0x02; // SELF_LOOP flag
                risk = risk + 0.5;
            }

            // Check for threshold proximity (structuring indicator)
            let dist_10k = (abs_amount - 10000.0) as f32;
            let abs_dist_10k = if dist_10k > 0.0 { dist_10k } else { -dist_10k };
            if abs_dist_10k < 500.0 {
                flags = flags | 0x04; // THRESHOLD_PROXIMITY flag
                risk = risk + 0.3;
            }

            // Add base risk from accounts involved
            risk = risk + account_risk[source as usize] * 0.3;
            risk = risk + account_risk[target as usize] * 0.3;

            // Clamp risk to 1.0
            if risk > 1.0 { risk = 1.0; }

            pattern_flags[i] = flags;
            risk_delta[i] = risk;

            ctx.sync_threads();

            flags
        }
    };

    let config = RingKernelConfig::new("fraud_detector_actor")
        .with_block_size(256)
        .with_queue_capacity(128)
        .with_hlc(true)
        .with_k2k(true);

    transpile_ring_kernel(&handler, &config)
        .map_err(|e| format!("Failed to generate fraud detector kernel: {}", e))
}

/// Generate the GAAP validation ring kernel.
fn generate_gaap_validator_kernel() -> Result<String, String> {
    let handler: syn::ItemFn = syn::parse_quote! {
        fn validate_gaap(
            ctx: &RingContext,
            // Flow data
            flow_source: &[u32],
            flow_target: &[u32],
            // Account types (0=Asset, 1=Liability, 2=Equity, 3=Revenue, 4=Expense, 5=Contra)
            account_types: &[u32],
            // Output
            violation_flags: &mut [u32],
            violation_type: &mut [u32],
            // Parameters
            n_flows: u32
        ) -> u32 {
            let tid = ctx.global_thread_id();

            if tid >= n_flows { return 0; }

            let i = tid as usize;
            let source_idx = flow_source[i] as usize;
            let target_idx = flow_target[i] as usize;
            let source_type = account_types[source_idx];
            let target_type = account_types[target_idx];

            let mut violation: u32 = 0;
            let mut vtype: u32 = 0;

            // Rule 1: Revenue (3) -> Asset (0) direct is improper
            // Should go through Accounts Receivable
            if source_type == 3 && target_type == 0 {
                violation = 1;
                vtype = 1; // IMPROPER_REVENUE_RECOGNITION
            }

            // Rule 2: Revenue (3) -> Expense (4) direct is improper netting
            if source_type == 3 && target_type == 4 {
                violation = 1;
                vtype = 2; // IMPROPER_NETTING
            }

            // Rule 3: Expense (4) -> Revenue (3) is reversal (needs review)
            if source_type == 4 && target_type == 3 {
                violation = 1;
                vtype = 3; // SUSPICIOUS_REVERSAL
            }

            // Rule 4: Equity (2) -> Expense (4) bypasses income statement
            if source_type == 2 && target_type == 4 {
                violation = 1;
                vtype = 4; // EQUITY_EXPENSE_BYPASS
            }

            violation_flags[i] = violation;
            violation_type[i] = vtype;

            ctx.sync_threads();

            violation
        }
    };

    let config = RingKernelConfig::new("gaap_validator_actor")
        .with_block_size(256)
        .with_queue_capacity(128)
        .with_hlc(true)
        .with_k2k(true);

    transpile_ring_kernel(&handler, &config)
        .map_err(|e| format!("Failed to generate GAAP validator kernel: {}", e))
}

/// Generate the Benford analysis ring kernel.
fn generate_benford_analyzer_kernel() -> Result<String, String> {
    let handler: syn::ItemFn = syn::parse_quote! {
        fn analyze_benford(
            ctx: &RingContext,
            // Input amounts
            amounts: &[f64],
            // Output digit counts (shared, use atomics)
            digit_counts: &mut [u32],
            // Parameters
            n_amounts: u32
        ) -> u32 {
            let tid = ctx.global_thread_id();

            if tid >= n_amounts { return 0; }

            let i = tid as usize;
            let amount = amounts[i];
            let abs_amount = if amount > 0.0 { amount } else { -amount };

            // Skip amounts less than 1
            if abs_amount < 1.0 { return 0; }

            // Extract first digit
            let v1 = if abs_amount >= 10000000000000.0 { abs_amount / 10000000000000.0 } else { abs_amount };
            let v2 = if v1 >= 1000000.0 { v1 / 1000000.0 } else { v1 };
            let v3 = if v2 >= 1000.0 { v2 / 1000.0 } else { v2 };
            let v4 = if v3 >= 100.0 { v3 / 100.0 } else { v3 };
            let v5 = if v4 >= 10.0 { v4 / 10.0 } else { v4 };

            let first_digit = v5 as i32;

            // Valid digits are 1-9
            if first_digit < 1 { return 0; }
            if first_digit > 9 { return 0; }

            // Atomically increment the count for this digit
            atomic_add(&mut digit_counts[(first_digit - 1) as usize], 1);

            1
        }
    };

    let config = RingKernelConfig::new("benford_analyzer_actor")
        .with_block_size(256)
        .with_queue_capacity(64)
        .with_hlc(true)
        .with_k2k(true);

    transpile_ring_kernel(&handler, &config)
        .map_err(|e| format!("Failed to generate Benford analyzer kernel: {}", e))
}

/// Generate the suspense detection ring kernel.
fn generate_suspense_detector_kernel() -> Result<String, String> {
    let handler: syn::ItemFn = syn::parse_quote! {
        fn detect_suspense(
            ctx: &RingContext,
            // Account data
            balance_debit: &[f64],
            balance_credit: &[f64],
            risk_scores: &[f32],
            inflow_counts: &[u32],
            outflow_counts: &[u32],
            // Output
            suspense_scores: &mut [f32],
            // Parameters
            n_accounts: u32
        ) -> u32 {
            let tid = ctx.global_thread_id();

            if tid >= n_accounts { return 0; }

            let i = tid as usize;
            let mut score: f32 = 0.0;

            // Calculate balance and check for round numbers
            let balance = balance_debit[i] - balance_credit[i];
            let abs_balance = if balance > 0.0 { balance } else { -balance };

            // Round number check (divisible by 1000)
            let thousands = abs_balance / 1000.0;
            let truncated = (thousands as i32) as f64;
            let fractional = thousands - truncated;

            if abs_balance >= 1000.0 && fractional < 0.001 {
                score = score + 0.3;
            }

            // High existing risk
            if risk_scores[i] > 0.5 {
                score = score + 0.4;
            }

            // Flow imbalance (many inflows, few outflows = holding pattern)
            let inflows = inflow_counts[i] as f32;
            let outflows = outflow_counts[i] as f32;
            let flow_ratio = if outflows > 0.0 { inflows / outflows } else { 10.0 };

            if flow_ratio > 5.0 {
                score = score + 0.3;
            }

            // Clamp
            if score > 1.0 { score = 1.0; }

            suspense_scores[i] = score;

            ctx.sync_threads();

            if score > 0.5 { 1 } else { 0 }
        }
    };

    let config = RingKernelConfig::new("suspense_detector_actor")
        .with_block_size(256)
        .with_queue_capacity(64)
        .with_hlc(true)
        .with_k2k(true);

    transpile_ring_kernel(&handler, &config)
        .map_err(|e| format!("Failed to generate suspense detector kernel: {}", e))
}

/// Generate the results aggregator ring kernel.
///
/// This kernel aggregates results from all analytics kernels and
/// prepares the final AnalyticsResult message for the host.
fn generate_results_aggregator_kernel() -> Result<String, String> {
    let handler: syn::ItemFn = syn::parse_quote! {
        fn aggregate_results(
            ctx: &RingContext,
            // Results from various analyzers
            fraud_flags: &[u32],
            gaap_violations: &[u32],
            suspense_scores: &[f32],
            // Counts (single element arrays for atomic accumulation)
            fraud_count: &mut [u32],
            gaap_count: &mut [u32],
            suspense_count: &mut [u32],
            risk_sum: &mut [f32],
            // Parameters
            n_flows: u32,
            n_accounts: u32
        ) -> u32 {
            let tid = ctx.global_thread_id();

            // Count fraud patterns (each thread processes one flow)
            if tid < n_flows {
                let i = tid as usize;
                if fraud_flags[i] != 0 {
                    atomic_add(&mut fraud_count[0], 1);
                }
                if gaap_violations[i] != 0 {
                    atomic_add(&mut gaap_count[0], 1);
                }
            }

            ctx.sync_threads();

            // Count suspense accounts (use threads for accounts)
            if tid < n_accounts {
                let i = tid as usize;
                if suspense_scores[i] > 0.5 {
                    atomic_add(&mut suspense_count[0], 1);
                }
                // Note: atomic_add for f32 would need special handling
                // For now, we skip risk_sum or use integer approximation
            }

            ctx.sync_threads();

            1
        }
    };

    let config = RingKernelConfig::new("results_aggregator_actor")
        .with_block_size(256)
        .with_queue_capacity(32)
        .with_hlc(true)
        .with_k2k(true);

    transpile_ring_kernel(&handler, &config)
        .map_err(|e| format!("Failed to generate results aggregator kernel: {}", e))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_generate_all_kernels() {
        let kernels = AnalyticsKernels::generate();
        assert!(
            kernels.is_ok(),
            "Failed to generate kernels: {:?}",
            kernels.err()
        );

        let kernels = kernels.unwrap();
        assert!(kernels.pagerank.contains("pagerank_actor"));
        assert!(kernels.fraud_detector.contains("fraud_detector_actor"));
        assert!(kernels.gaap_validator.contains("gaap_validator_actor"));
        assert!(kernels.benford_analyzer.contains("benford_analyzer_actor"));
        assert!(kernels
            .suspense_detector
            .contains("suspense_detector_actor"));
        assert!(kernels
            .results_aggregator
            .contains("results_aggregator_actor"));
    }

    #[test]
    fn test_pagerank_kernel() {
        let kernel = generate_pagerank_kernel();
        assert!(kernel.is_ok(), "PageRank kernel failed: {:?}", kernel.err());
        let code = kernel.unwrap();
        assert!(code.contains("damping"));
        assert!(code.contains("pr_current"));
        assert!(code.contains("pr_next"));
    }

    #[test]
    fn test_fraud_detector_kernel() {
        let kernel = generate_fraud_detector_kernel();
        assert!(
            kernel.is_ok(),
            "Fraud detector kernel failed: {:?}",
            kernel.err()
        );
        let code = kernel.unwrap();
        assert!(code.contains("pattern_flags"));
        assert!(code.contains("risk_delta"));
    }

    #[test]
    fn test_gaap_validator_kernel() {
        let kernel = generate_gaap_validator_kernel();
        assert!(
            kernel.is_ok(),
            "GAAP validator kernel failed: {:?}",
            kernel.err()
        );
        let code = kernel.unwrap();
        assert!(code.contains("violation_flags"));
        assert!(code.contains("account_types"));
    }

    #[test]
    fn test_benford_analyzer_kernel() {
        let kernel = generate_benford_analyzer_kernel();
        assert!(
            kernel.is_ok(),
            "Benford analyzer kernel failed: {:?}",
            kernel.err()
        );
        let code = kernel.unwrap();
        assert!(code.contains("digit_counts"));
        assert!(code.contains("atomic"));
    }

    #[test]
    fn test_suspense_detector_kernel() {
        let kernel = generate_suspense_detector_kernel();
        assert!(
            kernel.is_ok(),
            "Suspense detector kernel failed: {:?}",
            kernel.err()
        );
        let code = kernel.unwrap();
        assert!(code.contains("suspense_scores"));
        assert!(code.contains("balance_debit"));
    }
}