1use axum::{
8 extract::{Path, State},
9 http::StatusCode,
10 response::IntoResponse,
11 routing::{get, post},
12 Json, Router,
13};
14use chrono::Utc;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::RwLock;
19use uuid::Uuid;
20
21const REGULATOR_ENDPOINT_URL: &str = "http://127.0.0.1:18084";
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SubmissionRequest {
33 pub submission_id: String,
34 pub locked_dataset_version: String,
35 pub artifact_format: String,
36 pub options: SubmissionOptions,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct SubmissionOptions {
41 pub retry_policy: RetryPolicy,
42 #[serde(default)]
43 pub off_peak_window: Option<OffPeakWindow>,
44 #[serde(default)]
45 pub batch_size: Option<u64>,
46 #[serde(default)]
47 pub include_evidence_pack: bool,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RetryPolicy {
52 #[serde(default = "default_max_attempts")]
53 pub max_attempts: u32,
54 #[serde(alias = "initial_delay_ms", alias = "initial_backoff_ms", default = "default_initial_delay")]
55 pub initial_backoff_ms: u64,
56 #[serde(alias = "max_delay_ms", alias = "max_backoff_ms", default = "default_max_delay")]
57 pub max_backoff_ms: u64,
58 #[serde(default = "default_multiplier")]
59 pub backoff_multiplier: f64,
60 #[serde(default = "default_send_to_dlq")]
61 pub send_to_dlq: bool,
62}
63
64fn default_max_attempts() -> u32 { 3 }
65fn default_initial_delay() -> u64 { 1000 }
66fn default_max_delay() -> u64 { 30000 }
67fn default_multiplier() -> f64 { 2.0 }
68fn default_send_to_dlq() -> bool { true }
69
70impl Default for RetryPolicy {
71 fn default() -> Self {
72 Self {
73 max_attempts: 3,
74 initial_backoff_ms: 1000,
75 max_backoff_ms: 30000,
76 backoff_multiplier: 2.0,
77 send_to_dlq: true,
78 }
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct OffPeakWindow {
84 pub start_hour: u8,
85 pub end_hour: u8,
86 pub timezone: String,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct SubmissionResponse {
91 pub execution_id: String,
92 pub submission_id: String,
93 pub status: SubmissionStatus,
94 pub artifact: Option<Artifact>,
95 pub attempts: Vec<SubmissionAttempt>,
96 pub evidence_pack: Option<EvidencePack>,
97 pub audit_events: Vec<AuditEvent>,
98 pub duration_ms: u64,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
102#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
103pub enum SubmissionStatus {
104 Pending,
105 Generating,
106 Submitting,
107 Submitted,
108 Acknowledged,
109 Rejected,
110 PendingRetry,
111 Failed,
112 Escalated,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct Artifact {
117 pub artifact_id: String,
118 pub artifact_hash: String,
119 pub format: String,
120 pub row_count: u64,
121 pub file_size: u64,
122 pub locked_dataset_version: String,
123 pub created_at: String,
124 pub precheck_result: Option<PrecheckResult>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct PrecheckResult {
129 pub passed: bool,
130 pub checks: Vec<PrecheckItem>,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PrecheckItem {
135 pub check_name: String,
136 pub passed: bool,
137 pub message: Option<String>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct SubmissionAttempt {
142 pub attempt_number: u32,
143 pub timestamp: String,
144 pub status: AttemptStatus,
145 pub response_code: Option<u16>,
146 pub response_message: Option<String>,
147 pub correlation_id: String,
148 pub duration_ms: u64,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
152#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
153pub enum AttemptStatus {
154 Success,
155 Timeout,
156 Rejected,
157 Error,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct EvidencePack {
162 pub pack_id: String,
163 pub created_at: String,
164 pub contents: EvidenceContents,
165 pub hash: String,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct EvidenceContents {
170 pub run_id: String,
171 pub dataset_version: String,
172 pub artifact_hash: String,
173 pub submission_attempts: Vec<SubmissionAttempt>,
174 pub audit_log_hash: String,
175 pub approval_trail: Option<serde_json::Value>,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct AuditEvent {
180 pub event_type: String,
181 pub timestamp: String,
182 pub actor: String,
183 pub details: serde_json::Value,
184}
185
186#[derive(Debug)]
191pub struct EngineState {
192 pub submissions: RwLock<HashMap<String, SubmissionResponse>>,
193 pub dlq: RwLock<Vec<DlqEntry>>,
194 pub start_time: std::time::Instant,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct DlqEntry {
199 pub submission_id: String,
200 pub reason: String,
201 pub timestamp: String,
202 pub attempts: u32,
203 pub last_error: String,
204}
205
206impl EngineState {
207 pub fn new() -> Self {
208 Self {
209 submissions: RwLock::new(HashMap::new()),
210 dlq: RwLock::new(Vec::new()),
211 start_time: std::time::Instant::now(),
212 }
213 }
214}
215
216impl Default for EngineState {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222pub struct EngineSimulator {
227 state: Arc<EngineState>,
228}
229
230impl EngineSimulator {
231 pub fn new() -> Self {
232 Self {
233 state: Arc::new(EngineState::new()),
234 }
235 }
236
237 pub fn router(&self) -> Router {
238 let state = self.state.clone();
239
240 Router::new()
241 .route("/health", get(health_check))
243 .route("/webhook/:tenant_id/:workflow_id/:trigger_id", post(handle_webhook))
245 .route("/internal/dlq", get(get_dlq_entries))
247 .route("/internal/dlq/:submission_id/retry", post(retry_dlq_entry))
248 .route("/submissions/:submission_id", get(get_submission_status))
250 .with_state(state)
251 }
252
253 pub fn state(&self) -> Arc<EngineState> {
254 self.state.clone()
255 }
256}
257
258impl Default for EngineSimulator {
259 fn default() -> Self {
260 Self::new()
261 }
262}
263
264pub struct AdminSimulator {
269 state: Arc<EngineState>,
270}
271
272impl AdminSimulator {
273 pub fn new(state: Arc<EngineState>) -> Self {
274 Self { state }
275 }
276
277 pub fn router(&self) -> Router {
278 let state = self.state.clone();
279
280 Router::new()
281 .route("/health", get(admin_health_check))
282 .route("/metrics", get(metrics))
283 .with_state(state)
284 }
285}
286
287async fn health_check(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
292 let uptime = state.start_time.elapsed().as_secs();
293 Json(serde_json::json!({
294 "status": "healthy",
295 "service": "engine-simulator",
296 "version": "1.0.0",
297 "uptime_secs": uptime
298 }))
299}
300
301async fn admin_health_check(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
302 let uptime = state.start_time.elapsed().as_secs();
303 Json(serde_json::json!({
304 "status": "healthy",
305 "service": "admin-simulator",
306 "version": "1.0.0",
307 "uptime_secs": uptime
308 }))
309}
310
311async fn metrics(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
312 let submissions = state.submissions.read().await;
313 let dlq = state.dlq.read().await;
314
315 Json(serde_json::json!({
316 "total_submissions": submissions.len(),
317 "dlq_size": dlq.len(),
318 "uptime_secs": state.start_time.elapsed().as_secs()
319 }))
320}
321
322async fn handle_webhook(
323 State(state): State<Arc<EngineState>>,
324 Path((_tenant_id, _workflow_id, _trigger_id)): Path<(String, String, String)>,
325 Json(request): Json<SubmissionRequest>,
326) -> impl IntoResponse {
327 let start = std::time::Instant::now();
328 let submission_id = request.submission_id.clone();
329 let execution_id = format!("EXEC-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
330
331 let artifact = generate_artifact(&request);
333
334 let (status, attempts, error_reason) = submit_to_regulator(&request, &artifact).await;
336
337 let audit_events = generate_audit_events(&submission_id, &status, &attempts);
339
340 let evidence_pack = if request.options.include_evidence_pack {
342 Some(generate_evidence_pack(&request, &artifact, &attempts))
343 } else {
344 None
345 };
346
347 let response = SubmissionResponse {
348 execution_id: execution_id.clone(),
349 submission_id: submission_id.clone(),
350 status,
351 artifact: Some(artifact),
352 attempts,
353 evidence_pack,
354 audit_events,
355 duration_ms: start.elapsed().as_millis() as u64,
356 };
357
358 {
360 let mut submissions = state.submissions.write().await;
361 submissions.insert(submission_id.clone(), response.clone());
362 }
363
364 if status == SubmissionStatus::Escalated || status == SubmissionStatus::Failed {
366 let mut dlq = state.dlq.write().await;
367 dlq.push(DlqEntry {
368 submission_id: submission_id.clone(),
369 reason: error_reason.unwrap_or_else(|| "Unknown".to_string()),
370 timestamp: Utc::now().to_rfc3339(),
371 attempts: response.attempts.len() as u32,
372 last_error: response.attempts.last()
373 .and_then(|a| a.response_message.clone())
374 .unwrap_or_else(|| "No error".to_string()),
375 });
376 }
377
378 (StatusCode::OK, Json(response))
379}
380
381async fn get_dlq_entries(State(state): State<Arc<EngineState>>) -> impl IntoResponse {
382 let dlq = state.dlq.read().await;
383 Json(dlq.clone())
384}
385
386async fn retry_dlq_entry(
387 State(state): State<Arc<EngineState>>,
388 Path(submission_id): Path<String>,
389) -> impl IntoResponse {
390 let mut dlq = state.dlq.write().await;
391
392 if let Some(pos) = dlq.iter().position(|e| e.submission_id == submission_id) {
394 dlq.remove(pos);
395 (StatusCode::OK, Json(serde_json::json!({"status": "retrying", "submission_id": submission_id})))
396 } else {
397 (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Entry not found in DLQ"})))
398 }
399}
400
401async fn get_submission_status(
402 State(state): State<Arc<EngineState>>,
403 Path(submission_id): Path<String>,
404) -> impl IntoResponse {
405 let submissions = state.submissions.read().await;
406
407 if let Some(submission) = submissions.get(&submission_id) {
408 (StatusCode::OK, Json(serde_json::json!(submission)))
409 } else {
410 (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Submission not found"})))
411 }
412}
413
414fn generate_artifact(request: &SubmissionRequest) -> Artifact {
419 let artifact_id = format!("ART-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
420 let content = format!("{}:{}:{}", request.submission_id, request.locked_dataset_version, Utc::now());
421 let artifact_hash = format!("{:x}", md5::compute(content.as_bytes()));
422
423 Artifact {
424 artifact_id,
425 artifact_hash,
426 format: request.artifact_format.clone(),
427 row_count: 1000, file_size: 1024 * 50, locked_dataset_version: request.locked_dataset_version.clone(),
430 created_at: Utc::now().to_rfc3339(),
431 precheck_result: Some(PrecheckResult {
432 passed: true,
433 checks: vec![
434 PrecheckItem {
435 check_name: "schema_validation".to_string(),
436 passed: true,
437 message: None,
438 },
439 PrecheckItem {
440 check_name: "hash_verification".to_string(),
441 passed: true,
442 message: None,
443 },
444 PrecheckItem {
445 check_name: "size_limit".to_string(),
446 passed: true,
447 message: None,
448 },
449 ],
450 }),
451 }
452}
453
454async fn submit_to_regulator(
455 request: &SubmissionRequest,
456 artifact: &Artifact,
457) -> (SubmissionStatus, Vec<SubmissionAttempt>, Option<String>) {
458 let client = reqwest::Client::builder()
459 .timeout(std::time::Duration::from_secs(30))
460 .build()
461 .unwrap();
462
463 let max_attempts = request.options.retry_policy.max_attempts;
464 let mut attempts = Vec::new();
465 let mut last_error: Option<String> = None;
466
467 for attempt_num in 1..=max_attempts {
468 let attempt_start = std::time::Instant::now();
469 let correlation_id = format!("CORR-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
470
471 let submit_url = format!("{}/api/v1/submit", REGULATOR_ENDPOINT_URL);
472
473 let mock_records: Vec<serde_json::Value> = (0..artifact.row_count.min(10))
475 .map(|i| serde_json::json!({
476 "id": format!("REC-{:06}", i),
477 "nik": format!("32050219900{:05}", i),
478 "nama_lengkap": format!("Test User {}", i),
479 "jenis_fasilitas": "KMK",
480 "jumlah_kredit": 10000000,
481 "mata_uang": "IDR",
482 "suku_bunga": 12.5,
483 "tanggal_mulai": "2024-01-01",
484 "tanggal_jatuh_tempo": "2025-01-01",
485 "saldo_outstanding": 8000000,
486 "kolektabilitas": 1
487 }))
488 .collect();
489
490 let result = client
491 .post(&submit_url)
492 .json(&serde_json::json!({
493 "submission_id": request.submission_id,
494 "reporting_period": "202501",
495 "bank_code": "BANK001",
496 "records": mock_records,
497 "metadata": {
498 "artifact_hash": artifact.artifact_hash,
499 "format": artifact.format,
500 "dataset_version": request.locked_dataset_version
501 }
502 }))
503 .send()
504 .await;
505
506 match result {
507 Ok(response) => {
508 let status_code = response.status().as_u16();
509 let duration = attempt_start.elapsed().as_millis() as u64;
510
511 if response.status().is_success() {
512 attempts.push(SubmissionAttempt {
513 attempt_number: attempt_num,
514 timestamp: Utc::now().to_rfc3339(),
515 status: AttemptStatus::Success,
516 response_code: Some(status_code),
517 response_message: None,
518 correlation_id,
519 duration_ms: duration,
520 });
521 return (SubmissionStatus::Acknowledged, attempts, None);
522 } else if status_code == 400 || status_code == 422 {
523 let body = response.text().await.unwrap_or_default();
525 attempts.push(SubmissionAttempt {
526 attempt_number: attempt_num,
527 timestamp: Utc::now().to_rfc3339(),
528 status: AttemptStatus::Rejected,
529 response_code: Some(status_code),
530 response_message: Some(body.clone()),
531 correlation_id,
532 duration_ms: duration,
533 });
534 return (SubmissionStatus::Rejected, attempts, Some(body));
535 } else {
536 let body = response.text().await.unwrap_or_default();
538 last_error = Some(body.clone());
539 attempts.push(SubmissionAttempt {
540 attempt_number: attempt_num,
541 timestamp: Utc::now().to_rfc3339(),
542 status: AttemptStatus::Error,
543 response_code: Some(status_code),
544 response_message: Some(body),
545 correlation_id,
546 duration_ms: duration,
547 });
548 }
549 }
550 Err(e) => {
551 let duration = attempt_start.elapsed().as_millis() as u64;
552 let (error_status, error_msg) = if e.is_timeout() {
553 (AttemptStatus::Timeout, "Request timeout".to_string())
554 } else if e.is_connect() {
555 (AttemptStatus::Error, "Connection failed".to_string())
556 } else {
557 (AttemptStatus::Error, format!("Request error: {}", e))
558 };
559
560 last_error = Some(error_msg.clone());
561 attempts.push(SubmissionAttempt {
562 attempt_number: attempt_num,
563 timestamp: Utc::now().to_rfc3339(),
564 status: error_status,
565 response_code: None,
566 response_message: Some(error_msg),
567 correlation_id,
568 duration_ms: duration,
569 });
570 }
571 }
572
573 if attempt_num < max_attempts {
575 let delay = calculate_backoff(attempt_num, &request.options.retry_policy);
576 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
577 }
578 }
579
580 if attempts.len() as u32 >= max_attempts {
582 (SubmissionStatus::Escalated, attempts, last_error)
583 } else {
584 (SubmissionStatus::Failed, attempts, last_error)
585 }
586}
587
588fn calculate_backoff(attempt: u32, policy: &RetryPolicy) -> u64 {
589 let delay = (policy.initial_backoff_ms as f64) * policy.backoff_multiplier.powi(attempt as i32 - 1);
590 (delay as u64).min(policy.max_backoff_ms)
591}
592
593fn generate_audit_events(
594 submission_id: &str,
595 status: &SubmissionStatus,
596 attempts: &[SubmissionAttempt],
597) -> Vec<AuditEvent> {
598 let mut events = vec![
599 AuditEvent {
600 event_type: "submission_started".to_string(),
601 timestamp: Utc::now().to_rfc3339(),
602 actor: "engine".to_string(),
603 details: serde_json::json!({
604 "submission_id": submission_id
605 }),
606 },
607 AuditEvent {
609 event_type: "artifact_generated".to_string(),
610 timestamp: Utc::now().to_rfc3339(),
611 actor: "engine".to_string(),
612 details: serde_json::json!({
613 "submission_id": submission_id,
614 "format": "XML"
615 }),
616 },
617 ];
618
619 for attempt in attempts {
620 let attempt_event_type = match attempt.status {
621 AttemptStatus::Success => "attempt_success".to_string(),
622 AttemptStatus::Timeout => "attempt_timeout".to_string(),
623 AttemptStatus::Rejected => "attempt_rejected".to_string(),
624 AttemptStatus::Error => "attempt_error".to_string(),
625 };
626
627 events.push(AuditEvent {
628 event_type: attempt_event_type,
629 timestamp: attempt.timestamp.clone(),
630 actor: "engine".to_string(),
631 details: serde_json::json!({
632 "attempt": attempt.attempt_number,
633 "duration_ms": attempt.duration_ms,
634 "response_code": attempt.response_code,
635 "correlation_id": attempt.correlation_id
636 }),
637 });
638 }
639
640 let final_event_type = match status {
642 SubmissionStatus::Acknowledged | SubmissionStatus::Submitted => "submission_success".to_string(),
643 SubmissionStatus::Rejected => "submission_rejected".to_string(),
644 SubmissionStatus::Escalated => "submission_escalated".to_string(),
645 SubmissionStatus::Failed => "submission_failed".to_string(),
646 _ => format!("submission_{:?}", status).to_lowercase(),
647 };
648
649 events.push(AuditEvent {
650 event_type: final_event_type,
651 timestamp: Utc::now().to_rfc3339(),
652 actor: "engine".to_string(),
653 details: serde_json::json!({
654 "final_status": format!("{:?}", status),
655 "total_attempts": attempts.len()
656 }),
657 });
658
659 events
660}
661
662fn generate_evidence_pack(
663 request: &SubmissionRequest,
664 artifact: &Artifact,
665 attempts: &[SubmissionAttempt],
666) -> EvidencePack {
667 let pack_id = format!("EVP-{}", Uuid::new_v4().to_string()[..8].to_uppercase());
668
669 let contents = EvidenceContents {
670 run_id: request.submission_id.clone(),
671 dataset_version: request.locked_dataset_version.clone(),
672 artifact_hash: artifact.artifact_hash.clone(),
673 submission_attempts: attempts.to_vec(),
674 audit_log_hash: format!("{:x}", md5::compute(format!("{:?}", attempts).as_bytes())),
675 approval_trail: None,
676 };
677
678 let hash = format!("{:x}", md5::compute(format!("{:?}", contents).as_bytes()));
679
680 EvidencePack {
681 pack_id,
682 created_at: Utc::now().to_rfc3339(),
683 contents,
684 hash,
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691
692 #[test]
693 fn test_default_retry_policy() {
694 let policy = RetryPolicy::default();
695 assert_eq!(policy.max_attempts, 3);
696 assert_eq!(policy.initial_backoff_ms, 1000);
697 }
698
699 #[test]
700 fn test_calculate_backoff() {
701 let policy = RetryPolicy::default();
702
703 let delay1 = calculate_backoff(1, &policy);
704 assert_eq!(delay1, 1000);
705
706 let delay2 = calculate_backoff(2, &policy);
707 assert_eq!(delay2, 2000);
708
709 let delay3 = calculate_backoff(3, &policy);
710 assert_eq!(delay3, 4000);
711 }
712}