use std::fmt;
use std::time::Duration;
#[derive(Clone, PartialEq, Eq)]
pub struct TransportCredentials {
secret: Vec<u8>,
}
impl TransportCredentials {
#[must_use]
pub fn new(secret: impl Into<Vec<u8>>) -> Self {
Self {
secret: secret.into(),
}
}
#[must_use]
pub fn secret(&self) -> &[u8] {
&self.secret
}
}
impl fmt::Debug for TransportCredentials {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("TransportCredentials")
.field("secret", &"<redacted>")
.finish()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ReconnectConfig {
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub max_attempts: usize,
}
impl ReconnectConfig {
#[must_use]
pub const fn new(
initial_backoff: Duration,
max_backoff: Duration,
max_attempts: usize,
) -> Self {
Self {
initial_backoff,
max_backoff,
max_attempts,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WorkerConfig {
pub namespace: String,
pub subject: String,
pub endpoint: String,
pub task_queue: String,
pub identity: String,
pub max_concurrency: usize,
pub reconnect: ReconnectConfig,
pub transport_credentials: Option<TransportCredentials>,
}
const DEFAULT_WORKER_NAMESPACE: &str = "default";
const DEFAULT_WORKER_SUBJECT: &str = "worker";
impl WorkerConfig {
#[must_use]
pub const fn builder() -> WorkerConfigBuilder {
WorkerConfigBuilder::new()
}
#[must_use]
pub fn new(
endpoint: impl Into<String>,
task_queue: impl Into<String>,
identity: impl Into<String>,
max_concurrency: usize,
reconnect: ReconnectConfig,
transport_credentials: Option<TransportCredentials>,
) -> Self {
Self {
namespace: String::from(DEFAULT_WORKER_NAMESPACE),
subject: String::from(DEFAULT_WORKER_SUBJECT),
endpoint: endpoint.into(),
task_queue: task_queue.into(),
identity: identity.into(),
max_concurrency,
reconnect,
transport_credentials,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct WorkerConfigBuilder {
namespace: Option<String>,
subject: Option<String>,
endpoint: Option<String>,
task_queue: Option<String>,
identity: Option<String>,
max_concurrency: Option<usize>,
reconnect_initial_backoff: Option<Duration>,
reconnect_max_backoff: Option<Duration>,
reconnect_max_attempts: Option<usize>,
transport_credentials: Option<TransportCredentials>,
}
impl WorkerConfigBuilder {
#[must_use]
pub const fn new() -> Self {
Self {
namespace: None,
subject: None,
endpoint: None,
task_queue: None,
identity: None,
max_concurrency: None,
reconnect_initial_backoff: None,
reconnect_max_backoff: None,
reconnect_max_attempts: None,
transport_credentials: None,
}
}
#[must_use]
pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
self.namespace = Some(namespace.into());
self
}
#[must_use]
pub fn subject(mut self, subject: impl Into<String>) -> Self {
self.subject = Some(subject.into());
self
}
#[must_use]
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
#[must_use]
pub fn task_queue(mut self, task_queue: impl Into<String>) -> Self {
self.task_queue = Some(task_queue.into());
self
}
#[must_use]
pub fn identity(mut self, identity: impl Into<String>) -> Self {
self.identity = Some(identity.into());
self
}
#[must_use]
pub const fn max_concurrency(mut self, max_concurrency: usize) -> Self {
self.max_concurrency = Some(max_concurrency);
self
}
#[must_use]
pub const fn reconnect_initial_backoff(mut self, delay: Duration) -> Self {
self.reconnect_initial_backoff = Some(delay);
self
}
#[must_use]
pub const fn reconnect_max_backoff(mut self, delay: Duration) -> Self {
self.reconnect_max_backoff = Some(delay);
self
}
#[must_use]
pub const fn reconnect_max_attempts(mut self, attempts: usize) -> Self {
self.reconnect_max_attempts = Some(attempts);
self
}
#[must_use]
pub fn transport_credentials(mut self, credentials: TransportCredentials) -> Self {
self.transport_credentials = Some(credentials);
self
}
pub fn build(self) -> Result<WorkerConfig, WorkerConfigBuildError> {
Ok(WorkerConfig {
namespace: self
.namespace
.unwrap_or_else(|| String::from(DEFAULT_WORKER_NAMESPACE)),
subject: self
.subject
.unwrap_or_else(|| String::from(DEFAULT_WORKER_SUBJECT)),
endpoint: self
.endpoint
.ok_or(WorkerConfigBuildError::MissingEndpoint)?,
task_queue: self
.task_queue
.ok_or(WorkerConfigBuildError::MissingTaskQueue)?,
identity: self
.identity
.ok_or(WorkerConfigBuildError::MissingIdentity)?,
max_concurrency: self
.max_concurrency
.ok_or(WorkerConfigBuildError::MissingMaxConcurrency)?,
reconnect: ReconnectConfig {
initial_backoff: self
.reconnect_initial_backoff
.ok_or(WorkerConfigBuildError::MissingReconnectInitialBackoff)?,
max_backoff: self
.reconnect_max_backoff
.ok_or(WorkerConfigBuildError::MissingReconnectMaxBackoff)?,
max_attempts: self
.reconnect_max_attempts
.ok_or(WorkerConfigBuildError::MissingReconnectMaxAttempts)?,
},
transport_credentials: self.transport_credentials,
})
}
}
#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerConfigBuildError {
#[error("worker endpoint is required")]
MissingEndpoint,
#[error("worker task queue is required")]
MissingTaskQueue,
#[error("worker identity is required")]
MissingIdentity,
#[error("worker max_concurrency is required")]
MissingMaxConcurrency,
#[error("worker reconnect_initial_backoff is required")]
MissingReconnectInitialBackoff,
#[error("worker reconnect_max_backoff is required")]
MissingReconnectMaxBackoff,
#[error("worker reconnect_max_attempts is required")]
MissingReconnectMaxAttempts,
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::{TransportCredentials, WorkerConfig};
#[test]
fn worker_config_builder_round_trips_fields() -> Result<(), Box<dyn std::error::Error>> {
let credentials = TransportCredentials::new(b"secret-token".to_vec());
let config = WorkerConfig::builder()
.endpoint("http://127.0.0.1:50051")
.task_queue("payments")
.identity("worker-a")
.max_concurrency(7)
.reconnect_initial_backoff(Duration::from_millis(10))
.reconnect_max_backoff(Duration::from_millis(100))
.reconnect_max_attempts(3)
.namespace("payments")
.subject("worker-a")
.transport_credentials(credentials.clone())
.build()?;
assert_eq!(config.namespace, "payments");
assert_eq!(config.subject, "worker-a");
assert_eq!(config.endpoint, "http://127.0.0.1:50051");
assert_eq!(config.task_queue, "payments");
assert_eq!(config.identity, "worker-a");
assert_eq!(config.max_concurrency, 7);
assert_eq!(config.reconnect.initial_backoff, Duration::from_millis(10));
assert_eq!(config.reconnect.max_backoff, Duration::from_millis(100));
assert_eq!(config.reconnect.max_attempts, 3);
assert_eq!(config.transport_credentials, Some(credentials));
assert!(!format!("{config:?}").contains("secret-token"));
Ok(())
}
#[test]
fn worker_config_new_uses_auth_metadata_defaults() {
let config = WorkerConfig::new(
"http://127.0.0.1:50051",
"default",
"worker-a",
4,
super::ReconnectConfig::new(Duration::from_millis(10), Duration::from_millis(100), 3),
None,
);
assert_eq!(config.namespace, "default");
assert_eq!(config.subject, "worker");
}
}