mod client;
pub mod protocol;
use std::{fmt::Display, net::SocketAddr, sync::Arc};
use dbn::{Compression, SType, Schema, VersionUpgradePolicy};
use time::{Duration, OffsetDateTime};
use tokio::net::{lookup_host, ToSocketAddrs};
use tracing::warn;
use typed_builder::TypedBuilder;
use crate::{ApiKey, DateTimeLike, Symbols};
pub use client::Client;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TimeoutConf {
pub connect: Option<Duration>,
pub auth: Option<Duration>,
}
impl Default for TimeoutConf {
fn default() -> Self {
Self {
connect: Some(Duration::seconds(10)),
auth: Some(Duration::seconds(30)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SlowReaderBehavior {
Warn,
Skip,
}
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct Subscription {
#[builder(setter(into))]
pub symbols: Symbols,
pub schema: Schema,
#[builder(default = SType::RawSymbol)]
pub stype_in: SType,
#[builder(default, setter(transform = |dt: impl DateTimeLike| Some(dt.to_date_time())))]
pub start: Option<OffsetDateTime>,
#[doc(hidden)]
#[builder(setter(strip_bool))]
pub use_snapshot: bool,
#[builder(default, setter(strip_option))]
pub id: Option<u32>,
}
#[doc(hidden)]
#[derive(Debug, Copy, Clone)]
pub struct Unset;
#[derive(Debug, Clone)]
pub struct ClientBuilder<AK, D> {
addr: Option<Arc<Vec<SocketAddr>>>,
key: AK,
dataset: D,
send_ts_out: bool,
upgrade_policy: VersionUpgradePolicy,
heartbeat_interval: Option<Duration>,
buf_size: Option<usize>,
user_agent_ext: Option<String>,
compression: Compression,
slow_reader_behavior: Option<SlowReaderBehavior>,
timeout_conf: TimeoutConf,
}
impl Default for ClientBuilder<Unset, Unset> {
fn default() -> Self {
Self {
addr: None,
key: Unset,
dataset: Unset,
send_ts_out: false,
upgrade_policy: VersionUpgradePolicy::default(),
heartbeat_interval: None,
buf_size: None,
user_agent_ext: None,
compression: Compression::None,
slow_reader_behavior: None,
timeout_conf: TimeoutConf::default(),
}
}
}
impl<AK, D> ClientBuilder<AK, D> {
pub fn send_ts_out(mut self, send_ts_out: bool) -> Self {
self.send_ts_out = send_ts_out;
self
}
pub fn upgrade_policy(mut self, upgrade_policy: VersionUpgradePolicy) -> Self {
self.upgrade_policy = upgrade_policy;
self
}
pub fn heartbeat_interval(mut self, heartbeat_interval: Duration) -> Self {
if heartbeat_interval.subsec_nanoseconds() > 0 {
warn!(
"heartbeat_interval subsecond precision ignored: {}ns",
heartbeat_interval.subsec_nanoseconds()
)
}
self.heartbeat_interval = Some(heartbeat_interval);
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.buf_size = Some(size);
self
}
pub async fn addr(mut self, addr: impl ToSocketAddrs) -> crate::Result<Self> {
const PARAM_NAME: &str = "addr";
let addrs: Vec<_> = lookup_host(addr)
.await
.map_err(|e| crate::Error::bad_arg(PARAM_NAME, format!("{e}")))?
.collect();
self.addr = Some(Arc::new(addrs));
Ok(self)
}
pub fn user_agent_extension(mut self, extension: String) -> Self {
self.user_agent_ext = Some(extension);
self
}
pub fn compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
pub fn slow_reader_behavior(mut self, slow_reader_behavior: SlowReaderBehavior) -> Self {
self.slow_reader_behavior = Some(slow_reader_behavior);
self
}
pub fn timeout_conf(mut self, timeout_conf: TimeoutConf) -> Self {
self.timeout_conf = timeout_conf;
self
}
}
impl ClientBuilder<Unset, Unset> {
pub fn new() -> Self {
Self::default()
}
}
impl<D> ClientBuilder<Unset, D> {
pub fn key(self, key: impl ToString) -> crate::Result<ClientBuilder<ApiKey, D>> {
Ok(ClientBuilder {
addr: self.addr,
key: ApiKey::new(key.to_string())?,
dataset: self.dataset,
send_ts_out: self.send_ts_out,
upgrade_policy: self.upgrade_policy,
heartbeat_interval: self.heartbeat_interval,
buf_size: self.buf_size,
user_agent_ext: self.user_agent_ext,
compression: self.compression,
slow_reader_behavior: self.slow_reader_behavior,
timeout_conf: self.timeout_conf,
})
}
pub fn key_from_env(self) -> crate::Result<ClientBuilder<ApiKey, D>> {
let key = crate::key_from_env()?;
self.key(key)
}
}
impl<AK> ClientBuilder<AK, Unset> {
pub fn dataset(self, dataset: impl ToString) -> ClientBuilder<AK, String> {
ClientBuilder {
addr: self.addr,
key: self.key,
dataset: dataset.to_string(),
send_ts_out: self.send_ts_out,
upgrade_policy: self.upgrade_policy,
heartbeat_interval: self.heartbeat_interval,
buf_size: self.buf_size,
user_agent_ext: self.user_agent_ext,
compression: self.compression,
slow_reader_behavior: self.slow_reader_behavior,
timeout_conf: self.timeout_conf,
}
}
}
impl ClientBuilder<ApiKey, String> {
pub async fn build(self) -> crate::Result<Client> {
Client::new(self).await
}
}
impl Display for SlowReaderBehavior {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Warn => write!(f, "warn"),
Self::Skip => write!(f, "skip"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use dbn::Schema;
use time::macros::datetime;
#[test]
fn subscription_with_time_offset_datetime() {
let start = datetime!(2024-03-15 09:30:00 UTC);
let sub = Subscription::builder()
.symbols("AAPL")
.schema(Schema::Trades)
.start(start)
.build();
assert_eq!(sub.start, Some(start));
}
#[test]
fn subscription_with_time_date() {
let date = time::macros::date!(2024 - 03 - 15);
let sub = Subscription::builder()
.symbols("AAPL")
.schema(Schema::Trades)
.start(date)
.build();
assert_eq!(sub.start, Some(datetime!(2024-03-15 00:00:00 UTC)));
}
#[cfg(feature = "chrono")]
mod chrono_tests {
use super::*;
use chrono::{TimeZone, Utc};
#[test]
fn subscription_with_chrono_datetime_utc() {
let start = Utc.with_ymd_and_hms(2024, 3, 15, 9, 30, 0).unwrap();
let sub = Subscription::builder()
.symbols("AAPL")
.schema(Schema::Trades)
.start(start)
.build();
assert_eq!(sub.start, Some(datetime!(2024-03-15 09:30:00 UTC)));
}
#[test]
fn subscription_with_chrono_datetime_fixed_offset() {
use chrono::FixedOffset;
let est = FixedOffset::west_opt(5 * 3600).unwrap();
let start = est.with_ymd_and_hms(2024, 3, 15, 9, 30, 0).unwrap();
let sub = Subscription::builder()
.symbols("AAPL")
.schema(Schema::Trades)
.start(start)
.build();
assert_eq!(sub.start, Some(datetime!(2024-03-15 14:30:00 UTC)));
}
#[test]
fn subscription_with_chrono_naive_date() {
let date = chrono::NaiveDate::from_ymd_opt(2024, 3, 15).unwrap();
let sub = Subscription::builder()
.symbols("AAPL")
.schema(Schema::Trades)
.start(date)
.build();
assert_eq!(sub.start, Some(datetime!(2024-03-15 00:00:00 UTC)));
}
}
}