ticksupply 0.2.1

Official Rust client for the Ticksupply market data API
Documentation
//! datastreams — List available datastreams across exchanges and instruments.
//!
//! # Examples
//!
//! ```no_run
//! # async fn example() -> ticksupply::Result<()> {
//! let client = ticksupply::Client::new()?;
//! let page = client.datastreams()
//!     .list()
//!     .exchange("binance")
//!     .stream_type("trades")
//!     .send().await?;
//! for ds in &page.items {
//!     println!("id={} {} {}", ds.datastream_id, ds.instrument, ds.stream_type);
//! }
//! # Ok(()) }
//! ```

use futures::Stream;

use crate::client::Client;
use crate::error::Result;
use crate::http::{send, RequestOpts};
use crate::pagination::Page;
use crate::resources::catalog::DatastreamInfo;

/// Accessor for `/datastreams`.
pub struct DatastreamsResource<'a> {
    pub(crate) client: &'a Client,
}

impl<'a> DatastreamsResource<'a> {
    /// Returns a builder for listing datastreams.
    ///
    /// All filters are optional; with no filters the response contains every
    /// datastream visible to the account.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # async fn example() -> ticksupply::Result<()> {
    /// let client = ticksupply::Client::new()?;
    /// let page = client.datastreams()
    ///     .list()
    ///     .exchange("binance")
    ///     .instrument("BTCUSDT")
    ///     .send().await?;
    /// # let _ = page;
    /// # Ok(()) }
    /// ```
    pub fn list(&self) -> ListDatastreamsRequest<'a> {
        ListDatastreamsRequest {
            client: self.client,
            exchange: None,
            instrument: None,
            stream_type: None,
            wire_format: None,
            limit: None,
            page_token: None,
        }
    }
}

/// Builder for `GET /v1/datastreams`.
pub struct ListDatastreamsRequest<'a> {
    client: &'a Client,
    exchange: Option<String>,
    instrument: Option<String>,
    stream_type: Option<String>,
    wire_format: Option<String>,
    limit: Option<u32>,
    page_token: Option<String>,
}

impl<'a> ListDatastreamsRequest<'a> {
    /// Filters results by exchange code.
    pub fn exchange(mut self, e: impl Into<String>) -> Self {
        self.exchange = Some(e.into());
        self
    }
    /// Filters results by instrument symbol.
    pub fn instrument(mut self, i: impl Into<String>) -> Self {
        self.instrument = Some(i.into());
        self
    }
    /// Filters results by stream type code (e.g. `"trades"`, `"depth"`).
    pub fn stream_type(mut self, s: impl Into<String>) -> Self {
        self.stream_type = Some(s.into());
        self
    }
    /// Filters results by wire format (e.g. `"json"`).
    pub fn wire_format(mut self, w: impl Into<String>) -> Self {
        self.wire_format = Some(w.into());
        self
    }
    /// Sets the maximum results per page (default 100, max 1000).
    pub fn limit(mut self, n: u32) -> Self {
        self.limit = Some(n);
        self
    }
    /// Sets the page token returned by a prior response.
    pub fn page_token(mut self, t: impl Into<String>) -> Self {
        self.page_token = Some(t.into());
        self
    }

    fn query(&self) -> Vec<(&'static str, String)> {
        let mut q = Vec::new();
        if let Some(s) = &self.exchange {
            q.push(("exchange", s.clone()));
        }
        if let Some(s) = &self.instrument {
            q.push(("instrument", s.clone()));
        }
        if let Some(s) = &self.stream_type {
            q.push(("stream_type", s.clone()));
        }
        if let Some(s) = &self.wire_format {
            q.push(("wire_format", s.clone()));
        }
        if let Some(n) = self.limit {
            q.push(("limit", n.to_string()));
        }
        if let Some(t) = &self.page_token {
            q.push(("page_token", t.clone()));
        }
        q
    }

    /// Fetches a single page of results.
    ///
    /// # Errors
    ///
    /// - [`crate::Error::Authentication`] on invalid credentials.
    /// - [`crate::Error::Validation`] if `limit` is out of range.
    /// - [`crate::Error::Network`] on transport failure.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # async fn example() -> ticksupply::Result<()> {
    /// let client = ticksupply::Client::new()?;
    /// let page = client.datastreams()
    ///     .list()
    ///     .exchange("binance")
    ///     .send().await?;
    /// # let _ = page;
    /// # Ok(()) }
    /// ```
    pub async fn send(self) -> Result<Page<DatastreamInfo>> {
        let q = self.query();
        send::<_, ()>(
            self.client,
            reqwest::Method::GET,
            "/datastreams",
            Some(q.as_slice()),
            None,
            RequestOpts::default(),
        )
        .await
    }

    /// Auto-paginates across all matching pages, yielding each datastream.
    ///
    /// Each yielded item is a [`Result`] that surfaces the same errors as
    /// [`Self::send`] if a page fetch fails.
    ///
    /// Streaming always starts from the first page; any `page_token`
    /// previously set on the builder is ignored. Filters and `limit` are
    /// preserved across pages.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # async fn example() -> ticksupply::Result<()> {
    /// use futures::StreamExt;
    /// let client = ticksupply::Client::new()?;
    /// let mut s = Box::pin(client.datastreams().list().stream());
    /// while let Some(ds) = s.next().await {
    ///     let _ds = ds?;
    /// }
    /// # Ok(()) }
    /// ```
    pub fn stream(self) -> impl Stream<Item = Result<DatastreamInfo>> + 'a {
        let Self {
            client,
            exchange,
            instrument,
            stream_type,
            wire_format,
            limit,
            ..
        } = self;
        async_stream::try_stream! {
            let mut page_token: Option<String> = None;
            loop {
                let req = ListDatastreamsRequest {
                    client,
                    exchange: exchange.clone(),
                    instrument: instrument.clone(),
                    stream_type: stream_type.clone(),
                    wire_format: wire_format.clone(),
                    limit,
                    page_token: page_token.clone(),
                };
                let page = req.send().await?;
                for item in page.items { yield item; }
                match page.next_page_token {
                    Some(t) => page_token = Some(t),
                    None => break,
                }
            }
        }
    }
}