Skip to main content

credit_data_simulator/
engine.rs

1//! # Engine Simulator for Paket B Tests
2//!
3//! This module simulates the Vastar-Flow Engine for Submission workflow testing.
4//! It provides webhook endpoints, submission orchestration, and integrates with
5//! the Regulator Endpoint Simulator.
6
7use axum::{
8    extract::{Path, State},
9    http::StatusCode,
10    response::IntoResponse,
11    routing::{get, post},
12    Json, Router,
13};
14use chrono::Utc;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::RwLock;
19use uuid::Uuid;
20
21// ═══════════════════════════════════════════════════════════════════════════════
22// Constants
23// ═══════════════════════════════════════════════════════════════════════════════
24
25const REGULATOR_ENDPOINT_URL: &str = "http://127.0.0.1:18084";
26
27// ═══════════════════════════════════════════════════════════════════════════════
28// Data Structures (matching test expectations)
29// ═══════════════════════════════════════════════════════════════════════════════
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SubmissionRequest {
33    pub submission_id: String,
34    pub locked_dataset_version: String,
35    pub artifact_format: String,
36    pub options: SubmissionOptions,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct SubmissionOptions {
41    pub retry_policy: RetryPolicy,
42    #[serde(default)]
43    pub off_peak_window: Option<OffPeakWindow>,
44    #[serde(default)]
45    pub batch_size: Option<u64>,
46    #[serde(default)]
47    pub include_evidence_pack: bool,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RetryPolicy {
52    #[serde(default = "default_max_attempts")]
53    pub max_attempts: u32,
54    #[serde(alias = "initial_delay_ms", alias = "initial_backoff_ms", default = "default_initial_delay")]
55    pub initial_backoff_ms: u64,
56    #[serde(alias = "max_delay_ms", alias = "max_backoff_ms", default = "default_max_delay")]
57    pub max_backoff_ms: u64,
58    #[serde(default = "default_multiplier")]
59    pub backoff_multiplier: f64,
60    #[serde(default = "default_send_to_dlq")]
61    pub send_to_dlq: bool,
62}
63
64fn default_max_attempts() -> u32 { 3 }
65fn default_initial_delay() -> u64 { 1000 }
66fn default_max_delay() -> u64 { 30000 }
67fn default_multiplier() -> f64 { 2.0 }
68fn default_send_to_dlq() -> bool { true }
69
70impl Default for RetryPolicy {
71    fn default() -> Self {
72        Self {
73            max_attempts: 3,
74            initial_backoff_ms: 1000,
75            max_backoff_ms: 30000,
76            backoff_multiplier: 2.0,
77            send_to_dlq: true,
78        }
79    }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct OffPeakWindow {
84    pub start_hour: u8,
85    pub end_hour: u8,
86    pub timezone: String,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct SubmissionResponse {
91    pub execution_id: String,
92    pub submission_id: String,
93    pub status: SubmissionStatus,
94    pub artifact: Option<Artifact>,
95    pub attempts: Vec<SubmissionAttempt>,
96    pub evidence_pack: Option<EvidencePack>,
97    pub audit_events: Vec<AuditEvent>,
98    pub duration_ms: u64,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
102#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
103pub enum SubmissionStatus {
104    Pending,
105    Generating,
106    Submitting,
107    Submitted,
108    Acknowledged,
109    Rejected,
110    PendingRetry,
111    Failed,
112    Escalated,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct Artifact {
117    pub artifact_id: String,
118    pub artifact_hash: String,
119    pub format: String,
120    pub row_count: u64,
121    pub file_size: u64,
122    pub locked_dataset_version: String,
123    pub created_at: String,
124    pub precheck_result: Option<PrecheckResult>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct PrecheckResult {
129    pub passed: bool,
130    pub checks: Vec<PrecheckItem>,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PrecheckItem {
135    pub check_name: String,
136    pub passed: bool,
137    pub message: Option<String>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct SubmissionAttempt {
142    pub attempt_number: u32,
143    pub timestamp: String,
144    pub status: AttemptStatus,
145    pub response_code: Option<u16>,
146    pub response_message: Option<String>,
147    pub correlation_id: String,
148    pub duration_ms: u64,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
152#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
153pub enum AttemptStatus {
154    Success,
155    Timeout,
156    Rejected,
157    Error,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct EvidencePack {
162    pub pack_id: String,
163    pub created_at: String,
164    pub contents: EvidenceContents,
165    pub hash: String,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct EvidenceContents {
170    pub run_id: String,
171    pub dataset_version: String,
172    pub artifact_hash: String,
173    pub submission_attempts: Vec<SubmissionAttempt>,
174    pub audit_log_hash: String,
175    pub approval_trail: Option<serde_json::Value>,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct AuditEvent {
180    pub event_type: String,
181    pub timestamp: String,
182    pub actor: String,
183    pub details: serde_json::Value,
184}
185
186// ═══════════════════════════════════════════════════════════════════════════════
187// Engine State
188// ═══════════════════════════════════════════════════════════════════════════════
189
190#[derive(Debug)]
191pub struct EngineState {
192    pub submissions: RwLock<HashMap<String, SubmissionResponse>>,
193    pub dlq: RwLock<Vec<DlqEntry>>,
194    pub start_time: std::time::Instant,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct DlqEntry {
199    pub submission_id: String,
200    pub reason: String,
201    pub timestamp: String,
202    pub attempts: u32,
203    pub last_error: String,
204}
205
206impl EngineState {
207    pub fn new() -> Self {
208        Self {
209            submissions: RwLock::new(HashMap::new()),
210            dlq: RwLock::new(Vec::new()),
211            start_time: std::time::Instant::now(),
212        }
213    }
214}
215
216impl Default for EngineState {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222// ═══════════════════════════════════════════════════════════════════════════════
223// Engine Simulator
224// ═══════════════════════════════════════════════════════════════════════════════
225
226pub struct EngineSimulator {
227    state: Arc<EngineState>,
228}
229
230impl EngineSimulator {
231    pub fn new() -> Self {
232        Self {
233            state: Arc::new(EngineState::new()),
234        }
235    }
236
237    pub fn router(&self) -> Router {
238        let state = self.state.clone();
239
240        Router::new()
241            // Health endpoints
242            .route("/health", get(health_check))
243            // Webhook endpoint for submissions
244            .route("/webhook/:tenant_id/:workflow_id/:trigger_id", post(handle_webhook))
245            // DLQ management
246            .route("/internal/dlq", get(get_dlq_entries))
247            .route("/internal/dlq/:submission_id/retry", post(retry_dlq_entry))
248            // Submission status
249            .route("/submissions/:submission_id", get(get_submission_status))
250            .with_state(state)
251    }
252
253    pub fn state(&self) -> Arc<EngineState> {
254        self.state.clone()
255    }
256}
257
258impl Default for EngineSimulator {
259    fn default() -> Self {
260        Self::new()
261    }
262}
263
264// ═══════════════════════════════════════════════════════════════════════════════
265// Admin Simulator (for health check on port 9090)
266// ═══════════════════════════════════════════════════════════════════════════════
267
268pub struct AdminSimulator {
269    state: Arc<EngineState>,
270}
271
272impl AdminSimulator {
273    pub fn new(state: Arc<EngineState>) -> Self {
274        Self { state }
275    }
276
277    pub fn router(&self) -> Router {
278        let state = self.state.clone();
279
280        Router::new()
281            .route("/health", get(admin_health_check))
282            .route("/metrics", get(metrics))
283            .with_state(state)
284    }
285}
286
287// ═══════════════════════════════════════════════════════════════════════════════
288// Handlers
289// ═══════════════════════════════════════════════════════════════════════════════
290
291async fn health_check(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
292    let uptime = state.start_time.elapsed().as_secs();
293    Json(serde_json::json!({
294        "status": "healthy",
295        "service": "engine-simulator",
296        "version": "1.0.0",
297        "uptime_secs": uptime
298    }))
299}
300
301async fn admin_health_check(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
302    let uptime = state.start_time.elapsed().as_secs();
303    Json(serde_json::json!({
304        "status": "healthy",
305        "service": "admin-simulator",
306        "version": "1.0.0",
307        "uptime_secs": uptime
308    }))
309}
310
311async fn metrics(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
312    let submissions = state.submissions.read().await;
313    let dlq = state.dlq.read().await;
314    
315    Json(serde_json::json!({
316        "total_submissions": submissions.len(),
317        "dlq_size": dlq.len(),
318        "uptime_secs": state.start_time.elapsed().as_secs()
319    }))
320}
321
322async fn handle_webhook(
323    State(state): State<Arc<EngineState>>,
324    Path((_tenant_id, _workflow_id, _trigger_id)): Path<(String, String, String)>,
325    Json(request): Json<SubmissionRequest>,
326) -> impl IntoResponse {
327    let start = std::time::Instant::now();
328    let submission_id = request.submission_id.clone();
329    let execution_id = format!("EXEC-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
330
331    // Generate artifact
332    let artifact = generate_artifact(&request);
333
334    // Attempt submission to regulator
335    let (status, attempts, error_reason) = submit_to_regulator(&request, &artifact).await;
336
337    // Generate audit events
338    let audit_events = generate_audit_events(&submission_id, &status, &attempts);
339
340    // Generate evidence pack if requested
341    let evidence_pack = if request.options.include_evidence_pack {
342        Some(generate_evidence_pack(&request, &artifact, &attempts))
343    } else {
344        None
345    };
346
347    let response = SubmissionResponse {
348        execution_id: execution_id.clone(),
349        submission_id: submission_id.clone(),
350        status,
351        artifact: Some(artifact),
352        attempts,
353        evidence_pack,
354        audit_events,
355        duration_ms: start.elapsed().as_millis() as u64,
356    };
357
358    // Store in state
359    {
360        let mut submissions = state.submissions.write().await;
361        submissions.insert(submission_id.clone(), response.clone());
362    }
363
364    // Add to DLQ if failed
365    if status == SubmissionStatus::Escalated || status == SubmissionStatus::Failed {
366        let mut dlq = state.dlq.write().await;
367        dlq.push(DlqEntry {
368            submission_id: submission_id.clone(),
369            reason: error_reason.unwrap_or_else(|| "Unknown".to_string()),
370            timestamp: Utc::now().to_rfc3339(),
371            attempts: response.attempts.len() as u32,
372            last_error: response.attempts.last()
373                .and_then(|a| a.response_message.clone())
374                .unwrap_or_else(|| "No error".to_string()),
375        });
376    }
377
378    (StatusCode::OK, Json(response))
379}
380
381async fn get_dlq_entries(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
382    let dlq = state.dlq.read().await;
383    Json(dlq.clone())
384}
385
386async fn retry_dlq_entry(
387    State(state): State<Arc<EngineState>>,
388    Path(submission_id): Path<String>,
389) -> impl IntoResponse {
390    let mut dlq = state.dlq.write().await;
391    
392    // Find and remove from DLQ
393    if let Some(pos) = dlq.iter().position(|e| e.submission_id == submission_id) {
394        dlq.remove(pos);
395        (StatusCode::OK, Json(serde_json::json!({"status": "retrying", "submission_id": submission_id})))
396    } else {
397        (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Entry not found in DLQ"})))
398    }
399}
400
401async fn get_submission_status(
402    State(state): State<Arc<EngineState>>,
403    Path(submission_id): Path<String>,
404) -> impl IntoResponse {
405    let submissions = state.submissions.read().await;
406    
407    if let Some(submission) = submissions.get(&submission_id) {
408        (StatusCode::OK, Json(serde_json::json!(submission)))
409    } else {
410        (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Submission not found"})))
411    }
412}
413
414// ═══════════════════════════════════════════════════════════════════════════════
415// Helper Functions
416// ═══════════════════════════════════════════════════════════════════════════════
417
418fn generate_artifact(request: &SubmissionRequest) -> Artifact {
419    let artifact_id = format!("ART-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
420    let content = format!("{}:{}:{}", request.submission_id, request.locked_dataset_version, Utc::now());
421    let artifact_hash = format!("{:x}", md5::compute(content.as_bytes()));
422
423    Artifact {
424        artifact_id,
425        artifact_hash,
426        format: request.artifact_format.clone(),
427        row_count: 1000, // Mock row count
428        file_size: 1024 * 50, // 50KB mock size
429        locked_dataset_version: request.locked_dataset_version.clone(),
430        created_at: Utc::now().to_rfc3339(),
431        precheck_result: Some(PrecheckResult {
432            passed: true,
433            checks: vec![
434                PrecheckItem {
435                    check_name: "schema_validation".to_string(),
436                    passed: true,
437                    message: None,
438                },
439                PrecheckItem {
440                    check_name: "hash_verification".to_string(),
441                    passed: true,
442                    message: None,
443                },
444                PrecheckItem {
445                    check_name: "size_limit".to_string(),
446                    passed: true,
447                    message: None,
448                },
449            ],
450        }),
451    }
452}
453
454async fn submit_to_regulator(
455    request: &SubmissionRequest,
456    artifact: &Artifact,
457) -> (SubmissionStatus, Vec<SubmissionAttempt>, Option<String>) {
458    let client = reqwest::Client::builder()
459        .timeout(std::time::Duration::from_secs(30))
460        .build()
461        .unwrap();
462
463    let max_attempts = request.options.retry_policy.max_attempts;
464    let mut attempts = Vec::new();
465    let mut last_error: Option<String> = None;
466
467    for attempt_num in 1..=max_attempts {
468        let attempt_start = std::time::Instant::now();
469        let correlation_id = format!("CORR-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
470        
471        let submit_url = format!("{}/api/v1/submit", REGULATOR_ENDPOINT_URL);
472        
473        // Generate mock records (Regulator expects records array)
474        let mock_records: Vec<serde_json::Value> = (0..artifact.row_count.min(10))
475            .map(|i| serde_json::json!({
476                "id": format!("REC-{:06}", i),
477                "nik": format!("32050219900{:05}", i),
478                "nama_lengkap": format!("Test User {}", i),
479                "jenis_fasilitas": "KMK",
480                "jumlah_kredit": 10000000,
481                "mata_uang": "IDR",
482                "suku_bunga": 12.5,
483                "tanggal_mulai": "2024-01-01",
484                "tanggal_jatuh_tempo": "2025-01-01",
485                "saldo_outstanding": 8000000,
486                "kolektabilitas": 1
487            }))
488            .collect();
489        
490        let result = client
491            .post(&submit_url)
492            .json(&serde_json::json!({
493                "submission_id": request.submission_id,
494                "reporting_period": "202501",
495                "bank_code": "BANK001",
496                "records": mock_records,
497                "metadata": {
498                    "artifact_hash": artifact.artifact_hash,
499                    "format": artifact.format,
500                    "dataset_version": request.locked_dataset_version
501                }
502            }))
503            .send()
504            .await;
505
506        match result {
507            Ok(response) => {
508                let status_code = response.status().as_u16();
509                let duration = attempt_start.elapsed().as_millis() as u64;
510
511                if response.status().is_success() {
512                    attempts.push(SubmissionAttempt {
513                        attempt_number: attempt_num,
514                        timestamp: Utc::now().to_rfc3339(),
515                        status: AttemptStatus::Success,
516                        response_code: Some(status_code),
517                        response_message: None,
518                        correlation_id,
519                        duration_ms: duration,
520                    });
521                    return (SubmissionStatus::Acknowledged, attempts, None);
522                } else if status_code == 400 || status_code == 422 {
523                    // Rejection - don't retry
524                    let body = response.text().await.unwrap_or_default();
525                    attempts.push(SubmissionAttempt {
526                        attempt_number: attempt_num,
527                        timestamp: Utc::now().to_rfc3339(),
528                        status: AttemptStatus::Rejected,
529                        response_code: Some(status_code),
530                        response_message: Some(body.clone()),
531                        correlation_id,
532                        duration_ms: duration,
533                    });
534                    return (SubmissionStatus::Rejected, attempts, Some(body));
535                } else {
536                    // Server error - retry
537                    let body = response.text().await.unwrap_or_default();
538                    last_error = Some(body.clone());
539                    attempts.push(SubmissionAttempt {
540                        attempt_number: attempt_num,
541                        timestamp: Utc::now().to_rfc3339(),
542                        status: AttemptStatus::Error,
543                        response_code: Some(status_code),
544                        response_message: Some(body),
545                        correlation_id,
546                        duration_ms: duration,
547                    });
548                }
549            }
550            Err(e) => {
551                let duration = attempt_start.elapsed().as_millis() as u64;
552                let (error_status, error_msg) = if e.is_timeout() {
553                    (AttemptStatus::Timeout, "Request timeout".to_string())
554                } else if e.is_connect() {
555                    (AttemptStatus::Error, "Connection failed".to_string())
556                } else {
557                    (AttemptStatus::Error, format!("Request error: {}", e))
558                };
559                
560                last_error = Some(error_msg.clone());
561                attempts.push(SubmissionAttempt {
562                    attempt_number: attempt_num,
563                    timestamp: Utc::now().to_rfc3339(),
564                    status: error_status,
565                    response_code: None,
566                    response_message: Some(error_msg),
567                    correlation_id,
568                    duration_ms: duration,
569                });
570            }
571        }
572
573        // Wait before retry (exponential backoff)
574        if attempt_num < max_attempts {
575            let delay = calculate_backoff(attempt_num, &request.options.retry_policy);
576            tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
577        }
578    }
579
580    // All attempts failed
581    if attempts.len() as u32 >= max_attempts {
582        (SubmissionStatus::Escalated, attempts, last_error)
583    } else {
584        (SubmissionStatus::Failed, attempts, last_error)
585    }
586}
587
588fn calculate_backoff(attempt: u32, policy: &RetryPolicy) -> u64 {
589    let delay = (policy.initial_backoff_ms as f64) * policy.backoff_multiplier.powi(attempt as i32 - 1);
590    (delay as u64).min(policy.max_backoff_ms)
591}
592
593fn generate_audit_events(
594    submission_id: &str,
595    status: &SubmissionStatus,
596    attempts: &[SubmissionAttempt],
597) -> Vec<AuditEvent> {
598    let mut events = vec![
599        AuditEvent {
600            event_type: "submission_started".to_string(),
601            timestamp: Utc::now().to_rfc3339(),
602            actor: "engine".to_string(),
603            details: serde_json::json!({
604                "submission_id": submission_id
605            }),
606        },
607        // Artifact generated event
608        AuditEvent {
609            event_type: "artifact_generated".to_string(),
610            timestamp: Utc::now().to_rfc3339(),
611            actor: "engine".to_string(),
612            details: serde_json::json!({
613                "submission_id": submission_id,
614                "format": "XML"
615            }),
616        },
617    ];
618
619    for attempt in attempts {
620        let attempt_event_type = match attempt.status {
621            AttemptStatus::Success => "attempt_success".to_string(),
622            AttemptStatus::Timeout => "attempt_timeout".to_string(),
623            AttemptStatus::Rejected => "attempt_rejected".to_string(),
624            AttemptStatus::Error => "attempt_error".to_string(),
625        };
626        
627        events.push(AuditEvent {
628            event_type: attempt_event_type,
629            timestamp: attempt.timestamp.clone(),
630            actor: "engine".to_string(),
631            details: serde_json::json!({
632                "attempt": attempt.attempt_number,
633                "duration_ms": attempt.duration_ms,
634                "response_code": attempt.response_code,
635                "correlation_id": attempt.correlation_id
636            }),
637        });
638    }
639
640    // Final status event
641    let final_event_type = match status {
642        SubmissionStatus::Acknowledged | SubmissionStatus::Submitted => "submission_success".to_string(),
643        SubmissionStatus::Rejected => "submission_rejected".to_string(),
644        SubmissionStatus::Escalated => "submission_escalated".to_string(),
645        SubmissionStatus::Failed => "submission_failed".to_string(),
646        _ => format!("submission_{:?}", status).to_lowercase(),
647    };
648    
649    events.push(AuditEvent {
650        event_type: final_event_type,
651        timestamp: Utc::now().to_rfc3339(),
652        actor: "engine".to_string(),
653        details: serde_json::json!({
654            "final_status": format!("{:?}", status),
655            "total_attempts": attempts.len()
656        }),
657    });
658
659    events
660}
661
662fn generate_evidence_pack(
663    request: &SubmissionRequest,
664    artifact: &Artifact,
665    attempts: &[SubmissionAttempt],
666) -> EvidencePack {
667    let pack_id = format!("EVP-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
668    
669    let contents = EvidenceContents {
670        run_id: request.submission_id.clone(),
671        dataset_version: request.locked_dataset_version.clone(),
672        artifact_hash: artifact.artifact_hash.clone(),
673        submission_attempts: attempts.to_vec(),
674        audit_log_hash: format!("{:x}", md5::compute(format!("{:?}", attempts).as_bytes())),
675        approval_trail: None,
676    };
677
678    let hash = format!("{:x}", md5::compute(format!("{:?}", contents).as_bytes()));
679
680    EvidencePack {
681        pack_id,
682        created_at: Utc::now().to_rfc3339(),
683        contents,
684        hash,
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691
692    #[test]
693    fn test_default_retry_policy() {
694        let policy = RetryPolicy::default();
695        assert_eq!(policy.max_attempts, 3);
696        assert_eq!(policy.initial_backoff_ms, 1000);
697    }
698
699    #[test]
700    fn test_calculate_backoff() {
701        let policy = RetryPolicy::default();
702        
703        let delay1 = calculate_backoff(1, &policy);
704        assert_eq!(delay1, 1000);
705        
706        let delay2 = calculate_backoff(2, &policy);
707        assert_eq!(delay2, 2000);
708        
709        let delay3 = calculate_backoff(3, &policy);
710        assert_eq!(delay3, 4000);
711    }
712}