databento 0.46.0

Official Databento client library
Documentation
//! The Live client and related API types. Used for both real-time data and intraday historical.

mod client;
pub mod protocol;

use std::{fmt::Display, net::SocketAddr, sync::Arc};

use dbn::{Compression, SType, Schema, VersionUpgradePolicy};
use time::{Duration, OffsetDateTime};
use tokio::net::{lookup_host, ToSocketAddrs};
use tracing::warn;
use typed_builder::TypedBuilder;

use crate::{ApiKey, DateTimeLike, Symbols};

pub use client::Client;

/// Timeouts for the Live client's connection and authentication phases.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TimeoutConf {
    /// The timeout for the TCP connection to the gateway. Defaults to 10 seconds.
    pub connect: Option<Duration>,
    /// The timeout for CRAM authentication with the gateway. Defaults to 30 seconds.
    pub auth: Option<Duration>,
}

impl Default for TimeoutConf {
    fn default() -> Self {
        Self {
            connect: Some(Duration::seconds(10)),
            auth: Some(Duration::seconds(30)),
        }
    }
}

/// Live session parameter which controls gateway behavior when the client
/// falls behind real time.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SlowReaderBehavior {
    /// Send a warning but continue reading.
    Warn,
    /// Skip records to catch up.
    Skip,
}

/// A subscription for real-time or intraday historical data.
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct Subscription {
    /// The symbols of the instruments to subscribe to.
    #[builder(setter(into))]
    pub symbols: Symbols,
    /// The data record schema of data to subscribe to.
    pub schema: Schema,
    /// The symbology type of the symbols in [`symbols`](Self::symbols).
    #[builder(default = SType::RawSymbol)]
    pub stype_in: SType,
    /// The inclusive start of subscription replay.
    /// Pass [`OffsetDateTime::UNIX_EPOCH`](time::OffsetDateTime::UNIX_EPOCH) to request all available data.
    /// When `None`, only real-time data is sent.
    ///
    /// Cannot be specified after the session is started with [`LiveClient::start`](crate::LiveClient::start).
    /// See [`Intraday Replay`](https://databento.com/docs/api-reference-live/basics/intraday-replay).
    #[builder(default, setter(transform = |dt: impl DateTimeLike| Some(dt.to_date_time())))]
    pub start: Option<OffsetDateTime>,
    #[doc(hidden)]
    /// Request subscription with snapshot. Only supported with `Mbo` schema.
    /// Defaults to `false`. Conflicts with the `start` parameter.
    #[builder(setter(strip_bool))]
    pub use_snapshot: bool,
    /// The optional numerical identifier associated with this subscription.
    #[builder(default, setter(strip_option))]
    pub id: Option<u32>,
}

#[doc(hidden)]
#[derive(Debug, Copy, Clone)]
pub struct Unset;

/// A type-safe builder for the [`LiveClient`](Client). It will not allow you to call
/// [`Self::build()`] before setting the required fields:
/// - `key`
/// - `dataset`
#[derive(Debug, Clone)]
pub struct ClientBuilder<AK, D> {
    addr: Option<Arc<Vec<SocketAddr>>>,
    key: AK,
    dataset: D,
    send_ts_out: bool,
    upgrade_policy: VersionUpgradePolicy,
    heartbeat_interval: Option<Duration>,
    buf_size: Option<usize>,
    user_agent_ext: Option<String>,
    compression: Compression,
    slow_reader_behavior: Option<SlowReaderBehavior>,
    timeout_conf: TimeoutConf,
}

impl Default for ClientBuilder<Unset, Unset> {
    fn default() -> Self {
        Self {
            addr: None,
            key: Unset,
            dataset: Unset,
            send_ts_out: false,
            upgrade_policy: VersionUpgradePolicy::default(),
            heartbeat_interval: None,
            buf_size: None,
            user_agent_ext: None,
            compression: Compression::None,
            slow_reader_behavior: None,
            timeout_conf: TimeoutConf::default(),
        }
    }
}

