1use crate::{
24 config::{FailureType, RegulatorEndpointConfig, RegulatorMode},
25 ApiResponse, HealthStatus, ResponseMeta, SharedState, SimulatorError, SimulatorResult,
26 SimulatorStats, Simulator, shared_state,
27};
28use axum::{
72 extract::{Path, Query, State},
73 http::{HeaderMap, StatusCode},
74 response::{IntoResponse, Json},
75 routing::{delete, get, post},
76 Router,
77};
78use chrono::{DateTime, Utc};
79use serde::{Deserialize, Serialize};
80use std::collections::HashMap;
81
82use std::time::{Duration, Instant};
83use tokio::sync::oneshot;
84use uuid::Uuid;
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct SubmissionRequest {
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub submission_id: Option<String>,
96 pub reporting_period: String,
98 pub bank_code: String,
100 pub records: Vec<serde_json::Value>,
102 #[serde(skip_serializing_if = "Option::is_none")]
104 pub metadata: Option<HashMap<String, serde_json::Value>>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct SubmissionResponse {
110 pub submission_id: String,
112 pub reference_number: String,
114 pub status: SubmissionStatus,
116 pub accepted_count: usize,
118 pub rejected_count: usize,
120 #[serde(skip_serializing_if = "Vec::is_empty")]
122 pub rejections: Vec<RecordRejection>,
123 pub submitted_at: String,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub processed_at: Option<String>,
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub estimated_processing_time_secs: Option<u64>,
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub details: Option<HashMap<String, serde_json::Value>>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
138#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
139pub enum SubmissionStatus {
140 Accepted,
142 PartiallyAccepted,
144 Rejected,
146 Queued,
148 Processing,
150 Failed,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct RecordRejection {
157 pub record_index: usize,
159 #[serde(skip_serializing_if = "Option::is_none")]
161 pub record_id: Option<String>,
162 pub error_code: String,
164 pub message: String,
166 #[serde(skip_serializing_if = "Option::is_none")]
168 pub field: Option<String>,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct StoredSubmission {
174 pub submission_id: String,
175 pub reference_number: String,
176 pub bank_code: String,
177 pub reporting_period: String,
178 pub status: SubmissionStatus,
179 pub record_count: usize,
180 pub accepted_count: usize,
181 pub rejected_count: usize,
182 pub rejections: Vec<RecordRejection>,
183 pub submitted_at: DateTime<Utc>,
184 pub processed_at: Option<DateTime<Utc>>,
185 pub idempotency_key: Option<String>,
186 pub attempt_count: u32,
187}
188
189#[derive(Debug, Deserialize)]
191pub struct ListSubmissionsParams {
192 pub bank_code: Option<String>,
193 pub reporting_period: Option<String>,
194 pub status: Option<SubmissionStatus>,
195 pub page: Option<u32>,
196 pub page_size: Option<u32>,
197}
198
199#[derive(Debug, Deserialize)]
201pub struct ChangeModeRequest {
202 pub mode: RegulatorMode,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct RejectionError {
208 pub error_code: String,
209 pub error_message: String,
210 pub details: Option<serde_json::Value>,
211}
212
213pub struct RegulatorEndpointState {
219 pub config: RegulatorEndpointConfig,
220 pub submissions: HashMap<String, StoredSubmission>,
221 pub idempotency_cache: HashMap<String, String>, pub stats: SimulatorStats,
223 pub started_at: Instant,
224 pub ready: bool,
225 pub submission_counter: u64,
226}
227
228impl RegulatorEndpointState {
229 pub fn new(config: RegulatorEndpointConfig) -> Self {
230 Self {
231 config,
232 submissions: HashMap::new(),
233 idempotency_cache: HashMap::new(),
234 stats: SimulatorStats::default(),
235 started_at: Instant::now(),
236 ready: false,
237 submission_counter: 0,
238 }
239 }
240
241 pub fn generate_reference_number(&mut self) -> String {
243 self.submission_counter += 1;
244 let timestamp = Utc::now().format("%Y%m%d%H%M%S");
245 format!("OJK-SLIK-{}-{:06}", timestamp, self.submission_counter)
246 }
247
248 pub fn check_idempotency(&self, key: &str) -> Option<&StoredSubmission> {
250 self.idempotency_cache
251 .get(key)
252 .and_then(|id| self.submissions.get(id))
253 }
254
255 pub fn store_submission(&mut self, submission: StoredSubmission) {
257 if let Some(ref key) = submission.idempotency_key {
258 self.idempotency_cache.insert(key.clone(), submission.submission_id.clone());
259 }
260 self.submissions.insert(submission.submission_id.clone(), submission);
261
262 if self.idempotency_cache.len() > self.config.max_idempotency_entries {
264 let to_remove: Vec<String> = self.idempotency_cache
266 .keys()
267 .take(self.idempotency_cache.len() / 2)
268 .cloned()
269 .collect();
270 for key in to_remove {
271 self.idempotency_cache.remove(&key);
272 }
273 }
274 }
275
276 pub fn process_submission(
278 &mut self,
279 request: &SubmissionRequest,
280 idempotency_key: Option<String>,
281 ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
282 if let Some(ref key) = idempotency_key {
284 if self.config.enforce_idempotency {
285 if let Some(existing) = self.check_idempotency(key) {
286 return Ok(SubmissionResponse {
288 submission_id: existing.submission_id.clone(),
289 reference_number: existing.reference_number.clone(),
290 status: existing.status.clone(),
291 accepted_count: existing.accepted_count,
292 rejected_count: existing.rejected_count,
293 rejections: existing.rejections.clone(),
294 submitted_at: existing.submitted_at.to_rfc3339(),
295 processed_at: existing.processed_at.map(|t| t.to_rfc3339()),
296 estimated_processing_time_secs: None,
297 details: Some({
298 let mut d = HashMap::new();
299 d.insert("idempotent".to_string(), serde_json::json!(true));
300 d.insert("original_submission".to_string(), serde_json::json!(true));
301 d
302 }),
303 });
304 }
305 }
306 }
307
308 if self.config.off_peak_config.enabled
310 && self.config.off_peak_config.reject_outside_window
311 && !self.config.off_peak_config.is_off_peak_now()
312 {
313 return Err((
314 StatusCode::SERVICE_UNAVAILABLE,
315 RejectionError {
316 error_code: "OUTSIDE_SUBMISSION_WINDOW".to_string(),
317 error_message: "Submissions are only accepted during off-peak hours".to_string(),
318 details: Some(serde_json::json!({
319 "off_peak_start": self.config.off_peak_config.start_hour,
320 "off_peak_end": self.config.off_peak_config.end_hour,
321 })),
322 },
323 ));
324 }
325
326 match &self.config.mode {
328 RegulatorMode::Accept => self.accept_submission(request, idempotency_key),
329 RegulatorMode::Reject { error_code, error_message } => {
330 Err((
331 StatusCode::BAD_REQUEST,
332 RejectionError {
333 error_code: error_code.clone(),
334 error_message: error_message.clone(),
335 details: None,
336 },
337 ))
338 }
339 RegulatorMode::Timeout { delay_ms } => {
340 Err((
343 StatusCode::GATEWAY_TIMEOUT,
344 RejectionError {
345 error_code: "TIMEOUT".to_string(),
346 error_message: "Request timed out".to_string(),
347 details: Some(serde_json::json!({ "timeout_ms": delay_ms })),
348 },
349 ))
350 }
351 RegulatorMode::ServiceUnavailable => {
352 Err((
353 StatusCode::SERVICE_UNAVAILABLE,
354 RejectionError {
355 error_code: "SERVICE_UNAVAILABLE".to_string(),
356 error_message: "The service is temporarily unavailable".to_string(),
357 details: None,
358 },
359 ))
360 }
361 RegulatorMode::RateLimited => {
362 Err((
363 StatusCode::TOO_MANY_REQUESTS,
364 RejectionError {
365 error_code: "RATE_LIMITED".to_string(),
366 error_message: "Too many requests. Please retry later.".to_string(),
367 details: Some(serde_json::json!({
368 "retry_after_secs": self.config.retry_after_secs
369 })),
370 },
371 ))
372 }
373 RegulatorMode::Intermittent { failure_rate } => {
374 if rand::random::<f64>() < *failure_rate {
375 Err((
376 StatusCode::INTERNAL_SERVER_ERROR,
377 RejectionError {
378 error_code: "INTERNAL_ERROR".to_string(),
379 error_message: "An internal error occurred".to_string(),
380 details: None,
381 },
382 ))
383 } else {
384 self.accept_submission(request, idempotency_key)
385 }
386 }
387 RegulatorMode::PartialReject { reject_ratio } => {
388 self.partial_accept_submission(request, idempotency_key, *reject_ratio)
389 }
390 RegulatorMode::Queued { queue_delay_ms } => {
391 self.queue_submission(request, idempotency_key, *queue_delay_ms)
392 }
393 RegulatorMode::Custom { status_code, body, .. } => {
394 let status = StatusCode::from_u16(*status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
395 if status.is_success() {
396 self.accept_submission(request, idempotency_key)
397 } else {
398 Err((
399 status,
400 RejectionError {
401 error_code: "CUSTOM_ERROR".to_string(),
402 error_message: body.clone(),
403 details: None,
404 },
405 ))
406 }
407 }
408 }
409 }
410
411 fn accept_submission(
412 &mut self,
413 request: &SubmissionRequest,
414 idempotency_key: Option<String>,
415 ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
416 let submission_id = request.submission_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
417 let reference_number = self.generate_reference_number();
418 let now = Utc::now();
419 let record_count = request.records.len();
420
421 let stored = StoredSubmission {
422 submission_id: submission_id.clone(),
423 reference_number: reference_number.clone(),
424 bank_code: request.bank_code.clone(),
425 reporting_period: request.reporting_period.clone(),
426 status: SubmissionStatus::Accepted,
427 record_count,
428 accepted_count: record_count,
429 rejected_count: 0,
430 rejections: Vec::new(),
431 submitted_at: now,
432 processed_at: Some(now),
433 idempotency_key,
434 attempt_count: 1,
435 };
436
437 self.store_submission(stored);
438
439 Ok(SubmissionResponse {
440 submission_id,
441 reference_number,
442 status: SubmissionStatus::Accepted,
443 accepted_count: record_count,
444 rejected_count: 0,
445 rejections: Vec::new(),
446 submitted_at: now.to_rfc3339(),
447 processed_at: Some(now.to_rfc3339()),
448 estimated_processing_time_secs: None,
449 details: None,
450 })
451 }
452
453 fn partial_accept_submission(
454 &mut self,
455 request: &SubmissionRequest,
456 idempotency_key: Option<String>,
457 reject_ratio: f64,
458 ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
459 let submission_id = request.submission_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
460 let reference_number = self.generate_reference_number();
461 let now = Utc::now();
462 let record_count = request.records.len();
463
464 let mut rejections = Vec::new();
466 let rejection_errors = [
467 ("INVALID_NIK", "NIK format is invalid"),
468 ("INVALID_AMOUNT", "Credit amount is invalid"),
469 ("DUPLICATE_RECORD", "Duplicate record detected"),
470 ("MISSING_FIELD", "Required field is missing"),
471 ];
472
473 for (idx, record) in request.records.iter().enumerate() {
474 if rand::random::<f64>() < reject_ratio {
475 let error = &rejection_errors[rand::random::<usize>() % rejection_errors.len()];
476 rejections.push(RecordRejection {
477 record_index: idx,
478 record_id: record.get("id").and_then(|v| v.as_str()).map(String::from),
479 error_code: error.0.to_string(),
480 message: error.1.to_string(),
481 field: None,
482 });
483 }
484 }
485
486 let rejected_count = rejections.len();
487 let accepted_count = record_count - rejected_count;
488
489 let status = if rejected_count == 0 {
490 SubmissionStatus::Accepted
491 } else if accepted_count == 0 {
492 SubmissionStatus::Rejected
493 } else {
494 SubmissionStatus::PartiallyAccepted
495 };
496
497 let stored = StoredSubmission {
498 submission_id: submission_id.clone(),
499 reference_number: reference_number.clone(),
500 bank_code: request.bank_code.clone(),
501 reporting_period: request.reporting_period.clone(),
502 status: status.clone(),
503 record_count,
504 accepted_count,
505 rejected_count,
506 rejections: rejections.clone(),
507 submitted_at: now,
508 processed_at: Some(now),
509 idempotency_key,
510 attempt_count: 1,
511 };
512
513 self.store_submission(stored);
514
515 Ok(SubmissionResponse {
516 submission_id,
517 reference_number,
518 status,
519 accepted_count,
520 rejected_count,
521 rejections,
522 submitted_at: now.to_rfc3339(),
523 processed_at: Some(now.to_rfc3339()),
524 estimated_processing_time_secs: None,
525 details: None,
526 })
527 }
528
529 fn queue_submission(
530 &mut self,
531 request: &SubmissionRequest,
532 idempotency_key: Option<String>,
533 queue_delay_ms: u64,
534 ) -> Result<SubmissionResponse, (StatusCode, RejectionError)> {
535 let submission_id = request.submission_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
536 let reference_number = self.generate_reference_number();
537 let now = Utc::now();
538 let record_count = request.records.len();
539
540 let stored = StoredSubmission {
541 submission_id: submission_id.clone(),
542 reference_number: reference_number.clone(),
543 bank_code: request.bank_code.clone(),
544 reporting_period: request.reporting_period.clone(),
545 status: SubmissionStatus::Queued,
546 record_count,
547 accepted_count: 0,
548 rejected_count: 0,
549 rejections: Vec::new(),
550 submitted_at: now,
551 processed_at: None,
552 idempotency_key,
553 attempt_count: 1,
554 };
555
556 self.store_submission(stored);
557
558 Ok(SubmissionResponse {
559 submission_id,
560 reference_number,
561 status: SubmissionStatus::Queued,
562 accepted_count: 0,
563 rejected_count: 0,
564 rejections: Vec::new(),
565 submitted_at: now.to_rfc3339(),
566 processed_at: None,
567 estimated_processing_time_secs: Some(queue_delay_ms / 1000),
568 details: Some({
569 let mut d = HashMap::new();
570 d.insert("queue_position".to_string(), serde_json::json!(self.submissions.len()));
571 d
572 }),
573 })
574 }
575}
576
577pub struct RegulatorEndpointSimulator {
583 state: SharedState<RegulatorEndpointState>,
584 config: RegulatorEndpointConfig,
585}
586
587impl RegulatorEndpointSimulator {
588 pub fn new(config: RegulatorEndpointConfig) -> Self {
590 let state = shared_state(RegulatorEndpointState::new(config.clone()));
591 Self { state, config }
592 }
593
594 pub async fn run(&self, shutdown_rx: oneshot::Receiver<()>) -> SimulatorResult<()> {
596 {
598 let mut state = self.state.write().await;
599 state.ready = true;
600 }
601
602 let app = self.create_router();
603 let addr: std::net::SocketAddr = self.config.socket_addr().parse()
604 .map_err(|e| SimulatorError::ConfigError(format!("Invalid address: {}", e)))?;
605
606 tracing::info!("Regulator Endpoint Simulator listening on {}", addr);
607
608 let listener = tokio::net::TcpListener::bind(addr).await
609 .map_err(|e| SimulatorError::BindError(e.to_string()))?;
610
611 axum::serve(listener, app)
612 .with_graceful_shutdown(async {
613 let _ = shutdown_rx.await;
614 tracing::info!("Regulator Endpoint Simulator shutting down");
615 })
616 .await
617 .map_err(|e| SimulatorError::StartError(e.to_string()))?;
618
619 Ok(())
620 }
621
622 fn create_router(&self) -> Router {
624 let state = self.state.clone();
625
626 Router::new()
627 .route("/health", get(health_handler))
628 .route("/api/v1/submit", post(submit_handler))
629 .route("/api/v1/submissions", get(list_submissions_handler))
630 .route("/api/v1/submissions/:id", get(get_submission_handler))
631 .route("/api/v1/submissions/:id", delete(delete_submission_handler))
632 .route("/api/v1/mode", post(change_mode_handler))
633 .route("/api/v1/mode", get(get_mode_handler))
634 .route("/api/v1/stats", get(stats_handler))
635 .route("/api/v1/reset", post(reset_handler))
636 .with_state(state)
637 }
638
639 pub async fn get_submission(&self, id: &str) -> Option<StoredSubmission> {
641 self.state.read().await.submissions.get(id).cloned()
642 }
643
644 pub async fn set_mode(&self, mode: RegulatorMode) {
646 self.state.write().await.config.mode = mode;
647 }
648
649 pub async fn get_mode(&self) -> RegulatorMode {
651 self.state.read().await.config.mode.clone()
652 }
653
654 pub async fn get_all_submissions(&self) -> Vec<StoredSubmission> {
656 self.state.read().await.submissions.values().cloned().collect()
657 }
658
659 pub async fn clear_submissions(&self) {
661 let mut state = self.state.write().await;
662 state.submissions.clear();
663 state.idempotency_cache.clear();
664 }
665}
666
667#[async_trait::async_trait]
668impl Simulator for RegulatorEndpointSimulator {
669 fn name(&self) -> &str {
670 "regulator-endpoint"
671 }
672
673 fn port(&self) -> u16 {
674 self.config.port
675 }
676
677 async fn health(&self) -> HealthStatus {
678 let state = self.state.read().await;
679 let uptime = state.started_at.elapsed().as_secs();
680
681 if state.ready {
682 HealthStatus::healthy(self.name(), "1.0.0", uptime)
683 .with_details("submission_count", serde_json::json!(state.submissions.len()))
684 .with_details("mode", serde_json::json!(format!("{:?}", state.config.mode)))
685 } else {
686 HealthStatus::unhealthy(self.name(), "Not ready")
687 }
688 }
689
690 async fn stats(&self) -> SimulatorStats {
691 self.state.read().await.stats.clone()
692 }
693
694 async fn reset_stats(&self) {
695 self.state.write().await.stats = SimulatorStats::default();
696 }
697
698 async fn is_ready(&self) -> bool {
699 self.state.read().await.ready
700 }
701}
702
703async fn health_handler(
709 State(state): State<SharedState<RegulatorEndpointState>>,
710) -> impl IntoResponse {
711 let state = state.read().await;
712 let uptime = state.started_at.elapsed().as_secs();
713
714 if state.ready {
715 let health = HealthStatus::healthy("regulator-endpoint", "1.0.0", uptime)
716 .with_details("submission_count", serde_json::json!(state.submissions.len()))
717 .with_details("mode", serde_json::json!(format!("{:?}", state.config.mode)));
718 (StatusCode::OK, Json(health))
719 } else {
720 let health = HealthStatus::unhealthy("regulator-endpoint", "Not ready");
721 (StatusCode::SERVICE_UNAVAILABLE, Json(health))
722 }
723}
724
725async fn submit_handler(
727 State(state): State<SharedState<RegulatorEndpointState>>,
728 headers: HeaderMap,
729 Json(request): Json<SubmissionRequest>,
730) -> impl IntoResponse {
731 let start = Instant::now();
732 let mut state_guard = state.write().await;
733
734 let idempotency_key = headers
736 .get("Idempotency-Key")
737 .and_then(|v| v.to_str().ok())
738 .map(String::from)
739 .or_else(|| request.submission_id.clone());
740
741 let failure = state_guard.config.failure_injection.random_failure().cloned();
743 if let Some(ref failure) = failure {
744 state_guard.stats.record_request("/api/v1/submit", false, start.elapsed().as_millis() as f64);
745 return match failure {
746 FailureType::InternalError => {
747 (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiResponse::<SubmissionResponse>::error("ERR500", "Internal server error")))
748 }
749 FailureType::ServiceUnavailable => {
750 (StatusCode::SERVICE_UNAVAILABLE, Json(ApiResponse::<SubmissionResponse>::error("ERR503", "Service unavailable")))
751 }
752 FailureType::Timeout => {
753 tokio::time::sleep(Duration::from_secs(30)).await;
754 (StatusCode::GATEWAY_TIMEOUT, Json(ApiResponse::<SubmissionResponse>::error("ERR504", "Gateway timeout")))
755 }
756 FailureType::RateLimited => {
757 (StatusCode::TOO_MANY_REQUESTS, Json(ApiResponse::<SubmissionResponse>::error("ERR429", "Rate limited")))
758 }
759 _ => {
760 (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiResponse::<SubmissionResponse>::error("ERR500", "Internal server error")))
761 }
762 };
763 }
764
765 state_guard.config.latency.apply().await;
767
768 if let RegulatorMode::Timeout { delay_ms } = &state_guard.config.mode {
770 let delay = *delay_ms;
771 drop(state_guard); tokio::time::sleep(Duration::from_millis(delay)).await;
773
774 let mut state_guard = state.write().await;
775 state_guard.stats.record_request("/api/v1/submit", false, start.elapsed().as_millis() as f64);
776 state_guard.stats.record_timeout();
777 return (StatusCode::GATEWAY_TIMEOUT, Json(ApiResponse::<SubmissionResponse>::error("TIMEOUT", "Request timed out")));
778 }
779
780 match state_guard.process_submission(&request, idempotency_key) {
782 Ok(response) => {
783 state_guard.stats.record_request("/api/v1/submit", true, start.elapsed().as_millis() as f64);
784 (StatusCode::OK, Json(ApiResponse::success(response)))
785 }
786 Err((status, error)) => {
787 state_guard.stats.record_request("/api/v1/submit", false, start.elapsed().as_millis() as f64);
788 (status, Json(ApiResponse::<SubmissionResponse>::error(&error.error_code, &error.error_message)))
789 }
790 }
791}
792
793async fn get_submission_handler(
795 State(state): State<SharedState<RegulatorEndpointState>>,
796 Path(id): Path<String>,
797) -> impl IntoResponse {
798 let start = Instant::now();
799 let mut state_guard = state.write().await;
800
801 state_guard.config.latency.apply().await;
803
804 if let Some(submission) = state_guard.submissions.get(&id).cloned() {
805 state_guard.stats.record_request(&format!("/api/v1/submissions/{}", id), true, start.elapsed().as_millis() as f64);
806
807 let response = SubmissionResponse {
808 submission_id: submission.submission_id,
809 reference_number: submission.reference_number,
810 status: submission.status,
811 accepted_count: submission.accepted_count,
812 rejected_count: submission.rejected_count,
813 rejections: submission.rejections,
814 submitted_at: submission.submitted_at.to_rfc3339(),
815 processed_at: submission.processed_at.map(|t| t.to_rfc3339()),
816 estimated_processing_time_secs: None,
817 details: None,
818 };
819
820 (StatusCode::OK, Json(ApiResponse::success(response)))
821 } else {
822 state_guard.stats.record_request(&format!("/api/v1/submissions/{}", id), false, start.elapsed().as_millis() as f64);
823 (StatusCode::NOT_FOUND, Json(ApiResponse::<SubmissionResponse>::error("NOT_FOUND", &format!("Submission '{}' not found", id))))
824 }
825}
826
827async fn list_submissions_handler(
829 State(state): State<SharedState<RegulatorEndpointState>>,
830 Query(params): Query<ListSubmissionsParams>,
831) -> impl IntoResponse {
832 let start = Instant::now();
833 let mut state_guard = state.write().await;
834
835 state_guard.config.latency.apply().await;
837
838 let mut submissions: Vec<&StoredSubmission> = state_guard.submissions.values().collect();
839
840 if let Some(ref bank_code) = params.bank_code {
842 submissions.retain(|s| &s.bank_code == bank_code);
843 }
844 if let Some(ref period) = params.reporting_period {
845 submissions.retain(|s| &s.reporting_period == period);
846 }
847 if let Some(ref status) = params.status {
848 submissions.retain(|s| &s.status == status);
849 }
850
851 submissions.sort_by(|a, b| b.submitted_at.cmp(&a.submitted_at));
853
854 let total_count = submissions.len() as u64;
855 let page = params.page.unwrap_or(1).max(1);
856 let page_size = params.page_size.unwrap_or(20).min(100);
857 let start_idx = ((page - 1) * page_size) as usize;
858 let end_idx = (start_idx + page_size as usize).min(submissions.len());
859
860 let paged: Vec<SubmissionSummary> = submissions[start_idx..end_idx]
861 .iter()
862 .map(|s| SubmissionSummary {
863 submission_id: s.submission_id.clone(),
864 reference_number: s.reference_number.clone(),
865 bank_code: s.bank_code.clone(),
866 reporting_period: s.reporting_period.clone(),
867 status: s.status.clone(),
868 record_count: s.record_count,
869 accepted_count: s.accepted_count,
870 rejected_count: s.rejected_count,
871 submitted_at: s.submitted_at.to_rfc3339(),
872 })
873 .collect();
874
875 state_guard.stats.record_request("/api/v1/submissions", true, start.elapsed().as_millis() as f64);
876
877 let meta = ResponseMeta::paginated(page, page_size, total_count)
878 .with_timing(start.elapsed().as_millis() as u64);
879
880 (StatusCode::OK, Json(ApiResponse::success_with_meta(paged, meta)))
881}
882
883async fn delete_submission_handler(
885 State(state): State<SharedState<RegulatorEndpointState>>,
886 Path(id): Path<String>,
887) -> impl IntoResponse {
888 let mut state_guard = state.write().await;
889
890 if let Some(submission) = state_guard.submissions.remove(&id) {
891 if let Some(ref key) = submission.idempotency_key {
893 state_guard.idempotency_cache.remove(key);
894 }
895 Json(ApiResponse::success(serde_json::json!({
896 "deleted": true,
897 "submission_id": id
898 })))
899 } else {
900 Json(ApiResponse::<serde_json::Value>::error("NOT_FOUND", &format!("Submission '{}' not found", id)))
901 }
902}
903
904async fn change_mode_handler(
906 State(state): State<SharedState<RegulatorEndpointState>>,
907 Json(request): Json<ChangeModeRequest>,
908) -> impl IntoResponse {
909 let mut state_guard = state.write().await;
910 state_guard.config.mode = request.mode.clone();
911
912 Json(ApiResponse::success(serde_json::json!({
913 "mode": format!("{:?}", request.mode),
914 "updated": true
915 })))
916}
917
918async fn get_mode_handler(
920 State(state): State<SharedState<RegulatorEndpointState>>,
921) -> impl IntoResponse {
922 let state_guard = state.read().await;
923
924 Json(ApiResponse::success(serde_json::json!({
925 "mode": format!("{:?}", state_guard.config.mode)
926 })))
927}
928
929async fn stats_handler(
931 State(state): State<SharedState<RegulatorEndpointState>>,
932) -> impl IntoResponse {
933 let state_guard = state.read().await;
934
935 let mut stats = state_guard.stats.clone();
936
937 let mut endpoint_counts = stats.endpoint_counts.clone();
939 endpoint_counts.insert("total_submissions".to_string(), state_guard.submissions.len() as u64);
940
941 let accepted = state_guard.submissions.values()
942 .filter(|s| s.status == SubmissionStatus::Accepted)
943 .count() as u64;
944 endpoint_counts.insert("accepted_submissions".to_string(), accepted);
945
946 let rejected = state_guard.submissions.values()
947 .filter(|s| s.status == SubmissionStatus::Rejected)
948 .count() as u64;
949 endpoint_counts.insert("rejected_submissions".to_string(), rejected);
950
951 stats.endpoint_counts = endpoint_counts;
952
953 Json(ApiResponse::success(stats))
954}
955
956async fn reset_handler(
958 State(state): State<SharedState<RegulatorEndpointState>>,
959) -> impl IntoResponse {
960 let mut state_guard = state.write().await;
961 state_guard.stats = SimulatorStats::default();
962 state_guard.submissions.clear();
963 state_guard.idempotency_cache.clear();
964 state_guard.submission_counter = 0;
965 state_guard.config.mode = RegulatorMode::Accept;
966
967 Json(ApiResponse::success(serde_json::json!({
968 "reset": true,
969 "mode": "Accept"
970 })))
971}
972
973#[derive(Debug, Clone, Serialize, Deserialize)]
975pub struct SubmissionSummary {
976 pub submission_id: String,
977 pub reference_number: String,
978 pub bank_code: String,
979 pub reporting_period: String,
980 pub status: SubmissionStatus,
981 pub record_count: usize,
982 pub accepted_count: usize,
983 pub rejected_count: usize,
984 pub submitted_at: String,
985}
986
987#[cfg(test)]
992mod tests {
993 use super::*;
994
995 #[test]
996 fn test_submission_status_serialization() {
997 let status = SubmissionStatus::Accepted;
998 let json = serde_json::to_string(&status).unwrap();
999 assert_eq!(json, "\"ACCEPTED\"");
1000 }
1001
1002 #[test]
1003 fn test_submission_request() {
1004 let request = SubmissionRequest {
1005 submission_id: Some("test-123".to_string()),
1006 reporting_period: "202401".to_string(),
1007 bank_code: "BANKXYZ".to_string(),
1008 records: vec![serde_json::json!({"id": "1"})],
1009 metadata: None,
1010 };
1011
1012 assert_eq!(request.submission_id, Some("test-123".to_string()));
1013 assert_eq!(request.records.len(), 1);
1014 }
1015
1016 #[test]
1017 fn test_state_generate_reference_number() {
1018 let config = RegulatorEndpointConfig::default();
1019 let mut state = RegulatorEndpointState::new(config);
1020
1021 let ref1 = state.generate_reference_number();
1022 let ref2 = state.generate_reference_number();
1023
1024 assert!(ref1.starts_with("OJK-SLIK-"));
1025 assert!(ref2.starts_with("OJK-SLIK-"));
1026 assert_ne!(ref1, ref2);
1027 }
1028
1029 #[test]
1030 fn test_state_accept_submission() {
1031 let config = RegulatorEndpointConfig::default();
1032 let mut state = RegulatorEndpointState::new(config);
1033
1034 let request = SubmissionRequest {
1035 submission_id: None,
1036 reporting_period: "202401".to_string(),
1037 bank_code: "BANKXYZ".to_string(),
1038 records: vec![
1039 serde_json::json!({"id": "1"}),
1040 serde_json::json!({"id": "2"}),
1041 ],
1042 metadata: None,
1043 };
1044
1045 let result = state.process_submission(&request, Some("key-1".to_string()));
1046 assert!(result.is_ok());
1047
1048 let response = result.unwrap();
1049 assert_eq!(response.status, SubmissionStatus::Accepted);
1050 assert_eq!(response.accepted_count, 2);
1051 assert_eq!(response.rejected_count, 0);
1052 }
1053
1054 #[test]
1055 fn test_state_idempotency() {
1056 let config = RegulatorEndpointConfig::default();
1057 let mut state = RegulatorEndpointState::new(config);
1058
1059 let request = SubmissionRequest {
1060 submission_id: Some("sub-123".to_string()),
1061 reporting_period: "202401".to_string(),
1062 bank_code: "BANKXYZ".to_string(),
1063 records: vec![serde_json::json!({"id": "1"})],
1064 metadata: None,
1065 };
1066
1067 let result1 = state.process_submission(&request, Some("idem-key-1".to_string()));
1069 assert!(result1.is_ok());
1070 let response1 = result1.unwrap();
1071
1072 let result2 = state.process_submission(&request, Some("idem-key-1".to_string()));
1074 assert!(result2.is_ok());
1075 let response2 = result2.unwrap();
1076
1077 assert_eq!(response1.submission_id, response2.submission_id);
1079 assert_eq!(response1.reference_number, response2.reference_number);
1080
1081 assert!(response2.details.as_ref().unwrap().get("idempotent").is_some());
1083 }
1084
1085 #[test]
1086 fn test_state_reject_mode() {
1087 let mut config = RegulatorEndpointConfig::default();
1088 config.mode = RegulatorMode::Reject {
1089 error_code: "INVALID_DATA".to_string(),
1090 error_message: "Data validation failed".to_string(),
1091 };
1092
1093 let mut state = RegulatorEndpointState::new(config);
1094
1095 let request = SubmissionRequest {
1096 submission_id: None,
1097 reporting_period: "202401".to_string(),
1098 bank_code: "BANKXYZ".to_string(),
1099 records: vec![serde_json::json!({"id": "1"})],
1100 metadata: None,
1101 };
1102
1103 let result = state.process_submission(&request, None);
1104 assert!(result.is_err());
1105
1106 let (status, error) = result.unwrap_err();
1107 assert_eq!(status, StatusCode::BAD_REQUEST);
1108 assert_eq!(error.error_code, "INVALID_DATA");
1109 }
1110
1111 #[test]
1112 fn test_state_service_unavailable_mode() {
1113 let mut config = RegulatorEndpointConfig::default();
1114 config.mode = RegulatorMode::ServiceUnavailable;
1115
1116 let mut state = RegulatorEndpointState::new(config);
1117
1118 let request = SubmissionRequest {
1119 submission_id: None,
1120 reporting_period: "202401".to_string(),
1121 bank_code: "BANKXYZ".to_string(),
1122 records: vec![serde_json::json!({"id": "1"})],
1123 metadata: None,
1124 };
1125
1126 let result = state.process_submission(&request, None);
1127 assert!(result.is_err());
1128
1129 let (status, _) = result.unwrap_err();
1130 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
1131 }
1132
1133 #[test]
1134 fn test_state_rate_limited_mode() {
1135 let mut config = RegulatorEndpointConfig::default();
1136 config.mode = RegulatorMode::RateLimited;
1137
1138 let mut state = RegulatorEndpointState::new(config);
1139
1140 let request = SubmissionRequest {
1141 submission_id: None,
1142 reporting_period: "202401".to_string(),
1143 bank_code: "BANKXYZ".to_string(),
1144 records: vec![serde_json::json!({"id": "1"})],
1145 metadata: None,
1146 };
1147
1148 let result = state.process_submission(&request, None);
1149 assert!(result.is_err());
1150
1151 let (status, error) = result.unwrap_err();
1152 assert_eq!(status, StatusCode::TOO_MANY_REQUESTS);
1153 assert_eq!(error.error_code, "RATE_LIMITED");
1154 }
1155
1156 #[test]
1157 fn test_state_queued_mode() {
1158 let mut config = RegulatorEndpointConfig::default();
1159 config.mode = RegulatorMode::Queued { queue_delay_ms: 5000 };
1160
1161 let mut state = RegulatorEndpointState::new(config);
1162
1163 let request = SubmissionRequest {
1164 submission_id: None,
1165 reporting_period: "202401".to_string(),
1166 bank_code: "BANKXYZ".to_string(),
1167 records: vec![serde_json::json!({"id": "1"})],
1168 metadata: None,
1169 };
1170
1171 let result = state.process_submission(&request, None);
1172 assert!(result.is_ok());
1173
1174 let response = result.unwrap();
1175 assert_eq!(response.status, SubmissionStatus::Queued);
1176 assert!(response.estimated_processing_time_secs.is_some());
1177 }
1178
1179 #[test]
1180 fn test_state_partial_reject_mode() {
1181 let mut config = RegulatorEndpointConfig::default();
1182 config.mode = RegulatorMode::PartialReject { reject_ratio: 0.5 };
1183
1184 let mut state = RegulatorEndpointState::new(config);
1185
1186 let request = SubmissionRequest {
1187 submission_id: None,
1188 reporting_period: "202401".to_string(),
1189 bank_code: "BANKXYZ".to_string(),
1190 records: (0..100).map(|i| serde_json::json!({"id": i})).collect(),
1191 metadata: None,
1192 };
1193
1194 let result = state.process_submission(&request, None);
1195 assert!(result.is_ok());
1196
1197 let response = result.unwrap();
1198 assert!(response.accepted_count > 0 || response.rejected_count > 0);
1201 assert_eq!(response.accepted_count + response.rejected_count, 100);
1202 }
1203
1204 #[test]
1205 fn test_record_rejection() {
1206 let rejection = RecordRejection {
1207 record_index: 5,
1208 record_id: Some("CR0005".to_string()),
1209 error_code: "INVALID_NIK".to_string(),
1210 message: "NIK format is invalid".to_string(),
1211 field: Some("debtor_nik".to_string()),
1212 };
1213
1214 assert_eq!(rejection.record_index, 5);
1215 assert_eq!(rejection.error_code, "INVALID_NIK");
1216 }
1217
1218 #[test]
1219 fn test_stored_submission() {
1220 let submission = StoredSubmission {
1221 submission_id: "sub-123".to_string(),
1222 reference_number: "OJK-SLIK-20240101120000-000001".to_string(),
1223 bank_code: "BANKXYZ".to_string(),
1224 reporting_period: "202401".to_string(),
1225 status: SubmissionStatus::Accepted,
1226 record_count: 100,
1227 accepted_count: 100,
1228 rejected_count: 0,
1229 rejections: Vec::new(),
1230 submitted_at: Utc::now(),
1231 processed_at: Some(Utc::now()),
1232 idempotency_key: Some("idem-123".to_string()),
1233 attempt_count: 1,
1234 };
1235
1236 assert_eq!(submission.status, SubmissionStatus::Accepted);
1237 assert_eq!(submission.record_count, 100);
1238 }
1239}