1use std::fmt;
4use std::time::Duration;
5
6#[derive(Clone, PartialEq, Eq)]
12pub struct TransportCredentials {
13 secret: Vec<u8>,
14}
15
16impl TransportCredentials {
17 #[must_use]
19 pub fn new(secret: impl Into<Vec<u8>>) -> Self {
20 Self {
21 secret: secret.into(),
22 }
23 }
24
25 #[must_use]
27 pub fn secret(&self) -> &[u8] {
28 &self.secret
29 }
30}
31
32impl fmt::Debug for TransportCredentials {
33 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34 formatter
35 .debug_struct("TransportCredentials")
36 .field("secret", &"<redacted>")
37 .finish()
38 }
39}
40
41#[derive(Clone, Debug, PartialEq, Eq)]
73pub struct ReconnectConfig {
74 pub initial_backoff: Duration,
76 pub max_backoff: Duration,
80 pub max_attempts: usize,
82}
83
84impl ReconnectConfig {
85 #[must_use]
87 pub const fn new(
88 initial_backoff: Duration,
89 max_backoff: Duration,
90 max_attempts: usize,
91 ) -> Self {
92 Self {
93 initial_backoff,
94 max_backoff,
95 max_attempts,
96 }
97 }
98}
99
100#[derive(Clone, Debug, PartialEq, Eq)]
106pub struct WorkerConfig {
107 pub namespace: String,
109 pub subject: String,
111 pub endpoint: String,
113 pub task_queue: String,
116 pub identity: String,
118 pub max_concurrency: usize,
120 pub reconnect: ReconnectConfig,
122 pub transport_credentials: Option<TransportCredentials>,
124}
125
126const DEFAULT_WORKER_NAMESPACE: &str = "default";
127const DEFAULT_WORKER_SUBJECT: &str = "worker";
128
129impl WorkerConfig {
130 #[must_use]
133 pub const fn builder() -> WorkerConfigBuilder {
134 WorkerConfigBuilder::new()
135 }
136
137 #[must_use]
139 pub fn new(
140 endpoint: impl Into<String>,
141 task_queue: impl Into<String>,
142 identity: impl Into<String>,
143 max_concurrency: usize,
144 reconnect: ReconnectConfig,
145 transport_credentials: Option<TransportCredentials>,
146 ) -> Self {
147 Self {
148 namespace: String::from(DEFAULT_WORKER_NAMESPACE),
149 subject: String::from(DEFAULT_WORKER_SUBJECT),
150 endpoint: endpoint.into(),
151 task_queue: task_queue.into(),
152 identity: identity.into(),
153 max_concurrency,
154 reconnect,
155 transport_credentials,
156 }
157 }
158}
159
160#[derive(Clone, Debug, Default)]
162pub struct WorkerConfigBuilder {
163 namespace: Option<String>,
164 subject: Option<String>,
165 endpoint: Option<String>,
166 task_queue: Option<String>,
167 identity: Option<String>,
168 max_concurrency: Option<usize>,
169 reconnect_initial_backoff: Option<Duration>,
170 reconnect_max_backoff: Option<Duration>,
171 reconnect_max_attempts: Option<usize>,
172 transport_credentials: Option<TransportCredentials>,
173}
174
175impl WorkerConfigBuilder {
176 #[must_use]
178 pub const fn new() -> Self {
179 Self {
180 namespace: None,
181 subject: None,
182 endpoint: None,
183 task_queue: None,
184 identity: None,
185 max_concurrency: None,
186 reconnect_initial_backoff: None,
187 reconnect_max_backoff: None,
188 reconnect_max_attempts: None,
189 transport_credentials: None,
190 }
191 }
192
193 #[must_use]
195 pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
196 self.namespace = Some(namespace.into());
197 self
198 }
199
200 #[must_use]
202 pub fn subject(mut self, subject: impl Into<String>) -> Self {
203 self.subject = Some(subject.into());
204 self
205 }
206
207 #[must_use]
209 pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
210 self.endpoint = Some(endpoint.into());
211 self
212 }
213
214 #[must_use]
216 pub fn task_queue(mut self, task_queue: impl Into<String>) -> Self {
217 self.task_queue = Some(task_queue.into());
218 self
219 }
220
221 #[must_use]
223 pub fn identity(mut self, identity: impl Into<String>) -> Self {
224 self.identity = Some(identity.into());
225 self
226 }
227
228 #[must_use]
230 pub const fn max_concurrency(mut self, max_concurrency: usize) -> Self {
231 self.max_concurrency = Some(max_concurrency);
232 self
233 }
234
235 #[must_use]
237 pub const fn reconnect_initial_backoff(mut self, delay: Duration) -> Self {
238 self.reconnect_initial_backoff = Some(delay);
239 self
240 }
241
242 #[must_use]
244 pub const fn reconnect_max_backoff(mut self, delay: Duration) -> Self {
245 self.reconnect_max_backoff = Some(delay);
246 self
247 }
248
249 #[must_use]
251 pub const fn reconnect_max_attempts(mut self, attempts: usize) -> Self {
252 self.reconnect_max_attempts = Some(attempts);
253 self
254 }
255
256 #[must_use]
258 pub fn transport_credentials(mut self, credentials: TransportCredentials) -> Self {
259 self.transport_credentials = Some(credentials);
260 self
261 }
262
263 pub fn build(self) -> Result<WorkerConfig, WorkerConfigBuildError> {
269 Ok(WorkerConfig {
270 namespace: self
271 .namespace
272 .unwrap_or_else(|| String::from(DEFAULT_WORKER_NAMESPACE)),
273 subject: self
274 .subject
275 .unwrap_or_else(|| String::from(DEFAULT_WORKER_SUBJECT)),
276 endpoint: self
277 .endpoint
278 .ok_or(WorkerConfigBuildError::MissingEndpoint)?,
279 task_queue: self
280 .task_queue
281 .ok_or(WorkerConfigBuildError::MissingTaskQueue)?,
282 identity: self
283 .identity
284 .ok_or(WorkerConfigBuildError::MissingIdentity)?,
285 max_concurrency: self
286 .max_concurrency
287 .ok_or(WorkerConfigBuildError::MissingMaxConcurrency)?,
288 reconnect: ReconnectConfig {
289 initial_backoff: self
290 .reconnect_initial_backoff
291 .ok_or(WorkerConfigBuildError::MissingReconnectInitialBackoff)?,
292 max_backoff: self
293 .reconnect_max_backoff
294 .ok_or(WorkerConfigBuildError::MissingReconnectMaxBackoff)?,
295 max_attempts: self
296 .reconnect_max_attempts
297 .ok_or(WorkerConfigBuildError::MissingReconnectMaxAttempts)?,
298 },
299 transport_credentials: self.transport_credentials,
300 })
301 }
302}
303
304#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
306pub enum WorkerConfigBuildError {
307 #[error("worker endpoint is required")]
309 MissingEndpoint,
310 #[error("worker task queue is required")]
312 MissingTaskQueue,
313 #[error("worker identity is required")]
315 MissingIdentity,
316 #[error("worker max_concurrency is required")]
318 MissingMaxConcurrency,
319 #[error("worker reconnect_initial_backoff is required")]
321 MissingReconnectInitialBackoff,
322 #[error("worker reconnect_max_backoff is required")]
324 MissingReconnectMaxBackoff,
325 #[error("worker reconnect_max_attempts is required")]
327 MissingReconnectMaxAttempts,
328}
329
330#[cfg(test)]
331mod tests {
332 use std::time::Duration;
333
334 use super::{TransportCredentials, WorkerConfig};
335
336 #[test]
337 fn worker_config_builder_round_trips_fields() -> Result<(), Box<dyn std::error::Error>> {
338 let credentials = TransportCredentials::new(b"secret-token".to_vec());
339 let config = WorkerConfig::builder()
340 .endpoint("http://127.0.0.1:50051")
341 .task_queue("payments")
342 .identity("worker-a")
343 .max_concurrency(7)
344 .reconnect_initial_backoff(Duration::from_millis(10))
345 .reconnect_max_backoff(Duration::from_millis(100))
346 .reconnect_max_attempts(3)
347 .namespace("payments")
348 .subject("worker-a")
349 .transport_credentials(credentials.clone())
350 .build()?;
351
352 assert_eq!(config.namespace, "payments");
353 assert_eq!(config.subject, "worker-a");
354 assert_eq!(config.endpoint, "http://127.0.0.1:50051");
355 assert_eq!(config.task_queue, "payments");
356 assert_eq!(config.identity, "worker-a");
357 assert_eq!(config.max_concurrency, 7);
358 assert_eq!(config.reconnect.initial_backoff, Duration::from_millis(10));
359 assert_eq!(config.reconnect.max_backoff, Duration::from_millis(100));
360 assert_eq!(config.reconnect.max_attempts, 3);
361 assert_eq!(config.transport_credentials, Some(credentials));
362 assert!(!format!("{config:?}").contains("secret-token"));
363
364 Ok(())
365 }
366
367 #[test]
368 fn worker_config_new_uses_auth_metadata_defaults() {
369 let config = WorkerConfig::new(
370 "http://127.0.0.1:50051",
371 "default",
372 "worker-a",
373 4,
374 super::ReconnectConfig::new(Duration::from_millis(10), Duration::from_millis(100), 3),
375 None,
376 );
377
378 assert_eq!(config.namespace, "default");
379 assert_eq!(config.subject, "worker");
380 }
381}