Skip to main content

credit_data_simulator/
regulator_endpoint.rs

1//! # Regulator Endpoint Simulator
2//!
3//! Simulates the OJK (Otoritas Jasa Keuangan) regulator endpoint for SLIK submissions.
4//! Supports various response modes for testing different scenarios including:
5//! - Normal acceptance
6//! - Timeouts
7//! - Rejections
8//! - Rate limiting
9//! - Partial failures
10//! - Idempotency handling
11//!
12//! ## Endpoints
13//!
14//! - `GET /health` - Health check
15//! - `POST /api/v1/submit` - Submit credit data
16//! - `GET /api/v1/submissions/:id` - Get submission status
17//! - `GET /api/v1/submissions` - List submissions
18//! - `POST /api/v1/mode` - Change response mode dynamically
19//! - `GET /api/v1/stats` - Get simulator statistics
20//! - `POST /api/v1/reset` - Reset state and statistics
21//! - `DELETE /api/v1/submissions/:id` - Delete a submission (for testing)
22
23use crate::{
24    config::{FailureType, RegulatorEndpointConfig, RegulatorMode},
25    ApiResponse, HealthStatus, ResponseMeta, SharedState, SimulatorError, SimulatorResult,
26    SimulatorStats, Simulator, shared_state,
27};
28/// # Regulator Endpoint Simulator (OJK/SLIK)
29///
30/// Simulates the external regulatory reporting endpoint (e.g. SLIK/OJK).
31/// This service receives data submissions, validates them, and provides acknowledgments.
32///
33/// ## Business Purpose
34///
35/// The Regulator Endpoint mimics the behavior of the OJK (Otoritas Jasa Keuangan) SLIK system.
36/// It allows the Vastar Workflow Engine to:
37/// 1. Test submission workflows (End-to-End).
38/// 2. Handle various response scenarios (Success, Failure, Timeout).
39/// 3. Verify retry mechanisms and error handling logic.
40/// 4. Validate idempotency (prevent duplicate submissions).
41///
42/// ## Operating Modes
43///
44/// This simulator supports dynamic mode switching to test system resilience:
45///
46/// - **Accept**: Standard success mode. Validates schema and accepts submission.
47/// - **Reject**: Simulates logic/validation errors (400 Bad Request).
48/// - **Timeout**: Simulates network latency or gateway timeouts (504 Gateway Timeout).
49/// - **ServiceUnavailable**: Simulates downtime or maintenance (503 Service Unavailable).
50/// - **RateLimited**: Simulates API rate limits (429 Too Many Requests).
51/// - **Intermittent**: Randomly fails a percentage of requests (Chaos Testing).
52/// - **PartialReject**: Accepts the submission but rejects specific records inside it.
53/// - **Queued**: Simulates async processing (202 Accepted) where status must be polled later.
54/// - **Custom**: Configurable status code and body for edge case testing.
55///
56/// ## Key Features
57///
58/// - **Idempotency**: Checks `Reference-Number` or content hash to detect duplicates.
59/// - **Async Processing**: Can simulate long-running validation tasks.
60/// - **Detailed Receipts**: Returns submission ID, timestamp, and record-level status.
61///
62/// ## API Endpoints
63///
64/// - `POST /api/v1/submissions` - Submit a new regulatory report.
65/// - `GET /api/v1/submissions/:id` - Check status of a submission.
66/// - `GET /api/v1/submissions` - List all received submissions (for verification).
67/// - `POST /api/v1/mode` - Change the simulator's operating mode (Runtime Config).
68/// - `GET /api/v1/mode` - Get current operating mode.
69/// - `GET /api/v1/stats` - Get simulator statistics.
70
71use axum::{
72    extract::{Path, Query, State},
73    http::{HeaderMap, StatusCode},
74    response::{IntoResponse, Json},
75    routing::{delete, get, post},
76    Router,
77};
78use chrono::{DateTime, Utc};
79use serde::{Deserialize, Serialize};
80use std::collections::HashMap;
81
82use std::time::{Duration, Instant};
83use tokio::sync::oneshot;
84use uuid::Uuid;
85
86// ============================================================================
87// Data Models
88// ============================================================================
89
90/// Submission request from clients
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct SubmissionRequest {
93    /// Client-provided submission ID for idempotency
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub submission_id: Option<String>,
96    /// Reporting period (YYYYMM)
97    pub reporting_period: String,
98    /// Bank code
99    pub bank_code: String,
100    /// Credit records to submit
101    pub records: Vec<serde_json::Value>,
102    /// Metadata
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub metadata: Option<HashMap<String, serde_json::Value>>,
105}
106
107/// Submission response
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct SubmissionResponse {
110    /// Submission ID (generated or provided)
111    pub submission_id: String,
112    /// Regulator reference number
113    pub reference_number: String,
114    /// Submission status
115    pub status: SubmissionStatus,
116    /// Number of records accepted
117    pub accepted_count: usize,
118    /// Number of records rejected
119    pub rejected_count: usize,
120    /// Rejected record details
121    #[serde(skip_serializing_if = "Vec::is_empty")]
122    pub rejections: Vec<RecordRejection>,
123    /// Submission timestamp
124    pub submitted_at: String,
125    /// Processing timestamp (if completed)
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub processed_at: Option<String>,
128    /// Estimated processing time (for queued submissions)
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub estimated_processing_time_secs: Option<u64>,
131    /// Additional details
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub details: Option<HashMap<String, serde_json::Value>>,
134}
135
136/// Submission status
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
138#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
139pub enum SubmissionStatus {
140    /// Submission accepted and processed
141    Accepted,
142    /// Submission partially accepted (some records rejected)
143    PartiallyAccepted,
144    /// Submission rejected
145    Rejected,
146    /// Submission queued for processing
147    Queued,
148    /// Submission is being processed
149    Processing,
150    /// Submission failed
151    Failed,
152}
153
154/// Record rejection details
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct RecordRejection {
157    /// Record index in the submission
158    pub record_index: usize,
159    /// Record ID if available
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub record_id: Option<String>,
162    /// Rejection error code
163    pub error_code: String,
164    /// Rejection message
165    pub message: String,
166    /// Field that caused the rejection
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub field: Option<String>,
169}
170
171/// Stored submission record
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct StoredSubmission {
174    pub submission_id: String,
175    pub reference_number: String,
176    pub bank_code: String,
177    pub reporting_period: String,
178    pub status: SubmissionStatus,
179    pub record_count: usize,
180    pub accepted_count: usize,
181    pub rejected_count: usize,
182    pub rejections: Vec<RecordRejection>,
183    pub submitted_at: DateTime<Utc>,
184    pub processed_at: Option<DateTime<Utc>>,
185    pub idempotency_key: Option<String>,
186    pub attempt_count: u32,
187}
188
189/// Query parameters for listing submissions
190#[derive(Debug, Deserialize)]
191pub struct ListSubmissionsParams {
192    pub bank_code: Option<String>,
193    pub reporting_period: Option<String>,
194    pub status: Option<SubmissionStatus>,
195    pub page: Option<u32>,
196    pub page_size: Option<u32>,
197}
198
199/// Request to change the response mode
200#[derive(Debug, Deserialize)]
201pub struct ChangeModeRequest {
202    pub mode: RegulatorMode,
203}
204
205/// Rejection error response
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct RejectionError {
208    pub error_code: String,
209    pub error_message: String,
210    pub details: Option<serde_json::Value>,
211}
212
213// ============================================================================
214// Simulator State
215// ============================================================================
216
217/// Internal state for Regulator Endpoint Simulator
218pub struct RegulatorEndpointState {
219    pub config: RegulatorEndpointConfig,
220    pub submissions: HashMap<String, StoredSubmission>,
221    pub idempotency_cache: HashMap<String, String>, // idempotency_key -> submission_id
222    pub stats: SimulatorStats,
223    pub started_at: Instant,
224    pub ready: bool,
225    pub submission_counter: u64,
226}
227
228impl RegulatorEndpointState {
229    pub fn new(config: RegulatorEndpointConfig) -> Self {
230        Self {
231            config,
232            submissions: HashMap::new(),
233            idempotency_cache: HashMap::new(),
234            stats: SimulatorStats::default(),
235            started_at: Instant::now(),
236            ready: false,
237            submission_counter: 0,
238        }
239    }
240
241    /// Generate a new reference number
242    pub fn generate_reference_number(&mut self) -> String {
243        self.submission_counter += 1;
244        let timestamp = Utc::now().format("%Y%m%d%H%M%S");
245        format!("OJK-SLIK-{}-{:06}", timestamp, self.submission_counter)
246    }
247
248    /// Check if submission is a duplicate (idempotency)
249    pub fn check_idempotency(&self, key: &str) -> Option<&StoredSubmission> {
250        self.idempotency_cache
251            .get(key)
252            .and_then(|id| self.submissions.get(id))
253    }
254
255    /// Store a submission
256    pub fn store_submission(&mut self, submission: StoredSubmission) {
257        if let Some(ref key) = submission.idempotency_key {
258            self.idempotency_cache.insert(key.clone(), submission.submission_id.clone());
259        }
260        self.submissions.insert(submission.submission_id.clone(), submission);
261
262        // Cleanup old idempotency entries if needed
263        if self.idempotency_cache.len() > self.config.max_idempotency_entries {
264            // Remove oldest entries (simple approach: just clear half)
265            let to_remove: Vec<String> = self.idempotency_cache
266                .keys()
267                .take(self.idempotency_cache.len() / 2)
268                .cloned()
269                .collect();
270            for key in to_remove {
271                self.idempotency_cache.remove(&key);
272            }
273        }
274    }
275
276    /// Process submission based on current mode
277    pub fn process_submission(
278        &mut self,
279        request: &SubmissionRequest,
280        idempotency_key: Option<String>,
281    ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
282        // Check idempotency first
283        if let Some(ref key) = idempotency_key {
284            if self.config.enforce_idempotency {
285                if let Some(existing) = self.check_idempotency(key) {
286                    // Return the existing submission response
287                    return Ok(SubmissionResponse {
288                        submission_id: existing.submission_id.clone(),
289                        reference_number: existing.reference_number.clone(),
290                        status: existing.status.clone(),
291                        accepted_count: existing.accepted_count,
292                        rejected_count: existing.rejected_count,
293                        rejections: existing.rejections.clone(),
294                        submitted_at: existing.submitted_at.to_rfc3339(),
295                        processed_at: existing.processed_at.map(|t| t.to_rfc3339()),
296                        estimated_processing_time_secs: None,
297                        details: Some({
298                            let mut d = HashMap::new();
299                            d.insert("idempotent".to_string(), serde_json::json!(true));
300                            d.insert("original_submission".to_string(), serde_json::json!(true));
301                            d
302                        }),
303                    });
304                }
305            }
306        }
307
308        // Check off-peak window
309        if self.config.off_peak_config.enabled
310            && self.config.off_peak_config.reject_outside_window
311            && !self.config.off_peak_config.is_off_peak_now()
312        {
313            return Err((
314                StatusCode::SERVICE_UNAVAILABLE,
315                RejectionError {
316                    error_code: "OUTSIDE_SUBMISSION_WINDOW".to_string(),
317                    error_message: "Submissions are only accepted during off-peak hours".to_string(),
318                    details: Some(serde_json::json!({
319                        "off_peak_start": self.config.off_peak_config.start_hour,
320                        "off_peak_end": self.config.off_peak_config.end_hour,
321                    })),
322                },
323            ));
324        }
325
326        // Process based on mode
327        match &self.config.mode {
328            RegulatorMode::Accept => self.accept_submission(request, idempotency_key),
329            RegulatorMode::Reject { error_code, error_message } => {
330                Err((
331                    StatusCode::BAD_REQUEST,
332                    RejectionError {
333                        error_code: error_code.clone(),
334                        error_message: error_message.clone(),
335                        details: None,
336                    },
337                ))
338            }
339            RegulatorMode::Timeout { delay_ms } => {
340                // This is handled in the handler with actual sleep
341                // Here we just return a timeout error after the delay
342                Err((
343                    StatusCode::GATEWAY_TIMEOUT,
344                    RejectionError {
345                        error_code: "TIMEOUT".to_string(),
346                        error_message: "Request timed out".to_string(),
347                        details: Some(serde_json::json!({ "timeout_ms": delay_ms })),
348                    },
349                ))
350            }
351            RegulatorMode::ServiceUnavailable => {
352                Err((
353                    StatusCode::SERVICE_UNAVAILABLE,
354                    RejectionError {
355                        error_code: "SERVICE_UNAVAILABLE".to_string(),
356                        error_message: "The service is temporarily unavailable".to_string(),
357                        details: None,
358                    },
359                ))
360            }
361            RegulatorMode::RateLimited => {
362                Err((
363                    StatusCode::TOO_MANY_REQUESTS,
364                    RejectionError {
365                        error_code: "RATE_LIMITED".to_string(),
366                        error_message: "Too many requests. Please retry later.".to_string(),
367                        details: Some(serde_json::json!({
368                            "retry_after_secs": self.config.retry_after_secs
369                        })),
370                    },
371                ))
372            }
373            RegulatorMode::Intermittent { failure_rate } => {
374                if rand::random::<f64>() < *failure_rate {
375                    Err((
376                        StatusCode::INTERNAL_SERVER_ERROR,
377                        RejectionError {
378                            error_code: "INTERNAL_ERROR".to_string(),
379                            error_message: "An internal error occurred".to_string(),
380                            details: None,
381                        },
382                    ))
383                } else {
384                    self.accept_submission(request, idempotency_key)
385                }
386            }
387            RegulatorMode::PartialReject { reject_ratio } => {
388                self.partial_accept_submission(request, idempotency_key, *reject_ratio)
389            }
390            RegulatorMode::Queued { queue_delay_ms } => {
391                self.queue_submission(request, idempotency_key, *queue_delay_ms)
392            }
393            RegulatorMode::Custom { status_code, body, .. } => {
394                let status = StatusCode::from_u16(*status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
395                if status.is_success() {
396                    self.accept_submission(request, idempotency_key)
397                } else {
398                    Err((
399                        status,
400                        RejectionError {
401                            error_code: "CUSTOM_ERROR".to_string(),
402                            error_message: body.clone(),
403                            details: None,
404                        },
405                    ))
406                }
407            }
408        }
409    }
410
411    fn accept_submission(
412        &mut self,
413        request: &SubmissionRequest,
414        idempotency_key: Option<String>,
415    ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
416        let submission_id = request.submission_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
417        let reference_number = self.generate_reference_number();
418        let now = Utc::now();
419        let record_count = request.records.len();
420
421        let stored = StoredSubmission {
422            submission_id: submission_id.clone(),
423            reference_number: reference_number.clone(),
424            bank_code: request.bank_code.clone(),
425            reporting_period: request.reporting_period.clone(),
426            status: SubmissionStatus::Accepted,
427            record_count,
428            accepted_count: record_count,
429            rejected_count: 0,
430            rejections: Vec::new(),
431            submitted_at: now,
432            processed_at: Some(now),
433            idempotency_key,
434            attempt_count: 1,
435        };
436
437        self.store_submission(stored);
438
439        Ok(SubmissionResponse {
440            submission_id,
441            reference_number,
442            status: SubmissionStatus::Accepted,
443            accepted_count: record_count,
444            rejected_count: 0,
445            rejections: Vec::new(),
446            submitted_at: now.to_rfc3339(),
447            processed_at: Some(now.to_rfc3339()),
448            estimated_processing_time_secs: None,
449            details: None,
450        })
451    }
452
453    fn partial_accept_submission(
454        &mut self,
455        request: &SubmissionRequest,
456        idempotency_key: Option<String>,
457        reject_ratio: f64,
458    ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
459        let submission_id = request.submission_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
460        let reference_number = self.generate_reference_number();
461        let now = Utc::now();
462        let record_count = request.records.len();
463
464        // Determine which records to reject
465        let mut rejections = Vec::new();
466        let rejection_errors = [
467            ("INVALID_NIK", "NIK format is invalid"),
468            ("INVALID_AMOUNT", "Credit amount is invalid"),
469            ("DUPLICATE_RECORD", "Duplicate record detected"),
470            ("MISSING_FIELD", "Required field is missing"),
471        ];
472
473        for (idx, record) in request.records.iter().enumerate() {
474            if rand::random::<f64>() < reject_ratio {
475                let error = &rejection_errors[rand::random::<usize>() % rejection_errors.len()];
476                rejections.push(RecordRejection {
477                    record_index: idx,
478                    record_id: record.get("id").and_then(|v| v.as_str()).map(String::from),
479                    error_code: error.0.to_string(),
480                    message: error.1.to_string(),
481                    field: None,
482                });
483            }
484        }
485
486        let rejected_count = rejections.len();
487        let accepted_count = record_count - rejected_count;
488
489        let status = if rejected_count == 0 {
490            SubmissionStatus::Accepted
491        } else if accepted_count == 0 {
492            SubmissionStatus::Rejected
493        } else {
494            SubmissionStatus::PartiallyAccepted
495        };
496
497        let stored = StoredSubmission {
498            submission_id: submission_id.clone(),
499            reference_number: reference_number.clone(),
500            bank_code: request.bank_code.clone(),
501            reporting_period: request.reporting_period.clone(),
502            status: status.clone(),
503            record_count,
504            accepted_count,
505            rejected_count,
506            rejections: rejections.clone(),
507            submitted_at: now,
508            processed_at: Some(now),
509            idempotency_key,
510            attempt_count: 1,
511        };
512
513        self.store_submission(stored);
514
515        Ok(SubmissionResponse {
516            submission_id,
517            reference_number,
518            status,
519            accepted_count,
520            rejected_count,
521            rejections,
522            submitted_at: now.to_rfc3339(),
523            processed_at: Some(now.to_rfc3339()),
524            estimated_processing_time_secs: None,
525            details: None,
526        })
527    }
528
529    fn queue_submission(
530        &mut self,
531        request: &SubmissionRequest,
532        idempotency_key: Option<String>,
533        queue_delay_ms: u64,
534    ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
535        let submission_id = request.submission_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
536        let reference_number = self.generate_reference_number();
537        let now = Utc::now();
538        let record_count = request.records.len();
539
540        let stored = StoredSubmission {
541            submission_id: submission_id.clone(),
542            reference_number: reference_number.clone(),
543            bank_code: request.bank_code.clone(),
544            reporting_period: request.reporting_period.clone(),
545            status: SubmissionStatus::Queued,
546            record_count,
547            accepted_count: 0,
548            rejected_count: 0,
549            rejections: Vec::new(),
550            submitted_at: now,
551            processed_at: None,
552            idempotency_key,
553            attempt_count: 1,
554        };
555
556        self.store_submission(stored);
557
558        Ok(SubmissionResponse {
559            submission_id,
560            reference_number,
561            status: SubmissionStatus::Queued,
562            accepted_count: 0,
563            rejected_count: 0,
564            rejections: Vec::new(),
565            submitted_at: now.to_rfc3339(),
566            processed_at: None,
567            estimated_processing_time_secs: Some(queue_delay_ms / 1000),
568            details: Some({
569                let mut d = HashMap::new();
570                d.insert("queue_position".to_string(), serde_json::json!(self.submissions.len()));
571                d
572            }),
573        })
574    }
575}
576
577// ============================================================================
578// Regulator Endpoint Simulator
579// ============================================================================
580
581/// Regulator Endpoint Simulator implementation
582pub struct RegulatorEndpointSimulator {
583    state: SharedState<RegulatorEndpointState>,
584    config: RegulatorEndpointConfig,
585}
586
587impl RegulatorEndpointSimulator {
588    /// Create a new Regulator Endpoint Simulator
589    pub fn new(config: RegulatorEndpointConfig) -> Self {
590        let state = shared_state(RegulatorEndpointState::new(config.clone()));
591        Self { state, config }
592    }
593
594    /// Run the simulator HTTP server
595    pub async fn run(&self, shutdown_rx: oneshot::Receiver<()>) -> SimulatorResult<()> {
596        // Mark as ready
597        {
598            let mut state = self.state.write().await;
599            state.ready = true;
600        }
601
602        let app = self.create_router();
603        let addr: std::net::SocketAddr = self.config.socket_addr().parse()
604            .map_err(|e| SimulatorError::ConfigError(format!("Invalid address: {}", e)))?;
605
606        tracing::info!("Regulator Endpoint Simulator listening on {}", addr);
607
608        let listener = tokio::net::TcpListener::bind(addr).await
609            .map_err(|e| SimulatorError::BindError(e.to_string()))?;
610
611        axum::serve(listener, app)
612            .with_graceful_shutdown(async {
613                let _ = shutdown_rx.await;
614                tracing::info!("Regulator Endpoint Simulator shutting down");
615            })
616            .await
617            .map_err(|e| SimulatorError::StartError(e.to_string()))?;
618
619        Ok(())
620    }
621
622    /// Create the router with all endpoints
623    fn create_router(&self) -> Router {
624        let state = self.state.clone();
625
626        Router::new()
627            .route("/health", get(health_handler))
628            .route("/api/v1/submit", post(submit_handler))
629            .route("/api/v1/submissions", get(list_submissions_handler))
630            .route("/api/v1/submissions/:id", get(get_submission_handler))
631            .route("/api/v1/submissions/:id", delete(delete_submission_handler))
632            .route("/api/v1/mode", post(change_mode_handler))
633            .route("/api/v1/mode", get(get_mode_handler))
634            .route("/api/v1/stats", get(stats_handler))
635            .route("/api/v1/reset", post(reset_handler))
636            .with_state(state)
637    }
638
639    /// Get a submission by ID
640    pub async fn get_submission(&self, id: &str) -> Option<StoredSubmission> {
641        self.state.read().await.submissions.get(id).cloned()
642    }
643
644    /// Set the response mode dynamically
645    pub async fn set_mode(&self, mode: RegulatorMode) {
646        self.state.write().await.config.mode = mode;
647    }
648
649    /// Get current mode
650    pub async fn get_mode(&self) -> RegulatorMode {
651        self.state.read().await.config.mode.clone()
652    }
653
654    /// Get all submissions
655    pub async fn get_all_submissions(&self) -> Vec<StoredSubmission> {
656        self.state.read().await.submissions.values().cloned().collect()
657    }
658
659    /// Clear all submissions
660    pub async fn clear_submissions(&self) {
661        let mut state = self.state.write().await;
662        state.submissions.clear();
663        state.idempotency_cache.clear();
664    }
665}
666
667#[async_trait::async_trait]
668impl Simulator for RegulatorEndpointSimulator {
669    fn name(&self) -> &str {
670        "regulator-endpoint"
671    }
672
673    fn port(&self) -> u16 {
674        self.config.port
675    }
676
677    async fn health(&self) -> HealthStatus {
678        let state = self.state.read().await;
679        let uptime = state.started_at.elapsed().as_secs();
680
681        if state.ready {
682            HealthStatus::healthy(self.name(), "1.0.0", uptime)
683                .with_details("submission_count", serde_json::json!(state.submissions.len()))
684                .with_details("mode", serde_json::json!(format!("{:?}", state.config.mode)))
685        } else {
686            HealthStatus::unhealthy(self.name(), "Not ready")
687        }
688    }
689
690    async fn stats(&self) -> SimulatorStats {
691        self.state.read().await.stats.clone()
692    }
693
694    async fn reset_stats(&self) {
695        self.state.write().await.stats = SimulatorStats::default();
696    }
697
698    async fn is_ready(&self) -> bool {
699        self.state.read().await.ready
700    }
701}
702
703// ============================================================================
704// HTTP Handlers
705// ============================================================================
706
707/// Health check endpoint
708async fn health_handler(
709    State(state): State<SharedState<RegulatorEndpointState>>,
710) -> impl IntoResponse {
711    let state = state.read().await;
712    let uptime = state.started_at.elapsed().as_secs();
713
714    if state.ready {
715        let health = HealthStatus::healthy("regulator-endpoint", "1.0.0", uptime)
716            .with_details("submission_count", serde_json::json!(state.submissions.len()))
717            .with_details("mode", serde_json::json!(format!("{:?}", state.config.mode)));
718        (StatusCode::OK, Json(health))
719    } else {
720        let health = HealthStatus::unhealthy("regulator-endpoint", "Not ready");
721        (StatusCode::SERVICE_UNAVAILABLE, Json(health))
722    }
723}
724
725/// Submit credit data
726async fn submit_handler(
727    State(state): State<SharedState<RegulatorEndpointState>>,
728    headers: HeaderMap,
729    Json(request): Json<SubmissionRequest>,
730) -> impl IntoResponse {
731    let start = Instant::now();
732    let mut state_guard = state.write().await;
733
734    // Extract idempotency key from header or request
735    let idempotency_key = headers
736        .get("Idempotency-Key")
737        .and_then(|v| v.to_str().ok())
738        .map(String::from)
739        .or_else(|| request.submission_id.clone());
740
741    // Check for failure injection
742    let failure = state_guard.config.failure_injection.random_failure().cloned();
743    if let Some(ref failure) = failure {
744        state_guard.stats.record_request("/api/v1/submit", false, start.elapsed().as_millis() as f64);
745        return match failure {
746            FailureType::InternalError => {
747                (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiResponse::<SubmissionResponse>::error("ERR500", "Internal server error")))
748            }
749            FailureType::ServiceUnavailable => {
750                (StatusCode::SERVICE_UNAVAILABLE, Json(ApiResponse::<SubmissionResponse>::error("ERR503", "Service unavailable")))
751            }
752            FailureType::Timeout => {
753                tokio::time::sleep(Duration::from_secs(30)).await;
754                (StatusCode::GATEWAY_TIMEOUT, Json(ApiResponse::<SubmissionResponse>::error("ERR504", "Gateway timeout")))
755            }
756            FailureType::RateLimited => {
757                (StatusCode::TOO_MANY_REQUESTS, Json(ApiResponse::<SubmissionResponse>::error("ERR429", "Rate limited")))
758            }
759            _ => {
760                (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiResponse::<SubmissionResponse>::error("ERR500", "Internal server error")))
761            }
762        };
763    }
764
765    // Apply latency
766    state_guard.config.latency.apply().await;
767
768    // Handle timeout mode specially (need to delay before responding)
769    if let RegulatorMode::Timeout { delay_ms } = &state_guard.config.mode {
770        let delay = *delay_ms;
771        drop(state_guard); // Release lock during sleep
772        tokio::time::sleep(Duration::from_millis(delay)).await;
773
774        let mut state_guard = state.write().await;
775        state_guard.stats.record_request("/api/v1/submit", false, start.elapsed().as_millis() as f64);
776        state_guard.stats.record_timeout();
777        return (StatusCode::GATEWAY_TIMEOUT, Json(ApiResponse::<SubmissionResponse>::error("TIMEOUT", "Request timed out")));
778    }
779
780    // Process the submission
781    match state_guard.process_submission(&request, idempotency_key) {
782        Ok(response) => {
783            state_guard.stats.record_request("/api/v1/submit", true, start.elapsed().as_millis() as f64);
784            (StatusCode::OK, Json(ApiResponse::success(response)))
785        }
786        Err((status, error)) => {
787            state_guard.stats.record_request("/api/v1/submit", false, start.elapsed().as_millis() as f64);
788            (status, Json(ApiResponse::<SubmissionResponse>::error(&error.error_code, &error.error_message)))
789        }
790    }
791}
792
793/// Get submission by ID
794async fn get_submission_handler(
795    State(state): State<SharedState<RegulatorEndpointState>>,
796    Path(id): Path<String>,
797) -> impl IntoResponse {
798    let start = Instant::now();
799    let mut state_guard = state.write().await;
800
801    // Apply latency
802    state_guard.config.latency.apply().await;
803
804    if let Some(submission) = state_guard.submissions.get(&id).cloned() {
805        state_guard.stats.record_request(&format!("/api/v1/submissions/{}", id), true, start.elapsed().as_millis() as f64);
806
807        let response = SubmissionResponse {
808            submission_id: submission.submission_id,
809            reference_number: submission.reference_number,
810            status: submission.status,
811            accepted_count: submission.accepted_count,
812            rejected_count: submission.rejected_count,
813            rejections: submission.rejections,
814            submitted_at: submission.submitted_at.to_rfc3339(),
815            processed_at: submission.processed_at.map(|t| t.to_rfc3339()),
816            estimated_processing_time_secs: None,
817            details: None,
818        };
819
820        (StatusCode::OK, Json(ApiResponse::success(response)))
821    } else {
822        state_guard.stats.record_request(&format!("/api/v1/submissions/{}", id), false, start.elapsed().as_millis() as f64);
823        (StatusCode::NOT_FOUND, Json(ApiResponse::<SubmissionResponse>::error("NOT_FOUND", &format!("Submission '{}' not found", id))))
824    }
825}
826
827/// List submissions
828async fn list_submissions_handler(
829    State(state): State<SharedState<RegulatorEndpointState>>,
830    Query(params): Query<ListSubmissionsParams>,
831) -> impl IntoResponse {
832    let start = Instant::now();
833    let mut state_guard = state.write().await;
834
835    // Apply latency
836    state_guard.config.latency.apply().await;
837
838    let mut submissions: Vec<&StoredSubmission> = state_guard.submissions.values().collect();
839
840    // Apply filters
841    if let Some(ref bank_code) = params.bank_code {
842        submissions.retain(|s| &s.bank_code == bank_code);
843    }
844    if let Some(ref period) = params.reporting_period {
845        submissions.retain(|s| &s.reporting_period == period);
846    }
847    if let Some(ref status) = params.status {
848        submissions.retain(|s| &s.status == status);
849    }
850
851    // Sort by submitted_at descending
852    submissions.sort_by(|a, b| b.submitted_at.cmp(&a.submitted_at));
853
854    let total_count = submissions.len() as u64;
855    let page = params.page.unwrap_or(1).max(1);
856    let page_size = params.page_size.unwrap_or(20).min(100);
857    let start_idx = ((page - 1) * page_size) as usize;
858    let end_idx = (start_idx + page_size as usize).min(submissions.len());
859
860    let paged: Vec<SubmissionSummary> = submissions[start_idx..end_idx]
861        .iter()
862        .map(|s| SubmissionSummary {
863            submission_id: s.submission_id.clone(),
864            reference_number: s.reference_number.clone(),
865            bank_code: s.bank_code.clone(),
866            reporting_period: s.reporting_period.clone(),
867            status: s.status.clone(),
868            record_count: s.record_count,
869            accepted_count: s.accepted_count,
870            rejected_count: s.rejected_count,
871            submitted_at: s.submitted_at.to_rfc3339(),
872        })
873        .collect();
874
875    state_guard.stats.record_request("/api/v1/submissions", true, start.elapsed().as_millis() as f64);
876
877    let meta = ResponseMeta::paginated(page, page_size, total_count)
878        .with_timing(start.elapsed().as_millis() as u64);
879
880    (StatusCode::OK, Json(ApiResponse::success_with_meta(paged, meta)))
881}
882
883/// Delete a submission (for testing)
884async fn delete_submission_handler(
885    State(state): State<SharedState<RegulatorEndpointState>>,
886    Path(id): Path<String>,
887) -> impl IntoResponse {
888    let mut state_guard = state.write().await;
889
890    if let Some(submission) = state_guard.submissions.remove(&id) {
891        // Also remove from idempotency cache
892        if let Some(ref key) = submission.idempotency_key {
893            state_guard.idempotency_cache.remove(key);
894        }
895        Json(ApiResponse::success(serde_json::json!({
896            "deleted": true,
897            "submission_id": id
898        })))
899    } else {
900        Json(ApiResponse::<serde_json::Value>::error("NOT_FOUND", &format!("Submission '{}' not found", id)))
901    }
902}
903
904/// Change response mode
905async fn change_mode_handler(
906    State(state): State<SharedState<RegulatorEndpointState>>,
907    Json(request): Json<ChangeModeRequest>,
908) -> impl IntoResponse {
909    let mut state_guard = state.write().await;
910    state_guard.config.mode = request.mode.clone();
911
912    Json(ApiResponse::success(serde_json::json!({
913        "mode": format!("{:?}", request.mode),
914        "updated": true
915    })))
916}
917
918/// Get current mode
919async fn get_mode_handler(
920    State(state): State<SharedState<RegulatorEndpointState>>,
921) -> impl IntoResponse {
922    let state_guard = state.read().await;
923
924    Json(ApiResponse::success(serde_json::json!({
925        "mode": format!("{:?}", state_guard.config.mode)
926    })))
927}
928
929/// Get simulator statistics
930async fn stats_handler(
931    State(state): State<SharedState<RegulatorEndpointState>>,
932) -> impl IntoResponse {
933    let state_guard = state.read().await;
934
935    let mut stats = state_guard.stats.clone();
936
937    // Add submission-specific stats
938    let mut endpoint_counts = stats.endpoint_counts.clone();
939    endpoint_counts.insert("total_submissions".to_string(), state_guard.submissions.len() as u64);
940
941    let accepted = state_guard.submissions.values()
942        .filter(|s| s.status == SubmissionStatus::Accepted)
943        .count() as u64;
944    endpoint_counts.insert("accepted_submissions".to_string(), accepted);
945
946    let rejected = state_guard.submissions.values()
947        .filter(|s| s.status == SubmissionStatus::Rejected)
948        .count() as u64;
949    endpoint_counts.insert("rejected_submissions".to_string(), rejected);
950
951    stats.endpoint_counts = endpoint_counts;
952
953    Json(ApiResponse::success(stats))
954}
955
956/// Reset simulator state
957async fn reset_handler(
958    State(state): State<SharedState<RegulatorEndpointState>>,
959) -> impl IntoResponse {
960    let mut state_guard = state.write().await;
961    state_guard.stats = SimulatorStats::default();
962    state_guard.submissions.clear();
963    state_guard.idempotency_cache.clear();
964    state_guard.submission_counter = 0;
965    state_guard.config.mode = RegulatorMode::Accept;
966
967    Json(ApiResponse::success(serde_json::json!({
968        "reset": true,
969        "mode": "Accept"
970    })))
971}
972
973/// Summary of a submission for listing
974#[derive(Debug, Clone, Serialize, Deserialize)]
975pub struct SubmissionSummary {
976    pub submission_id: String,
977    pub reference_number: String,
978    pub bank_code: String,
979    pub reporting_period: String,
980    pub status: SubmissionStatus,
981    pub record_count: usize,
982    pub accepted_count: usize,
983    pub rejected_count: usize,
984    pub submitted_at: String,
985}
986
987// ============================================================================
988// Tests
989// ============================================================================
990
991#[cfg(test)]
992mod tests {
993    use super::*;
994
995    #[test]
996    fn test_submission_status_serialization() {
997        let status = SubmissionStatus::Accepted;
998        let json = serde_json::to_string(&status).unwrap();
999        assert_eq!(json, "\"ACCEPTED\"");
1000    }
1001
1002    #[test]
1003    fn test_submission_request() {
1004        let request = SubmissionRequest {
1005            submission_id: Some("test-123".to_string()),
1006            reporting_period: "202401".to_string(),
1007            bank_code: "BANKXYZ".to_string(),
1008            records: vec![serde_json::json!({"id": "1"})],
1009            metadata: None,
1010        };
1011
1012        assert_eq!(request.submission_id, Some("test-123".to_string()));
1013        assert_eq!(request.records.len(), 1);
1014    }
1015
1016    #[test]
1017    fn test_state_generate_reference_number() {
1018        let config = RegulatorEndpointConfig::default();
1019        let mut state = RegulatorEndpointState::new(config);
1020
1021        let ref1 = state.generate_reference_number();
1022        let ref2 = state.generate_reference_number();
1023
1024        assert!(ref1.starts_with("OJK-SLIK-"));
1025        assert!(ref2.starts_with("OJK-SLIK-"));
1026        assert_ne!(ref1, ref2);
1027    }
1028
1029    #[test]
1030    fn test_state_accept_submission() {
1031        let config = RegulatorEndpointConfig::default();
1032        let mut state = RegulatorEndpointState::new(config);
1033
1034        let request = SubmissionRequest {
1035            submission_id: None,
1036            reporting_period: "202401".to_string(),
1037            bank_code: "BANKXYZ".to_string(),
1038            records: vec![
1039                serde_json::json!({"id": "1"}),
1040                serde_json::json!({"id": "2"}),
1041            ],
1042            metadata: None,
1043        };
1044
1045        let result = state.process_submission(&request, Some("key-1".to_string()));
1046        assert!(result.is_ok());
1047
1048        let response = result.unwrap();
1049        assert_eq!(response.status, SubmissionStatus::Accepted);
1050        assert_eq!(response.accepted_count, 2);
1051        assert_eq!(response.rejected_count, 0);
1052    }
1053
1054    #[test]
1055    fn test_state_idempotency() {
1056        let config = RegulatorEndpointConfig::default();
1057        let mut state = RegulatorEndpointState::new(config);
1058
1059        let request = SubmissionRequest {
1060            submission_id: Some("sub-123".to_string()),
1061            reporting_period: "202401".to_string(),
1062            bank_code: "BANKXYZ".to_string(),
1063            records: vec![serde_json::json!({"id": "1"})],
1064            metadata: None,
1065        };
1066
1067        // First submission
1068        let result1 = state.process_submission(&request, Some("idem-key-1".to_string()));
1069        assert!(result1.is_ok());
1070        let response1 = result1.unwrap();
1071
1072        // Duplicate submission with same idempotency key
1073        let result2 = state.process_submission(&request, Some("idem-key-1".to_string()));
1074        assert!(result2.is_ok());
1075        let response2 = result2.unwrap();
1076
1077        // Should return the same submission ID
1078        assert_eq!(response1.submission_id, response2.submission_id);
1079        assert_eq!(response1.reference_number, response2.reference_number);
1080
1081        // Should indicate it's an idempotent response
1082        assert!(response2.details.as_ref().unwrap().get("idempotent").is_some());
1083    }
1084
1085    #[test]
1086    fn test_state_reject_mode() {
1087        let mut config = RegulatorEndpointConfig::default();
1088        config.mode = RegulatorMode::Reject {
1089            error_code: "INVALID_DATA".to_string(),
1090            error_message: "Data validation failed".to_string(),
1091        };
1092
1093        let mut state = RegulatorEndpointState::new(config);
1094
1095        let request = SubmissionRequest {
1096            submission_id: None,
1097            reporting_period: "202401".to_string(),
1098            bank_code: "BANKXYZ".to_string(),
1099            records: vec![serde_json::json!({"id": "1"})],
1100            metadata: None,
1101        };
1102
1103        let result = state.process_submission(&request, None);
1104        assert!(result.is_err());
1105
1106        let (status, error) = result.unwrap_err();
1107        assert_eq!(status, StatusCode::BAD_REQUEST);
1108        assert_eq!(error.error_code, "INVALID_DATA");
1109    }
1110
1111    #[test]
1112    fn test_state_service_unavailable_mode() {
1113        let mut config = RegulatorEndpointConfig::default();
1114        config.mode = RegulatorMode::ServiceUnavailable;
1115
1116        let mut state = RegulatorEndpointState::new(config);
1117
1118        let request = SubmissionRequest {
1119            submission_id: None,
1120            reporting_period: "202401".to_string(),
1121            bank_code: "BANKXYZ".to_string(),
1122            records: vec![serde_json::json!({"id": "1"})],
1123            metadata: None,
1124        };
1125
1126        let result = state.process_submission(&request, None);
1127        assert!(result.is_err());
1128
1129        let (status, _) = result.unwrap_err();
1130        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
1131    }
1132
1133    #[test]
1134    fn test_state_rate_limited_mode() {
1135        let mut config = RegulatorEndpointConfig::default();
1136        config.mode = RegulatorMode::RateLimited;
1137
1138        let mut state = RegulatorEndpointState::new(config);
1139
1140        let request = SubmissionRequest {
1141            submission_id: None,
1142            reporting_period: "202401".to_string(),
1143            bank_code: "BANKXYZ".to_string(),
1144            records: vec![serde_json::json!({"id": "1"})],
1145            metadata: None,
1146        };
1147
1148        let result = state.process_submission(&request, None);
1149        assert!(result.is_err());
1150
1151        let (status, error) = result.unwrap_err();
1152        assert_eq!(status, StatusCode::TOO_MANY_REQUESTS);
1153        assert_eq!(error.error_code, "RATE_LIMITED");
1154    }
1155
1156    #[test]
1157    fn test_state_queued_mode() {
1158        let mut config = RegulatorEndpointConfig::default();
1159        config.mode = RegulatorMode::Queued { queue_delay_ms: 5000 };
1160
1161        let mut state = RegulatorEndpointState::new(config);
1162
1163        let request = SubmissionRequest {
1164            submission_id: None,
1165            reporting_period: "202401".to_string(),
1166            bank_code: "BANKXYZ".to_string(),
1167            records: vec![serde_json::json!({"id": "1"})],
1168            metadata: None,
1169        };
1170
1171        let result = state.process_submission(&request, None);
1172        assert!(result.is_ok());
1173
1174        let response = result.unwrap();
1175        assert_eq!(response.status, SubmissionStatus::Queued);
1176        assert!(response.estimated_processing_time_secs.is_some());
1177    }
1178
1179    #[test]
1180    fn test_state_partial_reject_mode() {
1181        let mut config = RegulatorEndpointConfig::default();
1182        config.mode = RegulatorMode::PartialReject { reject_ratio: 0.5 };
1183
1184        let mut state = RegulatorEndpointState::new(config);
1185
1186        let request = SubmissionRequest {
1187            submission_id: None,
1188            reporting_period: "202401".to_string(),
1189            bank_code: "BANKXYZ".to_string(),
1190            records: (0..100).map(|i| serde_json::json!({"id": i})).collect(),
1191            metadata: None,
1192        };
1193
1194        let result = state.process_submission(&request, None);
1195        assert!(result.is_ok());
1196
1197        let response = result.unwrap();
1198        // With 50% reject ratio, we should have some accepted and some rejected
1199        // (statistically unlikely to have all accepted or all rejected)
1200        assert!(response.accepted_count > 0 || response.rejected_count > 0);
1201        assert_eq!(response.accepted_count + response.rejected_count, 100);
1202    }
1203
1204    #[test]
1205    fn test_record_rejection() {
1206        let rejection = RecordRejection {
1207            record_index: 5,
1208            record_id: Some("CR0005".to_string()),
1209            error_code: "INVALID_NIK".to_string(),
1210            message: "NIK format is invalid".to_string(),
1211            field: Some("debtor_nik".to_string()),
1212        };
1213
1214        assert_eq!(rejection.record_index, 5);
1215        assert_eq!(rejection.error_code, "INVALID_NIK");
1216    }
1217
1218    #[test]
1219    fn test_stored_submission() {
1220        let submission = StoredSubmission {
1221            submission_id: "sub-123".to_string(),
1222            reference_number: "OJK-SLIK-20240101120000-000001".to_string(),
1223            bank_code: "BANKXYZ".to_string(),
1224            reporting_period: "202401".to_string(),
1225            status: SubmissionStatus::Accepted,
1226            record_count: 100,
1227            accepted_count: 100,
1228            rejected_count: 0,
1229            rejections: Vec::new(),
1230            submitted_at: Utc::now(),
1231            processed_at: Some(Utc::now()),
1232            idempotency_key: Some("idem-123".to_string()),
1233            attempt_count: 1,
1234        };
1235
1236        assert_eq!(submission.status, SubmissionStatus::Accepted);
1237        assert_eq!(submission.record_count, 100);
1238    }
1239}