1use crate::{
24 config::{CoreBankingConfig, FailureType, LatencyConfig},
25 models::credit::CreditRecord,
26 shared_state, ApiResponse, HealthStatus, ResponseMeta, SharedState, Simulator,
27 SimulatorError, SimulatorResult, SimulatorStats,
28};
29
30use axum::{
31 body::Body,
32 extract::{DefaultBodyLimit, Path, Query, State},
33 http::{header, HeaderMap, StatusCode},
34 response::{sse::Event, IntoResponse, Json, Response, Sse},
35 routing::{get, post, put},
36 Router,
37};
38
39use chrono::{NaiveDate, NaiveDateTime, Utc};
40use futures::stream::Stream;
41use futures::StreamExt;
42use rand::{Rng, SeedableRng};
43use rand_chacha::ChaCha8Rng;
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46use tokio_util::io::StreamReader;
47use tokio::io::AsyncBufReadExt;
48use std::{
49 convert::Infallible,
50 time::{Duration, Instant},
51};
52use tokio::sync::oneshot;
53
54fn parse_date_opt(s: &Option<String>) -> Result<Option<NaiveDate>, &'static str> {
59 if let Some(v) = s {
60 let d = NaiveDate::parse_from_str(v, "%Y-%m-%d").map_err(|_| "invalid date format")?;
61 Ok(Some(d))
62 } else {
63 Ok(None)
64 }
65}
66
67fn parse_dt_opt(s: &Option<String>) -> Result<Option<NaiveDateTime>, &'static str> {
68 if let Some(v) = s {
69 let dt = chrono::DateTime::parse_from_rfc3339(v)
71 .map_err(|_| "invalid datetime format")?
72 .naive_utc();
73 Ok(Some(dt))
74 } else {
75 Ok(None)
76 }
77}
78
79fn within_cutoff(last_updated_rfc3339: &str, start: Option<NaiveDate>, end: Option<NaiveDate>) -> bool {
80 let Ok(dt) = chrono::DateTime::parse_from_rfc3339(last_updated_rfc3339) else {
82 return false;
83 };
84 let d = dt.date_naive();
85 if let Some(s) = start {
86 if d < s {
87 return false;
88 }
89 }
90 if let Some(e) = end {
91 if d > e {
92 return false;
93 }
94 }
95 true
96}
97
98fn parse_cursor(cursor: &Option<String>) -> usize {
100 cursor
101 .as_ref()
102 .and_then(|c| c.parse::<usize>().ok())
103 .unwrap_or(0)
104}
105
106fn next_cursor_if_any(current_offset: usize, returned: usize, total: usize) -> Option<String> {
107 let next = current_offset.saturating_add(returned);
108 if next < total {
109 Some(next.to_string())
110 } else {
111 None
112 }
113}
114
115#[derive(Debug, Deserialize)]
119pub struct ListCreditsParams {
120 pub page: Option<u32>,
121 pub page_size: Option<u32>,
122
123 pub cursor: Option<String>,
125
126 pub cutoff_start: Option<String>,
128 pub cutoff_end: Option<String>,
129
130 pub mode: Option<String>,
132
133 pub stream_chunk_size: Option<usize>,
135}
136
137#[derive(Debug, Deserialize)]
139pub struct GenerateCreditsRequest {
140 pub count: u32,
141 pub dirty_ratio: Option<f64>,
142 pub seed: Option<u64>,
143 pub append: Option<bool>,
144}
145
146#[derive(Debug, Deserialize)]
148pub struct LoadCreditsRequest {
149 pub records: Vec<CreditRecord>,
150 pub append: Option<bool>,
151}
152
153#[derive(Debug, Deserialize)]
155pub struct StreamCreditsParams {
156 pub count: Option<u32>,
157 pub dirty_ratio: Option<f64>,
158 pub seed: Option<u64>,
159 pub batch_size: Option<u32>,
160 pub delay_ms: Option<u64>,
161}
162
163#[derive(Debug, Deserialize)]
165pub struct UpdateConfigRequest {
166 pub dirty_ratio: Option<f64>,
167 pub latency_ms: Option<u64>,
168 pub failure_rate: Option<f64>,
169}
170
171pub struct CoreBankingState {
177 pub config: CoreBankingConfig,
178 pub records: std::collections::HashMap<String, CreditRecord>,
179 pub stats: SimulatorStats,
180 pub started_at: Instant,
181 pub ready: bool,
182
183 pub dataset_seed: Option<u64>,
185 pub dataset_dirty_ratio: f64,
186}
187
188impl CoreBankingState {
189 pub fn new(config: CoreBankingConfig) -> Self {
190 Self {
191 dataset_seed: config.seed,
192 dataset_dirty_ratio: config.default_dirty_ratio,
193 config,
194 records: std::collections::HashMap::new(),
195 stats: SimulatorStats::default(),
196 started_at: Instant::now(),
197 ready: false,
198 }
199 }
200
201 pub fn generate_records(
203 count: u32,
204 dirty_ratio: f64,
205 seed: Option<u64>,
206 start_id: u64,
207 ) -> Vec<CreditRecord> {
208 let mut rng: ChaCha8Rng = if let Some(s) = seed {
209 ChaCha8Rng::seed_from_u64(s)
210 } else {
211 ChaCha8Rng::from_rng(rand::thread_rng()).unwrap()
212 };
213
214 let error_types = [
215 "invalid_nik",
216 "negative_amount",
217 "invalid_date",
218 "missing_field",
219 "invalid_currency",
220 "invalid_collectability",
221 "outstanding_gt_plafon",
222 ];
223
224 let mut records = Vec::with_capacity(count as usize);
225 let error_threshold = (dirty_ratio.clamp(0.0, 1.0) * u64::MAX as f64) as u64;
226
227 for i in 0..count {
228 let mut record = CreditRecord::generate_clean(&mut rng, start_id + i as u64);
229
230 if error_threshold > 0 && rng.gen::<u64>() < error_threshold {
231 let error_type = error_types[rng.gen_range(0..error_types.len())];
232 record = record.inject_error(&mut rng, error_type);
233 }
234
235 records.push(record);
236 }
237
238 records
239 }
240}
241
242pub struct CoreBankingSimulator {
248 state: SharedState<CoreBankingState>,
249 config: CoreBankingConfig,
250}
251
252impl CoreBankingSimulator {
253 pub fn new(config: CoreBankingConfig) -> Self {
254 let state = shared_state(CoreBankingState::new(config.clone()));
255 Self { state, config }
256 }
257
258 pub async fn run(&self, shutdown_rx: oneshot::Receiver<()>) -> SimulatorResult<()> {
259 {
261 let (record_count, dirty_ratio, seed) = {
262 let state = self.state.read().await;
263 (
264 state.config.default_record_count,
265 state.config.default_dirty_ratio,
266 state.config.seed,
267 )
268 };
269
270 let records =
272 CoreBankingState::generate_records(record_count, dirty_ratio, seed, 0);
273
274 let mut state = self.state.write().await;
275 for record in records {
276 state.records.insert(record.id.clone(), record);
277 }
278 state.ready = true;
279 state.dataset_seed = seed;
280 state.dataset_dirty_ratio = dirty_ratio;
281 }
282
283 let app = self.create_router();
284 let addr: std::net::SocketAddr = self
285 .config
286 .socket_addr()
287 .parse()
288 .map_err(|e| SimulatorError::ConfigError(format!("Invalid address: {e}")))?;
289
290 tracing::info!("Core Banking Simulator listening on {addr}");
291
292 let listener = tokio::net::TcpListener::bind(addr)
293 .await
294 .map_err(|e| SimulatorError::BindError(e.to_string()))?;
295
296 axum::serve(listener, app)
297 .with_graceful_shutdown(async {
298 let _ = shutdown_rx.await;
299 tracing::info!("Core Banking Simulator shutting down");
300 })
301 .await
302 .map_err(|e| SimulatorError::StartError(e.to_string()))?;
303
304 Ok(())
305 }
306
307 fn create_router(&self) -> Router {
308 let state = self.state.clone();
309
310 Router::new()
311 .route("/health", get(health_handler))
312 .route("/api/v1/credits", get(list_credits_handler).post(list_credits_handler))
313 .route("/api/v1/credits/:id", get(get_credit_handler))
314 .route("/api/v1/credits/count", get(count_credits_handler))
315 .route("/api/v1/credits/generate", post(generate_credits_handler))
316 .route("/api/v1/credits/load", post(load_credits_handler))
317 .route("/api/v1/credits/load-ndjson", post(load_ndjson_handler))
318 .route("/api/v1/credits/stream", get(stream_credits_handler))
319 .route("/api/v1/credits/ndjson", get(ndjson_credits_handler))
320 .route("/api/v1/stats", get(stats_handler))
321 .route("/api/v1/reset", post(reset_handler))
322 .route("/api/v1/config", put(update_config_handler))
323 .layer(DefaultBodyLimit::max(100 * 1024 * 1024))
324 .with_state(state)
325 }
326
327 pub async fn get_records(&self) -> Vec<CreditRecord> {
328 self.state.read().await.records.values().cloned().collect()
329 }
330
331 pub async fn set_records(&self, records: Vec<CreditRecord>) {
332 let mut state = self.state.write().await;
333 state.records.clear();
334 for record in records {
335 state.records.insert(record.id.clone(), record);
336 }
337 }
338}
339
340#[async_trait::async_trait]
341impl Simulator for CoreBankingSimulator {
342 fn name(&self) -> &str {
343 "core-banking"
344 }
345
346 fn port(&self) -> u16 {
347 self.config.port
348 }
349
350 async fn health(&self) -> HealthStatus {
351 let state = self.state.read().await;
352 let uptime = state.started_at.elapsed().as_secs();
353
354 if state.ready {
355 HealthStatus::healthy(self.name(), "1.0.0", uptime)
356 .with_details("record_count", serde_json::json!(state.records.len()))
357 .with_details("dataset_seed", serde_json::json!(state.dataset_seed))
358 .with_details("dataset_dirty_ratio", serde_json::json!(state.dataset_dirty_ratio))
359 } else {
360 HealthStatus::unhealthy(self.name(), "Not ready")
361 }
362 }
363
364 async fn stats(&self) -> SimulatorStats {
365 self.state.read().await.stats.clone()
366 }
367
368 async fn reset_stats(&self) {
369 self.state.write().await.stats = SimulatorStats::default();
370 }
371
372 async fn is_ready(&self) -> bool {
373 self.state.read().await.ready
374 }
375}
376
377async fn health_handler(State(state): State<SharedState<CoreBankingState>>) -> impl IntoResponse {
382 let state = state.read().await;
383 let uptime = state.started_at.elapsed().as_secs();
384
385 if state.ready {
386 let health = HealthStatus::healthy("core-banking", "1.0.0", uptime)
387 .with_details("record_count", serde_json::json!(state.records.len()))
388 .with_details("dataset_seed", serde_json::json!(state.dataset_seed))
389 .with_details("dataset_dirty_ratio", serde_json::json!(state.dataset_dirty_ratio));
390 (StatusCode::OK, Json(health))
391 } else {
392 let health = HealthStatus::unhealthy("core-banking", "Not ready");
393 (StatusCode::SERVICE_UNAVAILABLE, Json(health))
394 }
395}
396
397async fn list_credits_handler(
399 State(state): State<SharedState<CoreBankingState>>,
400 Query(params): Query<ListCreditsParams>,
401) -> Response {
402 let start = Instant::now();
403
404 let cutoff_start = match parse_date_opt(¶ms.cutoff_start) {
406 Ok(v) => v,
407 Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::<Vec<CreditRecord>>::error("BAD_REQUEST", e))).into_response(),
408 };
409 let cutoff_end = match parse_date_opt(¶ms.cutoff_end) {
410 Ok(v) => v,
411 Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::<Vec<CreditRecord>>::error("BAD_REQUEST", e))).into_response(),
412 };
413
414 let (failure, latency_config, max_records) = {
416 let state = state.read().await;
417 (
418 state.config.failure_injection.random_failure().cloned(),
419 state.config.latency.clone(),
420 state.config.max_records_per_request,
421 )
422 };
423
424 if let Some(ref failure) = failure {
426 let mut state = state.write().await;
428 state
429 .stats
430 .record_request("/api/v1/credits", false, start.elapsed().as_millis() as f64);
431 return match failure {
432 FailureType::InternalError => (
433 StatusCode::INTERNAL_SERVER_ERROR,
434 Json(ApiResponse::<()>::error("INTERNAL_ERROR", "Simulated internal error")),
435 ).into_response(),
436 FailureType::Timeout => {
437 tokio::time::sleep(Duration::from_millis(10000)).await;
438 (
439 StatusCode::GATEWAY_TIMEOUT,
440 Json(ApiResponse::<()>::error("TIMEOUT", "Simulated timeout")),
441 ).into_response()
442 }
443 FailureType::ServiceUnavailable => (
444 StatusCode::SERVICE_UNAVAILABLE,
445 Json(ApiResponse::<()>::error(
446 "SERVICE_UNAVAILABLE",
447 "Simulated service unavailable",
448 )),
449 ).into_response(),
450 _ => (
451 StatusCode::INTERNAL_SERVER_ERROR,
452 Json(ApiResponse::<()>::error("UNKNOWN", "Unknown failure type")),
453 ).into_response(),
454 };
455 }
456
457 latency_config.apply().await;
459
460 let mode = params.mode.as_deref().unwrap_or("normal");
462 let requested_page_size = params.page_size.unwrap_or(100).max(1);
463
464 if (requested_page_size as u64) > 100_000 || mode == "stream" {
466 return handle_list_stream(state, params, requested_page_size as usize).await.into_response();
467 }
468
469 let (records, total_count, page_size, page, offset) = {
470 let state = state.read().await;
471
472 let page_size = (requested_page_size as u64).min(max_records as u64) as u32;
473
474 tracing::info!(">>> [CORE-BANKING] Processing Credit List - page_size: {}", page_size);
475
476 let (offset, page) = if params.cursor.is_some() {
477 (parse_cursor(¶ms.cursor), 1u32)
478 } else {
479 let page = params.page.unwrap_or(1).max(1);
480 (((page - 1) * page_size) as usize, page)
481 };
482
483 let total_count = if cutoff_start.is_some() || cutoff_end.is_some() {
485 state
486 .records
487 .values()
488 .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
489 .count() as u64
490 } else {
491 state.records.len() as u64
492 };
493
494 let records: Vec<CreditRecord> = if cutoff_start.is_some() || cutoff_end.is_some() {
496 state
497 .records
498 .values()
499 .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
500 .skip(offset)
501 .take(page_size as usize)
502 .cloned()
503 .collect()
504 } else {
505 state
506 .records
507 .values()
508 .skip(offset)
509 .take(page_size as usize)
510 .cloned()
511 .collect()
512 };
513
514 (records, total_count, page_size, page, offset)
515 };
516
517 let returned = records.len();
518
519 let meta = ResponseMeta::paginated(page, page_size, total_count)
520 .with_timing(start.elapsed().as_millis() as u64)
521 .with_extra(
522 "next_cursor",
523 serde_json::json!(next_cursor_if_any(offset, returned, total_count as usize)),
524 );
525
526 {
528 let mut state = state.write().await;
529 state
530 .stats
531 .record_request("/api/v1/credits", true, start.elapsed().as_millis() as f64);
532
533 let approx_bytes = (returned * 1024) as u64;
535 state.stats.bytes_sent += approx_bytes;
536 }
537
538 (StatusCode::OK, Json(ApiResponse::success_with_meta(records, meta))).into_response()
539}
540
541async fn handle_list_stream(
543 state: SharedState<CoreBankingState>,
544 params: ListCreditsParams,
545 page_size: usize,
546) -> Response {
547 let chunk_size = params.stream_chunk_size.unwrap_or(1000).max(1);
548
549 let cutoff_start = parse_date_opt(¶ms.cutoff_start).ok().flatten();
550 let cutoff_end = parse_date_opt(¶ms.cutoff_end).ok().flatten();
551
552 let stream = async_stream::stream! {
553 let records: Vec<CreditRecord> = {
554 let state_lock = state.read().await;
555
556 let offset = if params.cursor.is_some() {
557 parse_cursor(¶ms.cursor)
558 } else {
559 let page = params.page.unwrap_or(1).max(1);
560 ((page - 1) as usize) * page_size
561 };
562
563 if cutoff_start.is_some() || cutoff_end.is_some() {
564 state_lock.records.values()
565 .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
566 .skip(offset)
567 .take(page_size)
568 .cloned()
569 .collect()
570 } else {
571 state_lock.records.values()
572 .skip(offset)
573 .take(page_size)
574 .cloned()
575 .collect()
576 }
577 };
578
579 let mut current_batch = String::new();
580 let mut count = 0;
581
582 for record in records {
583 if let Ok(line) = serde_json::to_string(&record) {
584 current_batch.push_str(&line);
585 current_batch.push('\n');
586 count += 1;
587
588 if count >= chunk_size {
589 yield Ok::<_, Infallible>(current_batch.clone());
590 current_batch.clear();
591 count = 0;
592 }
593 }
594 }
595
596 if !current_batch.is_empty() {
597 yield Ok::<_, Infallible>(current_batch);
598 }
599 };
600
601 Response::builder()
602 .header(header::CONTENT_TYPE, "application/x-ndjson")
603 .body(Body::from_stream(stream))
604 .unwrap()
605}
606
607async fn get_credit_handler(
609 State(state): State<SharedState<CoreBankingState>>,
610 Path(id): Path<String>,
611) -> impl IntoResponse {
612 let state = state.read().await;
613 if let Some(record) = state.records.get(&id) {
614 (StatusCode::OK, Json(ApiResponse::success(record.clone())))
615 } else {
616 (
617 StatusCode::NOT_FOUND,
618 Json(ApiResponse::<CreditRecord>::error(
619 "NOT_FOUND",
620 "Credit record not found",
621 )),
622 )
623 }
624}
625
626async fn count_credits_handler(
628 State(state): State<SharedState<CoreBankingState>>,
629 Query(params): Query<ListCreditsParams>,
630) -> impl IntoResponse {
631 let cutoff_start = match parse_date_opt(¶ms.cutoff_start) {
632 Ok(v) => v,
633 Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::error("BAD_REQUEST", e))),
634 };
635 let cutoff_end = match parse_date_opt(¶ms.cutoff_end) {
636 Ok(v) => v,
637 Err(e) => return (StatusCode::BAD_REQUEST, Json(ApiResponse::error("BAD_REQUEST", e))),
638 };
639
640 let state = state.read().await;
641 let count = if cutoff_start.is_some() || cutoff_end.is_some() {
642 state
643 .records
644 .values()
645 .filter(|r| within_cutoff(&r.last_updated, cutoff_start, cutoff_end))
646 .count()
647 } else {
648 state.records.len()
649 };
650
651 (
652 StatusCode::OK,
653 Json(ApiResponse::success(serde_json::json!({
654 "count": count,
655 "dirty_ratio": state.dataset_dirty_ratio,
656 "seed": state.dataset_seed
657 }))),
658 )
659}
660
661async fn generate_credits_handler(
663 State(state): State<SharedState<CoreBankingState>>,
664 Json(request): Json<GenerateCreditsRequest>,
665) -> impl IntoResponse {
666 let start = Instant::now();
667
668 let (max_records, default_dirty, current_count) = {
670 let state = state.read().await;
671 (
672 state.config.max_records_per_request,
673 state.config.default_dirty_ratio,
674 state.records.len() as u64,
675 )
676 };
677
678 let count = request.count.min(max_records);
679 let dirty_ratio = request
680 .dirty_ratio
681 .unwrap_or(default_dirty)
682 .clamp(0.0, 1.0);
683 let seed = request.seed;
684 let append = request.append.unwrap_or(false);
685 let start_id = if append { current_count } else { 0 };
686
687 let new_records = tokio::task::spawn_blocking(move || {
689 CoreBankingState::generate_records(count, dirty_ratio, seed, start_id)
690 })
691 .await
692 .unwrap();
693
694 let mut state = state.write().await;
696 if !append {
697 state.records.clear();
698 }
699 if append {
701 state.records.reserve(count as usize);
702 }
703
704 for record in new_records {
705 state.records.insert(record.id.clone(), record);
706 }
707
708 let result = serde_json::json!({
709 "generated": count,
710 "dirty_ratio": dirty_ratio,
711 "seed": seed,
712 "total_records": state.records.len()
713 });
714
715 (StatusCode::OK, Json(ApiResponse::success(result)))
716}
717
718async fn load_credits_handler(
720 State(state): State<SharedState<CoreBankingState>>,
721 Json(request): Json<LoadCreditsRequest>,
722) -> impl IntoResponse {
723 let append = request.append.unwrap_or(false);
724 let count = request.records.len();
725
726 let mut state = state.write().await;
728 if !append {
729 state.records.clear();
730 }
731 if append {
733 state.records.reserve(count);
734 }
735
736 for record in request.records {
737 state.records.insert(record.id.clone(), record);
738 }
739
740 let result = serde_json::json!({
741 "loaded": count,
742 "total_records": state.records.len()
743 });
744
745 (StatusCode::OK, Json(ApiResponse::success(result)))
746}
747
748async fn load_ndjson_handler(
750 State(state): State<SharedState<CoreBankingState>>,
751 Query(params): Query<HashMap<String, String>>,
752 body: Body,
753) -> impl IntoResponse {
754 let append = params.get("append").map(|v| v == "true").unwrap_or(false);
755
756 let stream = body.into_data_stream();
757 let stream = stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
758 let reader = StreamReader::new(stream);
759 let mut lines = tokio::io::BufReader::new(reader).lines();
760
761 let mut count = 0;
762 let mut records_to_add = Vec::with_capacity(10000);
763
764 if !append {
765 let mut state_lock = state.write().await;
766 state_lock.records.clear();
767 }
768
769 while let Ok(Some(line)) = lines.next_line().await {
770 let trimmed = line.trim();
771 if trimmed.is_empty() { continue; }
772 if let Ok(record) = serde_json::from_str::<CreditRecord>(trimmed) {
773 records_to_add.push(record);
774 count += 1;
775
776 if records_to_add.len() >= 10000 {
777 let mut state_lock = state.write().await;
778 for r in records_to_add.drain(..) {
779 state_lock.records.insert(r.id.clone(), r);
780 }
781 }
782 }
783 }
784
785 if !records_to_add.is_empty() {
786 let mut state_lock = state.write().await;
787 for r in records_to_add {
788 state_lock.records.insert(r.id.clone(), r);
789 }
790 }
791
792 let total = {
793 let state_lock = state.read().await;
794 state_lock.records.len()
795 };
796
797 let result = serde_json::json!({
798 "loaded": count,
799 "total_records": total
800 });
801
802 (StatusCode::OK, Json(ApiResponse::success(result)))
803}
804
805async fn stream_credits_handler(
807 State(state): State<SharedState<CoreBankingState>>,
808 Query(params): Query<StreamCreditsParams>,
809) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
810 let count = params.count.unwrap_or(1000);
811 let dirty_ratio = params.dirty_ratio.unwrap_or(0.0).clamp(0.0, 1.0);
812 let seed = params.seed;
813 let batch_size = params.batch_size.unwrap_or(100).max(1);
814 let delay_ms = params.delay_ms.unwrap_or(10);
815
816 let stream = async_stream::stream! {
817 let mut rng: ChaCha8Rng = match seed {
818 Some(s) => ChaCha8Rng::seed_from_u64(s),
819 None => ChaCha8Rng::from_entropy(),
820 };
821
822 let error_types = [
823 "invalid_nik",
824 "negative_amount",
825 "invalid_date",
826 "missing_field",
827 "invalid_currency",
828 "invalid_collectability",
829 "outstanding_gt_plafon",
830 ];
831
832 let mut batch = Vec::with_capacity(batch_size as usize);
833 let mut generated = 0u32;
834
835 while generated < count {
836 let mut record = CreditRecord::generate_clean(&mut rng, generated as u64);
837 if dirty_ratio > 0.0 && rng.gen::<f64>() < dirty_ratio {
838 let error_type = error_types[rng.gen_range(0..error_types.len())];
839 record = record.inject_error(&mut rng, error_type);
840 }
841
842 batch.push(record);
843 generated += 1;
844
845 if batch.len() >= batch_size as usize || generated >= count {
846 let data = serde_json::to_string(&batch).unwrap_or_default();
847 yield Ok(Event::default().event("records").data(data));
848 batch.clear();
849
850 if delay_ms > 0 {
851 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
852 }
853 }
854 }
855
856 yield Ok(Event::default().event("complete").data(format!(r#"{{\"total\":{}}}"#, generated)));
857
858 let mut state = state.write().await;
859 state.stats.record_request("/api/v1/credits/stream", true, 0.0);
860 };
861
862 Sse::new(stream)
863}
864
865async fn ndjson_credits_handler(
867 State(state): State<SharedState<CoreBankingState>>,
868 Query(params): Query<StreamCreditsParams>,
869) -> Response {
870 let count = params.count.unwrap_or(1000);
871 let dirty_ratio = params.dirty_ratio.unwrap_or(0.0).clamp(0.0, 1.0);
872 let seed = params.seed;
873 let delay_ms = params.delay_ms.unwrap_or(0);
874
875 let stream = async_stream::stream! {
876 let mut rng: ChaCha8Rng = match seed {
877 Some(s) => ChaCha8Rng::seed_from_u64(s),
878 None => ChaCha8Rng::from_entropy(),
879 };
880
881 let error_types = [
882 "invalid_nik",
883 "negative_amount",
884 "invalid_date",
885 "missing_field",
886 "invalid_currency",
887 "invalid_collectability",
888 "outstanding_gt_plafon",
889 ];
890
891 for i in 0..count {
892 let mut record = CreditRecord::generate_clean(&mut rng, i as u64);
893 if dirty_ratio > 0.0 && rng.gen::<f64>() < dirty_ratio {
894 let error_type = error_types[rng.gen_range(0..error_types.len())];
895 record = record.inject_error(&mut rng, error_type);
896 }
897
898 let line = match serde_json::to_string(&record) {
899 Ok(s) => s,
900 Err(_) => "{}".to_string(),
901 };
902
903 yield Ok::<_, Infallible>(format!("{}\n", line));
904
905 if delay_ms > 0 {
906 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
907 }
908 }
909
910 let mut st = state.write().await;
912 st.stats.record_request("/api/v1/credits/ndjson", true, 0.0);
913 };
914
915 let body = Body::from_stream(stream);
916 let mut headers = HeaderMap::new();
917 headers.insert(header::CONTENT_TYPE, "application/x-ndjson".parse().unwrap());
918
919 (StatusCode::OK, headers, body).into_response()
920}
921
922async fn stats_handler(State(state): State<SharedState<CoreBankingState>>) -> impl IntoResponse {
924 let state = state.read().await;
925 Json(ApiResponse::success(state.stats.clone()))
926}
927
928async fn reset_handler(State(state): State<SharedState<CoreBankingState>>) -> impl IntoResponse {
930 let mut state = state.write().await;
931 state.stats = SimulatorStats::default();
932 let record_count = state.config.default_record_count;
933 let dirty_ratio = state.config.default_dirty_ratio;
934 let seed = state.config.seed;
935
936 let records = CoreBankingState::generate_records(record_count, dirty_ratio, seed, 0);
937 state.records.clear();
938 for record in records {
939 state.records.insert(record.id.clone(), record);
940 }
941 state.dataset_seed = seed;
942 state.dataset_dirty_ratio = dirty_ratio;
943
944 Json(ApiResponse::success(serde_json::json!({
945 "reset": true,
946 "record_count": state.records.len(),
947 "dirty_ratio": state.dataset_dirty_ratio,
948 "seed": state.dataset_seed
949 })))
950}
951
952async fn update_config_handler(
954 State(state): State<SharedState<CoreBankingState>>,
955 Json(request): Json<UpdateConfigRequest>,
956) -> impl IntoResponse {
957 let mut state = state.write().await;
958
959 if let Some(dirty_ratio) = request.dirty_ratio {
960 state.config.default_dirty_ratio = dirty_ratio.clamp(0.0, 1.0);
961 }
962
963 if let Some(latency_ms) = request.latency_ms {
964 state.config.latency = LatencyConfig {
965 enabled: latency_ms > 0,
966 base_ms: latency_ms,
967 jitter_ms: latency_ms / 10,
968 percentiles: None,
969 };
970 }
971
972 if let Some(failure_rate) = request.failure_rate {
973 state.config.failure_injection.enabled = failure_rate > 0.0;
974 state.config.failure_injection.failure_rate = failure_rate.clamp(0.0, 1.0);
975 }
976
977 Json(ApiResponse::success(serde_json::json!({
978 "updated": true,
979 "dirty_ratio": state.config.default_dirty_ratio,
980 "latency_enabled": state.config.latency.enabled,
981 "failure_rate": state.config.failure_injection.failure_rate
982 })))
983}
984
985#[cfg(test)]
990mod tests {
991 use super::*;
992
993 #[test]
994 fn test_credit_record_generate_clean() {
995 let mut rng = ChaCha8Rng::seed_from_u64(42);
996 let record = CreditRecord::generate_clean(&mut rng, 1);
997
998 assert_eq!(record.id, "CR0000000001");
999 assert_eq!(record.nik.len(), 16);
1000 assert!(record.nik.chars().all(|c| c.is_numeric()));
1001 assert!(!record.nama_lengkap.is_empty());
1002 assert_eq!(record.mata_uang, "IDR");
1003 assert!(record.jumlah_kredit > 0);
1004 assert!(record.saldo_outstanding <= record.jumlah_kredit);
1005 assert!((1..=5).contains(&record.kolektabilitas));
1006 assert!(record._has_error.is_none());
1007 }
1008
1009 #[test]
1010 fn test_credit_record_inject_invalid_nik() {
1011 let mut rng = ChaCha8Rng::seed_from_u64(42);
1012 let record = CreditRecord::generate_clean(&mut rng, 1).inject_error(&mut rng, "invalid_nik");
1013
1014 assert_eq!(record._has_error, Some(true));
1015 assert_eq!(record._error_type, Some("invalid_nik".to_string()));
1016 assert!(record.nik.len() != 16 || !record.nik.chars().all(|c| c.is_numeric()));
1017 }
1018
1019 #[test]
1020 fn test_credit_record_inject_negative_amount() {
1021 let mut rng = ChaCha8Rng::seed_from_u64(42);
1022 let record = CreditRecord::generate_clean(&mut rng, 1).inject_error(&mut rng, "negative_amount");
1023
1024 assert_eq!(record._has_error, Some(true));
1025 assert!(record.jumlah_kredit < 0);
1026 }
1027
1028 #[test]
1029 fn test_credit_record_inject_invalid_date() {
1030 let mut rng = ChaCha8Rng::seed_from_u64(42);
1031 let record = CreditRecord::generate_clean(&mut rng, 1).inject_error(&mut rng, "invalid_date");
1032
1033 assert_eq!(record._has_error, Some(true));
1034 let invalid_start = record.tanggal_mulai == "2024-13-45" || record.tanggal_mulai == "not-a-date";
1035 let invalid_end = record.tanggal_jatuh_tempo.is_empty();
1036 assert!(invalid_start || invalid_end);
1037 }
1038
1039 #[test]
1040 fn test_state_generate_records_deterministic() {
1041 let config = CoreBankingConfig::default();
1042
1043 let mut state1 = CoreBankingState::new(config.clone());
1044 let records1 = CoreBankingState::generate_records(100, 0.1, Some(12345), 0);
1045 for r in records1 {
1046 state1.records.insert(r.id.clone(), r);
1047 }
1048
1049 let mut state2 = CoreBankingState::new(config);
1050 let records2 = CoreBankingState::generate_records(100, 0.1, Some(12345), 0);
1051 for r in records2 {
1052 state2.records.insert(r.id.clone(), r);
1053 }
1054
1055 assert_eq!(state1.records.len(), state2.records.len());
1056 for (id, r1) in &state1.records {
1057 let r2 = state2.records.get(id).expect("Record missing in state2");
1058 assert_eq!(r1.id, r2.id);
1059 assert_eq!(r1.nik, r2.nik);
1060 assert_eq!(r1.jumlah_kredit, r2.jumlah_kredit);
1061 assert_eq!(r1._has_error, r2._has_error);
1062 }
1063 }
1064
1065 #[test]
1066 fn test_within_cutoff() {
1067 let ts = "2024-01-15T10:00:00Z";
1068 assert!(within_cutoff(ts, Some(NaiveDate::from_ymd_opt(2024,1,1).unwrap()), Some(NaiveDate::from_ymd_opt(2024,1,31).unwrap())));
1069 assert!(!within_cutoff(ts, Some(NaiveDate::from_ymd_opt(2024,2,1).unwrap()), None));
1070 }
1071}