use std::time::Duration;
use axum::extract::ws::{Message, WebSocket};
use futures::{SinkExt, StreamExt};
use serde::Serialize;
use tracing::{error, info, warn};
use super::routes::AppState;
use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
#[derive(Debug, Serialize)]
pub struct MetricsUpdate {
pub timestamp: String,
pub total_entries: u64,
pub total_anomalies: u64,
pub entries_per_second: f64,
pub active_streams: u32,
pub uptime_seconds: u64,
}
#[derive(Debug, Serialize)]
pub struct EventUpdate {
pub sequence: u64,
pub timestamp: String,
pub event_type: String,
pub document_id: String,
pub company_code: String,
pub amount: String,
pub is_anomaly: bool,
}
pub struct MetricsStream;
pub async fn handle_metrics_socket(socket: WebSocket, state: AppState) {
let (mut sender, mut receiver) = socket.split();
info!("Metrics WebSocket connected");
let state_clone = state.clone();
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = interval.tick() => {
let uptime = state_clone.server_state.uptime_seconds();
let total_entries = state_clone.server_state.total_entries.load(std::sync::atomic::Ordering::Relaxed);
let entries_per_second = if uptime > 0 {
total_entries as f64 / uptime as f64
} else {
0.0
};
let update = MetricsUpdate {
timestamp: chrono::Utc::now().to_rfc3339(),
total_entries,
total_anomalies: state_clone.server_state.total_anomalies.load(std::sync::atomic::Ordering::Relaxed),
entries_per_second,
active_streams: state_clone.server_state.active_streams.load(std::sync::atomic::Ordering::Relaxed) as u32,
uptime_seconds: uptime,
};
match serde_json::to_string(&update) {
Ok(json) => {
if sender.send(Message::Text(json.into())).await.is_err() {
info!("Metrics WebSocket client disconnected");
break;
}
}
Err(e) => {
error!("Failed to serialize metrics: {}", e);
}
}
}
msg = receiver.next() => {
match msg {
Some(Ok(Message::Close(_))) | None => {
info!("Metrics WebSocket closed by client");
break;
}
Some(Ok(Message::Ping(data)))
if sender.send(Message::Pong(data.clone())).await.is_err() =>
{
break;
}
Some(Err(e)) => {
warn!("Metrics WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
}
}
pub async fn handle_events_socket(socket: WebSocket, state: AppState) {
let (mut sender, mut receiver) = socket.split();
info!("Events WebSocket connected");
state
.server_state
.active_streams
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let config = state.server_state.config.read().await.clone();
let phase_config = PhaseConfig {
generate_master_data: false,
generate_document_flows: false,
generate_journal_entries: true,
inject_anomalies: false,
show_progress: false,
..Default::default()
};
let mut sequence = 0u64;
let delay = Duration::from_millis(100);
let mut orchestrator = match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
Ok(o) => o,
Err(e) => {
error!("Failed to create orchestrator: {}", e);
state
.server_state
.active_streams
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
return;
}
};
loop {
if state
.server_state
.stream_stopped
.load(std::sync::atomic::Ordering::Relaxed)
{
info!("Events stream stopped by control command");
break;
}
while state
.server_state
.stream_paused
.load(std::sync::atomic::Ordering::Relaxed)
{
tokio::time::sleep(Duration::from_millis(100)).await;
if state
.server_state
.stream_stopped
.load(std::sync::atomic::Ordering::Relaxed)
{
break;
}
}
tokio::select! {
msg = receiver.next() => {
match msg {
Some(Ok(Message::Close(_))) | None => {
info!("Events WebSocket closed by client");
break;
}
Some(Ok(Message::Ping(data)))
if sender.send(Message::Pong(data.clone())).await.is_err() =>
{
break;
}
Some(Err(e)) => {
warn!("Events WebSocket error: {}", e);
break;
}
_ => {}
}
}
_ = tokio::time::sleep(delay) => {
let result = match orchestrator.generate() {
Ok(r) => r,
Err(e) => {
error!("Generation failed: {}", e);
break;
}
};
for entry in result.journal_entries.iter() {
sequence += 1;
state.server_state.total_stream_events.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
state.server_state.total_entries.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let total_amount: rust_decimal::Decimal = entry.lines.iter()
.map(|l| l.debit_amount)
.sum();
let event = EventUpdate {
sequence,
timestamp: chrono::Utc::now().to_rfc3339(),
event_type: "JournalEntry".to_string(),
document_id: entry.header.document_id.to_string(),
company_code: entry.header.company_code.clone(),
amount: total_amount.to_string(),
is_anomaly: entry.header.is_fraud,
};
match serde_json::to_string(&event) {
Ok(json) => {
if sender.send(Message::Text(json.into())).await.is_err() {
info!("Events WebSocket client disconnected");
break;
}
}
Err(e) => {
error!("Failed to serialize event: {}", e);
}
}
}
}
}
}
state
.server_state
.active_streams
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_metrics_update_serialization() {
let update = MetricsUpdate {
timestamp: "2024-01-01T00:00:00Z".to_string(),
total_entries: 1000,
total_anomalies: 10,
entries_per_second: 16.67,
active_streams: 1,
uptime_seconds: 60,
};
let json = serde_json::to_string(&update).unwrap();
assert!(json.contains("total_entries"));
assert!(json.contains("1000"));
}
#[test]
fn test_event_update_serialization() {
let event = EventUpdate {
sequence: 1,
timestamp: "2024-01-01T00:00:00Z".to_string(),
event_type: "JournalEntry".to_string(),
document_id: "12345".to_string(),
company_code: "1000".to_string(),
amount: "1000.00".to_string(),
is_anomaly: false,
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("JournalEntry"));
assert!(json.contains("12345"));
}
}