use axum::{
extract::{Query, State},
Json,
};
use chrono::{DateTime, Utc};
use serde::Deserialize;
use crate::error::AppError;
use crate::services::replay::{
ReplayCutoff, ReplayProjection, ReplayService, ReplayState,
};
#[derive(Debug, Clone, Deserialize)]
pub struct ReplayStateQuery {
pub execution_id: i64,
#[serde(default = "default_tenant_id")]
pub tenant_id: String,
#[serde(default = "default_org_id")]
pub organization_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub as_of_event_id: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub as_of_position: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub as_of_time: Option<DateTime<Utc>>,
#[serde(default = "default_projection")]
pub projection: String,
#[serde(default = "default_limit")]
pub limit: i64,
#[serde(default)]
pub resolve_payloads: bool,
}
fn default_tenant_id() -> String {
"default".to_string()
}
fn default_org_id() -> String {
"default".to_string()
}
fn default_projection() -> String {
"all".to_string()
}
fn default_limit() -> i64 {
10_000
}
pub async fn replay_state(
State(service): State<ReplayService>,
Query(query): Query<ReplayStateQuery>,
) -> Result<Json<ReplayState>, AppError> {
let cutoff = ReplayCutoff {
as_of_event_id: query.as_of_event_id,
as_of_position: query.as_of_position,
as_of_time: query.as_of_time,
};
if cutoff.set_count() > 1 {
return Err(AppError::BadRequest(
"Use only one replay cutoff: as_of_event_id, as_of_position, or as_of_time".to_string(),
));
}
let projection = ReplayProjection::parse_wire(&query.projection).ok_or_else(|| {
AppError::BadRequest(format!(
"unknown projection {:?}; expected one of: execution, stage, frame, command, business_object, loop, all",
query.projection
))
})?;
let _ = query.resolve_payloads;
let state = service
.replay_state(
&query.tenant_id,
&query.organization_id,
query.execution_id,
cutoff,
projection,
query.limit,
)
.await?;
Ok(Json(state))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn query_defaults_match_python_endpoint() {
let raw = r#"{"execution_id":42}"#;
let q: ReplayStateQuery = serde_json::from_str(raw).unwrap();
assert_eq!(q.execution_id, 42);
assert_eq!(q.tenant_id, "default");
assert_eq!(q.organization_id, "default");
assert_eq!(q.projection, "all");
assert_eq!(q.limit, 10_000);
assert!(q.as_of_event_id.is_none());
assert!(!q.resolve_payloads);
}
}