use std::time::Duration;
use url::Url;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum LogLevel {
Debug,
#[default]
Info,
Warn,
Error,
}
impl LogLevel {
pub fn as_str(&self) -> &'static str {
match self {
LogLevel::Debug => "debug",
LogLevel::Info => "info",
LogLevel::Warn => "warn",
LogLevel::Error => "error",
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TapConfig {
pub database_url: Option<String>,
pub max_db_conns: Option<u32>,
pub bind: Option<String>,
pub admin_password: Option<String>,
pub metrics_listen: Option<String>,
pub log_level: Option<LogLevel>,
pub plc_url: Option<Url>,
pub relay_url: Option<Url>,
pub firehose_parallelism: Option<u32>,
pub resync_parallelism: Option<u32>,
pub outbox_parallelism: Option<u32>,
pub cursor_save_interval: Option<Duration>,
pub repo_fetch_timeout: Option<Duration>,
pub ident_cache_size: Option<u32>,
pub outbox_capacity: Option<u32>,
pub retry_timeout: Option<Duration>,
pub full_network: Option<bool>,
pub signal_collection: Option<String>,
pub collection_filters: Option<Vec<String>>,
pub disable_acks: Option<bool>,
pub webhook_url: Option<Url>,
pub outbox_only: Option<bool>,
pub inherit_stdio: Option<bool>,
pub shutdown_timeout: Option<Duration>,
pub request_timeout: Option<Duration>,
pub startup_timeout: Option<Duration>,
}
impl TapConfig {
pub fn new() -> Self {
Self::default()
}
pub fn builder() -> TapConfigBuilder {
TapConfigBuilder::default()
}
pub fn shutdown_timeout(&self) -> Duration {
self.shutdown_timeout.unwrap_or(Duration::from_secs(5))
}
pub fn request_timeout(&self) -> Duration {
self.request_timeout.unwrap_or(Duration::from_secs(30))
}
pub fn startup_timeout(&self) -> Duration {
self.startup_timeout.unwrap_or(Duration::from_secs(30))
}
pub fn inherit_stdio(&self) -> bool {
self.inherit_stdio.unwrap_or(false)
}
pub fn to_env_vars(&self) -> Vec<(String, String)> {
let mut vars = Vec::new();
macro_rules! push_env {
($field:expr, $name:literal, clone) => {
if let Some(ref v) = $field {
vars.push(($name.into(), v.clone()));
}
};
($field:expr, $name:literal, string) => {
if let Some(v) = $field {
vars.push(($name.into(), v.to_string()));
}
};
($field:expr, $name:literal, ref_string) => {
if let Some(ref v) = $field {
vars.push(($name.into(), v.to_string()));
}
};
($field:expr, $name:literal, as_str) => {
if let Some(ref v) = $field {
vars.push(($name.into(), v.as_str().into()));
}
};
($field:expr, $name:literal, duration) => {
if let Some(v) = $field {
vars.push(($name.into(), format_duration(v)));
}
};
}
push_env!(self.database_url, "TAP_DATABASE_URL", clone);
push_env!(self.max_db_conns, "TAP_MAX_DB_CONNS", string);
push_env!(self.bind, "TAP_BIND", clone);
push_env!(self.admin_password, "TAP_ADMIN_PASSWORD", clone);
push_env!(self.metrics_listen, "TAP_METRICS_LISTEN", clone);
push_env!(self.log_level, "TAP_LOG_LEVEL", as_str);
push_env!(self.plc_url, "TAP_PLC_URL", ref_string);
push_env!(self.relay_url, "TAP_RELAY_URL", ref_string);
push_env!(
self.firehose_parallelism,
"TAP_FIREHOSE_PARALLELISM",
string
);
push_env!(self.resync_parallelism, "TAP_RESYNC_PARALLELISM", string);
push_env!(self.outbox_parallelism, "TAP_OUTBOX_PARALLELISM", string);
push_env!(
self.cursor_save_interval,
"TAP_CURSOR_SAVE_INTERVAL",
duration
);
push_env!(self.repo_fetch_timeout, "TAP_REPO_FETCH_TIMEOUT", duration);
push_env!(self.ident_cache_size, "RELAY_IDENT_CACHE_SIZE", string);
push_env!(self.outbox_capacity, "TAP_OUTBOX_CAPACITY", string);
push_env!(self.retry_timeout, "TAP_RETRY_TIMEOUT", duration);
push_env!(self.full_network, "TAP_FULL_NETWORK", string);
push_env!(self.signal_collection, "TAP_SIGNAL_COLLECTION", clone);
push_env!(self.disable_acks, "TAP_DISABLE_ACKS", string);
push_env!(self.webhook_url, "TAP_WEBHOOK_URL", ref_string);
push_env!(self.outbox_only, "TAP_OUTBOX_ONLY", string);
if let Some(ref v) = self.collection_filters {
vars.push(("TAP_COLLECTION_FILTERS".into(), v.join(",")));
}
vars
}
}
fn format_duration(d: Duration) -> String {
let secs = d.as_secs();
let millis = d.subsec_millis();
if millis == 0 {
if secs > 0 && secs.is_multiple_of(3600) {
format!("{}h", secs / 3600)
} else if secs > 0 && secs.is_multiple_of(60) {
format!("{}m", secs / 60)
} else {
format!("{}s", secs)
}
} else {
format!("{}ms", d.as_millis())
}
}
#[derive(Debug, Clone, Default)]
pub struct TapConfigBuilder {
config: TapConfig,
}
impl TapConfigBuilder {
pub fn database_url(mut self, url: impl Into<String>) -> Self {
self.config.database_url = Some(url.into());
self
}
pub fn max_db_conns(mut self, n: u32) -> Self {
self.config.max_db_conns = Some(n);
self
}
pub fn bind(mut self, addr: impl Into<String>) -> Self {
self.config.bind = Some(addr.into());
self
}
pub fn admin_password(mut self, password: impl Into<String>) -> Self {
self.config.admin_password = Some(password.into());
self
}
pub fn metrics_listen(mut self, addr: impl Into<String>) -> Self {
self.config.metrics_listen = Some(addr.into());
self
}
pub fn log_level(mut self, level: LogLevel) -> Self {
self.config.log_level = Some(level);
self
}
pub fn plc_url(mut self, url: Url) -> Self {
self.config.plc_url = Some(url);
self
}
pub fn relay_url(mut self, url: Url) -> Self {
self.config.relay_url = Some(url);
self
}
pub fn firehose_parallelism(mut self, n: u32) -> Self {
self.config.firehose_parallelism = Some(n);
self
}
pub fn resync_parallelism(mut self, n: u32) -> Self {
self.config.resync_parallelism = Some(n);
self
}
pub fn outbox_parallelism(mut self, n: u32) -> Self {
self.config.outbox_parallelism = Some(n);
self
}
pub fn cursor_save_interval(mut self, d: Duration) -> Self {
self.config.cursor_save_interval = Some(d);
self
}
pub fn repo_fetch_timeout(mut self, d: Duration) -> Self {
self.config.repo_fetch_timeout = Some(d);
self
}
pub fn ident_cache_size(mut self, n: u32) -> Self {
self.config.ident_cache_size = Some(n);
self
}
pub fn outbox_capacity(mut self, n: u32) -> Self {
self.config.outbox_capacity = Some(n);
self
}
pub fn retry_timeout(mut self, d: Duration) -> Self {
self.config.retry_timeout = Some(d);
self
}
pub fn full_network(mut self, enabled: bool) -> Self {
self.config.full_network = Some(enabled);
self
}
pub fn signal_collection(mut self, collection: impl Into<String>) -> Self {
self.config.signal_collection = Some(collection.into());
self
}
pub fn collection_filter(mut self, filter: impl Into<String>) -> Self {
self.config
.collection_filters
.get_or_insert_with(Vec::new)
.push(filter.into());
self
}
pub fn collection_filters(mut self, filters: Vec<String>) -> Self {
self.config.collection_filters = Some(filters);
self
}
pub fn disable_acks(mut self, disabled: bool) -> Self {
self.config.disable_acks = Some(disabled);
self
}
pub fn webhook_url(mut self, url: Url) -> Self {
self.config.webhook_url = Some(url);
self
}
pub fn outbox_only(mut self, enabled: bool) -> Self {
self.config.outbox_only = Some(enabled);
self
}
pub fn shutdown_timeout(mut self, d: Duration) -> Self {
self.config.shutdown_timeout = Some(d);
self
}
pub fn request_timeout(mut self, d: Duration) -> Self {
self.config.request_timeout = Some(d);
self
}
pub fn startup_timeout(mut self, d: Duration) -> Self {
self.config.startup_timeout = Some(d);
self
}
pub fn inherit_stdio(mut self, inherit: bool) -> Self {
self.config.inherit_stdio = Some(inherit);
self
}
pub fn build(self) -> TapConfig {
self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_duration() {
assert_eq!(format_duration(Duration::from_secs(30)), "30s");
assert_eq!(format_duration(Duration::from_secs(60)), "1m");
assert_eq!(format_duration(Duration::from_secs(3600)), "1h");
assert_eq!(format_duration(Duration::from_secs(90)), "90s");
assert_eq!(format_duration(Duration::from_millis(500)), "500ms");
}
#[test]
fn test_config_to_env_vars() {
let config = TapConfig::builder()
.database_url("sqlite://./test.db")
.bind(":3000")
.signal_collection("app.bsky.feed.post")
.collection_filters(vec!["app.bsky.feed.post".into(), "app.bsky.graph.*".into()])
.disable_acks(true)
.build();
let vars = config.to_env_vars();
assert!(vars.contains(&("TAP_DATABASE_URL".into(), "sqlite://./test.db".into())));
assert!(vars.contains(&("TAP_BIND".into(), ":3000".into())));
assert!(vars.contains(&("TAP_SIGNAL_COLLECTION".into(), "app.bsky.feed.post".into())));
assert!(vars.contains(&(
"TAP_COLLECTION_FILTERS".into(),
"app.bsky.feed.post,app.bsky.graph.*".into()
)));
assert!(vars.contains(&("TAP_DISABLE_ACKS".into(), "true".into())));
}
}