use std::time::{Duration, Instant};
use ringkernel_core::MessageId;
use super::messages::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AnalyticsKernelId {
PageRank,
FraudDetector,
GaapValidator,
BenfordAnalyzer,
SuspenseDetector,
ResultsAggregator,
}
impl AnalyticsKernelId {
pub fn name(&self) -> &'static str {
match self {
Self::PageRank => "pagerank_actor",
Self::FraudDetector => "fraud_detector_actor",
Self::GaapValidator => "gaap_validator_actor",
Self::BenfordAnalyzer => "benford_analyzer_actor",
Self::SuspenseDetector => "suspense_detector_actor",
Self::ResultsAggregator => "results_aggregator_actor",
}
}
pub fn all() -> &'static [Self] {
&[
Self::PageRank,
Self::FraudDetector,
Self::GaapValidator,
Self::BenfordAnalyzer,
Self::SuspenseDetector,
Self::ResultsAggregator,
]
}
}
#[derive(Debug, Clone)]
pub struct CoordinatorConfig {
pub pagerank_damping: f32,
pub pagerank_iterations: u32,
pub velocity_threshold: f32,
pub round_amount_threshold: f64,
pub queue_capacity: usize,
pub block_size: u32,
pub enable_k2k: bool,
pub enable_hlc: bool,
}
impl Default for CoordinatorConfig {
fn default() -> Self {
Self {
pagerank_damping: 0.85,
pagerank_iterations: 20,
velocity_threshold: 10.0,
round_amount_threshold: 1000.0,
queue_capacity: 256,
block_size: 256,
enable_k2k: true,
enable_hlc: true,
}
}
}
#[derive(Debug, Default)]
pub struct PipelineState {
pub current_snapshot_id: u64,
pub pagerank_complete: bool,
pub fraud_detection_complete: bool,
pub gaap_validation_complete: bool,
pub benford_complete: bool,
pub suspense_complete: bool,
pub start_time: Option<Instant>,
pub fraud_pattern_count: u32,
pub gaap_violation_count: u32,
pub suspense_account_count: u32,
pub benford_anomaly: bool,
}
impl PipelineState {
pub fn is_complete(&self) -> bool {
self.pagerank_complete
&& self.fraud_detection_complete
&& self.gaap_validation_complete
&& self.benford_complete
&& self.suspense_complete
}
pub fn processing_time(&self) -> Option<Duration> {
self.start_time.map(|t| t.elapsed())
}
pub fn reset(&mut self, snapshot_id: u64) {
self.current_snapshot_id = snapshot_id;
self.pagerank_complete = false;
self.fraud_detection_complete = false;
self.gaap_validation_complete = false;
self.benford_complete = false;
self.suspense_complete = false;
self.start_time = Some(Instant::now());
self.fraud_pattern_count = 0;
self.gaap_violation_count = 0;
self.suspense_account_count = 0;
self.benford_anomaly = false;
}
}
pub struct AnalyticsCoordinator {
pub config: CoordinatorConfig,
pub state: PipelineState,
next_snapshot_id: u64,
pub stats: CoordinatorStats,
}
#[derive(Debug, Default, Clone)]
pub struct CoordinatorStats {
pub snapshots_processed: u64,
pub total_processing_time_us: u64,
pub avg_processing_time_us: f64,
pub total_fraud_patterns: u64,
pub total_gaap_violations: u64,
pub total_suspense_accounts: u64,
}
impl AnalyticsCoordinator {
pub fn new(config: CoordinatorConfig) -> Self {
Self {
config,
state: PipelineState::default(),
next_snapshot_id: 1,
stats: CoordinatorStats::default(),
}
}
pub fn begin_snapshot(&mut self) -> u64 {
let snapshot_id = self.next_snapshot_id;
self.next_snapshot_id += 1;
self.state.reset(snapshot_id);
snapshot_id
}
pub fn create_pagerank_request(
&self,
account_count: u32,
edge_count: u32,
graph_offset: u64,
) -> PageRankRequest {
PageRankRequest {
id: MessageId::generate(),
account_count,
edge_count,
damping: self.config.pagerank_damping,
iterations: self.config.pagerank_iterations,
graph_offset,
}
}
pub fn create_fraud_detection_request(
&self,
flow_count: u32,
flows_offset: u64,
accounts_offset: u64,
account_count: u32,
) -> FraudDetectionRequest {
FraudDetectionRequest {
id: MessageId::generate(),
priority: ringkernel_core::Priority::High,
snapshot_id: self.state.current_snapshot_id,
flow_count,
flows_offset,
accounts_offset,
account_count,
}
}
pub fn create_gaap_validation_request(
&self,
flow_count: u32,
flows_offset: u64,
account_types_offset: u64,
) -> GaapValidationRequest {
GaapValidationRequest {
id: MessageId::generate(),
flow_count,
flows_offset,
account_types_offset,
}
}
pub fn create_benford_analysis_request(
&self,
amount_count: u32,
amounts_offset: u64,
) -> BenfordAnalysisRequest {
BenfordAnalysisRequest {
id: MessageId::generate(),
amount_count,
amounts_offset,
}
}
pub fn create_suspense_detection_request(
&self,
account_count: u32,
balances_offset: u64,
risk_scores_offset: u64,
flow_counts_offset: u64,
) -> SuspenseDetectionRequest {
SuspenseDetectionRequest {
id: MessageId::generate(),
account_count,
balances_offset,
risk_scores_offset,
flow_counts_offset,
}
}
pub fn handle_pagerank_response(&mut self, _response: PageRankResponse) {
self.state.pagerank_complete = true;
}
pub fn handle_fraud_response(&mut self, response: FraudDetectionResponse) {
self.state.fraud_detection_complete = true;
self.state.fraud_pattern_count = response.pattern_count;
}
pub fn handle_gaap_response(&mut self, response: GaapValidationResponse) {
self.state.gaap_validation_complete = true;
self.state.gaap_violation_count = response.violation_count;
}
pub fn handle_benford_response(&mut self, response: BenfordAnalysisResponse) {
self.state.benford_complete = true;
self.state.benford_anomaly = response.is_anomalous;
}
pub fn handle_suspense_response(&mut self, response: SuspenseDetectionResponse) {
self.state.suspense_complete = true;
self.state.suspense_account_count = response.suspense_count;
}
pub fn finalize_snapshot(&mut self) -> AnalyticsResult {
let processing_time = self
.state
.processing_time()
.map(|d| d.as_micros() as u64)
.unwrap_or(0);
self.stats.snapshots_processed += 1;
self.stats.total_processing_time_us += processing_time;
self.stats.avg_processing_time_us =
self.stats.total_processing_time_us as f64 / self.stats.snapshots_processed as f64;
self.stats.total_fraud_patterns += self.state.fraud_pattern_count as u64;
self.stats.total_gaap_violations += self.state.gaap_violation_count as u64;
self.stats.total_suspense_accounts += self.state.suspense_account_count as u64;
let fraud_risk = (self.state.fraud_pattern_count as f32 / 100.0).min(1.0);
let gaap_risk = (self.state.gaap_violation_count as f32 / 50.0).min(1.0);
let suspense_risk = (self.state.suspense_account_count as f32 / 20.0).min(1.0);
let benford_risk = if self.state.benford_anomaly { 0.5 } else { 0.0 };
let overall_risk =
(fraud_risk * 0.35 + gaap_risk * 0.25 + suspense_risk * 0.25 + benford_risk * 0.15)
.min(1.0);
AnalyticsResult {
id: MessageId::generate(),
snapshot_id: self.state.current_snapshot_id,
pagerank_complete: self.state.pagerank_complete,
fraud_detection_complete: self.state.fraud_detection_complete,
gaap_validation_complete: self.state.gaap_validation_complete,
benford_complete: self.state.benford_complete,
fraud_pattern_count: self.state.fraud_pattern_count,
gaap_violation_count: self.state.gaap_violation_count,
suspense_account_count: self.state.suspense_account_count,
overall_risk_score: overall_risk,
benford_anomaly: self.state.benford_anomaly,
processing_time_us: processing_time,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_coordinator_creation() {
let coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
assert_eq!(coord.config.pagerank_damping, 0.85);
assert_eq!(coord.config.pagerank_iterations, 20);
}
#[test]
fn test_begin_snapshot() {
let mut coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
let id1 = coord.begin_snapshot();
let id2 = coord.begin_snapshot();
assert_eq!(id1, 1);
assert_eq!(id2, 2);
}
#[test]
fn test_pipeline_state() {
let mut state = PipelineState::default();
state.reset(1);
assert!(!state.is_complete());
state.pagerank_complete = true;
state.fraud_detection_complete = true;
state.gaap_validation_complete = true;
state.benford_complete = true;
state.suspense_complete = true;
assert!(state.is_complete());
}
#[test]
fn test_create_requests() {
let coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
let pr_req = coord.create_pagerank_request(100, 500, 0);
assert_eq!(pr_req.account_count, 100);
assert_eq!(pr_req.edge_count, 500);
assert_eq!(pr_req.damping, 0.85);
let fraud_req = coord.create_fraud_detection_request(500, 0, 1000, 100);
assert_eq!(fraud_req.flow_count, 500);
}
#[test]
fn test_finalize_snapshot() {
let mut coord = AnalyticsCoordinator::new(CoordinatorConfig::default());
coord.begin_snapshot();
coord.state.pagerank_complete = true;
coord.state.fraud_detection_complete = true;
coord.state.fraud_pattern_count = 5;
coord.state.gaap_validation_complete = true;
coord.state.gaap_violation_count = 3;
coord.state.benford_complete = true;
coord.state.benford_anomaly = false;
coord.state.suspense_complete = true;
coord.state.suspense_account_count = 2;
let result = coord.finalize_snapshot();
assert_eq!(result.snapshot_id, 1);
assert_eq!(result.fraud_pattern_count, 5);
assert_eq!(result.gaap_violation_count, 3);
assert_eq!(result.suspense_account_count, 2);
assert!(result.overall_risk_score > 0.0);
assert!(result.overall_risk_score <= 1.0);
}
#[test]
fn test_kernel_ids() {
assert_eq!(AnalyticsKernelId::PageRank.name(), "pagerank_actor");
assert_eq!(
AnalyticsKernelId::FraudDetector.name(),
"fraud_detector_actor"
);
assert_eq!(AnalyticsKernelId::all().len(), 6);
}
}