mod common;
use common::mock_client;
use ticksupply::resources::subscriptions::SubscriptionStatus;
use wiremock::matchers::{body_json, header, method, path, query_param};
use wiremock::{Mock, ResponseTemplate};
fn sub_json(id: &str, status: &str) -> serde_json::Value {
serde_json::json!({
"id": id,
"datastream_id": 42,
"status": status,
"created_at": "2024-01-01T00:00:00Z"
})
}
fn sub_detail_json(id: &str, status: &str) -> serde_json::Value {
serde_json::json!({
"id": id,
"status": status,
"datastream": {
"datastream_id": 42,
"exchange": "binance",
"instrument": "BTCUSDT",
"stream_type": "trades",
"wire_format": "json"
},
"created_at": "2024-01-01T00:00:00Z",
"spans": []
})
}
#[tokio::test]
async fn create_subscription_sends_body_and_decodes() {
let (server, client) = mock_client().await;
Mock::given(method("POST"))
.and(path("/v1/subscriptions"))
.and(body_json(serde_json::json!({ "datastream_id": 42 })))
.respond_with(
ResponseTemplate::new(200).set_body_json(sub_detail_json("sub_new", "active")),
)
.mount(&server)
.await;
let sub = client.subscriptions().create(42).send().await.unwrap();
assert_eq!(sub.id, "sub_new");
assert!(matches!(sub.status, SubscriptionStatus::Active));
}
#[tokio::test]
async fn create_subscription_sends_idempotency_header() {
let (server, client) = mock_client().await;
Mock::given(method("POST"))
.and(path("/v1/subscriptions"))
.and(header("Idempotency-Key", "abcd-1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(sub_detail_json("sub_ok", "active")))
.mount(&server)
.await;
let sub = client
.subscriptions()
.create(42)
.idempotency_key("abcd-1234")
.send()
.await
.unwrap();
assert_eq!(sub.id, "sub_ok");
}
#[tokio::test]
async fn get_pause_resume_delete_flow() {
let (server, client) = mock_client().await;
Mock::given(method("GET"))
.and(path("/v1/subscriptions/sub_x"))
.respond_with(ResponseTemplate::new(200).set_body_json(sub_json("sub_x", "active")))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/v1/subscriptions/sub_x/pause"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/v1/subscriptions/sub_x/resume"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;
Mock::given(method("DELETE"))
.and(path("/v1/subscriptions/sub_x"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;
let sub = client.subscriptions().get("sub_x").await.unwrap();
assert!(matches!(sub.status, SubscriptionStatus::Active));
client.subscriptions().pause("sub_x").send().await.unwrap();
client.subscriptions().resume("sub_x").send().await.unwrap();
client.subscriptions().delete("sub_x").send().await.unwrap();
}
#[tokio::test]
async fn pause_forwards_idempotency_key() {
let (server, client) = mock_client().await;
Mock::given(method("POST"))
.and(path("/v1/subscriptions/sub_x/pause"))
.and(header("Idempotency-Key", "key-1"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;
client
.subscriptions()
.pause("sub_x")
.idempotency_key("key-1")
.send()
.await
.unwrap();
}
#[tokio::test]
async fn list_spans_decodes_items() {
let (server, client) = mock_client().await;
Mock::given(method("GET"))
.and(path("/v1/subscriptions/sub_x/spans"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([
{ "id": "spn_1", "started_at": "2024-01-01T00:00:00Z", "ended_at": "2024-01-02T00:00:00Z" },
{ "id": "spn_2", "started_at": "2024-01-03T00:00:00Z" }
])))
.mount(&server)
.await;
let spans = client.subscriptions().list_spans("sub_x").await.unwrap();
assert_eq!(spans.len(), 2);
assert!(spans[1].ended_at.is_none());
}
#[tokio::test]
async fn list_sends_pagination_params() {
let (server, client) = mock_client().await;
Mock::given(method("GET"))
.and(path("/v1/subscriptions"))
.and(query_param("limit", "10"))
.and(query_param("page_token", "tok_1"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"items": [sub_json("sub_a", "active")],
"total": 1
})))
.mount(&server)
.await;
let page = client
.subscriptions()
.list()
.limit(10)
.page_token("tok_1")
.send()
.await
.unwrap();
assert_eq!(page.items.len(), 1);
}
#[tokio::test]
async fn get_accepts_server_shape_with_expanded_datastream() {
let (server, client) = mock_client().await;
let body = serde_json::json!({
"id": "sub_x",
"datastream_id": 42,
"status": "active",
"created_at": "2024-01-01T00:00:00Z",
"datastream": {
"datastream_id": 42,
"exchange": "binance",
"instrument": "BTCUSDT",
"stream_type": "trades",
"wire_format": "json"
}
});
Mock::given(method("GET"))
.and(path("/v1/subscriptions/sub_x"))
.respond_with(ResponseTemplate::new(200).set_body_json(body))
.mount(&server)
.await;
let sub = client.subscriptions().get("sub_x").await.unwrap();
assert_eq!(sub.id, "sub_x");
assert_eq!(sub.datastream_id, 42);
assert!(matches!(sub.status, SubscriptionStatus::Active));
assert!(sub.datastream.is_some());
}
#[tokio::test]
async fn get_accepts_response_without_datastream_field() {
let (server, client) = mock_client().await;
Mock::given(method("GET"))
.and(path("/v1/subscriptions/sub_x"))
.respond_with(ResponseTemplate::new(200).set_body_json(sub_json("sub_x", "active")))
.mount(&server)
.await;
let sub = client.subscriptions().get("sub_x").await.unwrap();
assert!(sub.datastream.is_none());
}