use futures::Stream;
use crate::client::Client;
use crate::error::Result;
use crate::http::{send, RequestOpts};
use crate::pagination::Page;
use crate::resources::catalog::DatastreamInfo;
pub struct DatastreamsResource<'a> {
pub(crate) client: &'a Client,
}
impl<'a> DatastreamsResource<'a> {
pub fn list(&self) -> ListDatastreamsRequest<'a> {
ListDatastreamsRequest {
client: self.client,
exchange: None,
instrument: None,
stream_type: None,
wire_format: None,
limit: None,
page_token: None,
}
}
}
pub struct ListDatastreamsRequest<'a> {
client: &'a Client,
exchange: Option<String>,
instrument: Option<String>,
stream_type: Option<String>,
wire_format: Option<String>,
limit: Option<u32>,
page_token: Option<String>,
}
impl<'a> ListDatastreamsRequest<'a> {
pub fn exchange(mut self, e: impl Into<String>) -> Self {
self.exchange = Some(e.into());
self
}
pub fn instrument(mut self, i: impl Into<String>) -> Self {
self.instrument = Some(i.into());
self
}
pub fn stream_type(mut self, s: impl Into<String>) -> Self {
self.stream_type = Some(s.into());
self
}
pub fn wire_format(mut self, w: impl Into<String>) -> Self {
self.wire_format = Some(w.into());
self
}
pub fn limit(mut self, n: u32) -> Self {
self.limit = Some(n);
self
}
pub fn page_token(mut self, t: impl Into<String>) -> Self {
self.page_token = Some(t.into());
self
}
fn query(&self) -> Vec<(&'static str, String)> {
let mut q = Vec::new();
if let Some(s) = &self.exchange {
q.push(("exchange", s.clone()));
}
if let Some(s) = &self.instrument {
q.push(("instrument", s.clone()));
}
if let Some(s) = &self.stream_type {
q.push(("stream_type", s.clone()));
}
if let Some(s) = &self.wire_format {
q.push(("wire_format", s.clone()));
}
if let Some(n) = self.limit {
q.push(("limit", n.to_string()));
}
if let Some(t) = &self.page_token {
q.push(("page_token", t.clone()));
}
q
}
pub async fn send(self) -> Result<Page<DatastreamInfo>> {
let q = self.query();
send::<_, ()>(
self.client,
reqwest::Method::GET,
"/datastreams",
Some(q.as_slice()),
None,
RequestOpts::default(),
)
.await
}
pub fn stream(self) -> impl Stream<Item = Result<DatastreamInfo>> + 'a {
let Self {
client,
exchange,
instrument,
stream_type,
wire_format,
limit,
..
} = self;
async_stream::try_stream! {
let mut page_token: Option<String> = None;
loop {
let req = ListDatastreamsRequest {
client,
exchange: exchange.clone(),
instrument: instrument.clone(),
stream_type: stream_type.clone(),
wire_format: wire_format.clone(),
limit,
page_token: page_token.clone(),
};
let page = req.send().await?;
for item in page.items { yield item; }
match page.next_page_token {
Some(t) => page_token = Some(t),
None => break,
}
}
}
}
}