1use std::fmt;
4use std::time::Duration;
5
6#[derive(Clone, PartialEq, Eq)]
12pub struct TransportCredentials {
13 secret: Vec<u8>,
14}
15
16impl TransportCredentials {
17 #[must_use]
19 pub fn new(secret: impl Into<Vec<u8>>) -> Self {
20 Self {
21 secret: secret.into(),
22 }
23 }
24
25 #[must_use]
27 pub fn secret(&self) -> &[u8] {
28 &self.secret
29 }
30}
31
32impl fmt::Debug for TransportCredentials {
33 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34 formatter
35 .debug_struct("TransportCredentials")
36 .field("secret", &"<redacted>")
37 .finish()
38 }
39}
40
41#[derive(Clone, Debug, PartialEq, Eq)]
74pub struct ReconnectConfig {
75 pub initial_backoff: Duration,
77 pub max_backoff: Duration,
81 pub max_attempts: usize,
83}
84
85impl ReconnectConfig {
86 #[must_use]
88 pub const fn new(
89 initial_backoff: Duration,
90 max_backoff: Duration,
91 max_attempts: usize,
92 ) -> Self {
93 Self {
94 initial_backoff,
95 max_backoff,
96 max_attempts,
97 }
98 }
99}
100
101#[derive(Clone, Debug, PartialEq, Eq)]
107pub struct WorkerConfig {
108 pub namespace: String,
110 pub subject: String,
112 pub endpoint: String,
114 pub task_queue: String,
117 pub identity: String,
119 pub max_concurrency: usize,
121 pub reconnect: ReconnectConfig,
123 pub transport_credentials: Option<TransportCredentials>,
125}
126
127const DEFAULT_WORKER_NAMESPACE: &str = "default";
128const DEFAULT_WORKER_SUBJECT: &str = "worker";
129
130impl WorkerConfig {
131 #[must_use]
134 pub const fn builder() -> WorkerConfigBuilder {
135 WorkerConfigBuilder::new()
136 }
137
138 #[must_use]
140 pub fn new(
141 endpoint: impl Into<String>,
142 task_queue: impl Into<String>,
143 identity: impl Into<String>,
144 max_concurrency: usize,
145 reconnect: ReconnectConfig,
146 transport_credentials: Option<TransportCredentials>,
147 ) -> Self {
148 Self {
149 namespace: String::from(DEFAULT_WORKER_NAMESPACE),
150 subject: String::from(DEFAULT_WORKER_SUBJECT),
151 endpoint: endpoint.into(),
152 task_queue: task_queue.into(),
153 identity: identity.into(),
154 max_concurrency,
155 reconnect,
156 transport_credentials,
157 }
158 }
159}
160
161#[derive(Clone, Debug, Default)]
163pub struct WorkerConfigBuilder {
164 namespace: Option<String>,
165 subject: Option<String>,
166 endpoint: Option<String>,
167 task_queue: Option<String>,
168 identity: Option<String>,
169 max_concurrency: Option<usize>,
170 reconnect_initial_backoff: Option<Duration>,
171 reconnect_max_backoff: Option<Duration>,
172 reconnect_max_attempts: Option<usize>,
173 transport_credentials: Option<TransportCredentials>,
174}
175
176impl WorkerConfigBuilder {
177 #[must_use]
179 pub const fn new() -> Self {
180 Self {
181 namespace: None,
182 subject: None,
183 endpoint: None,
184 task_queue: None,
185 identity: None,
186 max_concurrency: None,
187 reconnect_initial_backoff: None,
188 reconnect_max_backoff: None,
189 reconnect_max_attempts: None,
190 transport_credentials: None,
191 }
192 }
193
194 #[must_use]
196 pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
197 self.namespace = Some(namespace.into());
198 self
199 }
200
201 #[must_use]
203 pub fn subject(mut self, subject: impl Into<String>) -> Self {
204 self.subject = Some(subject.into());
205 self
206 }
207
208 #[must_use]
210 pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
211 self.endpoint = Some(endpoint.into());
212 self
213 }
214
215 #[must_use]
217 pub fn task_queue(mut self, task_queue: impl Into<String>) -> Self {
218 self.task_queue = Some(task_queue.into());
219 self
220 }
221
222 #[must_use]
224 pub fn identity(mut self, identity: impl Into<String>) -> Self {
225 self.identity = Some(identity.into());
226 self
227 }
228
229 #[must_use]
231 pub const fn max_concurrency(mut self, max_concurrency: usize) -> Self {
232 self.max_concurrency = Some(max_concurrency);
233 self
234 }
235
236 #[must_use]
238 pub const fn reconnect_initial_backoff(mut self, delay: Duration) -> Self {
239 self.reconnect_initial_backoff = Some(delay);
240 self
241 }
242
243 #[must_use]
245 pub const fn reconnect_max_backoff(mut self, delay: Duration) -> Self {
246 self.reconnect_max_backoff = Some(delay);
247 self
248 }
249
250 #[must_use]
252 pub const fn reconnect_max_attempts(mut self, attempts: usize) -> Self {
253 self.reconnect_max_attempts = Some(attempts);
254 self
255 }
256
257 #[must_use]
259 pub fn transport_credentials(mut self, credentials: TransportCredentials) -> Self {
260 self.transport_credentials = Some(credentials);
261 self
262 }
263
264 pub fn build(self) -> Result<WorkerConfig, WorkerConfigBuildError> {
270 Ok(WorkerConfig {
271 namespace: self
272 .namespace
273 .unwrap_or_else(|| String::from(DEFAULT_WORKER_NAMESPACE)),
274 subject: self
275 .subject
276 .unwrap_or_else(|| String::from(DEFAULT_WORKER_SUBJECT)),
277 endpoint: self
278 .endpoint
279 .ok_or(WorkerConfigBuildError::MissingEndpoint)?,
280 task_queue: self
281 .task_queue
282 .ok_or(WorkerConfigBuildError::MissingTaskQueue)?,
283 identity: self
284 .identity
285 .ok_or(WorkerConfigBuildError::MissingIdentity)?,
286 max_concurrency: self
287 .max_concurrency
288 .ok_or(WorkerConfigBuildError::MissingMaxConcurrency)?,
289 reconnect: ReconnectConfig {
290 initial_backoff: self
291 .reconnect_initial_backoff
292 .ok_or(WorkerConfigBuildError::MissingReconnectInitialBackoff)?,
293 max_backoff: self
294 .reconnect_max_backoff
295 .ok_or(WorkerConfigBuildError::MissingReconnectMaxBackoff)?,
296 max_attempts: self
297 .reconnect_max_attempts
298 .ok_or(WorkerConfigBuildError::MissingReconnectMaxAttempts)?,
299 },
300 transport_credentials: self.transport_credentials,
301 })
302 }
303}
304
305#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
307pub enum WorkerConfigBuildError {
308 #[error("worker endpoint is required")]
310 MissingEndpoint,
311 #[error("worker task queue is required")]
313 MissingTaskQueue,
314 #[error("worker identity is required")]
316 MissingIdentity,
317 #[error("worker max_concurrency is required")]
319 MissingMaxConcurrency,
320 #[error("worker reconnect_initial_backoff is required")]
322 MissingReconnectInitialBackoff,
323 #[error("worker reconnect_max_backoff is required")]
325 MissingReconnectMaxBackoff,
326 #[error("worker reconnect_max_attempts is required")]
328 MissingReconnectMaxAttempts,
329}
330
331#[cfg(test)]
332mod tests {
333 use std::time::Duration;
334
335 use super::{TransportCredentials, WorkerConfig};
336
337 #[test]
338 fn worker_config_builder_round_trips_fields() -> Result<(), Box<dyn std::error::Error>> {
339 let credentials = TransportCredentials::new(b"secret-token".to_vec());
340 let config = WorkerConfig::builder()
341 .endpoint("http://127.0.0.1:50051")
342 .task_queue("payments")
343 .identity("worker-a")
344 .max_concurrency(7)
345 .reconnect_initial_backoff(Duration::from_millis(10))
346 .reconnect_max_backoff(Duration::from_millis(100))
347 .reconnect_max_attempts(3)
348 .namespace("payments")
349 .subject("worker-a")
350 .transport_credentials(credentials.clone())
351 .build()?;
352
353 assert_eq!(config.namespace, "payments");
354 assert_eq!(config.subject, "worker-a");
355 assert_eq!(config.endpoint, "http://127.0.0.1:50051");
356 assert_eq!(config.task_queue, "payments");
357 assert_eq!(config.identity, "worker-a");
358 assert_eq!(config.max_concurrency, 7);
359 assert_eq!(config.reconnect.initial_backoff, Duration::from_millis(10));
360 assert_eq!(config.reconnect.max_backoff, Duration::from_millis(100));
361 assert_eq!(config.reconnect.max_attempts, 3);
362 assert_eq!(config.transport_credentials, Some(credentials));
363 assert!(!format!("{config:?}").contains("secret-token"));
364
365 Ok(())
366 }
367
368 #[test]
369 fn worker_config_new_uses_auth_metadata_defaults() {
370 let config = WorkerConfig::new(
371 "http://127.0.0.1:50051",
372 "default",
373 "worker-a",
374 4,
375 super::ReconnectConfig::new(Duration::from_millis(10), Duration::from_millis(100), 3),
376 None,
377 );
378
379 assert_eq!(config.namespace, "default");
380 assert_eq!(config.subject, "worker");
381 }
382}