use futures::Stream;
use serde::{Deserialize, Serialize};
use crate::client::Client;
use crate::error::Result;
use crate::http::{send, send_empty, RequestOpts};
use crate::pagination::Page;
use crate::timestamp::{IntoTimestamp, Nanos, Timestamp};
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum ExportStatus {
Queued,
Running,
Succeeded,
Failed,
Canceled,
Deleted,
#[serde(other)]
Unknown,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExportJob {
pub id: String,
pub datastream_id: i64,
pub start_time: Nanos,
pub end_time: Nanos,
pub format: String,
pub status: ExportStatus,
#[serde(default)]
pub reason: Option<String>,
pub created_at: Timestamp,
#[serde(default)]
pub started_at: Option<Timestamp>,
#[serde(default)]
pub finished_at: Option<Timestamp>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ArtifactDownload {
pub id: String,
pub url: String,
pub bytes: u64,
pub filename: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExportDownloadResponse {
pub artifacts: Vec<ArtifactDownload>,
pub count: u32,
pub total_bytes: u64,
}
impl ExportDownloadResponse {
pub fn single(&self) -> Option<&ArtifactDownload> {
match self.artifacts.as_slice() {
[only] => Some(only),
_ => None,
}
}
}
pub struct ExportsResource<'a> {
pub(crate) client: &'a Client,
}
impl<'a> ExportsResource<'a> {
pub async fn get(&self, id: &str) -> Result<ExportJob> {
let path = format!("/exports/{id}");
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
None,
None,
RequestOpts::default(),
)
.await
}
pub fn cancel(&self, id: impl Into<String>) -> CancelExportRequest<'a> {
CancelExportRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
pub fn delete(&self, id: impl Into<String>) -> DeleteExportRequest<'a> {
DeleteExportRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
pub async fn get_download(&self, id: &str) -> Result<ExportDownloadResponse> {
let path = format!("/exports/{id}/download");
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
None,
None,
RequestOpts::default(),
)
.await
}
pub fn create(
&self,
datastream_id: i64,
start_time: impl IntoTimestamp,
end_time: impl IntoTimestamp,
) -> CreateExportRequest<'a> {
CreateExportRequest {
client: self.client,
datastream_id,
start_time: start_time.into_timestamp_string(),
end_time: end_time.into_timestamp_string(),
idempotency_key: None,
schema: None,
}
}
pub fn list(&self) -> ListExportsRequest<'a> {
ListExportsRequest {
client: self.client,
limit: None,
page_token: None,
}
}
}
pub struct CreateExportRequest<'a> {
client: &'a Client,
datastream_id: i64,
start_time: String,
end_time: String,
idempotency_key: Option<String>,
schema: Option<SchemaArg>,
}
enum SchemaArg {
Ref(String),
Inline(serde_json::Value),
}
impl<'a> CreateExportRequest<'a> {
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub fn schema(mut self, s: impl Into<String>) -> Self {
self.schema = Some(SchemaArg::Ref(s.into()));
self
}
pub fn inline_schema(mut self, v: serde_json::Value) -> Self {
self.schema = Some(SchemaArg::Inline(v));
self
}
pub async fn send(self) -> Result<ExportJob> {
#[derive(Serialize)]
#[serde(untagged)]
enum SchemaBody<'b> {
Ref(&'b str),
Inline(&'b serde_json::Value),
}
#[derive(Serialize)]
struct Body<'b> {
datastream_id: i64,
start_time: &'b str,
end_time: &'b str,
#[serde(skip_serializing_if = "Option::is_none")]
schema: Option<SchemaBody<'b>>,
}
let schema = self.schema.as_ref().map(|s| match s {
SchemaArg::Ref(r) => SchemaBody::Ref(r),
SchemaArg::Inline(v) => SchemaBody::Inline(v),
});
let body = Body {
datastream_id: self.datastream_id,
start_time: &self.start_time,
end_time: &self.end_time,
schema,
};
send(
self.client,
reqwest::Method::POST,
"/exports",
None,
Some(&body),
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct ListExportsRequest<'a> {
client: &'a Client,
limit: Option<u32>,
page_token: Option<String>,
}
impl<'a> ListExportsRequest<'a> {
pub fn limit(mut self, n: u32) -> Self {
self.limit = Some(n);
self
}
pub fn page_token(mut self, t: impl Into<String>) -> Self {
self.page_token = Some(t.into());
self
}
fn query(&self) -> Vec<(&'static str, String)> {
let mut q = Vec::new();
if let Some(n) = self.limit {
q.push(("limit", n.to_string()));
}
if let Some(t) = &self.page_token {
q.push(("page_token", t.clone()));
}
q
}
pub async fn send(self) -> Result<Page<ExportJob>> {
let q = self.query();
send::<_, ()>(
self.client,
reqwest::Method::GET,
"/exports",
Some(q.as_slice()),
None,
RequestOpts::default(),
)
.await
}
pub fn stream(self) -> impl Stream<Item = Result<ExportJob>> + 'a {
let Self { client, limit, .. } = self;
async_stream::try_stream! {
let mut page_token: Option<String> = None;
loop {
let req = ListExportsRequest {
client,
limit,
page_token: page_token.clone(),
};
let page = req.send().await?;
for item in page.items { yield item; }
match page.next_page_token {
Some(t) => page_token = Some(t),
None => break,
}
}
}
}
}
pub struct CancelExportRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> CancelExportRequest<'a> {
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<()> {
let path = format!("/exports/{}/cancel", self.id);
send_empty::<()>(
self.client,
reqwest::Method::POST,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct DeleteExportRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> DeleteExportRequest<'a> {
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<()> {
let path = format!("/exports/{}", self.id);
send_empty::<()>(
self.client,
reqwest::Method::DELETE,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}