use futures::Stream;
use serde::Deserialize;
use crate::client::Client;
use crate::error::Result;
use crate::http::{send, RequestOpts};
use crate::pagination::Page;
#[derive(Debug, Clone, Deserialize)]
pub struct Exchange {
pub code: String,
pub display_name: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Instrument {
pub symbol: String,
#[serde(default)]
pub base: Option<String>,
#[serde(default)]
pub quote: Option<String>,
#[serde(default)]
pub instrument_type: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DatastreamInfo {
pub datastream_id: i64,
pub exchange: String,
pub instrument: String,
pub stream_type: String,
pub wire_format: String,
}
pub struct ExchangesResource<'a> {
pub(crate) client: &'a Client,
}
impl<'a> ExchangesResource<'a> {
pub async fn list(&self) -> Result<Vec<Exchange>> {
send::<_, ()>(
self.client,
reqwest::Method::GET,
"/exchanges",
None,
None,
RequestOpts::default(),
)
.await
}
pub fn list_instruments(&self, exchange: impl Into<String>) -> ListInstrumentsRequest<'a> {
ListInstrumentsRequest {
client: self.client,
exchange: exchange.into(),
search: None,
base: None,
quote: None,
instrument_type: None,
limit: None,
page_token: None,
}
}
pub fn list_datastreams(
&self,
exchange: impl Into<String>,
instrument: impl Into<String>,
) -> ListExchangeDatastreamsRequest<'a> {
ListExchangeDatastreamsRequest {
client: self.client,
exchange: exchange.into(),
instrument: instrument.into(),
stream_type: None,
wire_format: None,
limit: None,
page_token: None,
}
}
}
pub struct ListInstrumentsRequest<'a> {
client: &'a Client,
exchange: String,
search: Option<String>,
base: Option<String>,
quote: Option<String>,
instrument_type: Option<String>,
limit: Option<u32>,
page_token: Option<String>,
}
impl<'a> ListInstrumentsRequest<'a> {
pub fn search(mut self, s: impl Into<String>) -> Self {
self.search = Some(s.into());
self
}
pub fn base(mut self, b: impl Into<String>) -> Self {
self.base = Some(b.into());
self
}
pub fn quote(mut self, q: impl Into<String>) -> Self {
self.quote = Some(q.into());
self
}
pub fn instrument_type(mut self, t: impl Into<String>) -> Self {
self.instrument_type = Some(t.into());
self
}
pub fn limit(mut self, n: u32) -> Self {
self.limit = Some(n);
self
}
pub fn page_token(mut self, t: impl Into<String>) -> Self {
self.page_token = Some(t.into());
self
}
fn query(&self) -> Vec<(&'static str, String)> {
let mut q = Vec::new();
if let Some(s) = &self.search {
q.push(("search", s.clone()));
}
if let Some(s) = &self.base {
q.push(("base", s.clone()));
}
if let Some(s) = &self.quote {
q.push(("quote", s.clone()));
}
if let Some(s) = &self.instrument_type {
q.push(("instrument_type", s.clone()));
}
if let Some(n) = self.limit {
q.push(("limit", n.to_string()));
}
if let Some(t) = &self.page_token {
q.push(("page_token", t.clone()));
}
q
}
pub async fn send(self) -> Result<Page<Instrument>> {
let path = format!("/exchanges/{}/instruments", self.exchange);
let q = self.query();
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
Some(q.as_slice()),
None,
RequestOpts::default(),
)
.await
}
pub fn stream(self) -> impl Stream<Item = Result<Instrument>> + 'a {
let Self {
client,
exchange,
search,
base,
quote,
instrument_type,
limit,
..
} = self;
async_stream::try_stream! {
let mut page_token: Option<String> = None;
loop {
let req = ListInstrumentsRequest {
client,
exchange: exchange.clone(),
search: search.clone(),
base: base.clone(),
quote: quote.clone(),
instrument_type: instrument_type.clone(),
limit,
page_token: page_token.clone(),
};
let page = req.send().await?;
for item in page.items { yield item; }
match page.next_page_token {
Some(t) => page_token = Some(t),
None => break,
}
}
}
}
}
pub struct ListExchangeDatastreamsRequest<'a> {
client: &'a Client,
exchange: String,
instrument: String,
stream_type: Option<String>,
wire_format: Option<String>,
limit: Option<u32>,
page_token: Option<String>,
}
impl<'a> ListExchangeDatastreamsRequest<'a> {
pub fn stream_type(mut self, t: impl Into<String>) -> Self {
self.stream_type = Some(t.into());
self
}
pub fn wire_format(mut self, w: impl Into<String>) -> Self {
self.wire_format = Some(w.into());
self
}
pub fn limit(mut self, n: u32) -> Self {
self.limit = Some(n);
self
}
pub fn page_token(mut self, t: impl Into<String>) -> Self {
self.page_token = Some(t.into());
self
}
fn query(&self) -> Vec<(&'static str, String)> {
let mut q = Vec::new();
if let Some(s) = &self.stream_type {
q.push(("stream_type", s.clone()));
}
if let Some(s) = &self.wire_format {
q.push(("wire_format", s.clone()));
}
if let Some(n) = self.limit {
q.push(("limit", n.to_string()));
}
if let Some(t) = &self.page_token {
q.push(("page_token", t.clone()));
}
q
}
pub async fn send(self) -> Result<Page<DatastreamInfo>> {
let path = format!(
"/exchanges/{}/instruments/{}/datastreams",
self.exchange, self.instrument
);
let q = self.query();
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
Some(q.as_slice()),
None,
RequestOpts::default(),
)
.await
}
pub fn stream(self) -> impl Stream<Item = Result<DatastreamInfo>> + 'a {
let Self {
client,
exchange,
instrument,
stream_type,
wire_format,
limit,
..
} = self;
async_stream::try_stream! {
let mut page_token: Option<String> = None;
loop {
let req = ListExchangeDatastreamsRequest {
client,
exchange: exchange.clone(),
instrument: instrument.clone(),
stream_type: stream_type.clone(),
wire_format: wire_format.clone(),
limit,
page_token: page_token.clone(),
};
let page = req.send().await?;
for item in page.items { yield item; }
match page.next_page_token {
Some(t) => page_token = Some(t),
None => break,
}
}
}
}
}