use futures::Stream;
use serde::{Deserialize, Serialize};
use crate::client::Client;
use crate::error::Result;
use crate::http::{send, send_empty, RequestOpts};
use crate::pagination::Page;
use crate::resources::catalog::DatastreamInfo;
use crate::timestamp::Timestamp;
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum SubscriptionStatus {
Active,
Paused,
Deleted,
#[serde(other)]
Unknown,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Subscription {
pub id: String,
pub datastream_id: i64,
pub status: SubscriptionStatus,
pub created_at: Timestamp,
#[serde(default)]
pub datastream: Option<DatastreamInfo>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SubscriptionResponse {
pub id: String,
pub status: SubscriptionStatus,
pub datastream: DatastreamInfo,
pub created_at: Timestamp,
#[serde(default)]
pub spans: Vec<SubscriptionSpan>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SubscriptionSpan {
pub id: String,
pub started_at: Timestamp,
#[serde(default)]
pub ended_at: Option<Timestamp>,
}
pub struct SubscriptionsResource<'a> {
pub(crate) client: &'a Client,
}
impl<'a> SubscriptionsResource<'a> {
pub async fn get(&self, id: &str) -> Result<Subscription> {
let path = format!("/subscriptions/{id}");
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
None,
None,
RequestOpts::default(),
)
.await
}
pub fn pause(&self, id: impl Into<String>) -> PauseSubscriptionRequest<'a> {
PauseSubscriptionRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
pub fn resume(&self, id: impl Into<String>) -> ResumeSubscriptionRequest<'a> {
ResumeSubscriptionRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
pub fn delete(&self, id: impl Into<String>) -> DeleteSubscriptionRequest<'a> {
DeleteSubscriptionRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
pub async fn list_spans(&self, id: &str) -> Result<Vec<SubscriptionSpan>> {
let path = format!("/subscriptions/{id}/spans");
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
None,
None,
RequestOpts::default(),
)
.await
}
pub fn create(&self, datastream_id: i64) -> CreateSubscriptionRequest<'a> {
CreateSubscriptionRequest {
client: self.client,
datastream_id,
idempotency_key: None,
}
}
pub fn list(&self) -> ListSubscriptionsRequest<'a> {
ListSubscriptionsRequest {
client: self.client,
limit: None,
page_token: None,
}
}
}
pub struct CreateSubscriptionRequest<'a> {
client: &'a Client,
datastream_id: i64,
idempotency_key: Option<String>,
}
impl<'a> CreateSubscriptionRequest<'a> {
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<SubscriptionResponse> {
#[derive(Serialize)]
struct Body {
datastream_id: i64,
}
let body = Body {
datastream_id: self.datastream_id,
};
send(
self.client,
reqwest::Method::POST,
"/subscriptions",
None,
Some(&body),
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct ListSubscriptionsRequest<'a> {
client: &'a Client,
limit: Option<u32>,
page_token: Option<String>,
}
impl<'a> ListSubscriptionsRequest<'a> {
pub fn limit(mut self, n: u32) -> Self {
self.limit = Some(n);
self
}
pub fn page_token(mut self, t: impl Into<String>) -> Self {
self.page_token = Some(t.into());
self
}
fn query(&self) -> Vec<(&'static str, String)> {
let mut q = Vec::new();
if let Some(n) = self.limit {
q.push(("limit", n.to_string()));
}
if let Some(t) = &self.page_token {
q.push(("page_token", t.clone()));
}
q
}
pub async fn send(self) -> Result<Page<Subscription>> {
let q = self.query();
send::<_, ()>(
self.client,
reqwest::Method::GET,
"/subscriptions",
Some(q.as_slice()),
None,
RequestOpts::default(),
)
.await
}
pub fn stream(self) -> impl Stream<Item = Result<Subscription>> + 'a {
let Self { client, limit, .. } = self;
async_stream::try_stream! {
let mut page_token: Option<String> = None;
loop {
let req = ListSubscriptionsRequest {
client,
limit,
page_token: page_token.clone(),
};
let page = req.send().await?;
for item in page.items { yield item; }
match page.next_page_token {
Some(t) => page_token = Some(t),
None => break,
}
}
}
}
}
pub struct PauseSubscriptionRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> PauseSubscriptionRequest<'a> {
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<()> {
let path = format!("/subscriptions/{}/pause", self.id);
send_empty::<()>(
self.client,
reqwest::Method::POST,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct ResumeSubscriptionRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> ResumeSubscriptionRequest<'a> {
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<()> {
let path = format!("/subscriptions/{}/resume", self.id);
send_empty::<()>(
self.client,
reqwest::Method::POST,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct DeleteSubscriptionRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> DeleteSubscriptionRequest<'a> {
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<()> {
let path = format!("/subscriptions/{}", self.id);
send_empty::<()>(
self.client,
reqwest::Method::DELETE,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}