mod client;
pub mod protocol;
use std::{net::SocketAddr, sync::Arc};
use dbn::{SType, Schema, VersionUpgradePolicy};
use time::{Duration, OffsetDateTime};
use tokio::net::{lookup_host, ToSocketAddrs};
use tracing::warn;
use typed_builder::TypedBuilder;
use crate::{ApiKey, Symbols};
pub use client::Client;
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct Subscription {
#[builder(setter(into))]
pub symbols: Symbols,
pub schema: Schema,
#[builder(default = SType::RawSymbol)]
pub stype_in: SType,
#[builder(default, setter(strip_option))]
pub start: Option<OffsetDateTime>,
#[doc(hidden)]
#[builder(setter(strip_bool))]
pub use_snapshot: bool,
#[builder(default, setter(strip_option))]
pub id: Option<u32>,
}
#[doc(hidden)]
#[derive(Debug, Copy, Clone)]
pub struct Unset;
#[derive(Debug, Clone)]
pub struct ClientBuilder<AK, D> {
addr: Option<Arc<Vec<SocketAddr>>>,
key: AK,
dataset: D,
send_ts_out: bool,
upgrade_policy: VersionUpgradePolicy,
heartbeat_interval: Option<Duration>,
}
impl Default for ClientBuilder<Unset, Unset> {
fn default() -> Self {
Self {
addr: None,
key: Unset,
dataset: Unset,
send_ts_out: false,
upgrade_policy: VersionUpgradePolicy::UpgradeToV2,
heartbeat_interval: None,
}
}
}
impl<AK, D> ClientBuilder<AK, D> {
pub fn send_ts_out(mut self, send_ts_out: bool) -> Self {
self.send_ts_out = send_ts_out;
self
}
pub fn upgrade_policy(mut self, upgrade_policy: VersionUpgradePolicy) -> Self {
self.upgrade_policy = upgrade_policy;
self
}
pub fn heartbeat_interval(mut self, heartbeat_interval: Duration) -> Self {
if heartbeat_interval.subsec_nanoseconds() > 0 {
warn!(
"heartbeat_interval subsecond precision ignored: {}ns",
heartbeat_interval.subsec_nanoseconds()
)
}
self.heartbeat_interval = Some(heartbeat_interval);
self
}
pub async fn addr(mut self, addr: impl ToSocketAddrs) -> crate::Result<Self> {
const PARAM_NAME: &str = "addr";
let addrs: Vec<_> = lookup_host(addr)
.await
.map_err(|e| crate::Error::bad_arg(PARAM_NAME, format!("{e}")))?
.collect();
self.addr = Some(Arc::new(addrs));
Ok(self)
}
}
impl ClientBuilder<Unset, Unset> {
pub fn new() -> Self {
Self::default()
}
}
impl<D> ClientBuilder<Unset, D> {
pub fn key(self, key: impl ToString) -> crate::Result<ClientBuilder<ApiKey, D>> {
Ok(ClientBuilder {
addr: self.addr,
key: ApiKey::new(key.to_string())?,
dataset: self.dataset,
send_ts_out: self.send_ts_out,
upgrade_policy: self.upgrade_policy,
heartbeat_interval: self.heartbeat_interval,
})
}
pub fn key_from_env(self) -> crate::Result<ClientBuilder<ApiKey, D>> {
let key = crate::key_from_env()?;
self.key(key)
}
}
impl<AK> ClientBuilder<AK, Unset> {
pub fn dataset(self, dataset: impl ToString) -> ClientBuilder<AK, String> {
ClientBuilder {
addr: self.addr,
key: self.key,
dataset: dataset.to_string(),
send_ts_out: self.send_ts_out,
upgrade_policy: self.upgrade_policy,
heartbeat_interval: self.heartbeat_interval,
}
}
}
impl ClientBuilder<ApiKey, String> {
pub async fn build(self) -> crate::Result<Client> {
if let Some(addr) = self.addr {
Client::connect_with_addr(
addr.as_slice(),
self.key.0,
self.dataset,
self.send_ts_out,
self.upgrade_policy,
self.heartbeat_interval,
)
.await
} else {
Client::connect(
self.key.0,
self.dataset,
self.send_ts_out,
self.upgrade_policy,
self.heartbeat_interval,
)
.await
}
}
}