Skip to main content

ringkernel_accnet/fabric/
pipeline.rs

1//! Streaming data pipeline connecting generation to analysis.
2//!
3//! The pipeline orchestrates data flow from generation through
4//! transformation and analysis, emitting events for visualization.
5
6use super::{
7    AccountTypeInfo, AnomalyInjectionConfig, AnomalyInjector, ChartOfAccountsTemplate,
8    CompanyArchetype, GeneratorConfig, TransactionGenerator,
9};
10use crate::models::{
11    AccountingNetwork, Decimal128, FraudPattern, GaapViolation, HybridTimestamp, NetworkSnapshot,
12    TemporalAlert, TransactionFlow,
13};
14use std::time::Duration;
15use tokio::sync::broadcast;
16use uuid::Uuid;
17
18/// Configuration for the data pipeline.
19#[derive(Debug, Clone)]
20pub struct PipelineConfig {
21    /// How often to emit batches
22    pub tick_duration: Duration,
23    /// Entries per batch
24    pub batch_size: usize,
25    /// Channel buffer size
26    pub channel_buffer: usize,
27    /// Enable anomaly injection
28    pub inject_anomalies: bool,
29    /// Anomaly injection configuration
30    pub anomaly_config: AnomalyInjectionConfig,
31}
32
33impl Default for PipelineConfig {
34    fn default() -> Self {
35        Self {
36            tick_duration: Duration::from_millis(100),
37            batch_size: 50,
38            channel_buffer: 1000,
39            inject_anomalies: true,
40            anomaly_config: AnomalyInjectionConfig::default(),
41        }
42    }
43}
44
45impl PipelineConfig {
46    /// Create a fast configuration for testing.
47    pub fn fast() -> Self {
48        Self {
49            tick_duration: Duration::from_millis(10),
50            batch_size: 100,
51            ..Default::default()
52        }
53    }
54
55    /// Create a slow configuration for educational demos.
56    pub fn educational() -> Self {
57        Self {
58            tick_duration: Duration::from_millis(500),
59            batch_size: 5,
60            ..Default::default()
61        }
62    }
63}
64
65/// Events emitted by the pipeline.
66#[derive(Debug, Clone)]
67pub enum PipelineEvent {
68    /// New journal entries were generated.
69    EntriesGenerated {
70        /// Number of entries generated.
71        count: usize,
72        /// Timestamp of generation.
73        timestamp: HybridTimestamp,
74    },
75
76    /// Entries were transformed into flows.
77    FlowsCreated {
78        /// Generated transaction flows.
79        flows: Vec<TransactionFlow>,
80        /// Timestamp of transformation.
81        timestamp: HybridTimestamp,
82    },
83
84    /// Network was updated with new data
85    NetworkUpdated(NetworkSnapshot),
86
87    /// Anomaly was detected
88    AnomalyDetected(Alert),
89
90    /// Fraud pattern was identified
91    FraudPatternDetected(FraudPattern),
92
93    /// GAAP violation was found
94    GaapViolationDetected(GaapViolation),
95
96    /// Temporal anomaly was detected
97    TemporalAnomalyDetected(TemporalAlert),
98
99    /// Pipeline statistics update
100    StatsUpdated(PipelineStats),
101
102    /// Pipeline paused
103    Paused,
104
105    /// Pipeline resumed
106    Resumed,
107
108    /// Pipeline stopped
109    Stopped,
110}
111
112/// An alert from the analysis engine.
113#[derive(Debug, Clone)]
114pub struct Alert {
115    /// Unique identifier
116    pub id: Uuid,
117    /// Alert severity
118    pub severity: AlertSeverity,
119    /// Alert type
120    pub alert_type: String,
121    /// Human-readable message
122    pub message: String,
123    /// Involved account indices
124    pub accounts: Vec<u16>,
125    /// Amount involved (if applicable)
126    pub amount: Option<Decimal128>,
127    /// When the alert was raised
128    pub timestamp: HybridTimestamp,
129}
130
131/// Alert severity levels.
132#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
133pub enum AlertSeverity {
134    /// Informational only
135    Info,
136    /// Low risk
137    Low,
138    /// Medium risk
139    Medium,
140    /// High risk
141    High,
142    /// Critical - immediate attention needed
143    Critical,
144}
145
146impl AlertSeverity {
147    /// Get color for this severity.
148    pub fn color(&self) -> [u8; 3] {
149        match self {
150            AlertSeverity::Info => [100, 181, 246],   // Light blue
151            AlertSeverity::Low => [255, 235, 59],     // Yellow
152            AlertSeverity::Medium => [255, 152, 0],   // Orange
153            AlertSeverity::High => [244, 67, 54],     // Red
154            AlertSeverity::Critical => [183, 28, 28], // Dark red
155        }
156    }
157
158    /// Get icon for this severity.
159    pub fn icon(&self) -> &'static str {
160        match self {
161            AlertSeverity::Info => "â„šī¸",
162            AlertSeverity::Low => "âš ī¸",
163            AlertSeverity::Medium => "đŸ”ļ",
164            AlertSeverity::High => "🔴",
165            AlertSeverity::Critical => "🚨",
166        }
167    }
168}
169
170/// Pipeline statistics.
171#[derive(Debug, Clone, Default)]
172pub struct PipelineStats {
173    /// Total entries generated
174    pub entries_generated: u64,
175    /// Total flows created
176    pub flows_created: u64,
177    /// Total anomalies detected
178    pub anomalies_detected: u64,
179    /// Entries per second (recent)
180    pub entries_per_second: f64,
181    /// Flows per second (recent)
182    pub flows_per_second: f64,
183    /// Method distribution
184    pub method_distribution: [u32; 5],
185    /// Pipeline running time (seconds)
186    pub running_time_seconds: f64,
187}
188
189/// The data fabric pipeline.
190pub struct DataFabricPipeline {
191    /// Entity ID for this pipeline
192    entity_id: Uuid,
193    /// Transaction generator
194    generator: TransactionGenerator,
195    /// Anomaly injector
196    injector: Option<AnomalyInjector>,
197    /// The accounting network being built
198    network: AccountingNetwork,
199    /// Configuration
200    config: PipelineConfig,
201    /// Event broadcaster
202    event_sender: broadcast::Sender<PipelineEvent>,
203    /// Running state
204    is_running: bool,
205    /// Paused state
206    is_paused: bool,
207    /// Statistics
208    stats: PipelineStats,
209    /// Start time
210    start_time: Option<std::time::Instant>,
211}
212
213impl DataFabricPipeline {
214    /// Create a new pipeline.
215    pub fn new(
216        archetype: CompanyArchetype,
217        generator_config: GeneratorConfig,
218        pipeline_config: PipelineConfig,
219    ) -> Self {
220        let entity_id = Uuid::new_v4();
221        let generator = TransactionGenerator::new(archetype.clone(), generator_config);
222
223        // Initialize network with chart of accounts
224        let coa = ChartOfAccountsTemplate::for_archetype(&archetype);
225        let mut network = AccountingNetwork::new(entity_id, 2024, 1);
226
227        // Add accounts from template
228        for account_def in &coa.accounts {
229            let (node, metadata) = account_def.to_account(network.accounts.len() as u16);
230            network.add_account(node, metadata);
231        }
232
233        // Create anomaly injector if enabled
234        let injector = if pipeline_config.inject_anomalies {
235            let mut inj = AnomalyInjector::new(pipeline_config.anomaly_config.clone(), None);
236
237            // Register account types
238            for (idx, def) in coa.accounts.iter().enumerate() {
239                use crate::models::AccountType;
240                let info = AccountTypeInfo {
241                    is_asset: def.account_type == AccountType::Asset,
242                    is_liability: def.account_type == AccountType::Liability,
243                    is_revenue: def.account_type == AccountType::Revenue,
244                    is_expense: def.account_type == AccountType::Expense,
245                    is_equity: def.account_type == AccountType::Equity,
246                    is_cash: def.semantics & crate::models::AccountSemantics::IS_CASH != 0,
247                    is_suspense: def.semantics & crate::models::AccountSemantics::IS_SUSPENSE != 0,
248                };
249                inj.register_account(idx as u16, info);
250            }
251
252            Some(inj)
253        } else {
254            None
255        };
256
257        let (event_sender, _) = broadcast::channel(pipeline_config.channel_buffer);
258
259        Self {
260            entity_id,
261            generator,
262            injector,
263            network,
264            config: pipeline_config,
265            event_sender,
266            is_running: false,
267            is_paused: false,
268            stats: PipelineStats::default(),
269            start_time: None,
270        }
271    }
272
273    /// Subscribe to pipeline events.
274    pub fn subscribe(&self) -> broadcast::Receiver<PipelineEvent> {
275        self.event_sender.subscribe()
276    }
277
278    /// Get the current network snapshot.
279    pub fn network_snapshot(&self) -> NetworkSnapshot {
280        self.network.snapshot()
281    }
282
283    /// Get the full network (for analysis).
284    pub fn network(&self) -> &AccountingNetwork {
285        &self.network
286    }
287
288    /// Get mutable network access.
289    pub fn network_mut(&mut self) -> &mut AccountingNetwork {
290        &mut self.network
291    }
292
293    /// Get current statistics.
294    pub fn stats(&self) -> &PipelineStats {
295        &self.stats
296    }
297
298    /// Check if pipeline is running.
299    pub fn is_running(&self) -> bool {
300        self.is_running
301    }
302
303    /// Check if pipeline is paused.
304    pub fn is_paused(&self) -> bool {
305        self.is_paused
306    }
307
308    /// Process one batch of data.
309    /// Returns the generated flows.
310    pub fn tick(&mut self) -> Vec<TransactionFlow> {
311        if self.is_paused {
312            return Vec::new();
313        }
314
315        // Record start time if first tick
316        if self.start_time.is_none() {
317            self.start_time = Some(std::time::Instant::now());
318        }
319
320        // Generate entries
321        let entries = self.generator.generate_batch(self.config.batch_size);
322        let entry_count = entries.len();
323        self.stats.entries_generated += entry_count as u64;
324
325        // Emit generation event
326        let _ = self.event_sender.send(PipelineEvent::EntriesGenerated {
327            count: entry_count,
328            timestamp: HybridTimestamp::now(),
329        });
330
331        // Process each entry through injection and transformation
332        let mut all_flows = Vec::new();
333
334        for entry in entries {
335            // Optionally inject anomalies
336            let (final_entry, debit_lines, credit_lines, _anomaly_label) =
337                if let Some(ref mut injector) = self.injector {
338                    let result =
339                        injector.process(entry.entry, entry.debit_lines, entry.credit_lines);
340
341                    if result.anomaly_injected {
342                        self.stats.anomalies_detected += 1;
343
344                        // Emit anomaly alert
345                        if let Some(ref label) = result.anomaly_label {
346                            let alert = self.create_alert_from_label(label, &result.entry);
347                            let _ = self
348                                .event_sender
349                                .send(PipelineEvent::AnomalyDetected(alert));
350                        }
351                    }
352
353                    (
354                        result.entry,
355                        result.debit_lines,
356                        result.credit_lines,
357                        result.anomaly_label,
358                    )
359                } else {
360                    (entry.entry, entry.debit_lines, entry.credit_lines, None)
361                };
362
363            // Transform to flows
364            let flows = self.transform_to_flows(&final_entry, &debit_lines, &credit_lines);
365            self.stats.flows_created += flows.len() as u64;
366
367            // Update method distribution
368            let method_idx = final_entry.solving_method as usize;
369            if method_idx < 5 {
370                self.stats.method_distribution[method_idx] += 1;
371            }
372
373            // Add flows to network
374            for flow in &flows {
375                self.network.add_flow(flow.clone());
376            }
377
378            all_flows.extend(flows);
379        }
380
381        // Emit flows event
382        if !all_flows.is_empty() {
383            let _ = self.event_sender.send(PipelineEvent::FlowsCreated {
384                flows: all_flows.clone(),
385                timestamp: HybridTimestamp::now(),
386            });
387        }
388
389        // Update network statistics
390        self.network.update_statistics();
391
392        // Emit network update
393        let _ = self
394            .event_sender
395            .send(PipelineEvent::NetworkUpdated(self.network.snapshot()));
396
397        // Update timing stats
398        if let Some(start) = self.start_time {
399            self.stats.running_time_seconds = start.elapsed().as_secs_f64();
400            if self.stats.running_time_seconds > 0.0 {
401                self.stats.entries_per_second =
402                    self.stats.entries_generated as f64 / self.stats.running_time_seconds;
403                self.stats.flows_per_second =
404                    self.stats.flows_created as f64 / self.stats.running_time_seconds;
405            }
406        }
407
408        all_flows
409    }
410
411    /// Transform a journal entry to transaction flows.
412    fn transform_to_flows(
413        &self,
414        entry: &crate::models::JournalEntry,
415        debit_lines: &[crate::models::JournalLineItem],
416        credit_lines: &[crate::models::JournalLineItem],
417    ) -> Vec<TransactionFlow> {
418        use crate::models::SolvingMethod;
419
420        match entry.solving_method {
421            SolvingMethod::MethodA => {
422                // 1-to-1: single flow
423                if let (Some(debit), Some(credit)) = (debit_lines.first(), credit_lines.first()) {
424                    vec![TransactionFlow::with_provenance(
425                        debit.account_index,
426                        credit.account_index,
427                        debit.amount,
428                        entry.id,
429                        0,
430                        0,
431                        entry.posting_date,
432                        SolvingMethod::MethodA,
433                        1.0,
434                    )]
435                } else {
436                    Vec::new()
437                }
438            }
439
440            SolvingMethod::MethodB => {
441                // n-to-n: match by position (simplified)
442                let n = debit_lines.len().min(credit_lines.len());
443                (0..n)
444                    .map(|i| {
445                        TransactionFlow::with_provenance(
446                            debit_lines[i].account_index,
447                            credit_lines[i].account_index,
448                            debit_lines[i].amount,
449                            entry.id,
450                            i as u16,
451                            i as u16,
452                            entry.posting_date,
453                            SolvingMethod::MethodB,
454                            1.0,
455                        )
456                    })
457                    .collect()
458            }
459
460            _ => {
461                // For C/D/E, create flows from each debit to proportional credits
462                let total_credit: f64 = credit_lines.iter().map(|c| c.amount.to_f64()).sum();
463
464                if total_credit == 0.0 {
465                    return Vec::new();
466                }
467
468                let mut flows = Vec::new();
469                for debit in debit_lines {
470                    let debit_amount = debit.amount.to_f64();
471                    for credit in credit_lines {
472                        let credit_ratio = credit.amount.to_f64() / total_credit;
473                        let flow_amount = Decimal128::from_f64(debit_amount * credit_ratio);
474                        let confidence = entry.average_confidence * credit_ratio as f32;
475
476                        flows.push(TransactionFlow::with_provenance(
477                            debit.account_index,
478                            credit.account_index,
479                            flow_amount,
480                            entry.id,
481                            0,
482                            0,
483                            entry.posting_date,
484                            entry.solving_method,
485                            confidence,
486                        ));
487                    }
488                }
489                flows
490            }
491        }
492    }
493
494    /// Create an alert from an anomaly label.
495    fn create_alert_from_label(
496        &self,
497        label: &super::AnomalyLabel,
498        entry: &crate::models::JournalEntry,
499    ) -> Alert {
500        let (alert_type, message, severity) = match label {
501            super::AnomalyLabel::FraudPattern(pattern) => {
502                let severity = match pattern {
503                    crate::models::FraudPatternType::CircularFlow => AlertSeverity::Critical,
504                    crate::models::FraudPatternType::HighVelocity => AlertSeverity::High,
505                    crate::models::FraudPatternType::ThresholdClustering => AlertSeverity::High,
506                    _ => AlertSeverity::Medium,
507                };
508                (
509                    format!("Fraud: {:?}", pattern),
510                    pattern.description().to_string(),
511                    severity,
512                )
513            }
514            super::AnomalyLabel::GaapViolation(violation) => {
515                let severity = match violation.default_severity() {
516                    crate::models::ViolationSeverity::Critical => AlertSeverity::Critical,
517                    crate::models::ViolationSeverity::High => AlertSeverity::High,
518                    crate::models::ViolationSeverity::Medium => AlertSeverity::Medium,
519                    crate::models::ViolationSeverity::Low => AlertSeverity::Low,
520                };
521                (
522                    format!("GAAP: {:?}", violation),
523                    violation.description().to_string(),
524                    severity,
525                )
526            }
527            super::AnomalyLabel::TimingAnomaly(desc) => (
528                "Timing".to_string(),
529                format!("Timing anomaly: {}", desc),
530                AlertSeverity::Medium,
531            ),
532            super::AnomalyLabel::AmountAnomaly(desc) => (
533                "Amount".to_string(),
534                format!("Amount anomaly: {}", desc),
535                AlertSeverity::Medium,
536            ),
537        };
538
539        Alert {
540            id: Uuid::new_v4(),
541            severity,
542            alert_type,
543            message,
544            accounts: vec![],
545            amount: Some(entry.total_debits),
546            timestamp: entry.posting_date,
547        }
548    }
549
550    /// Pause the pipeline.
551    pub fn pause(&mut self) {
552        self.is_paused = true;
553        let _ = self.event_sender.send(PipelineEvent::Paused);
554    }
555
556    /// Resume the pipeline.
557    pub fn resume(&mut self) {
558        self.is_paused = false;
559        let _ = self.event_sender.send(PipelineEvent::Resumed);
560    }
561
562    /// Stop the pipeline.
563    pub fn stop(&mut self) {
564        self.is_running = false;
565        let _ = self.event_sender.send(PipelineEvent::Stopped);
566    }
567
568    /// Reset the pipeline (clear network and stats).
569    pub fn reset(&mut self) {
570        self.network = AccountingNetwork::new(self.entity_id, 2024, 1);
571
572        // Re-add accounts from chart of accounts
573        // (In a real implementation, we'd store the CoA template)
574
575        self.stats = PipelineStats::default();
576        self.start_time = None;
577
578        if let Some(ref mut injector) = self.injector {
579            injector.reset_stats();
580        }
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587
588    #[test]
589    fn test_pipeline_creation() {
590        let archetype = CompanyArchetype::retail_standard();
591        let gen_config = GeneratorConfig::default();
592        let pipe_config = PipelineConfig::default();
593
594        let pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
595        assert!(!pipeline.is_running());
596        assert!(!pipeline.is_paused());
597    }
598
599    #[test]
600    fn test_pipeline_tick() {
601        let archetype = CompanyArchetype::retail_standard();
602        let gen_config = GeneratorConfig {
603            seed: Some(42),
604            ..Default::default()
605        };
606        let pipe_config = PipelineConfig {
607            batch_size: 10,
608            inject_anomalies: false,
609            ..Default::default()
610        };
611
612        let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
613
614        let flows = pipeline.tick();
615        assert!(!flows.is_empty());
616        assert!(pipeline.stats().entries_generated > 0);
617        assert!(pipeline.stats().flows_created > 0);
618    }
619
620    #[test]
621    fn test_pipeline_pause_resume() {
622        let archetype = CompanyArchetype::retail_standard();
623        let gen_config = GeneratorConfig::default();
624        let pipe_config = PipelineConfig::default();
625
626        let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
627
628        // Generate some data
629        pipeline.tick();
630        let initial_count = pipeline.stats().entries_generated;
631
632        // Pause - should generate nothing
633        pipeline.pause();
634        assert!(pipeline.is_paused());
635        pipeline.tick();
636        assert_eq!(pipeline.stats().entries_generated, initial_count);
637
638        // Resume - should generate again
639        pipeline.resume();
640        assert!(!pipeline.is_paused());
641        pipeline.tick();
642        assert!(pipeline.stats().entries_generated > initial_count);
643    }
644
645    #[test]
646    fn test_pipeline_with_anomalies() {
647        let archetype = CompanyArchetype::retail_standard();
648        let gen_config = GeneratorConfig {
649            seed: Some(42),
650            ..Default::default()
651        };
652        let pipe_config = PipelineConfig {
653            batch_size: 100,
654            inject_anomalies: true,
655            anomaly_config: AnomalyInjectionConfig {
656                injection_rate: 0.5, // High rate for testing
657                ..Default::default()
658            },
659            ..Default::default()
660        };
661
662        let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
663
664        // Generate multiple batches
665        for _ in 0..10 {
666            pipeline.tick();
667        }
668
669        // Should have detected some anomalies
670        assert!(pipeline.stats().anomalies_detected > 0);
671    }
672}