use super::super::router;
use super::test_state;
use crate::{ActivitySource, DaemonEvent};
use axum::body::{to_bytes, Body};
use axum::http::{header, Request, StatusCode};
use serde_json::{json, Value};
use tower::util::ServiceExt;
use trusty_common::memory_core::palace::PalaceId;
#[tokio::test]
async fn sse_broadcast_emits_palace_created() {
let state = test_state();
let mut rx = state.events.subscribe();
let app = router().with_state(state.clone());
let body = json!({"name": "sse-test"}).to_string();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let event = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("event received within timeout")
.expect("event channel still open");
match event {
DaemonEvent::PalaceCreated { id, name, source } => {
assert_eq!(id, "sse-test");
assert_eq!(name, "sse-test");
assert_eq!(source, ActivitySource::Http);
}
other => panic!("expected PalaceCreated, got {other:?}"),
}
}
#[tokio::test]
async fn sse_endpoint_emits_connected_frame() {
use axum::routing::get;
let state = test_state();
let app = router()
.route("/sse", get(crate::sse_handler))
.with_state(state);
let resp = app
.oneshot(Request::builder().uri("/sse").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok()),
Some("text/event-stream")
);
let body = resp.into_body();
let bytes = tokio::time::timeout(std::time::Duration::from_millis(500), to_bytes(body, 4096))
.await
.ok()
.and_then(|r| r.ok())
.unwrap_or_default();
let text = String::from_utf8_lossy(&bytes);
assert!(
text.contains("\"type\":\"connected\""),
"expected connected frame, got: {text}"
);
}
#[tokio::test]
async fn dream_status_aggregates_across_palaces() {
use trusty_common::memory_core::dream::{DreamStats, PersistedDreamStats};
let state = test_state();
for (id, stats, ts) in [
(
"palace-a",
DreamStats {
merged: 1,
pruned: 2,
compacted: 3,
closets_updated: 4,
duration_ms: 100,
..DreamStats::default()
},
chrono::Utc::now() - chrono::Duration::seconds(60),
),
(
"palace-b",
DreamStats {
merged: 10,
pruned: 20,
compacted: 30,
closets_updated: 40,
duration_ms: 200,
..DreamStats::default()
},
chrono::Utc::now(),
),
] {
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new(id),
name: id.to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join(id),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let persisted = PersistedDreamStats {
last_run_at: ts,
stats,
};
persisted
.save(&state.data_root.join(id))
.expect("save dream stats");
}
let later = chrono::Utc::now();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/dream/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["merged"], 11);
assert_eq!(v["pruned"], 22);
assert_eq!(v["compacted"], 33);
assert_eq!(v["closets_updated"], 44);
assert_eq!(v["duration_ms"], 300);
let last = v["last_run_at"].as_str().expect("last_run_at is string");
let parsed: chrono::DateTime<chrono::Utc> = last
.parse()
.expect("last_run_at parses as RFC3339 timestamp");
assert!(
parsed <= later,
"last_run_at ({parsed}) should not exceed wall clock ({later})"
);
let cutoff = chrono::Utc::now() - chrono::Duration::seconds(30);
assert!(
parsed >= cutoff,
"expected the newer (palace-b) timestamp; got {parsed}"
);
}
#[tokio::test]
async fn dream_run_aggregates_stats() {
let state = test_state();
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("dream-run-test"),
name: "dream-run-test".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("dream-run-test"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/dream/run")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
for key in [
"merged",
"pruned",
"compacted",
"closets_updated",
"duration_ms",
] {
assert!(
v.get(key).is_some(),
"missing key {key} in dream_run payload: {v}"
);
assert!(
v[key].is_u64() || v[key].is_i64(),
"{key} should be integer, got {}",
v[key]
);
}
assert!(
v["last_run_at"].is_string(),
"last_run_at must be set by dream_run; got {v}"
);
}