Skip to main content

credit_data_simulator/
core_banking.rs

1//! # Core Banking Simulator
2//!
3//! Simulates a Core Banking data source that provides credit data for SLIK reporting.
4//! Supports:
5//! - Deterministic dataset generation (via POST /generate)
6//! - Read-only listing (GET /credits) without regenerating
7//! - Cutoff-window filtering (cutoff_start/cutoff_end) on last_updated
8//! - Pagination + cursor style (next_cursor)
9//! - SSE streaming (for PoC) + NDJSON streaming (for throughput testing)
10//!
11//! ## Endpoints
12//! - `GET /health` - Health check
13//! - `GET /api/v1/credits` - List credit records (paginated, read-only)
14//! - `GET /api/v1/credits/:id` - Get record by id
15//! - `GET /api/v1/credits/count` - Count records (optionally filtered)
16//! - `POST /api/v1/credits/generate` - Generate/replace/append dataset (ONLY mutating op)
17//! - `GET /api/v1/credits/stream` - Stream generated records (SSE) (stateless generator)
18//! - `GET /api/v1/credits/ndjson` - Stream generated records (NDJSON) (stateless generator)
19//! - `GET /api/v1/stats` - Simulator statistics
20//! - `POST /api/v1/reset` - Reset statistics and data
21//! - `PUT /api/v1/config` - Update configuration dynamically
22
23use crate::{
24    config::{CoreBankingConfig, FailureType, LatencyConfig},
25    models::credit::CreditRecord,
26    shared_state, ApiResponse, HealthStatus, ResponseMeta, SharedState, Simulator,
27    SimulatorError, SimulatorResult, SimulatorStats,
28};
29
30use axum::{
31    body::Body,
32    extract::{DefaultBodyLimit, Path, Query, State},
33    http::{header, HeaderMap, StatusCode},
34    response::{sse::Event, IntoResponse, Json, Response, Sse},
35    routing::{get, post, put},
36    Router,
37};
38
39use chrono::{NaiveDate, NaiveDateTime, Utc};
40use futures::stream::Stream;
41use futures::StreamExt;
42use rand::{Rng, SeedableRng};
43use rand_chacha::ChaCha8Rng;
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46use tokio_util::io::StreamReader;
47use tokio::io::AsyncBufReadExt;
48use std::{
49    convert::Infallible,
50    time::{Duration, Instant},
51};
52use tokio::sync::oneshot;
53
54// ============================================================================
55// Helpers
56// ============================================================================
57
58fn parse_date_opt(s: &Option<String>) -> Result<Option<NaiveDate>, &'static str> {
59    if let Some(v) = s {
60        let d = NaiveDate::parse_from_str(v, "%Y-%m-%d").map_err(|_| "invalid date format")?;
61        Ok(Some(d))
62    } else {
63        Ok(None)
64    }
65}
66
67fn parse_dt_opt(s: &Option<String>) -> Result<Option<NaiveDateTime>, &'static str> {
68    if let Some(v) = s {
69        // Accept RFC3339 with Z, minimal for PoC.
70        let dt = chrono::DateTime::parse_from_rfc3339(v)
71            .map_err(|_| "invalid datetime format")?
72            .naive_utc();
73        Ok(Some(dt))
74    } else {
75        Ok(None)
76    }
77}
78
79fn within_cutoff(last_updated_rfc3339: &str, start: Option<NaiveDate>, end: Option<NaiveDate>) -> bool {
80    // Parse last_updated. If parsing fails, treat as NOT within window.
81    let Ok(dt) = chrono::DateTime::parse_from_rfc3339(last_updated_rfc3339) else {
82        return false;
83    };
84    let d = dt.date_naive();
85    if let Some(s) = start {
86        if d < s {
87            return false;
88        }
89    }
90    if let Some(e) = end {
91        if d > e {
92            return false;
93        }
94    }
95    true
96}
97
98// Cursor is a simple numeric offset for PoC.
99fn parse_cursor(cursor: &Option<String>) -> usize {
100    cursor
101        .as_ref()
102        .and_then(|c| c.parse::<usize>().ok())
103        .unwrap_or(0)
104}
105
106fn next_cursor_if_any(current_offset: usize, returned: usize, total: usize) -> Option<String> {
107    let next = current_offset.saturating_add(returned);
108    if next < total {
109        Some(next.to_string())
110    } else {
111        None
112    }
113}
114
115// CreditRecord models moved to crate::models::credit
116
117/// Request parameters for credit listing
118#[derive(Debug, Deserialize)]
119pub struct ListCreditsParams {
120    pub page: Option<u32>,
121    pub page_size: Option<u32>,
122
123    /// Cursor offset for stable paging (optional). If present, `page` is ignored.
124    pub cursor: Option<String>,
125
126    /// Cutoff window (inclusive) for filtering by `last_updated` date.
127    pub cutoff_start: Option<String>,
128    pub cutoff_end: Option<String>,
129
130    /// Mode: "normal" (default) or "stream"
131    pub mode: Option<String>,
132
133    /// Chunk size for streaming (number of records per flush)
134    pub stream_chunk_size: Option<usize>,
135}
136
137/// Request parameters for generating credits (mutating)
138#[derive(Debug, Deserialize)]
139pub struct GenerateCreditsRequest {
140    pub count: u32,
141    pub dirty_ratio: Option<f64>,
142    pub seed: Option<u64>,
143    pub append: Option<bool>,
144}
145
146/// Request parameters for loading specific credits from a file (mutating)
147#[derive(Debug, Deserialize)]
148pub struct LoadCreditsRequest {
149    pub records: Vec<CreditRecord>,
150    pub append: Option<bool>,
151}
152
153/// Request parameters for streaming credits (stateless generator)
154#[derive(Debug, Deserialize)]
155pub struct StreamCreditsParams {
156    pub count: Option<u32>,
157    pub dirty_ratio: Option<f64>,
158    pub seed: Option<u64>,
159    pub batch_size: Option<u32>,
160    pub delay_ms: Option<u64>,
161}
162
163/// Dynamic configuration update request
164#[derive(Debug, Deserialize)]
165pub struct UpdateConfigRequest {
166    pub dirty_ratio: Option<f64>,
167    pub latency_ms: Option<u64>,
168    pub failure_rate: Option<f64>,
169}
170
171// ============================================================================
172// Simulator State
173// ============================================================================
174
175/// Internal state for Core Banking Simulator
176pub struct CoreBankingState {
177    pub config: CoreBankingConfig,
178    pub records: std::collections::HashMap<String, CreditRecord>,
179    pub stats: SimulatorStats,
180    pub started_at: Instant,
181    pub ready: bool,
182
183    // current dataset metadata (for reproducibility)
184    pub dataset_seed: Option<u64>,
185    pub dataset_dirty_ratio: f64,
186}
187
188impl CoreBankingState {
189    pub fn new(config: CoreBankingConfig) -> Self {
190        Self {
191            dataset_seed: config.seed,
192            dataset_dirty_ratio: config.default_dirty_ratio,
193            config,
194            records: std::collections::HashMap::new(),
195            stats: SimulatorStats::default(),
196            started_at: Instant::now(),
197            ready: false,
198        }
199    }
200
201    /// Generate records based on configuration
202    pub fn generate_records(
203        count: u32,
204        dirty_ratio: f64,
205        seed: Option<u64>,
206        start_id: u64,
207    ) -> Vec<CreditRecord> {
208        let mut rng: ChaCha8Rng = if let Some(s) = seed {
209            ChaCha8Rng::seed_from_u64(s)
210        } else {
211            ChaCha8Rng::from_rng(rand::thread_rng()).unwrap()
212        };
213
214        let error_types = [
215            "invalid_nik",
216            "negative_amount",
217            "invalid_date",
218            "missing_field",
219            "invalid_currency",
220            "invalid_collectability",
221            "outstanding_gt_plafon",
222        ];
223
224        let mut records = Vec::with_capacity(count as usize);
225        let error_threshold = (dirty_ratio.clamp(0.0, 1.0) * u64::MAX as f64) as u64;
226
227        for i in 0..count {
228            let mut record = CreditRecord::generate_clean(&mut rng, start_id + i as u64);
229
230            if error_threshold > 0 && rng.gen::<u64>() < error_threshold {
231                let error_type = error_types[rng.gen_range(0..error_types.len())];
232                record = record.inject_error(&mut rng, error_type);
233            }
234
235            records.push(record);
236        }
237
238        records
239    }
240}
241
242// ============================================================================
243// Core Banking Simulator
244// ============================================================================
245
246/// Core Banking Simulator implementation
247pub struct CoreBankingSimulator {
248    state: SharedState<CoreBankingState>,
249    config: CoreBankingConfig,
250}
251
252impl CoreBankingSimulator {
253    pub fn new(config: CoreBankingConfig) -> Self {
254        let state = shared_state(CoreBankingState::new(config.clone()));
255        Self { state, config }
256    }
257
258    pub async fn run(&self, shutdown_rx: oneshot::Receiver<()>) -> SimulatorResult<()> {
259        // Initialize with default records
260        {
261            let (record_count, dirty_ratio, seed) = {
262                let state = self.state.read().await;
263                (
264                    state.config.default_record_count,
265                    state.config.default_dirty_ratio,
266                    state.config.seed,
267                )
268            };
269
270            // Generate records outside the lock (CPU intensive)
271            let records =
272                CoreBankingState::generate_records(record_count, dirty_ratio, seed, 0);
273
274            let mut state = self.state.write().await;
275            for record in records {
276                state.records.insert(record.id.clone(), record);
277            }
278            state.ready = true;
279            state.dataset_seed = seed;
280            state.dataset_dirty_ratio = dirty_ratio;
281        }
282
283        let app = self.create_router();
284        let addr: std::net::SocketAddr = self
285            .config
286            .socket_addr()
287            .parse()
288            .map_err(|e| SimulatorError::ConfigError(format!("Invalid address: {e}")))?;
289
290        tracing::info!("Core Banking Simulator listening on {addr}");
291
292        let listener = tokio::net::TcpListener::bind(addr)
293            .await
294            .map_err(|e| SimulatorError::BindError(e.to_string()))?;
295
296        axum::serve(listener, app)
297            .with_graceful_shutdown(async {
298                let _ = shutdown_rx.await;
299                tracing::info!("Core Banking Simulator shutting down");
300            })
301            .await
302            .map_err(|e| SimulatorError::StartError(e.to_string()))?;
303
304        Ok(())
305    }
306
307    fn create_router(&self) -> Router {
308        let state = self.state.clone();
309
310        Router::new()
311            .route("/health", get(health_handler))
312            .route("/api/v1/credits", get(list_credits_handler).post(list_credits_handler))
313            .route("/api/v1/credits/:id", get(get_credit_handler))
314            .route("/api/v1/credits/count", get(count_credits_handler))
315            .route("/api/v1/credits/generate", post(generate_credits_handler))
316            .route("/api/v1/credits/load", post(load_credits_handler))
317            .route("/api/v1/credits/load-ndjson", post(load_ndjson_handler))
318            .route("/api/v1/credits/stream", get(stream_credits_handler))
319            .route("/api/v1/credits/ndjson", get(ndjson_credits_handler))
320            .route("/api/v1/stats", get(stats_handler))
321            .route("/api/v1/reset", post(reset_handler))
322            .route("/api/v1/config", put(update_config_handler))
323            .layer(DefaultBodyLimit::max(100 * 1024 * 1024))
324            .with_state(state)
325    }
326
327    pub async fn get_records(&self) -> Vec<CreditRecord> {
328        self.state.read().await.records.values().cloned().collect()
329    }
330
331    pub async fn set_records(&self, records: Vec<CreditRecord>) {
332        let mut state = self.state.write().await;
333        state.records.clear();
334        for record in records {
335            state.records.insert(record.id.clone(), record);
336        }
337    }
338}
339
340#[async_trait::async_trait]
341impl Simulator for CoreBankingSimulator {
342    fn name(&self) -> &str {
343        "core-banking"
344    }
345
346    fn port(&self) -> u16 {
347        self.config.port
348    }
349
350    async fn health(&self) -> HealthStatus {
351        let state = self.state.read().await;
352        let uptime = state.started_at.elapsed().as_secs();
353
354        if state.ready {
355            HealthStatus::healthy(self.name(), "1.0.0", uptime)
356                .with_details("record_count", serde_json::json!(state.records.len()))
357                .with_details("dataset_seed", serde_json::json!(state.dataset_seed))
358                .with_details("dataset_dirty_ratio", serde_json::json!(state.dataset_dirty_ratio))
359        } else {
360            HealthStatus::unhealthy(self.name(), "Not ready")
361        }
362    }
363
364    async fn stats(&self) -> SimulatorStats {
365        self.state.read().await.stats.clone()
366    }
367
368    async fn reset_stats(&self) {
369        self.state.write().await.stats = SimulatorStats::default();
370    }
371
372    async fn is_ready(&self) -> bool {
373        self.state.read().await.ready
374    }
375}
376
377// ============================================================================
378// HTTP Handlers
379// ============================================================================
380
381async fn health_handler(State(state): State<SharedState<CoreBankingState>>) -> impl IntoResponse {
382    let state = state.read().await;
383    let uptime = state.started_at.elapsed().as_secs();
384
385    if state.ready {
386        let health = HealthStatus::healthy("core-banking", "1.0.0", uptime)
387            .with_details("record_count", serde_json::json!(state.records.len()))
388            .with_details("dataset_seed", serde_json::json!(state.dataset_seed))
389            .with_details("dataset_dirty_ratio", serde_json::json!(state.dataset_dirty_ratio));
390        (StatusCode::OK, Json(health))
391    } else {
392        let health = HealthStatus::unhealthy("core-banking", "Not ready");
393        (StatusCode::SERVICE_UNAVAILABLE, Json(health))
394    }
395}
396
397/// List credits (READ ONLY) with pagination + cutoff filtering
398async fn list_credits_handler(
399    State(state): State<SharedState<CoreBankingState>>,
400    Query(params): Query<ListCreditsParams>,
401) -> Response {
402    let start = Instant::now();
403
404    // Parse cutoff params early (fail fast)
405    let cutoff_start = match parse_date_opt(&params.cutoff_start) {
406        Ok(v) => v,
407        Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::<Vec<CreditRecord>>::error("BAD_REQUEST", e))).into_response(),
408    };
409    let cutoff_end = match parse_date_opt(&params.cutoff_end) {
410        Ok(v) => v,
411        Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::<Vec<CreditRecord>>::error("BAD_REQUEST", e))).into_response(),
412    };
413
414    // 1. Read lock for config and limits
415    let (failure, latency_config, max_records) = {
416        let state = state.read().await;
417        (
418            state.config.failure_injection.random_failure().cloned(),
419            state.config.latency.clone(),
420            state.config.max_records_per_request,
421        )
422    };
423
424    // Failure injection (optional)
425    if let Some(ref failure) = failure {
426        // Need write lock to record stats
427        let mut state = state.write().await;
428        state
429            .stats
430            .record_request("/api/v1/credits", false, start.elapsed().as_millis() as f64);
431        return match failure {
432            FailureType::InternalError => (
433                StatusCode::INTERNAL_SERVER_ERROR,
434                Json(ApiResponse::<()>::error("INTERNAL_ERROR", "Simulated internal error")),
435            ).into_response(),
436            FailureType::Timeout => {
437                tokio::time::sleep(Duration::from_millis(10000)).await;
438                (
439                    StatusCode::GATEWAY_TIMEOUT,
440                    Json(ApiResponse::<()>::error("TIMEOUT", "Simulated timeout")),
441                ).into_response()
442            }
443            FailureType::ServiceUnavailable => (
444                StatusCode::SERVICE_UNAVAILABLE,
445                Json(ApiResponse::<()>::error(
446                    "SERVICE_UNAVAILABLE",
447                    "Simulated service unavailable",
448                )),
449            ).into_response(),
450            _ => (
451                StatusCode::INTERNAL_SERVER_ERROR,
452                Json(ApiResponse::<()>::error("UNKNOWN", "Unknown failure type")),
453            ).into_response(),
454        };
455    }
456
457    // Latency injection
458    latency_config.apply().await;
459
460    // 2. Data retrieval and response determination
461    let mode = params.mode.as_deref().unwrap_or("normal");
462    let requested_page_size = params.page_size.unwrap_or(100).max(1);
463    
464    // Auto-switch to stream if count > 100,000 or mode is explicit stream
465    if (requested_page_size as u64) > 100_000 || mode == "stream" {
466        return handle_list_stream(state, params, requested_page_size as usize).await.into_response();
467    }
468
469    let (records, total_count, page_size, page, offset) = {
470        let state = state.read().await;
471
472        let page_size = (requested_page_size as u64).min(max_records as u64) as u32;
473
474        tracing::info!(">>> [CORE-BANKING] Processing Credit List - page_size: {}", page_size);
475
476        let (offset, page) = if params.cursor.is_some() {
477            (parse_cursor(&params.cursor), 1u32)
478        } else {
479            let page = params.page.unwrap_or(1).max(1);
480            (((page - 1) * page_size) as usize, page)
481        };
482
483        // Calculate total count (needed for metadata)
484        let total_count = if cutoff_start.is_some() || cutoff_end.is_some() {
485            state
486                .records
487                .values()
488                .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
489                .count() as u64
490        } else {
491            state.records.len() as u64
492        };
493
494        // Get page of records
495        let records: Vec<CreditRecord> = if cutoff_start.is_some() || cutoff_end.is_some() {
496            state
497                .records
498                .values()
499                .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
500                .skip(offset)
501                .take(page_size as usize)
502                .cloned()
503                .collect()
504        } else {
505            state
506                .records
507                .values()
508                .skip(offset)
509                .take(page_size as usize)
510                .cloned()
511                .collect()
512        };
513
514        (records, total_count, page_size, page, offset)
515    };
516
517    let returned = records.len();
518
519    let meta = ResponseMeta::paginated(page, page_size, total_count)
520        .with_timing(start.elapsed().as_millis() as u64)
521        .with_extra(
522            "next_cursor",
523            serde_json::json!(next_cursor_if_any(offset, returned, total_count as usize)),
524        );
525
526    // 3. Write lock only for updating stats
527    {
528        let mut state = state.write().await;
529        state
530            .stats
531            .record_request("/api/v1/credits", true, start.elapsed().as_millis() as f64);
532
533        // Approximate bytes sent to avoid expensive serialization
534        let approx_bytes = (returned * 1024) as u64;
535        state.stats.bytes_sent += approx_bytes;
536    }
537
538    (StatusCode::OK, Json(ApiResponse::success_with_meta(records, meta))).into_response()
539}
540
541/// Helper to handle streaming for GET /api/v1/credits
542async fn handle_list_stream(
543    state: SharedState<CoreBankingState>,
544    params: ListCreditsParams,
545    page_size: usize,
546) -> Response {
547    let chunk_size = params.stream_chunk_size.unwrap_or(1000).max(1);
548    
549    let cutoff_start = parse_date_opt(&params.cutoff_start).ok().flatten();
550    let cutoff_end = parse_date_opt(&params.cutoff_end).ok().flatten();
551
552    let stream = async_stream::stream! {
553        let records: Vec<CreditRecord> = {
554            let state_lock = state.read().await;
555            
556            let offset = if params.cursor.is_some() {
557                parse_cursor(&params.cursor)
558            } else {
559                let page = params.page.unwrap_or(1).max(1);
560                ((page - 1) as usize) * page_size
561            };
562
563            if cutoff_start.is_some() || cutoff_end.is_some() {
564                state_lock.records.values()
565                    .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
566                    .skip(offset)
567                    .take(page_size)
568                    .cloned()
569                    .collect()
570            } else {
571                state_lock.records.values()
572                    .skip(offset)
573                    .take(page_size)
574                    .cloned()
575                    .collect()
576            }
577        };
578
579        let mut current_batch = String::new();
580        let mut count = 0;
581
582        for record in records {
583            if let Ok(line) = serde_json::to_string(&record) {
584                current_batch.push_str(&line);
585                current_batch.push('\n');
586                count += 1;
587
588                if count >= chunk_size {
589                    yield Ok::<_, Infallible>(current_batch.clone());
590                    current_batch.clear();
591                    count = 0;
592                }
593            }
594        }
595
596        if !current_batch.is_empty() {
597            yield Ok::<_, Infallible>(current_batch);
598        }
599    };
600
601    Response::builder()
602        .header(header::CONTENT_TYPE, "application/x-ndjson")
603        .body(Body::from_stream(stream))
604        .unwrap()
605}
606
607/// Get single credit by ID
608async fn get_credit_handler(
609    State(state): State<SharedState<CoreBankingState>>,
610    Path(id): Path<String>,
611) -> impl IntoResponse {
612    let state = state.read().await;
613    if let Some(record) = state.records.get(&id) {
614        (StatusCode::OK, Json(ApiResponse::success(record.clone())))
615    } else {
616        (
617            StatusCode::NOT_FOUND,
618            Json(ApiResponse::<CreditRecord>::error(
619                "NOT_FOUND",
620                "Credit record not found",
621            )),
622        )
623    }
624}
625
626/// Get credit count (optionally filtered by cutoff window)
627async fn count_credits_handler(
628    State(state): State<SharedState<CoreBankingState>>,
629    Query(params): Query<ListCreditsParams>,
630) -> impl IntoResponse {
631    let cutoff_start = match parse_date_opt(&params.cutoff_start) {
632        Ok(v) => v,
633        Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::error("BAD_REQUEST", e))),
634    };
635    let cutoff_end = match parse_date_opt(&params.cutoff_end) {
636        Ok(v) => v,
637        Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::error("BAD_REQUEST", e))),
638    };
639
640    let state = state.read().await;
641    let count = if cutoff_start.is_some() || cutoff_end.is_some() {
642        state
643            .records
644            .values()
645            .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
646            .count()
647    } else {
648        state.records.len()
649    };
650
651    (
652        StatusCode::OK,
653        Json(ApiResponse::success(serde_json::json!({
654            "count": count,
655            "dirty_ratio": state.dataset_dirty_ratio,
656            "seed": state.dataset_seed
657        }))),
658    )
659}
660
661/// Generate new credits (ONLY mutating endpoint)
662async fn generate_credits_handler(
663    State(state): State<SharedState<CoreBankingState>>,
664    Json(request): Json<GenerateCreditsRequest>,
665) -> impl IntoResponse {
666    let start = Instant::now();
667
668    // 1. Read config and current state (lightweight read lock)
669    let (max_records, default_dirty, current_count) = {
670        let state = state.read().await;
671        (
672            state.config.max_records_per_request,
673            state.config.default_dirty_ratio,
674            state.records.len() as u64,
675        )
676    };
677
678    let count = request.count.min(max_records);
679    let dirty_ratio = request
680        .dirty_ratio
681        .unwrap_or(default_dirty)
682        .clamp(0.0, 1.0);
683    let seed = request.seed;
684    let append = request.append.unwrap_or(false);
685    let start_id = if append { current_count } else { 0 };
686
687    // 2. Generate records in blocking thread (CPU heavy, no lock)
688    let new_records = tokio::task::spawn_blocking(move || {
689        CoreBankingState::generate_records(count, dirty_ratio, seed, start_id)
690    })
691    .await
692    .unwrap();
693
694    // 3. Update state (quick write lock)
695    let mut state = state.write().await;
696    if !append {
697        state.records.clear();
698    }
699    // Pre-allocate if appending
700    if append {
701        state.records.reserve(count as usize);
702    }
703
704    for record in new_records {
705        state.records.insert(record.id.clone(), record);
706    }
707
708    let result = serde_json::json!({
709        "generated": count,
710        "dirty_ratio": dirty_ratio,
711        "seed": seed,
712        "total_records": state.records.len()
713    });
714
715    (StatusCode::OK, Json(ApiResponse::success(result)))
716}
717
718/// Load specific credits (ONLY mutating endpoint)
719async fn load_credits_handler(
720    State(state): State<SharedState<CoreBankingState>>,
721    Json(request): Json<LoadCreditsRequest>,
722) -> impl IntoResponse {
723    let append = request.append.unwrap_or(false);
724    let count = request.records.len();
725
726    // Update state (quick write lock)
727    let mut state = state.write().await;
728    if !append {
729        state.records.clear();
730    }
731    // Pre-allocate if appending
732    if append {
733        state.records.reserve(count);
734    }
735
736    for record in request.records {
737        state.records.insert(record.id.clone(), record);
738    }
739
740    let result = serde_json::json!({
741        "loaded": count,
742        "total_records": state.records.len()
743    });
744
745    (StatusCode::OK, Json(ApiResponse::success(result)))
746}
747
748/// Load specific credits from NDJSON stream (high-performance)
749async fn load_ndjson_handler(
750    State(state): State<SharedState<CoreBankingState>>,
751    Query(params): Query<HashMap<String, String>>,
752    body: Body,
753) -> impl IntoResponse {
754    let append = params.get("append").map(|v| v == "true").unwrap_or(false);
755    
756    let stream = body.into_data_stream();
757    let stream = stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
758    let reader = StreamReader::new(stream);
759    let mut lines = tokio::io::BufReader::new(reader).lines();
760    
761    let mut count = 0;
762    let mut records_to_add = Vec::with_capacity(10000);
763    
764    if !append {
765        let mut state_lock = state.write().await;
766        state_lock.records.clear();
767    }
768
769    while let Ok(Some(line)) = lines.next_line().await {
770        let trimmed = line.trim();
771        if trimmed.is_empty() { continue; }
772        if let Ok(record) = serde_json::from_str::<CreditRecord>(trimmed) {
773            records_to_add.push(record);
774            count += 1;
775            
776            if records_to_add.len() >= 10000 {
777                let mut state_lock = state.write().await;
778                for r in records_to_add.drain(..) {
779                    state_lock.records.insert(r.id.clone(), r);
780                }
781            }
782        }
783    }
784    
785    if !records_to_add.is_empty() {
786        let mut state_lock = state.write().await;
787        for r in records_to_add {
788            state_lock.records.insert(r.id.clone(), r);
789        }
790    }
791
792    let total = {
793        let state_lock = state.read().await;
794        state_lock.records.len()
795    };
796
797    let result = serde_json::json!({
798        "loaded": count,
799        "total_records": total
800    });
801
802    (StatusCode::OK, Json(ApiResponse::success(result)))
803}
804
805/// Stream credits using Server-Sent Events (stateless generator)
806async fn stream_credits_handler(
807    State(state): State<SharedState<CoreBankingState>>,
808    Query(params): Query<StreamCreditsParams>,
809) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
810    let count = params.count.unwrap_or(1000);
811    let dirty_ratio = params.dirty_ratio.unwrap_or(0.0).clamp(0.0, 1.0);
812    let seed = params.seed;
813    let batch_size = params.batch_size.unwrap_or(100).max(1);
814    let delay_ms = params.delay_ms.unwrap_or(10);
815
816    let stream = async_stream::stream! {
817        let mut rng: ChaCha8Rng = match seed {
818            Some(s) => ChaCha8Rng::seed_from_u64(s),
819            None => ChaCha8Rng::from_entropy(),
820        };
821
822        let error_types = [
823            "invalid_nik",
824            "negative_amount",
825            "invalid_date",
826            "missing_field",
827            "invalid_currency",
828            "invalid_collectability",
829            "outstanding_gt_plafon",
830        ];
831
832        let mut batch = Vec::with_capacity(batch_size as usize);
833        let mut generated = 0u32;
834
835        while generated < count {
836            let mut record = CreditRecord::generate_clean(&mut rng, generated as u64);
837            if dirty_ratio > 0.0 && rng.gen::<f64>() < dirty_ratio {
838                let error_type = error_types[rng.gen_range(0..error_types.len())];
839                record = record.inject_error(&mut rng, error_type);
840            }
841
842            batch.push(record);
843            generated += 1;
844
845            if batch.len() >= batch_size as usize || generated >= count {
846                let data = serde_json::to_string(&batch).unwrap_or_default();
847                yield Ok(Event::default().event("records").data(data));
848                batch.clear();
849
850                if delay_ms > 0 {
851                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
852                }
853            }
854        }
855
856        yield Ok(Event::default().event("complete").data(format!(r#"{{\"total\":{}}}"#, generated)));
857
858        let mut state = state.write().await;
859        state.stats.record_request("/api/v1/credits/stream", true, 0.0);
860    };
861
862    Sse::new(stream)
863}
864
865/// Stream credits in NDJSON (stateless generator)
866async fn ndjson_credits_handler(
867    State(state): State<SharedState<CoreBankingState>>,
868    Query(params): Query<StreamCreditsParams>,
869) -> Response {
870    let count = params.count.unwrap_or(1000);
871    let dirty_ratio = params.dirty_ratio.unwrap_or(0.0).clamp(0.0, 1.0);
872    let seed = params.seed;
873    let delay_ms = params.delay_ms.unwrap_or(0);
874
875    let stream = async_stream::stream! {
876        let mut rng: ChaCha8Rng = match seed {
877            Some(s) => ChaCha8Rng::seed_from_u64(s),
878            None => ChaCha8Rng::from_entropy(),
879        };
880
881        let error_types = [
882            "invalid_nik",
883            "negative_amount",
884            "invalid_date",
885            "missing_field",
886            "invalid_currency",
887            "invalid_collectability",
888            "outstanding_gt_plafon",
889        ];
890
891        for i in 0..count {
892            let mut record = CreditRecord::generate_clean(&mut rng, i as u64);
893            if dirty_ratio > 0.0 && rng.gen::<f64>() < dirty_ratio {
894                let error_type = error_types[rng.gen_range(0..error_types.len())];
895                record = record.inject_error(&mut rng, error_type);
896            }
897
898            let line = match serde_json::to_string(&record) {
899                Ok(s) => s,
900                Err(_) => "{}".to_string(),
901            };
902
903            yield Ok::<_, Infallible>(format!("{}\n", line));
904
905            if delay_ms > 0 {
906                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
907            }
908        }
909
910        // update stats (best effort)
911        let mut st = state.write().await;
912        st.stats.record_request("/api/v1/credits/ndjson", true, 0.0);
913    };
914
915    let body = Body::from_stream(stream);
916    let mut headers = HeaderMap::new();
917    headers.insert(header::CONTENT_TYPE, "application/x-ndjson".parse().unwrap());
918
919    (StatusCode::OK, headers, body).into_response()
920}
921
922/// Get simulator statistics
923async fn stats_handler(State(state): State<SharedState<CoreBankingState>>) -> impl IntoResponse {
924    let state = state.read().await;
925    Json(ApiResponse::success(state.stats.clone()))
926}
927
928/// Reset simulator state
929async fn reset_handler(State(state): State<SharedState<CoreBankingState>>) -> impl IntoResponse {
930    let mut state = state.write().await;
931    state.stats = SimulatorStats::default();
932    let record_count = state.config.default_record_count;
933    let dirty_ratio = state.config.default_dirty_ratio;
934    let seed = state.config.seed;
935
936    let records = CoreBankingState::generate_records(record_count, dirty_ratio, seed, 0);
937    state.records.clear();
938    for record in records {
939        state.records.insert(record.id.clone(), record);
940    }
941    state.dataset_seed = seed;
942    state.dataset_dirty_ratio = dirty_ratio;
943
944    Json(ApiResponse::success(serde_json::json!({
945        "reset": true,
946        "record_count": state.records.len(),
947        "dirty_ratio": state.dataset_dirty_ratio,
948        "seed": state.dataset_seed
949    })))
950}
951
952/// Update configuration dynamically
953async fn update_config_handler(
954    State(state): State<SharedState<CoreBankingState>>,
955    Json(request): Json<UpdateConfigRequest>,
956) -> impl IntoResponse {
957    let mut state = state.write().await;
958
959    if let Some(dirty_ratio) = request.dirty_ratio {
960        state.config.default_dirty_ratio = dirty_ratio.clamp(0.0, 1.0);
961    }
962
963    if let Some(latency_ms) = request.latency_ms {
964        state.config.latency = LatencyConfig {
965            enabled: latency_ms > 0,
966            base_ms: latency_ms,
967            jitter_ms: latency_ms / 10,
968            percentiles: None,
969        };
970    }
971
972    if let Some(failure_rate) = request.failure_rate {
973        state.config.failure_injection.enabled = failure_rate > 0.0;
974        state.config.failure_injection.failure_rate = failure_rate.clamp(0.0, 1.0);
975    }
976
977    Json(ApiResponse::success(serde_json::json!({
978        "updated": true,
979        "dirty_ratio": state.config.default_dirty_ratio,
980        "latency_enabled": state.config.latency.enabled,
981        "failure_rate": state.config.failure_injection.failure_rate
982    })))
983}
984
985// ============================================================================
986// Tests
987// ============================================================================
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992
993    #[test]
994    fn test_credit_record_generate_clean() {
995        let mut rng = ChaCha8Rng::seed_from_u64(42);
996        let record = CreditRecord::generate_clean(&mut rng, 1);
997
998        assert_eq!(record.id, "CR0000000001");
999        assert_eq!(record.nik.len(), 16);
1000        assert!(record.nik.chars().all(|c| c.is_numeric()));
1001        assert!(!record.nama_lengkap.is_empty());
1002        assert_eq!(record.mata_uang, "IDR");
1003        assert!(record.jumlah_kredit > 0);
1004        assert!(record.saldo_outstanding <= record.jumlah_kredit);
1005        assert!((1..=5).contains(&record.kolektabilitas));
1006        assert!(record._has_error.is_none());
1007    }
1008
1009    #[test]
1010    fn test_credit_record_inject_invalid_nik() {
1011        let mut rng = ChaCha8Rng::seed_from_u64(42);
1012        let record = CreditRecord::generate_clean(&mut rng, 1).inject_error(&mut rng, "invalid_nik");
1013
1014        assert_eq!(record._has_error, Some(true));
1015        assert_eq!(record._error_type, Some("invalid_nik".to_string()));
1016        assert!(record.nik.len() != 16 || !record.nik.chars().all(|c| c.is_numeric()));
1017    }
1018
1019    #[test]
1020    fn test_credit_record_inject_negative_amount() {
1021        let mut rng = ChaCha8Rng::seed_from_u64(42);
1022        let record = CreditRecord::generate_clean(&mut rng, 1).inject_error(&mut rng, "negative_amount");
1023
1024        assert_eq!(record._has_error, Some(true));
1025        assert!(record.jumlah_kredit < 0);
1026    }
1027
1028    #[test]
1029    fn test_credit_record_inject_invalid_date() {
1030        let mut rng = ChaCha8Rng::seed_from_u64(42);
1031        let record = CreditRecord::generate_clean(&mut rng, 1).inject_error(&mut rng, "invalid_date");
1032
1033        assert_eq!(record._has_error, Some(true));
1034        let invalid_start = record.tanggal_mulai == "2024-13-45" || record.tanggal_mulai == "not-a-date";
1035        let invalid_end = record.tanggal_jatuh_tempo.is_empty();
1036        assert!(invalid_start || invalid_end);
1037    }
1038
1039    #[test]
1040    fn test_state_generate_records_deterministic() {
1041        let config = CoreBankingConfig::default();
1042
1043        let mut state1 = CoreBankingState::new(config.clone());
1044        let records1 = CoreBankingState::generate_records(100, 0.1, Some(12345), 0);
1045        for r in records1 {
1046            state1.records.insert(r.id.clone(), r);
1047        }
1048
1049        let mut state2 = CoreBankingState::new(config);
1050        let records2 = CoreBankingState::generate_records(100, 0.1, Some(12345), 0);
1051        for r in records2 {
1052            state2.records.insert(r.id.clone(), r);
1053        }
1054
1055        assert_eq!(state1.records.len(), state2.records.len());
1056        for (id, r1) in &state1.records {
1057            let r2 = state2.records.get(id).expect("Record missing in state2");
1058            assert_eq!(r1.id, r2.id);
1059            assert_eq!(r1.nik, r2.nik);
1060            assert_eq!(r1.jumlah_kredit, r2.jumlah_kredit);
1061            assert_eq!(r1._has_error, r2._has_error);
1062        }
1063    }
1064
1065    #[test]
1066    fn test_within_cutoff() {
1067        let ts = "2024-01-15T10:00:00Z";
1068        assert!(within_cutoff(ts, Some(NaiveDate::from_ymd_opt(2024,1,1).unwrap()), Some(NaiveDate::from_ymd_opt(2024,1,31).unwrap())));
1069        assert!(!within_cutoff(ts, Some(NaiveDate::from_ymd_opt(2024,2,1).unwrap()), None));
1070    }
1071}