impl<AK, D> ClientBuilder<AK, D> {
    /// Sets `ts_out`, which when enabled instructs the gateway to send a send timestamp
    /// after every record. These can be decoded with the special [`WithTsOut`](dbn::record::WithTsOut) type.
    pub fn send_ts_out(mut self, send_ts_out: bool) -> Self {
        self.send_ts_out = send_ts_out;
        self
    }

    /// Sets `upgrade_policy`, which controls how to decode data from prior DBN
    /// versions. The current default is to upgrade them to the latest version while
    /// decoding.
    pub fn upgrade_policy(mut self, upgrade_policy: VersionUpgradePolicy) -> Self {
        self.upgrade_policy = upgrade_policy;
        self
    }

    /// Sets `heartbeat_interval`, which controls the interval at which the gateway
    /// will send heartbeat records if no other data records are sent. If no heartbeat
    /// interval is configured, the gateway default will be used. Minimum interval
    /// is 5 seconds.
    ///
    /// Note that granularity of less than a second is not supported and will be
    /// ignored.
    pub fn heartbeat_interval(mut self, heartbeat_interval: Duration) -> Self {
        if heartbeat_interval.subsec_nanoseconds() > 0 {
            warn!(
                "heartbeat_interval subsecond precision ignored: {}ns",
                heartbeat_interval.subsec_nanoseconds()
            )
        }
        self.heartbeat_interval = Some(heartbeat_interval);
        self
    }

    /// Sets the initial size of the internal buffer used for reading data from the
    /// TCP socket.
    pub fn buffer_size(mut self, size: usize) -> Self {
        self.buf_size = Some(size);
        self
    }

    /// Overrides the address of the gateway the client will connect to. This is an
    /// advanced method.
    ///
    /// # Errors
    /// This function returns an error when `addr` fails to resolve.
    pub async fn addr(mut self, addr: impl ToSocketAddrs) -> crate::Result<Self> {
        const PARAM_NAME: &str = "addr";
        let addrs: Vec<_> = lookup_host(addr)
            .await
            .map_err(|e| crate::Error::bad_arg(PARAM_NAME, format!("{e}")))?
            .collect();
        self.addr = Some(Arc::new(addrs));
        Ok(self)
    }

    /// Extends the user agent. Intended for library authors.
    pub fn user_agent_extension(mut self, extension: String) -> Self {
        self.user_agent_ext = Some(extension);
        self
    }

    /// Sets the compression mode for the read stream. Default is [`Compression::None`].
    pub fn compression(mut self, compression: Compression) -> Self {
        self.compression = compression;
        self
    }

    /// Sets the behavior of the gateway when the client falls behind real time.
    pub fn slow_reader_behavior(mut self, slow_reader_behavior: SlowReaderBehavior) -> Self {
        self.slow_reader_behavior = Some(slow_reader_behavior);
        self
    }

    /// Sets the timeouts for connecting and authenticating with the gateway.
    /// Defaults to 10 seconds for connect and 30 seconds for auth.
    pub fn timeout_conf(mut self, timeout_conf: TimeoutConf) -> Self {
        self.timeout_conf = timeout_conf;
        self
    }
}

impl ClientBuilder<Unset, Unset> {
    /// Creates a new [`ClientBuilder`].
    pub fn new() -> Self {
        Self::default()
    }
}

impl<D> ClientBuilder<Unset, D> {
    /// Sets the API key.
    ///
    /// # Errors
    /// This function returns an error when the API key is invalid.
    pub fn key(self, key: impl ToString) -> crate::Result<ClientBuilder<ApiKey, D>> {
        Ok(ClientBuilder {
            addr: self.addr,
            key: ApiKey::new(key.to_string())?,
            dataset: self.dataset,
            send_ts_out: self.send_ts_out,
            upgrade_policy: self.upgrade_policy,
            heartbeat_interval: self.heartbeat_interval,
            buf_size: self.buf_size,
            user_agent_ext: self.user_agent_ext,
            compression: self.compression,
            slow_reader_behavior: self.slow_reader_behavior,
            timeout_conf: self.timeout_conf,
        })
    }

