use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Json, Response};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sonda_core::encoder::prometheus::PrometheusText;
use sonda_core::encoder::Encoder;
use sonda_core::ScenarioStats;
use tracing::{info, warn};
use uuid::Uuid;
use sonda_core::config::{LogScenarioConfig, ScenarioConfig, ScenarioEntry};
use sonda_core::schedule::launch::{launch_scenario, validate_entry};
use crate::state::AppState;
#[derive(Debug, Serialize)]
pub struct CreatedScenario {
pub id: String,
pub name: String,
pub status: &'static str,
}
#[derive(Debug, Serialize)]
pub struct ScenarioSummary {
pub id: String,
pub name: String,
pub status: String,
pub elapsed_secs: f64,
}
#[derive(Debug, Serialize)]
pub struct ListScenariosResponse {
pub scenarios: Vec<ScenarioSummary>,
}
#[derive(Debug, Serialize)]
pub struct ScenarioDetail {
pub id: String,
pub name: String,
pub status: String,
pub elapsed_secs: f64,
pub stats: StatsResponse,
}
#[derive(Debug, Serialize)]
pub struct DeletedScenario {
pub id: String,
pub status: String,
pub total_events: u64,
}
#[derive(Debug, Serialize)]
pub struct StatsResponse {
pub total_events: u64,
pub current_rate: f64,
pub bytes_emitted: u64,
pub errors: u64,
}
impl From<ScenarioStats> for StatsResponse {
fn from(s: ScenarioStats) -> Self {
Self {
total_events: s.total_events,
current_rate: s.current_rate,
bytes_emitted: s.bytes_emitted,
errors: s.errors,
}
}
}
#[derive(Debug, Serialize)]
pub struct DetailedStatsResponse {
pub total_events: u64,
pub current_rate: f64,
pub target_rate: f64,
pub bytes_emitted: u64,
pub errors: u64,
pub uptime_secs: f64,
pub state: String,
pub in_gap: bool,
pub in_burst: bool,
}
fn bad_request(detail: impl std::fmt::Display) -> Response {
let body = json!({ "error": "bad_request", "detail": detail.to_string() });
(StatusCode::BAD_REQUEST, Json(body)).into_response()
}
fn unprocessable(detail: impl std::fmt::Display) -> Response {
let body = json!({ "error": "unprocessable_entity", "detail": detail.to_string() });
(StatusCode::UNPROCESSABLE_ENTITY, Json(body)).into_response()
}
fn not_found(detail: impl std::fmt::Display) -> Response {
let body = json!({ "error": "not_found", "detail": detail.to_string() });
(StatusCode::NOT_FOUND, Json(body)).into_response()
}
fn internal_error(detail: impl std::fmt::Display) -> Response {
let body = json!({ "error": "internal_server_error", "detail": detail.to_string() });
(StatusCode::INTERNAL_SERVER_ERROR, Json(body)).into_response()
}
fn status_string(running: bool) -> String {
if running {
"running".to_string()
} else {
"stopped".to_string()
}
}
fn is_yaml_content_type(headers: &HeaderMap) -> bool {
headers
.get(axum::http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|ct| ct.contains("yaml"))
.unwrap_or(true) }
fn parse_body(body: &[u8], headers: &HeaderMap) -> Result<ScenarioEntry, String> {
if is_yaml_content_type(headers) {
parse_yaml_body(body)
} else {
parse_json_body(body)
}
}
fn parse_yaml_body(body: &[u8]) -> Result<ScenarioEntry, String> {
let text =
std::str::from_utf8(body).map_err(|e| format!("request body is not valid UTF-8: {e}"))?;
if let Ok(entry) = serde_yaml_ng::from_str::<ScenarioEntry>(text) {
return Ok(entry);
}
if let Ok(config) = serde_yaml_ng::from_str::<ScenarioConfig>(text) {
return Ok(ScenarioEntry::Metrics(config));
}
if let Ok(config) = serde_yaml_ng::from_str::<LogScenarioConfig>(text) {
return Ok(ScenarioEntry::Logs(config));
}
let yaml_err = serde_yaml_ng::from_str::<ScenarioEntry>(text)
.err()
.map(|e| e.to_string())
.unwrap_or_else(|| "unknown YAML parse error".to_string());
Err(format!("invalid YAML scenario body: {yaml_err}"))
}
fn parse_json_body(body: &[u8]) -> Result<ScenarioEntry, String> {
if let Ok(entry) = serde_json::from_slice::<ScenarioEntry>(body) {
return Ok(entry);
}
if let Ok(config) = serde_json::from_slice::<ScenarioConfig>(body) {
return Ok(ScenarioEntry::Metrics(config));
}
let json_err = serde_json::from_slice::<serde_json::Value>(body)
.map(|_| "JSON parsed but did not match any scenario schema".to_string())
.unwrap_or_else(|e| format!("invalid JSON: {e}"));
Err(format!("invalid JSON scenario body: {json_err}"))
}
pub async fn post_scenario(
State(state): State<AppState>,
headers: HeaderMap,
body: axum::body::Bytes,
) -> Result<Response, Response> {
let entry = parse_body(&body, &headers).map_err(|msg| {
warn!(error = %msg, "POST /scenarios: invalid request body");
bad_request(msg)
})?;
validate_entry(&entry).map_err(|e| {
warn!(error = %e, "POST /scenarios: validation failed");
unprocessable(e)
})?;
let id = Uuid::new_v4().to_string();
let name = match &entry {
ScenarioEntry::Metrics(c) => c.name.clone(),
ScenarioEntry::Logs(c) => c.name.clone(),
};
let shutdown = Arc::new(AtomicBool::new(true));
let handle = launch_scenario(id.clone(), entry, shutdown, None).map_err(|e| {
warn!(error = %e, "POST /scenarios: failed to launch scenario");
internal_error(e)
})?;
info!(id = %id, name = %name, "scenario launched");
state
.scenarios
.write()
.map_err(|e| {
warn!(error = %e, "POST /scenarios: scenarios lock is poisoned");
internal_error("internal state lock is poisoned")
})?
.insert(id.clone(), handle);
let response_body = CreatedScenario {
id,
name,
status: "running",
};
Ok((StatusCode::CREATED, Json(response_body)).into_response())
}
pub async fn list_scenarios(State(state): State<AppState>) -> Result<impl IntoResponse, Response> {
let scenarios = state
.scenarios
.read()
.map_err(|e| internal_error(format!("scenarios lock is poisoned: {e}")))?;
let summaries: Vec<ScenarioSummary> = scenarios
.iter()
.map(|(id, handle)| ScenarioSummary {
id: id.clone(),
name: handle.name.clone(),
status: status_string(handle.is_running()),
elapsed_secs: handle.elapsed().as_secs_f64(),
})
.collect();
Ok(Json(ListScenariosResponse {
scenarios: summaries,
}))
}
pub async fn get_scenario(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, Response> {
let scenarios = state
.scenarios
.read()
.map_err(|e| internal_error(format!("scenarios lock is poisoned: {e}")))?;
let handle = scenarios
.get(&id)
.ok_or_else(|| not_found(format!("scenario not found: {id}")))?;
let detail = ScenarioDetail {
id: id.clone(),
name: handle.name.clone(),
status: status_string(handle.is_running()),
elapsed_secs: handle.elapsed().as_secs_f64(),
stats: handle.stats_snapshot().into(),
};
Ok(Json(detail))
}
pub async fn delete_scenario(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, Response> {
let mut scenarios = state
.scenarios
.write()
.map_err(|e| internal_error(format!("scenarios lock is poisoned: {e}")))?;
let handle = scenarios
.get_mut(&id)
.ok_or_else(|| not_found(format!("scenario not found: {id}")))?;
handle.stop();
let was_running_before_join = handle.is_running();
if let Err(e) = handle.join(Some(Duration::from_secs(5))) {
warn!(id = %id, error = %e, "DELETE /scenarios/{id}: scenario thread returned an error");
}
let status = if handle.is_running() {
warn!(id = %id, "DELETE /scenarios/{id}: join timed out after 5s, scenario force-stopped");
"force_stopped".to_string()
} else if was_running_before_join {
"stopped".to_string()
} else {
"stopped".to_string()
};
let final_stats = handle.stats_snapshot();
scenarios.remove(&id);
drop(scenarios);
info!(id = %id, status = %status, total_events = final_stats.total_events, "scenario deleted");
Ok(Json(DeletedScenario {
id,
status,
total_events: final_stats.total_events,
}))
}
pub async fn get_scenario_stats(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, Response> {
let scenarios = state
.scenarios
.read()
.map_err(|e| internal_error(format!("scenarios lock is poisoned: {e}")))?;
let handle = scenarios
.get(&id)
.ok_or_else(|| not_found(format!("scenario not found: {id}")))?;
let snap = handle.stats_snapshot();
let response = DetailedStatsResponse {
total_events: snap.total_events,
current_rate: snap.current_rate,
target_rate: handle.target_rate,
bytes_emitted: snap.bytes_emitted,
errors: snap.errors,
uptime_secs: handle.elapsed().as_secs_f64(),
state: status_string(handle.is_running()),
in_gap: snap.in_gap,
in_burst: snap.in_burst,
};
Ok(Json(response))
}
#[derive(Debug, Deserialize)]
pub struct MetricsQuery {
pub limit: Option<usize>,
}
const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
pub async fn get_scenario_metrics(
State(state): State<AppState>,
Path(id): Path<String>,
Query(query): Query<MetricsQuery>,
) -> Result<Response, Response> {
let limit = query.limit.unwrap_or(100).min(1000);
let scenarios = state
.scenarios
.read()
.map_err(|e| internal_error(format!("scenarios lock is poisoned: {e}")))?;
let handle = scenarios
.get(&id)
.ok_or_else(|| not_found(format!("scenario not found: {id}")))?;
let events = handle.recent_metrics();
if events.is_empty() {
return Ok(StatusCode::NO_CONTENT.into_response());
}
let events_to_encode = if events.len() > limit {
&events[events.len() - limit..]
} else {
&events
};
let encoder = PrometheusText::new(None);
let mut buf = Vec::with_capacity(events_to_encode.len() * 128);
for event in events_to_encode {
if let Err(e) = encoder.encode_metric(event, &mut buf) {
warn!(id = %id, error = %e, "GET /scenarios/{id}/metrics: failed to encode metric event");
}
}
Ok((
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
buf,
)
.into_response())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::routes::router;
use crate::state::AppState;
use axum::body::Body;
use http_body_util::BodyExt;
use hyper::{Request, StatusCode};
use sonda_core::ScenarioHandle;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use tower::ServiceExt;
fn make_handle(id: &str, name: &str, event_count: u64, interval: Duration) -> ScenarioHandle {
let shutdown = Arc::new(AtomicBool::new(true));
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let shutdown_clone = Arc::clone(&shutdown);
let stats_clone = Arc::clone(&stats);
let thread = thread::Builder::new()
.name(format!("test-{name}"))
.spawn(move || -> Result<(), sonda_core::SondaError> {
for _ in 0..event_count {
if !shutdown_clone.load(Ordering::SeqCst) {
break;
}
thread::sleep(interval);
if let Ok(mut st) = stats_clone.write() {
st.total_events += 1;
st.bytes_emitted += 64;
}
}
Ok(())
})
.expect("thread must spawn");
ScenarioHandle {
id: id.to_string(),
name: name.to_string(),
shutdown,
thread: Some(thread),
started_at: Instant::now(),
stats,
target_rate: 100.0,
}
}
fn make_stopped_handle(id: &str, name: &str) -> ScenarioHandle {
let shutdown = Arc::new(AtomicBool::new(false));
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let shutdown_clone = Arc::clone(&shutdown);
let thread = thread::Builder::new()
.name(format!("test-stopped-{name}"))
.spawn(move || -> Result<(), sonda_core::SondaError> {
let _ = shutdown_clone.load(Ordering::SeqCst);
Ok(())
})
.expect("thread must spawn");
thread::sleep(Duration::from_millis(50));
ScenarioHandle {
id: id.to_string(),
name: name.to_string(),
shutdown,
thread: Some(thread),
started_at: Instant::now(),
stats,
target_rate: 100.0,
}
}
fn router_with_handles(handles: Vec<ScenarioHandle>) -> axum::Router {
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
for h in handles {
map.insert(h.id.clone(), h);
}
}
router(state)
}
fn test_router() -> (axum::Router, AppState) {
let state = AppState::new();
let app = router(state.clone());
(app, state)
}
async fn body_json(response: axum::response::Response) -> serde_json::Value {
let bytes = response.into_body().collect().await.unwrap().to_bytes();
serde_json::from_slice(&bytes).expect("body must be valid JSON")
}
fn cleanup_scenarios(state: &AppState) {
if let Ok(scenarios) = state.scenarios.read() {
for handle in scenarios.values() {
handle.stop();
}
}
if let Ok(mut scenarios) = state.scenarios.write() {
for handle in scenarios.values_mut() {
let _ = handle.join(Some(Duration::from_secs(2)));
}
}
}
async fn post_scenarios(
app: axum::Router,
content_type: &str,
body: &str,
) -> hyper::Response<axum::body::Body> {
let request = Request::builder()
.method("POST")
.uri("/scenarios")
.header("content-type", content_type)
.body(Body::from(body.to_string()))
.unwrap();
app.oneshot(request).await.unwrap()
}
const VALID_METRICS_YAML: &str = "\
name: test_metric
rate: 10
duration: 200ms
generator:
type: constant
value: 42.0
encoder:
type: prometheus_text
sink:
type: stdout
";
const VALID_LOGS_YAML: &str = "\
name: test_logs
rate: 10
duration: 200ms
generator:
type: template
templates:
- message: \"test log event\"
field_pools: {}
seed: 0
encoder:
type: json_lines
sink:
type: stdout
";
const VALID_TAGGED_METRICS_YAML: &str = "\
signal_type: metrics
name: tagged_metric
rate: 10
duration: 200ms
generator:
type: constant
value: 1.0
encoder:
type: prometheus_text
sink:
type: stdout
";
const ZERO_RATE_YAML: &str = "\
name: bad_rate
rate: 0
duration: 1s
generator:
type: constant
value: 1.0
encoder:
type: prometheus_text
sink:
type: stdout
";
#[tokio::test]
async fn list_scenarios_empty_returns_empty_array() {
let app = router_with_handles(vec![]);
let req = Request::builder()
.uri("/scenarios")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
let scenarios = body["scenarios"]
.as_array()
.expect("scenarios must be an array");
assert!(
scenarios.is_empty(),
"empty state must return empty scenarios array"
);
}
#[tokio::test]
async fn list_scenarios_returns_both_when_two_present() {
let h1 = make_handle("id-aaa", "scenario_alpha", 1000, Duration::from_millis(50));
let h2 = make_handle("id-bbb", "scenario_beta", 1000, Duration::from_millis(50));
let app = router_with_handles(vec![h1, h2]);
let req = Request::builder()
.uri("/scenarios")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
let scenarios = body["scenarios"]
.as_array()
.expect("scenarios must be an array");
assert_eq!(
scenarios.len(),
2,
"must list exactly 2 scenarios, got {}",
scenarios.len()
);
let mut ids: Vec<&str> = scenarios
.iter()
.map(|s| s["id"].as_str().unwrap())
.collect();
ids.sort();
assert_eq!(ids, vec!["id-aaa", "id-bbb"]);
let mut names: Vec<&str> = scenarios
.iter()
.map(|s| s["name"].as_str().unwrap())
.collect();
names.sort();
assert_eq!(names, vec!["scenario_alpha", "scenario_beta"]);
}
#[tokio::test]
async fn list_scenarios_response_shape_has_required_fields() {
let h = make_handle("id-shape", "shape_test", 1000, Duration::from_millis(50));
let app = router_with_handles(vec![h]);
let req = Request::builder()
.uri("/scenarios")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
let body = body_json(resp).await;
let entry = &body["scenarios"][0];
assert!(entry["id"].is_string(), "id must be a string");
assert!(entry["name"].is_string(), "name must be a string");
assert!(entry["status"].is_string(), "status must be a string");
assert!(
entry["elapsed_secs"].is_f64(),
"elapsed_secs must be a number"
);
}
#[tokio::test]
async fn get_scenario_returns_correct_name_status_elapsed() {
let h = make_handle(
"id-detail",
"detail_scenario",
1000,
Duration::from_millis(50),
);
let app = router_with_handles(vec![h]);
thread::sleep(Duration::from_millis(20));
let req = Request::builder()
.uri("/scenarios/id-detail")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["id"].as_str().unwrap(), "id-detail");
assert_eq!(body["name"].as_str().unwrap(), "detail_scenario");
assert_eq!(
body["status"].as_str().unwrap(),
"running",
"a live scenario must have status 'running'"
);
let elapsed = body["elapsed_secs"].as_f64().unwrap();
assert!(
elapsed > 0.0,
"elapsed_secs must be positive, got {elapsed}"
);
}
#[tokio::test]
async fn get_scenario_response_has_stats_fields() {
let h = make_handle(
"id-stats-fields",
"stats_check",
1000,
Duration::from_millis(50),
);
let app = router_with_handles(vec![h]);
let req = Request::builder()
.uri("/scenarios/id-stats-fields")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
let body = body_json(resp).await;
let stats = &body["stats"];
assert!(stats.is_object(), "response must include a stats object");
assert!(
stats.get("total_events").is_some(),
"stats must have total_events"
);
assert!(
stats.get("current_rate").is_some(),
"stats must have current_rate"
);
assert!(
stats.get("bytes_emitted").is_some(),
"stats must have bytes_emitted"
);
assert!(stats.get("errors").is_some(), "stats must have errors");
}
#[tokio::test]
async fn get_scenario_stats_total_events_positive_after_running() {
let h = make_handle("id-events", "events_check", 500, Duration::from_millis(10));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
thread::sleep(Duration::from_millis(200));
let app = router(state);
let req = Request::builder()
.uri("/scenarios/id-events")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
let body = body_json(resp).await;
let total_events = body["stats"]["total_events"].as_u64().unwrap();
assert!(
total_events > 0,
"stats.total_events must be > 0 after running, got {total_events}"
);
}
#[tokio::test]
async fn get_scenario_nonexistent_returns_404_with_json_body() {
let app = router_with_handles(vec![]);
let req = Request::builder()
.uri("/scenarios/nonexistent-id")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
StatusCode::NOT_FOUND,
"nonexistent scenario ID must return 404"
);
let body = body_json(resp).await;
assert_eq!(
body["error"].as_str().unwrap(),
"not_found",
"404 response must have error field set to 'not_found'"
);
assert_eq!(
body["detail"].as_str().unwrap(),
"scenario not found: nonexistent-id",
"404 response detail must include the requested scenario ID"
);
}
#[tokio::test]
async fn get_scenario_nonexistent_returns_json_content_type() {
let app = router_with_handles(vec![]);
let req = Request::builder()
.uri("/scenarios/some-missing-id")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let ct = resp
.headers()
.get("content-type")
.expect("404 response must have Content-Type header")
.to_str()
.unwrap();
assert!(
ct.contains("application/json"),
"404 Content-Type must be application/json, got: {ct}"
);
}
#[tokio::test]
async fn get_scenario_stopped_reports_stopped_status() {
let h = make_stopped_handle("id-stopped", "stopped_scenario");
let app = router_with_handles(vec![h]);
let req = Request::builder()
.uri("/scenarios/id-stopped")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(
body["status"].as_str().unwrap(),
"stopped",
"a finished scenario must have status 'stopped'"
);
}
#[tokio::test]
async fn elapsed_secs_tracks_real_time_within_one_second() {
let h = make_handle(
"id-elapsed",
"elapsed_test",
10000,
Duration::from_millis(50),
);
let created_at = Instant::now();
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
thread::sleep(Duration::from_millis(500));
let app = router(state);
let req = Request::builder()
.uri("/scenarios/id-elapsed")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
let body = body_json(resp).await;
let reported_elapsed = body["elapsed_secs"].as_f64().unwrap();
let actual_elapsed = created_at.elapsed().as_secs_f64();
let diff = (reported_elapsed - actual_elapsed).abs();
assert!(
diff < 1.0,
"elapsed_secs must be within 1 second of real time: reported={reported_elapsed:.3}, actual={actual_elapsed:.3}, diff={diff:.3}"
);
}
#[tokio::test]
async fn list_scenarios_sets_json_content_type() {
let app = router_with_handles(vec![]);
let req = Request::builder()
.uri("/scenarios")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
let ct = resp
.headers()
.get("content-type")
.expect("response must have Content-Type")
.to_str()
.unwrap();
assert!(
ct.contains("application/json"),
"Content-Type must be application/json, got: {ct}"
);
}
#[test]
fn stats_response_from_scenario_stats_converts_all_fields() {
let stats = ScenarioStats {
total_events: 42,
bytes_emitted: 1024,
current_rate: 10.5,
errors: 3,
in_gap: true,
in_burst: false,
..Default::default()
};
let resp: StatsResponse = stats.into();
assert_eq!(resp.total_events, 42);
assert_eq!(resp.bytes_emitted, 1024);
assert_eq!((resp.current_rate * 10.0).round(), 105.0);
assert_eq!(resp.errors, 3);
}
#[test]
fn status_string_true_returns_running() {
assert_eq!(status_string(true), "running");
}
#[test]
fn status_string_false_returns_stopped() {
assert_eq!(status_string(false), "stopped");
}
#[test]
fn scenario_summary_serializes_correctly() {
let s = ScenarioSummary {
id: "abc".to_string(),
name: "test".to_string(),
status: "running".to_string(),
elapsed_secs: 1.5,
};
let json = serde_json::to_value(&s).unwrap();
assert_eq!(json["id"], "abc");
assert_eq!(json["name"], "test");
assert_eq!(json["status"], "running");
assert_eq!(json["elapsed_secs"], 1.5);
}
#[test]
fn scenario_detail_serializes_with_nested_stats() {
let d = ScenarioDetail {
id: "xyz".to_string(),
name: "detail".to_string(),
status: "stopped".to_string(),
elapsed_secs: 42.0,
stats: StatsResponse {
total_events: 100,
current_rate: 5.0,
bytes_emitted: 2048,
errors: 1,
},
};
let json = serde_json::to_value(&d).unwrap();
assert_eq!(json["id"], "xyz");
assert_eq!(json["stats"]["total_events"], 100);
assert_eq!(json["stats"]["errors"], 1);
}
#[tokio::test]
async fn post_valid_metrics_yaml_returns_201_with_id() {
let (app, state) = test_router();
let response = post_scenarios(app, "application/x-yaml", VALID_METRICS_YAML).await;
assert_eq!(
response.status(),
StatusCode::CREATED,
"POST valid metrics YAML must return 201 Created"
);
let body = body_json(response).await;
assert!(
body["id"].is_string() && !body["id"].as_str().unwrap().is_empty(),
"response must contain a non-empty 'id' string, got: {body}"
);
assert_eq!(
body["name"], "test_metric",
"response name must match the scenario name"
);
assert_eq!(
body["status"], "running",
"status must be 'running' for a freshly launched scenario"
);
{
let scenarios = state.scenarios.read().expect("lock must not be poisoned");
let id = body["id"].as_str().unwrap();
assert!(
scenarios.contains_key(id),
"AppState must contain the handle for the newly created scenario ID"
);
}
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_valid_logs_yaml_returns_201() {
let (app, state) = test_router();
let response = post_scenarios(app, "text/yaml", VALID_LOGS_YAML).await;
assert_eq!(
response.status(),
StatusCode::CREATED,
"POST valid logs YAML must return 201 Created"
);
let body = body_json(response).await;
assert!(
body["id"].is_string() && !body["id"].as_str().unwrap().is_empty(),
"response must contain a non-empty 'id' for logs scenario"
);
assert_eq!(
body["name"], "test_logs",
"response name must match the logs scenario name"
);
assert_eq!(body["status"], "running");
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_tagged_metrics_yaml_returns_201() {
let (app, state) = test_router();
let response = post_scenarios(app, "application/x-yaml", VALID_TAGGED_METRICS_YAML).await;
assert_eq!(
response.status(),
StatusCode::CREATED,
"POST YAML with signal_type: metrics must return 201 Created"
);
let body = body_json(response).await;
assert_eq!(
body["name"], "tagged_metric",
"name must match the tagged scenario name"
);
assert_eq!(body["status"], "running");
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_invalid_yaml_returns_400() {
let (app, _state) = test_router();
let response =
post_scenarios(app, "application/x-yaml", "this is not valid yaml: [}{").await;
assert_eq!(
response.status(),
StatusCode::BAD_REQUEST,
"POST invalid YAML must return 400 Bad Request"
);
let body = body_json(response).await;
assert_eq!(
body["error"], "bad_request",
"error field must be 'bad_request'"
);
assert!(
body["detail"].is_string() && !body["detail"].as_str().unwrap().is_empty(),
"detail field must contain a non-empty error description"
);
}
#[tokio::test]
async fn post_empty_body_returns_400() {
let (app, _state) = test_router();
let response = post_scenarios(app, "application/x-yaml", "").await;
assert_eq!(
response.status(),
StatusCode::BAD_REQUEST,
"POST empty body must return 400 Bad Request"
);
}
#[tokio::test]
async fn post_yaml_missing_required_fields_returns_400() {
let (app, _state) = test_router();
let response = post_scenarios(app, "text/yaml", "foo: bar\nbaz: 123\n").await;
assert_eq!(
response.status(),
StatusCode::BAD_REQUEST,
"POST YAML missing required fields must return 400"
);
}
#[tokio::test]
async fn post_yaml_with_zero_rate_returns_422() {
let (app, _state) = test_router();
let response = post_scenarios(app, "application/x-yaml", ZERO_RATE_YAML).await;
assert_eq!(
response.status(),
StatusCode::UNPROCESSABLE_ENTITY,
"POST YAML with rate=0 must return 422 Unprocessable Entity"
);
let body = body_json(response).await;
assert_eq!(
body["error"], "unprocessable_entity",
"error field must be 'unprocessable_entity'"
);
assert!(
body["detail"].is_string() && !body["detail"].as_str().unwrap().is_empty(),
"detail must contain a description of the validation failure"
);
}
#[tokio::test]
async fn post_scenario_thread_is_running() {
let (app, state) = test_router();
let response = post_scenarios(app, "text/yaml", VALID_METRICS_YAML).await;
assert_eq!(response.status(), StatusCode::CREATED);
let body = body_json(response).await;
let id = body["id"].as_str().unwrap().to_string();
let scenarios = state.scenarios.read().expect("lock must not be poisoned");
let handle = scenarios
.get(&id)
.expect("handle must exist in AppState after POST");
assert!(
handle.is_running(),
"scenario thread must be running after POST (is_running() must return true)"
);
drop(scenarios);
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_with_application_x_yaml_content_type_returns_201() {
let (app, state) = test_router();
let response = post_scenarios(app, "application/x-yaml", VALID_METRICS_YAML).await;
assert_eq!(
response.status(),
StatusCode::CREATED,
"application/x-yaml content type must be accepted"
);
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_with_text_yaml_content_type_returns_201() {
let (app, state) = test_router();
let response = post_scenarios(app, "text/yaml", VALID_METRICS_YAML).await;
assert_eq!(
response.status(),
StatusCode::CREATED,
"text/yaml content type must be accepted"
);
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_with_json_content_type_returns_201() {
let json_body = serde_json::json!({
"signal_type": "metrics",
"name": "json_metric",
"rate": 10,
"duration": "200ms",
"generator": { "type": "constant", "value": 1.0 },
"encoder": { "type": "prometheus_text" },
"sink": { "type": "stdout" }
});
let (app, state) = test_router();
let response = post_scenarios(app, "application/json", &json_body.to_string()).await;
assert_eq!(
response.status(),
StatusCode::CREATED,
"application/json content type must be accepted for valid JSON scenario"
);
let body = body_json(response).await;
assert_eq!(body["name"], "json_metric");
assert_eq!(body["status"], "running");
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_invalid_json_returns_400() {
let (app, _state) = test_router();
let response = post_scenarios(app, "application/json", "not json {{{").await;
assert_eq!(
response.status(),
StatusCode::BAD_REQUEST,
"invalid JSON body must return 400"
);
}
#[tokio::test]
async fn post_with_no_content_type_defaults_to_yaml() {
let (app, state) = test_router();
let request = Request::builder()
.method("POST")
.uri("/scenarios")
.body(Body::from(VALID_METRICS_YAML.to_string()))
.unwrap();
let response = app.oneshot(request).await.unwrap();
assert_eq!(
response.status(),
StatusCode::CREATED,
"POST with no Content-Type header must default to YAML and succeed for valid YAML"
);
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_response_body_has_expected_keys() {
let (app, state) = test_router();
let response = post_scenarios(app, "text/yaml", VALID_METRICS_YAML).await;
assert_eq!(response.status(), StatusCode::CREATED);
let body = body_json(response).await;
let obj = body
.as_object()
.expect("response body must be a JSON object");
assert!(obj.contains_key("id"), "response must contain key 'id'");
assert!(obj.contains_key("name"), "response must contain key 'name'");
assert!(
obj.contains_key("status"),
"response must contain key 'status'"
);
assert_eq!(
obj.len(),
3,
"response must contain exactly 3 keys (id, name, status)"
);
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_response_id_is_valid_uuid() {
let (app, state) = test_router();
let response = post_scenarios(app, "text/yaml", VALID_METRICS_YAML).await;
assert_eq!(response.status(), StatusCode::CREATED);
let body = body_json(response).await;
let id_str = body["id"].as_str().expect("id must be a string");
let parsed = uuid::Uuid::parse_str(id_str);
assert!(parsed.is_ok(), "id must be a valid UUID, got: {id_str}");
cleanup_scenarios(&state);
}
#[tokio::test]
async fn post_yaml_with_negative_rate_returns_422() {
let yaml = "\
name: neg_rate
rate: -5
duration: 1s
generator:
type: constant
value: 1.0
encoder:
type: prometheus_text
sink:
type: stdout
";
let (app, _state) = test_router();
let response = post_scenarios(app, "text/yaml", yaml).await;
assert_eq!(
response.status(),
StatusCode::UNPROCESSABLE_ENTITY,
"negative rate must return 422"
);
}
#[test]
fn parse_yaml_body_accepts_bare_metrics_config() {
let result = parse_yaml_body(VALID_METRICS_YAML.as_bytes());
assert!(
result.is_ok(),
"parse_yaml_body must accept bare metrics YAML: {result:?}"
);
match result.unwrap() {
ScenarioEntry::Metrics(c) => assert_eq!(c.name, "test_metric"),
other => panic!("expected ScenarioEntry::Metrics, got: {other:?}"),
}
}
#[test]
fn parse_yaml_body_accepts_bare_logs_config() {
let result = parse_yaml_body(VALID_LOGS_YAML.as_bytes());
assert!(
result.is_ok(),
"parse_yaml_body must accept bare logs YAML: {result:?}"
);
match result.unwrap() {
ScenarioEntry::Logs(c) => assert_eq!(c.name, "test_logs"),
other => panic!("expected ScenarioEntry::Logs, got: {other:?}"),
}
}
#[test]
fn parse_yaml_body_accepts_tagged_entry() {
let result = parse_yaml_body(VALID_TAGGED_METRICS_YAML.as_bytes());
assert!(
result.is_ok(),
"parse_yaml_body must accept tagged ScenarioEntry YAML: {result:?}"
);
match result.unwrap() {
ScenarioEntry::Metrics(c) => assert_eq!(c.name, "tagged_metric"),
other => panic!("expected ScenarioEntry::Metrics, got: {other:?}"),
}
}
#[test]
fn parse_yaml_body_rejects_garbage() {
let result = parse_yaml_body(b"not valid: [}{");
assert!(result.is_err(), "parse_yaml_body must reject garbage input");
let err = result.unwrap_err();
assert!(
err.contains("invalid YAML"),
"error message must mention invalid YAML, got: {err}"
);
}
#[test]
fn parse_json_body_accepts_tagged_json() {
let json = serde_json::json!({
"signal_type": "metrics",
"name": "json_test",
"rate": 10,
"duration": "1s",
"generator": { "type": "constant", "value": 1.0 },
"encoder": { "type": "prometheus_text" },
"sink": { "type": "stdout" }
});
let result = parse_json_body(json.to_string().as_bytes());
assert!(
result.is_ok(),
"parse_json_body must accept tagged JSON: {result:?}"
);
}
#[test]
fn parse_json_body_rejects_invalid_json() {
let result = parse_json_body(b"not json");
assert!(result.is_err(), "parse_json_body must reject invalid JSON");
}
#[test]
fn is_yaml_content_type_returns_true_for_application_x_yaml() {
let mut headers = HeaderMap::new();
headers.insert("content-type", "application/x-yaml".parse().unwrap());
assert!(is_yaml_content_type(&headers));
}
#[test]
fn is_yaml_content_type_returns_true_for_text_yaml() {
let mut headers = HeaderMap::new();
headers.insert("content-type", "text/yaml".parse().unwrap());
assert!(is_yaml_content_type(&headers));
}
#[test]
fn is_yaml_content_type_returns_false_for_application_json() {
let mut headers = HeaderMap::new();
headers.insert("content-type", "application/json".parse().unwrap());
assert!(!is_yaml_content_type(&headers));
}
#[test]
fn is_yaml_content_type_defaults_to_true_when_missing() {
let headers = HeaderMap::new();
assert!(
is_yaml_content_type(&headers),
"must default to YAML when no Content-Type header is present"
);
}
#[test]
fn created_scenario_serializes_to_expected_json() {
let cs = CreatedScenario {
id: "abc-123".to_string(),
name: "my_scenario".to_string(),
status: "running",
};
let json = serde_json::to_value(&cs).expect("must serialize");
assert_eq!(json["id"], "abc-123");
assert_eq!(json["name"], "my_scenario");
assert_eq!(json["status"], "running");
}
async fn delete_scenario_req(app: axum::Router, id: &str) -> hyper::Response<axum::body::Body> {
let request = Request::builder()
.method("DELETE")
.uri(format!("/scenarios/{id}"))
.body(Body::empty())
.unwrap();
app.oneshot(request).await.unwrap()
}
#[tokio::test]
async fn delete_running_scenario_returns_stopped_status() {
let h = make_handle("id-del-run", "del_running", 1000, Duration::from_millis(50));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-del-run").await;
assert_eq!(
resp.status(),
StatusCode::OK,
"DELETE a running scenario must return 200 OK"
);
let body = body_json(resp).await;
assert_eq!(
body["status"].as_str().unwrap(),
"stopped",
"DELETE a running scenario must return status 'stopped'"
);
}
#[tokio::test]
async fn delete_returns_final_stats_with_total_events() {
let h = make_handle("id-del-stats", "del_stats", 1000, Duration::from_millis(10));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
thread::sleep(Duration::from_millis(200));
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-del-stats").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
let total_events = body["total_events"]
.as_u64()
.expect("total_events must be present and numeric");
assert!(
total_events > 0,
"DELETE must return final stats with total_events > 0, got {total_events}"
);
}
#[tokio::test]
async fn delete_already_stopped_returns_200_ok() {
let h = make_stopped_handle("id-del-stopped", "del_stopped");
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-del-stopped").await;
assert_eq!(
resp.status(),
StatusCode::OK,
"DELETE on already-stopped scenario must return 200 OK"
);
let body = body_json(resp).await;
assert_eq!(
body["status"].as_str().unwrap(),
"stopped",
"DELETE on already-stopped scenario must return status 'stopped'"
);
}
#[tokio::test]
async fn delete_unknown_scenario_returns_404() {
let app = router_with_handles(vec![]);
let resp = delete_scenario_req(app, "nonexistent-id").await;
assert_eq!(
resp.status(),
StatusCode::NOT_FOUND,
"DELETE on unknown scenario ID must return 404"
);
let body = body_json(resp).await;
assert_eq!(
body["error"].as_str().unwrap(),
"not_found",
"404 response must have error field 'not_found'"
);
assert!(
body["detail"].as_str().unwrap().contains("nonexistent-id"),
"404 detail must include the requested ID"
);
}
#[tokio::test]
async fn delete_response_has_expected_json_shape() {
let h = make_handle("id-del-shape", "del_shape", 1000, Duration::from_millis(50));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-del-shape").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
let obj = body
.as_object()
.expect("response body must be a JSON object");
assert!(obj.contains_key("id"), "response must contain key 'id'");
assert!(
obj.contains_key("status"),
"response must contain key 'status'"
);
assert!(
obj.contains_key("total_events"),
"response must contain key 'total_events'"
);
assert_eq!(
obj.len(),
3,
"response must contain exactly 3 keys (id, status, total_events), got: {:?}",
obj.keys().collect::<Vec<_>>()
);
}
#[tokio::test]
async fn delete_response_id_matches_requested_id() {
let h = make_handle("id-del-match", "del_match", 1000, Duration::from_millis(50));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-del-match").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(
body["id"].as_str().unwrap(),
"id-del-match",
"response id must match the requested scenario ID"
);
}
#[tokio::test]
async fn delete_twice_on_same_id_returns_404_on_second() {
let h = make_handle("id-del-twice", "del_twice", 1000, Duration::from_millis(50));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app1 = router(state.clone());
let resp1 = delete_scenario_req(app1, "id-del-twice").await;
assert_eq!(
resp1.status(),
StatusCode::OK,
"first DELETE must return 200 OK"
);
let body1 = body_json(resp1).await;
assert_eq!(body1["status"].as_str().unwrap(), "stopped");
let app2 = router(state.clone());
let resp2 = delete_scenario_req(app2, "id-del-twice").await;
assert_eq!(
resp2.status(),
StatusCode::NOT_FOUND,
"second DELETE on same ID must return 404 after handle removal"
);
}
#[tokio::test]
async fn delete_removes_handle_from_hashmap() {
let h = make_handle("id-del-map", "del_map", 1000, Duration::from_millis(50));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
assert_eq!(
state.scenarios.read().unwrap().len(),
1,
"precondition: map must have 1 entry before DELETE"
);
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-del-map").await;
assert_eq!(resp.status(), StatusCode::OK);
let map = state.scenarios.read().unwrap();
assert_eq!(map.len(), 0, "map must be empty after DELETE");
assert!(
map.get("id-del-map").is_none(),
"deleted scenario must not be present in the map"
);
}
#[tokio::test]
async fn delete_scenario_excluded_from_list() {
let h_keep = make_handle("id-keep", "keep_scenario", 1000, Duration::from_millis(50));
let h_delete = make_handle(
"id-delete",
"delete_scenario",
1000,
Duration::from_millis(50),
);
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h_keep.id.clone(), h_keep);
map.insert(h_delete.id.clone(), h_delete);
}
let app1 = router(state.clone());
let resp = delete_scenario_req(app1, "id-delete").await;
assert_eq!(resp.status(), StatusCode::OK, "DELETE must return 200");
let app2 = router(state.clone());
let req = Request::builder()
.uri("/scenarios")
.body(Body::empty())
.unwrap();
let resp = app2.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
let scenarios = body["scenarios"]
.as_array()
.expect("response must have a scenarios array");
assert_eq!(
scenarios.len(),
1,
"only one scenario should remain after DELETE"
);
assert_eq!(
scenarios[0]["id"].as_str().unwrap(),
"id-keep",
"the remaining scenario must be 'id-keep'"
);
cleanup_scenarios(&state);
}
#[test]
fn deleted_scenario_serializes_to_expected_json() {
let ds = DeletedScenario {
id: "del-123".to_string(),
status: "stopped".to_string(),
total_events: 42,
};
let json = serde_json::to_value(&ds).expect("must serialize");
assert_eq!(json["id"], "del-123");
assert_eq!(json["status"], "stopped");
assert_eq!(json["total_events"], 42);
}
#[test]
fn deleted_scenario_force_stopped_serializes_correctly() {
let ds = DeletedScenario {
id: "force-123".to_string(),
status: "force_stopped".to_string(),
total_events: 100,
};
let json = serde_json::to_value(&ds).expect("must serialize");
assert_eq!(json["status"], "force_stopped");
assert_eq!(json["total_events"], 100);
}
#[tokio::test]
async fn delete_scenario_returns_json_content_type() {
let h = make_handle("id-del-ct", "del_ct", 1000, Duration::from_millis(50));
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-del-ct").await;
assert_eq!(resp.status(), StatusCode::OK);
let ct = resp
.headers()
.get("content-type")
.expect("DELETE response must have Content-Type header")
.to_str()
.unwrap();
assert!(
ct.contains("application/json"),
"DELETE Content-Type must be application/json, got: {ct}"
);
}
#[tokio::test]
async fn delete_unknown_returns_json_content_type() {
let app = router_with_handles(vec![]);
let resp = delete_scenario_req(app, "missing-id").await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let ct = resp
.headers()
.get("content-type")
.expect("404 response must have Content-Type header")
.to_str()
.unwrap();
assert!(
ct.contains("application/json"),
"404 Content-Type must be application/json, got: {ct}"
);
}
fn make_handle_with_stats(
id: &str,
name: &str,
target_rate: f64,
initial_stats: ScenarioStats,
running: bool,
) -> ScenarioHandle {
let shutdown = Arc::new(AtomicBool::new(running));
let stats = Arc::new(RwLock::new(initial_stats));
let shutdown_clone = Arc::clone(&shutdown);
let thread = if running {
thread::Builder::new()
.name(format!("test-stats-{name}"))
.spawn(move || -> Result<(), sonda_core::SondaError> {
while shutdown_clone.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(10));
}
Ok(())
})
.expect("thread must spawn")
} else {
thread::Builder::new()
.name(format!("test-stats-stopped-{name}"))
.spawn(move || -> Result<(), sonda_core::SondaError> {
let _ = shutdown_clone.load(Ordering::SeqCst);
Ok(())
})
.expect("thread must spawn")
};
if !running {
thread::sleep(Duration::from_millis(50));
}
ScenarioHandle {
id: id.to_string(),
name: name.to_string(),
shutdown,
thread: Some(thread),
started_at: Instant::now(),
stats,
target_rate,
}
}
async fn get_stats_req(app: axum::Router, id: &str) -> hyper::Response<axum::body::Body> {
let req = Request::builder()
.uri(format!("/scenarios/{id}/stats"))
.body(Body::empty())
.unwrap();
app.oneshot(req).await.unwrap()
}
#[tokio::test]
async fn stats_endpoint_returns_all_expected_fields() {
let stats = ScenarioStats {
total_events: 500,
bytes_emitted: 32000,
current_rate: 99.5,
errors: 2,
in_gap: false,
in_burst: true,
..Default::default()
};
let h = make_handle_with_stats("id-stats-all", "all_fields", 100.0, stats, true);
let app = router_with_handles(vec![h]);
let resp = get_stats_req(app, "id-stats-all").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(
body["total_events"].as_u64().unwrap(),
500,
"total_events must be 500"
);
assert!(
(body["current_rate"].as_f64().unwrap() - 99.5).abs() < f64::EPSILON,
"current_rate must be 99.5"
);
assert!(
(body["target_rate"].as_f64().unwrap() - 100.0).abs() < f64::EPSILON,
"target_rate must be 100.0"
);
assert_eq!(
body["bytes_emitted"].as_u64().unwrap(),
32000,
"bytes_emitted must be 32000"
);
assert_eq!(body["errors"].as_u64().unwrap(), 2, "errors must be 2");
assert!(
body["uptime_secs"].as_f64().unwrap() >= 0.0,
"uptime_secs must be non-negative"
);
assert_eq!(
body["state"].as_str().unwrap(),
"running",
"state must be 'running' for a live scenario"
);
assert_eq!(
body["in_gap"].as_bool().unwrap(),
false,
"in_gap must be false"
);
assert_eq!(
body["in_burst"].as_bool().unwrap(),
true,
"in_burst must be true"
);
}
#[tokio::test]
async fn stats_endpoint_fields_update_as_scenario_progresses() {
let h = make_handle(
"id-stats-progress",
"progress",
500,
Duration::from_millis(10),
);
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
thread::sleep(Duration::from_millis(100));
let app1 = router(state.clone());
let resp1 = get_stats_req(app1, "id-stats-progress").await;
assert_eq!(resp1.status(), StatusCode::OK);
let body1 = body_json(resp1).await;
let events1 = body1["total_events"].as_u64().unwrap();
let bytes1 = body1["bytes_emitted"].as_u64().unwrap();
assert!(
events1 > 0,
"total_events must be > 0 after 100ms, got {events1}"
);
thread::sleep(Duration::from_millis(150));
let app2 = router(state.clone());
let resp2 = get_stats_req(app2, "id-stats-progress").await;
assert_eq!(resp2.status(), StatusCode::OK);
let body2 = body_json(resp2).await;
let events2 = body2["total_events"].as_u64().unwrap();
let bytes2 = body2["bytes_emitted"].as_u64().unwrap();
assert!(
events2 > events1,
"total_events must increase over time: first={events1}, second={events2}"
);
assert!(
bytes2 > bytes1,
"bytes_emitted must increase over time: first={bytes1}, second={bytes2}"
);
cleanup_scenarios(&state);
}
#[tokio::test]
async fn stats_endpoint_in_gap_true_when_stats_indicate_gap() {
let stats = ScenarioStats {
total_events: 10,
bytes_emitted: 640,
current_rate: 0.0,
errors: 0,
in_gap: true,
in_burst: false,
..Default::default()
};
let h = make_handle_with_stats("id-stats-gap", "gap_test", 50.0, stats, true);
let app = router_with_handles(vec![h]);
let resp = get_stats_req(app, "id-stats-gap").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(
body["in_gap"].as_bool().unwrap(),
true,
"in_gap must be true when the scenario is in a gap window"
);
assert_eq!(
body["in_burst"].as_bool().unwrap(),
false,
"in_burst must be false when only in_gap is set"
);
}
#[tokio::test]
async fn stats_endpoint_returns_stopped_state_for_finished_scenario() {
let stats = ScenarioStats {
total_events: 1000,
bytes_emitted: 64000,
current_rate: 0.0,
errors: 5,
in_gap: false,
in_burst: false,
..Default::default()
};
let h = make_handle_with_stats("id-stats-stopped", "stopped_test", 200.0, stats, false);
let app = router_with_handles(vec![h]);
let resp = get_stats_req(app, "id-stats-stopped").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(
body["state"].as_str().unwrap(),
"stopped",
"state must be 'stopped' for a finished scenario"
);
assert_eq!(
body["total_events"].as_u64().unwrap(),
1000,
"total_events must reflect final count"
);
assert_eq!(
body["errors"].as_u64().unwrap(),
5,
"errors must reflect final count"
);
assert!(
(body["target_rate"].as_f64().unwrap() - 200.0).abs() < f64::EPSILON,
"target_rate must be preserved even after stop"
);
}
#[tokio::test]
async fn stats_endpoint_unknown_id_returns_404() {
let app = router_with_handles(vec![]);
let resp = get_stats_req(app, "nonexistent-stats-id").await;
assert_eq!(
resp.status(),
StatusCode::NOT_FOUND,
"unknown ID must return 404"
);
let body = body_json(resp).await;
assert_eq!(
body["error"].as_str().unwrap(),
"not_found",
"404 error body must have error='not_found'"
);
}
#[tokio::test]
async fn stats_endpoint_404_returns_json_content_type() {
let app = router_with_handles(vec![]);
let resp = get_stats_req(app, "missing-stats-id").await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let ct = resp
.headers()
.get("content-type")
.expect("404 response must have Content-Type header")
.to_str()
.unwrap();
assert!(
ct.contains("application/json"),
"404 Content-Type must be application/json, got: {ct}"
);
}
#[tokio::test]
async fn stats_endpoint_target_rate_reflects_configured_rate() {
let stats = ScenarioStats {
total_events: 0,
bytes_emitted: 0,
current_rate: 45.0,
errors: 0,
in_gap: false,
in_burst: false,
..Default::default()
};
let h = make_handle_with_stats("id-stats-rate", "rate_test", 500.0, stats, true);
let app = router_with_handles(vec![h]);
let resp = get_stats_req(app, "id-stats-rate").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert!(
(body["target_rate"].as_f64().unwrap() - 500.0).abs() < f64::EPSILON,
"target_rate must be the configured rate (500.0)"
);
assert!(
(body["current_rate"].as_f64().unwrap() - 45.0).abs() < f64::EPSILON,
"current_rate must be the measured rate (45.0)"
);
}
#[tokio::test]
async fn stats_endpoint_uptime_secs_is_positive() {
let h = make_handle_with_stats(
"id-stats-uptime",
"uptime_test",
10.0,
ScenarioStats::default(),
true,
);
let app = router_with_handles(vec![h]);
thread::sleep(Duration::from_millis(20));
let resp = get_stats_req(app, "id-stats-uptime").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
let uptime = body["uptime_secs"].as_f64().unwrap();
assert!(
uptime > 0.0,
"uptime_secs must be positive for a running scenario, got {uptime}"
);
}
#[test]
fn detailed_stats_response_serializes_all_fields() {
let resp = DetailedStatsResponse {
total_events: 42,
current_rate: 10.5,
target_rate: 100.0,
bytes_emitted: 2048,
errors: 1,
uptime_secs: 3.14,
state: "running".to_string(),
in_gap: true,
in_burst: false,
};
let json = serde_json::to_value(&resp).expect("must serialize");
assert_eq!(json["total_events"], 42);
assert_eq!(json["current_rate"], 10.5);
assert_eq!(json["target_rate"], 100.0);
assert_eq!(json["bytes_emitted"], 2048);
assert_eq!(json["errors"], 1);
assert_eq!(json["uptime_secs"], 3.14);
assert_eq!(json["state"], "running");
assert_eq!(json["in_gap"], true);
assert_eq!(json["in_burst"], false);
}
#[tokio::test]
async fn stats_endpoint_success_returns_json_content_type() {
let h = make_handle_with_stats(
"id-stats-ct",
"ct_test",
10.0,
ScenarioStats::default(),
true,
);
let app = router_with_handles(vec![h]);
let resp = get_stats_req(app, "id-stats-ct").await;
assert_eq!(resp.status(), StatusCode::OK);
let ct = resp
.headers()
.get("content-type")
.expect("200 response must have Content-Type header")
.to_str()
.unwrap();
assert!(
ct.contains("application/json"),
"Content-Type must be application/json, got: {ct}"
);
}
fn make_metric_event(name: &str, value: f64) -> sonda_core::model::metric::MetricEvent {
sonda_core::model::metric::MetricEvent::new(
name.to_string(),
value,
sonda_core::model::metric::Labels::default(),
)
.expect("test metric name must be valid")
}
fn make_handle_with_metrics(
id: &str,
name: &str,
events: Vec<sonda_core::model::metric::MetricEvent>,
) -> ScenarioHandle {
let shutdown = Arc::new(AtomicBool::new(true));
let mut stats = ScenarioStats::default();
for event in events {
stats.push_metric(event);
}
let stats = Arc::new(RwLock::new(stats));
let shutdown_clone = Arc::clone(&shutdown);
let thread = thread::Builder::new()
.name(format!("test-metrics-{name}"))
.spawn(move || -> Result<(), sonda_core::SondaError> {
while shutdown_clone.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(10));
}
Ok(())
})
.expect("thread must spawn");
ScenarioHandle {
id: id.to_string(),
name: name.to_string(),
shutdown,
thread: Some(thread),
started_at: Instant::now(),
stats,
target_rate: 10.0,
}
}
async fn get_metrics_req(app: axum::Router, id: &str) -> hyper::Response<axum::body::Body> {
let req = Request::builder()
.uri(format!("/scenarios/{id}/metrics"))
.body(Body::empty())
.unwrap();
app.oneshot(req).await.unwrap()
}
async fn get_metrics_with_limit(
app: axum::Router,
id: &str,
limit: usize,
) -> hyper::Response<axum::body::Body> {
let req = Request::builder()
.uri(format!("/scenarios/{id}/metrics?limit={limit}"))
.body(Body::empty())
.unwrap();
app.oneshot(req).await.unwrap()
}
async fn body_string(response: axum::response::Response) -> String {
let bytes = response.into_body().collect().await.unwrap().to_bytes();
String::from_utf8(bytes.to_vec()).expect("body must be valid UTF-8")
}
#[tokio::test]
async fn metrics_endpoint_unknown_id_returns_404() {
let app = router_with_handles(vec![]);
let resp = get_metrics_req(app, "nonexistent-metrics-id").await;
assert_eq!(
resp.status(),
StatusCode::NOT_FOUND,
"unknown scenario ID must return 404"
);
let body = body_json(resp).await;
assert_eq!(
body["error"].as_str().unwrap(),
"not_found",
"404 error body must have error='not_found'"
);
}
#[tokio::test]
async fn metrics_endpoint_empty_buffer_returns_204() {
let h = make_handle_with_metrics("id-metrics-empty", "empty_metrics", vec![]);
let app = router_with_handles(vec![h]);
let resp = get_metrics_req(app, "id-metrics-empty").await;
assert_eq!(
resp.status(),
StatusCode::NO_CONTENT,
"empty metrics buffer must return 204 No Content"
);
}
#[tokio::test]
async fn metrics_endpoint_returns_prometheus_text_format() {
let events = vec![make_metric_event("up", 1.0), make_metric_event("up", 2.0)];
let h = make_handle_with_metrics("id-metrics-prom", "prom_text", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_req(app, "id-metrics-prom").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_string(resp).await;
let lines: Vec<&str> = body.lines().collect();
assert!(
lines.len() >= 2,
"must have at least 2 lines of Prometheus text, got {}",
lines.len()
);
for line in &lines {
assert!(
line.starts_with("up"),
"each Prometheus line must start with the metric name 'up', got: {line}"
);
}
}
#[tokio::test]
async fn metrics_endpoint_sets_prometheus_content_type() {
let events = vec![make_metric_event("cpu_usage", 42.0)];
let h = make_handle_with_metrics("id-metrics-ct", "ct_check", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_req(app, "id-metrics-ct").await;
assert_eq!(resp.status(), StatusCode::OK);
let ct = resp
.headers()
.get("content-type")
.expect("response must have Content-Type header")
.to_str()
.unwrap();
assert_eq!(
ct, "text/plain; version=0.0.4; charset=utf-8",
"Content-Type must be the Prometheus exposition format MIME type"
);
}
#[tokio::test]
async fn metrics_endpoint_limit_parameter_caps_event_count() {
let events: Vec<_> = (0..5).map(|i| make_metric_event("up", i as f64)).collect();
let h = make_handle_with_metrics("id-metrics-limit", "limit_test", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_with_limit(app, "id-metrics-limit", 2).await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_string(resp).await;
let lines: Vec<&str> = body.lines().collect();
assert_eq!(
lines.len(),
2,
"limit=2 must produce exactly 2 lines of output, got {}",
lines.len()
);
}
#[tokio::test]
async fn metrics_endpoint_limit_returns_most_recent_events() {
let events: Vec<_> = (0..5).map(|i| make_metric_event("val", i as f64)).collect();
let h = make_handle_with_metrics("id-metrics-recent", "recent_test", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_with_limit(app, "id-metrics-recent", 2).await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_string(resp).await;
assert!(
body.contains("3"),
"limited output must contain the second-to-last event value (3.0)"
);
assert!(
body.contains("4"),
"limited output must contain the last event value (4.0)"
);
}
#[tokio::test]
async fn metrics_endpoint_limit_zero_returns_no_content_after_drain() {
let events = vec![make_metric_event("up", 1.0)];
let h = make_handle_with_metrics("id-metrics-lim0", "lim0_test", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_with_limit(app, "id-metrics-lim0", 0).await;
assert_eq!(
resp.status(),
StatusCode::OK,
"limit=0 with non-empty buffer drains events but encodes zero, returns 200 with empty body"
);
}
#[tokio::test]
async fn metrics_endpoint_drain_clears_buffer_second_request_returns_204() {
let events = vec![make_metric_event("up", 1.0), make_metric_event("up", 2.0)];
let h = make_handle_with_metrics("id-metrics-drain", "drain_test", events);
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app1 = router(state.clone());
let resp1 = get_metrics_req(app1, "id-metrics-drain").await;
assert_eq!(
resp1.status(),
StatusCode::OK,
"first scrape must return 200 with metrics"
);
let body1 = body_string(resp1).await;
assert!(
!body1.is_empty(),
"first scrape must return non-empty Prometheus text"
);
let app2 = router(state.clone());
let resp2 = get_metrics_req(app2, "id-metrics-drain").await;
assert_eq!(
resp2.status(),
StatusCode::NO_CONTENT,
"second scrape must return 204 No Content because buffer was drained"
);
cleanup_scenarios(&state);
}
#[tokio::test]
async fn metrics_endpoint_404_returns_json_content_type() {
let app = router_with_handles(vec![]);
let resp = get_metrics_req(app, "missing-metrics-id").await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let ct = resp
.headers()
.get("content-type")
.expect("404 response must have Content-Type header")
.to_str()
.unwrap();
assert!(
ct.contains("application/json"),
"404 Content-Type must be application/json, got: {ct}"
);
}
#[tokio::test]
async fn metrics_endpoint_default_limit_returns_all_buffered_events() {
let events: Vec<_> = (0..5).map(|i| make_metric_event("up", i as f64)).collect();
let h = make_handle_with_metrics("id-metrics-nomax", "nomax_test", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_req(app, "id-metrics-nomax").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_string(resp).await;
let lines: Vec<&str> = body.lines().collect();
assert_eq!(
lines.len(),
5,
"all 5 buffered events must be returned when no limit is specified"
);
}
#[tokio::test]
async fn metrics_endpoint_limit_larger_than_buffer_returns_all() {
let events = vec![make_metric_event("up", 1.0), make_metric_event("up", 2.0)];
let h = make_handle_with_metrics("id-metrics-biglim", "biglim_test", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_with_limit(app, "id-metrics-biglim", 500).await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_string(resp).await;
let lines: Vec<&str> = body.lines().collect();
assert_eq!(
lines.len(),
2,
"when limit > buffer size, all buffered events must be returned"
);
}
#[tokio::test]
async fn metrics_endpoint_output_ends_with_newline() {
let events = vec![make_metric_event("up", 1.0)];
let h = make_handle_with_metrics("id-metrics-nl", "newline_test", events);
let app = router_with_handles(vec![h]);
let resp = get_metrics_req(app, "id-metrics-nl").await;
assert_eq!(resp.status(), StatusCode::OK);
let body = body_string(resp).await;
assert!(
body.ends_with('\n'),
"Prometheus text output must end with a newline"
);
}
#[test]
fn metrics_query_default_limit_is_none() {
let q: MetricsQuery = serde_json::from_str("{}").expect("must deserialize");
assert!(
q.limit.is_none(),
"limit must default to None when not specified"
);
}
#[test]
fn metrics_query_with_limit_deserializes() {
let q: MetricsQuery = serde_json::from_str(r#"{"limit": 50}"#).expect("must deserialize");
assert_eq!(q.limit, Some(50));
}
#[test]
fn prometheus_content_type_constant_has_correct_value() {
assert_eq!(
PROMETHEUS_CONTENT_TYPE, "text/plain; version=0.0.4; charset=utf-8",
"PROMETHEUS_CONTENT_TYPE must match the Prometheus exposition format MIME type"
);
}
fn make_unjoinable_handle(id: &str, name: &str) -> ScenarioHandle {
let shutdown = Arc::new(AtomicBool::new(true));
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let thread = thread::Builder::new()
.name(format!("test-unjoinable-{name}"))
.spawn(move || -> Result<(), sonda_core::SondaError> {
thread::sleep(Duration::from_secs(300));
Ok(())
})
.expect("thread must spawn");
ScenarioHandle {
id: id.to_string(),
name: name.to_string(),
shutdown,
thread: Some(thread),
started_at: Instant::now(),
stats,
target_rate: 50.0,
}
}
fn make_panicking_handle(id: &str, name: &str) -> ScenarioHandle {
let shutdown = Arc::new(AtomicBool::new(true));
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let thread = thread::Builder::new()
.name(format!("test-panic-{name}"))
.spawn(move || -> Result<(), sonda_core::SondaError> {
panic!("intentional panic for testing");
})
.expect("thread must spawn");
thread::sleep(Duration::from_millis(50));
ScenarioHandle {
id: id.to_string(),
name: name.to_string(),
shutdown,
thread: Some(thread),
started_at: Instant::now(),
stats,
target_rate: 10.0,
}
}
#[tokio::test]
async fn delete_unjoinable_thread_returns_force_stopped() {
let h = make_unjoinable_handle("id-force", "force_stop");
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-force").await;
assert_eq!(
resp.status(),
StatusCode::OK,
"DELETE on unjoinable thread must still return 200 OK"
);
let body = body_json(resp).await;
assert_eq!(
body["status"].as_str().unwrap(),
"force_stopped",
"DELETE on unjoinable thread must return status 'force_stopped'"
);
assert_eq!(
body["id"].as_str().unwrap(),
"id-force",
"response must contain the correct scenario ID"
);
let map = state.scenarios.read().unwrap();
assert!(
map.get("id-force").is_none(),
"force-stopped scenario must still be removed from the map"
);
}
#[tokio::test]
async fn delete_panicked_thread_returns_stopped() {
let h = make_panicking_handle("id-panic", "panic_scenario");
let state = AppState::new();
{
let mut map = state.scenarios.write().unwrap();
map.insert(h.id.clone(), h);
}
let app = router(state.clone());
let resp = delete_scenario_req(app, "id-panic").await;
assert_eq!(
resp.status(),
StatusCode::OK,
"DELETE on panicked thread must return 200 OK"
);
let body = body_json(resp).await;
assert_eq!(
body["status"].as_str().unwrap(),
"stopped",
"DELETE on panicked thread must return status 'stopped' (thread already exited)"
);
let map = state.scenarios.read().unwrap();
assert!(
map.get("id-panic").is_none(),
"panicked scenario must be removed from the map"
);
}
fn make_poisoned_state() -> AppState {
let state = AppState::new();
let scenarios_clone = Arc::clone(&state.scenarios);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = scenarios_clone.write().unwrap();
panic!("intentional panic to poison map lock");
}));
assert!(result.is_err(), "panic must have occurred");
assert!(
state.scenarios.read().is_err(),
"map lock must be poisoned after panic"
);
state
}
#[tokio::test]
async fn list_scenarios_poisoned_lock_returns_500() {
let state = make_poisoned_state();
let app = router(state);
let req = Request::builder()
.uri("/scenarios")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"poisoned map lock on list must return 500"
);
let body = body_json(resp).await;
assert_eq!(
body["error"].as_str().unwrap(),
"internal_server_error",
"500 response must have error='internal_server_error'"
);
}
#[tokio::test]
async fn get_scenario_poisoned_lock_returns_500() {
let state = make_poisoned_state();
let app = router(state);
let req = Request::builder()
.uri("/scenarios/any-id")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(
resp.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"poisoned map lock on get must return 500"
);
}
#[tokio::test]
async fn get_scenario_stats_poisoned_lock_returns_500() {
let state = make_poisoned_state();
let app = router(state);
let resp = get_stats_req(app, "any-id").await;
assert_eq!(
resp.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"poisoned map lock on stats must return 500"
);
}
#[tokio::test]
async fn get_scenario_metrics_poisoned_lock_returns_500() {
let state = make_poisoned_state();
let app = router(state);
let resp = get_metrics_req(app, "any-id").await;
assert_eq!(
resp.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"poisoned map lock on metrics must return 500"
);
}
#[tokio::test]
async fn delete_scenario_poisoned_lock_returns_500() {
let state = make_poisoned_state();
let app = router(state);
let resp = delete_scenario_req(app, "any-id").await;
assert_eq!(
resp.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"poisoned map lock on delete must return 500"
);
}
#[tokio::test]
async fn post_scenario_poisoned_lock_returns_500() {
let state = make_poisoned_state();
let app = router(state);
let response = post_scenarios(app, "application/x-yaml", VALID_METRICS_YAML).await;
assert_eq!(
response.status(),
StatusCode::INTERNAL_SERVER_ERROR,
"poisoned map lock on post must return 500"
);
let body = body_json(response).await;
assert_eq!(
body["error"].as_str().unwrap(),
"internal_server_error",
"500 response must have error='internal_server_error'"
);
}
}