use magnetar_admin::{AdminClient, AdminError, LongRunningProcessStatus};
use wiremock::matchers::{body_json, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn client(mock: &MockServer) -> AdminClient {
AdminClient::builder()
.service_url(mock.uri().parse().unwrap())
.build()
.unwrap()
}
#[tokio::test]
async fn topic_stats_decodes_rates_throughput_and_sizes() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/admin/v2/persistent/public/default/orders/stats"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"msgRateIn": 42.5,
"msgRateOut": 17.25,
"msgThroughputIn": 5_000.0,
"msgThroughputOut": 2_048.0,
"averageMsgSize": 512.0,
"msgInCounter": 1000,
"bytesInCounter": 64_000,
"storageSize": 1_048_576,
"backlogSize": 4_096,
"publishers": [],
"subscriptions": {},
})))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
let stats = admin
.topic_stats("public/default/orders")
.await
.expect("stats returns 200");
assert!((stats.msg_rate_in - 42.5).abs() < f64::EPSILON);
assert!((stats.msg_rate_out - 17.25).abs() < f64::EPSILON);
assert!((stats.msg_throughput_in - 5_000.0).abs() < f64::EPSILON);
assert!((stats.msg_throughput_out - 2_048.0).abs() < f64::EPSILON);
assert!((stats.average_msg_size - 512.0).abs() < f64::EPSILON);
assert_eq!(stats.msg_in_counter, 1000);
assert_eq!(stats.bytes_in_counter, 64_000);
assert_eq!(stats.storage_size, 1_048_576);
assert_eq!(stats.backlog_size, 4_096);
}
#[tokio::test]
async fn topic_stats_scalar_fields_default_to_zero_when_absent() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/admin/v2/persistent/public/default/idle/stats"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"publishers": [],
"subscriptions": {},
})))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
let stats = admin
.topic_stats("public/default/idle")
.await
.expect("stats returns 200");
assert!(stats.msg_rate_in.abs() < f64::EPSILON);
assert!(stats.msg_rate_out.abs() < f64::EPSILON);
assert!(stats.msg_throughput_in.abs() < f64::EPSILON);
assert!(stats.msg_throughput_out.abs() < f64::EPSILON);
assert!(stats.average_msg_size.abs() < f64::EPSILON);
assert_eq!(stats.msg_in_counter, 0);
assert_eq!(stats.bytes_in_counter, 0);
assert_eq!(stats.storage_size, 0);
assert_eq!(stats.backlog_size, 0);
}
#[tokio::test]
async fn topic_partitioned_stats_decodes_aggregated_metrics() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/admin/v2/persistent/public/default/orders/partitioned-stats",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"msgRateIn": 128.0,
"msgRateOut": 64.0,
"msgThroughputIn": 16_384.0,
"msgThroughputOut": 8_192.0,
"averageMsgSize": 256.0,
"msgInCounter": 4000,
"bytesInCounter": 256_000,
"storageSize": 4_194_304,
"backlogSize": 16_384,
"publishers": [],
"subscriptions": {},
})))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
let stats = admin
.topic_partitioned_stats("public/default/orders")
.await
.expect("partitioned-stats returns 200");
assert!((stats.msg_rate_in - 128.0).abs() < f64::EPSILON);
assert!((stats.msg_rate_out - 64.0).abs() < f64::EPSILON);
assert_eq!(stats.msg_in_counter, 4000);
assert_eq!(stats.storage_size, 4_194_304);
assert_eq!(stats.backlog_size, 16_384);
}
#[tokio::test]
async fn topic_compact_puts_to_compaction_path() {
let mock = MockServer::start().await;
Mock::given(method("PUT"))
.and(path(
"/admin/v2/persistent/public/default/orders/compaction",
))
.respond_with(ResponseTemplate::new(204))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
admin
.topic_compact("public/default/orders")
.await
.expect("compaction trigger returns 204");
}
#[tokio::test]
async fn topic_compaction_status_decodes_long_running_process_status() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/admin/v2/persistent/public/default/orders/compaction",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"status": "RUNNING",
"lastError": "",
})))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
let st: LongRunningProcessStatus = admin
.topic_compaction_status("public/default/orders")
.await
.expect("status returns 200");
assert_eq!(st.status, "RUNNING");
assert!(st.last_error.is_empty());
}
#[tokio::test]
async fn topic_unload_puts_to_unload_path() {
let mock = MockServer::start().await;
Mock::given(method("PUT"))
.and(path("/admin/v2/persistent/public/default/orders/unload"))
.respond_with(ResponseTemplate::new(204))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
admin
.topic_unload("public/default/orders")
.await
.expect("unload returns 204");
}
#[tokio::test]
async fn topic_terminate_posts_and_returns_last_message_id() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/admin/v2/persistent/public/default/orders/terminate"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"ledgerId": 123,
"entryId": 456,
"partitionIndex": -1,
})))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
let last = admin
.topic_terminate("public/default/orders")
.await
.expect("terminate returns the last message id")
.expect("non-sentinel ledgerId/entryId → Some");
assert_eq!(last.ledger_id, 123);
assert_eq!(last.entry_id, 456);
assert_eq!(last.partition, -1);
}
#[tokio::test]
async fn topic_terminate_sentinel_negative_one_is_none() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/admin/v2/persistent/public/default/empty/terminate"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"ledgerId": -1,
"entryId": -1,
"partitionIndex": -1,
})))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
let last = admin
.topic_terminate("public/default/empty")
.await
.expect("terminate sentinel must not surface as Protocol error");
assert!(last.is_none(), "sentinel (-1, -1) should map to None");
}
#[tokio::test]
async fn topic_update_partitions_posts_bare_integer_body() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.and(path(
"/admin/v2/persistent/public/default/orders/partitions",
))
.and(body_json(serde_json::json!(8)))
.respond_with(ResponseTemplate::new(204))
.expect(1)
.mount(&mock)
.await;
let admin = client(&mock);
admin
.topic_update_partitions("public/default/orders", 8)
.await
.expect("update-partitions returns 204");
}
#[tokio::test]
async fn topic_update_partitions_propagates_409_on_shrink() {
let mock = MockServer::start().await;
Mock::given(method("POST"))
.and(path(
"/admin/v2/persistent/public/default/orders/partitions",
))
.respond_with(
ResponseTemplate::new(409)
.set_body_string("Number of partitions can only be increased"),
)
.mount(&mock)
.await;
let admin = client(&mock);
let err = admin
.topic_update_partitions("public/default/orders", 2)
.await
.unwrap_err();
assert!(matches!(err, AdminError::Status { code: 409, .. }));
}
#[tokio::test]
async fn topic_delete_auto_detects_partitioned_route() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/admin/v2/persistent/public/default/orders/partitions",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"partitions": 4,
})))
.expect(1)
.mount(&mock)
.await;
Mock::given(method("DELETE"))
.and(path(
"/admin/v2/persistent/public/default/orders/partitions",
))
.respond_with(ResponseTemplate::new(204))
.expect(1)
.mount(&mock)
.await;
let admin = AdminClient::builder()
.service_url(mock.uri().parse().unwrap())
.build()
.unwrap();
admin
.topic_delete("public/default/orders", false)
.await
.expect("partitioned delete returns 204");
}
#[tokio::test]
async fn topic_delete_auto_detects_non_partitioned_route() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/admin/v2/persistent/public/default/oneoff/partitions",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"partitions": 0,
})))
.expect(1)
.mount(&mock)
.await;
Mock::given(method("DELETE"))
.and(path("/admin/v2/persistent/public/default/oneoff"))
.respond_with(ResponseTemplate::new(204))
.expect(1)
.mount(&mock)
.await;
let admin = AdminClient::builder()
.service_url(mock.uri().parse().unwrap())
.build()
.unwrap();
admin
.topic_delete("public/default/oneoff", true)
.await
.expect("non-partitioned delete returns 204");
}