    /// Sets the API key reading it from the `DATABENTO_API_KEY` environment
    /// variable.
    ///
    /// # Errors
    /// This function returns an error when the environment variable is not set or the
    /// API key is invalid.
    pub fn key_from_env(self) -> crate::Result<ClientBuilder<ApiKey, D>> {
        let key = crate::key_from_env()?;
        self.key(key)
    }
}

impl<AK> ClientBuilder<AK, Unset> {
    /// Sets the dataset.
    pub fn dataset(self, dataset: impl ToString) -> ClientBuilder<AK, String> {
        ClientBuilder {
            addr: self.addr,
            key: self.key,
            dataset: dataset.to_string(),
            send_ts_out: self.send_ts_out,
            upgrade_policy: self.upgrade_policy,
            heartbeat_interval: self.heartbeat_interval,
            buf_size: self.buf_size,
            user_agent_ext: self.user_agent_ext,
            compression: self.compression,
            slow_reader_behavior: self.slow_reader_behavior,
            timeout_conf: self.timeout_conf,
        }
    }
}

impl ClientBuilder<ApiKey, String> {
    /// Initializes the client and attempts to connect to the gateway.
    ///
    /// # Errors
    /// This function returns an error when its unable
    /// to connect and authenticate with the Live gateway.
    pub async fn build(self) -> crate::Result<Client> {
        Client::new(self).await
    }
}

impl Display for SlowReaderBehavior {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Warn => write!(f, "warn"),
            Self::Skip => write!(f, "skip"),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use dbn::Schema;
    use time::macros::datetime;

    #[test]
    fn subscription_with_time_offset_datetime() {
        let start = datetime!(2024-03-15 09:30:00 UTC);
        let sub = Subscription::builder()
            .symbols("AAPL")
            .schema(Schema::Trades)
            .start(start)
            .build();
        assert_eq!(sub.start, Some(start));
    }

    #[test]
    fn subscription_with_time_date() {
        let date = time::macros::date!(2024 - 03 - 15);
        let sub = Subscription::builder()
            .symbols("AAPL")
            .schema(Schema::Trades)
            .start(date)
            .build();
        assert_eq!(sub.start, Some(datetime!(2024-03-15 00:00:00 UTC)));
    }

    #[cfg(feature = "chrono")]
    mod chrono_tests {
        use super::*;
        use chrono::{TimeZone, Utc};

        #[test]
        fn subscription_with_chrono_datetime_utc() {
            let start = Utc.with_ymd_and_hms(2024, 3, 15, 9, 30, 0).unwrap();
            let sub = Subscription::builder()
                .symbols("AAPL")
                .schema(Schema::Trades)
                .start(start)
                .build();
            assert_eq!(sub.start, Some(datetime!(2024-03-15 09:30:00 UTC)));
        }

        #[test]
        fn subscription_with_chrono_datetime_fixed_offset() {
            use chrono::FixedOffset;
            let est = FixedOffset::west_opt(5 * 3600).unwrap();
            let start = est.with_ymd_and_hms(2024, 3, 15, 9, 30, 0).unwrap();
            let sub = Subscription::builder()
                .symbols("AAPL")
                .schema(Schema::Trades)
                .start(start)
                .build();
            // 09:30 EST = 14:30 UTC
            assert_eq!(sub.start, Some(datetime!(2024-03-15 14:30:00 UTC)));
        }

        #[test]
        fn subscription_with_chrono_naive_date() {
            let date = chrono::NaiveDate::from_ymd_opt(2024, 3, 15).unwrap();
            let sub = Subscription::builder()
                .symbols("AAPL")
                .schema(Schema::Trades)
                .start(date)
                .build();
            assert_eq!(sub.start, Some(datetime!(2024-03-15 00:00:00 UTC)));
        }
    }
}