Skip to main content

aion_worker/
config.rs

1//! `WorkerConfig` endpoint, task queue, identity, concurrency, and TLS/credentials passthrough.
2
3use std::fmt;
4use std::time::Duration;
5
6/// Opaque credentials forwarded to the worker transport layer.
7///
8/// The worker SDK does not interpret this value or define an authentication
9/// scheme. It exists only so operators can pass transport-specific credentials
10/// to the session implementation that knows how to apply them.
11#[derive(Clone, PartialEq, Eq)]
12pub struct TransportCredentials {
13    secret: Vec<u8>,
14}
15
16impl TransportCredentials {
17    /// Creates opaque transport credentials from caller-supplied bytes.
18    #[must_use]
19    pub fn new(secret: impl Into<Vec<u8>>) -> Self {
20        Self {
21            secret: secret.into(),
22        }
23    }
24
25    /// Returns the opaque credential bytes for transport-specific forwarding.
26    #[must_use]
27    pub fn secret(&self) -> &[u8] {
28        &self.secret
29    }
30}
31
32impl fmt::Debug for TransportCredentials {
33    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34        formatter
35            .debug_struct("TransportCredentials")
36            .field("secret", &"<redacted>")
37            .finish()
38    }
39}
40
41/// Operator-supplied reconnect backoff settings.
42///
43/// The settings govern both session establishment and the run loop's
44/// cumulative mid-run session-drop budget.
45///
46/// **Budget reset:** the cumulative drop budget resets to zero once an
47/// established session proves healthy — it served at least one task, or it
48/// stayed connected longer than `max_backoff` (measured monotonically from
49/// successful registration to the moment the stream ended or dropped;
50/// post-drop draining of in-flight activities never extends it). The cap is
51/// the policy's own definition of the longest pause, so a session outliving
52/// it is demonstrably past the flapping regime, and a served task proves
53/// end-to-end health. A genuinely flapping server — no session ever serves
54/// a task or outlives `max_backoff` — exhausts the budget after exactly
55/// `max_attempts` drops.
56///
57/// **Clean closes:** a clean/graceful server-side stream close is a
58/// retryable drop, not a run end: the worker redials through the same
59/// budgeted, backed-off cycle, so routine server deploys cost at most
60/// transient budget that heals. Only a persistent clean-close loop exhausts
61/// the budget (surfacing [`crate::error::WorkerError::CleanCloseExhausted`]).
62/// An explicit protocol drain signal ("closing, do not reconnect") is
63/// planned for the worker-protocol ack wave and will refine the clean-close
64/// case.
65///
66/// **Shutdown during a drop backoff:** every SDK races the backoff sleep
67/// against the shutdown signal and returns promptly, but the run outcome
68/// currently diverges: this SDK surfaces the pending drop error (a clean
69/// close pending recovery still ends `Ok`), while the Python and TypeScript
70/// workers return cleanly. Aligning the outcome cross-SDK is deferred to
71/// the protocol drain-signal wave.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub struct ReconnectConfig {
74    /// Initial reconnect backoff delay. Must be non-zero before reconnecting.
75    pub initial_backoff: Duration,
76    /// Maximum reconnect backoff delay cap. Must be non-zero before
77    /// reconnecting. Doubles as the session-health threshold for the
78    /// drop-budget reset described on this type.
79    pub max_backoff: Duration,
80    /// Maximum reconnect attempts before surfacing the last connection error.
81    pub max_attempts: usize,
82}
83
84impl ReconnectConfig {
85    /// Creates reconnect settings with every field supplied explicitly.
86    #[must_use]
87    pub const fn new(
88        initial_backoff: Duration,
89        max_backoff: Duration,
90        max_attempts: usize,
91    ) -> Self {
92        Self {
93            initial_backoff,
94            max_backoff,
95            max_attempts,
96        }
97    }
98}
99
100/// Operator-supplied worker connection and serving configuration.
101///
102/// Tunable fields remain caller-supplied. Namespace authorization metadata
103/// defaults to `default`/`worker` so development workers can register against
104/// the default task queue without an explicit auth setup.
105#[derive(Clone, Debug, PartialEq, Eq)]
106pub struct WorkerConfig {
107    /// Namespace advertised in `x-aion-namespaces` worker stream metadata.
108    pub namespace: String,
109    /// Subject advertised in `x-aion-subject` worker stream metadata.
110    pub subject: String,
111    /// Engine worker endpoint URI.
112    pub endpoint: String,
113    /// Task queue advertised to the engine. The current AW wire names this field
114    /// `namespace`; this SDK maps the task queue value to that owned wire shape.
115    pub task_queue: String,
116    /// Worker identity used by operators and future wire metadata.
117    pub identity: String,
118    /// Maximum concurrent activities this worker may serve.
119    pub max_concurrency: usize,
120    /// Operator-supplied reconnect settings.
121    pub reconnect: ReconnectConfig,
122    /// Opaque credentials for the transport implementation.
123    pub transport_credentials: Option<TransportCredentials>,
124}
125
126const DEFAULT_WORKER_NAMESPACE: &str = "default";
127const DEFAULT_WORKER_SUBJECT: &str = "worker";
128
129impl WorkerConfig {
130    /// Starts an explicit builder. The caller must provide every required field
131    /// before calling [`WorkerConfigBuilder::build`].
132    #[must_use]
133    pub const fn builder() -> WorkerConfigBuilder {
134        WorkerConfigBuilder::new()
135    }
136
137    /// Creates a worker config with default authorization metadata.
138    #[must_use]
139    pub fn new(
140        endpoint: impl Into<String>,
141        task_queue: impl Into<String>,
142        identity: impl Into<String>,
143        max_concurrency: usize,
144        reconnect: ReconnectConfig,
145        transport_credentials: Option<TransportCredentials>,
146    ) -> Self {
147        Self {
148            namespace: String::from(DEFAULT_WORKER_NAMESPACE),
149            subject: String::from(DEFAULT_WORKER_SUBJECT),
150            endpoint: endpoint.into(),
151            task_queue: task_queue.into(),
152            identity: identity.into(),
153            max_concurrency,
154            reconnect,
155            transport_credentials,
156        }
157    }
158}
159
160/// Builder for [`WorkerConfig`] with auth metadata defaults and explicit required fields.
161#[derive(Clone, Debug, Default)]
162pub struct WorkerConfigBuilder {
163    namespace: Option<String>,
164    subject: Option<String>,
165    endpoint: Option<String>,
166    task_queue: Option<String>,
167    identity: Option<String>,
168    max_concurrency: Option<usize>,
169    reconnect_initial_backoff: Option<Duration>,
170    reconnect_max_backoff: Option<Duration>,
171    reconnect_max_attempts: Option<usize>,
172    transport_credentials: Option<TransportCredentials>,
173}
174
175impl WorkerConfigBuilder {
176    /// Creates an empty config builder.
177    #[must_use]
178    pub const fn new() -> Self {
179        Self {
180            namespace: None,
181            subject: None,
182            endpoint: None,
183            task_queue: None,
184            identity: None,
185            max_concurrency: None,
186            reconnect_initial_backoff: None,
187            reconnect_max_backoff: None,
188            reconnect_max_attempts: None,
189            transport_credentials: None,
190        }
191    }
192
193    /// Sets the namespace advertised in worker stream authorization metadata.
194    #[must_use]
195    pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
196        self.namespace = Some(namespace.into());
197        self
198    }
199
200    /// Sets the subject advertised in worker stream authorization metadata.
201    #[must_use]
202    pub fn subject(mut self, subject: impl Into<String>) -> Self {
203        self.subject = Some(subject.into());
204        self
205    }
206
207    /// Sets the engine worker endpoint URI.
208    #[must_use]
209    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
210        self.endpoint = Some(endpoint.into());
211        self
212    }
213
214    /// Sets the task queue advertised to the engine.
215    #[must_use]
216    pub fn task_queue(mut self, task_queue: impl Into<String>) -> Self {
217        self.task_queue = Some(task_queue.into());
218        self
219    }
220
221    /// Sets the worker identity.
222    #[must_use]
223    pub fn identity(mut self, identity: impl Into<String>) -> Self {
224        self.identity = Some(identity.into());
225        self
226    }
227
228    /// Sets the operator-configured maximum concurrency.
229    #[must_use]
230    pub const fn max_concurrency(mut self, max_concurrency: usize) -> Self {
231        self.max_concurrency = Some(max_concurrency);
232        self
233    }
234
235    /// Sets the operator-configured initial reconnect backoff delay.
236    #[must_use]
237    pub const fn reconnect_initial_backoff(mut self, delay: Duration) -> Self {
238        self.reconnect_initial_backoff = Some(delay);
239        self
240    }
241
242    /// Sets the operator-configured reconnect backoff cap.
243    #[must_use]
244    pub const fn reconnect_max_backoff(mut self, delay: Duration) -> Self {
245        self.reconnect_max_backoff = Some(delay);
246        self
247    }
248
249    /// Sets the operator-configured maximum reconnect attempts.
250    #[must_use]
251    pub const fn reconnect_max_attempts(mut self, attempts: usize) -> Self {
252        self.reconnect_max_attempts = Some(attempts);
253        self
254    }
255
256    /// Sets optional opaque transport credentials.
257    #[must_use]
258    pub fn transport_credentials(mut self, credentials: TransportCredentials) -> Self {
259        self.transport_credentials = Some(credentials);
260        self
261    }
262
263    /// Builds a [`WorkerConfig`] when every required field has been supplied.
264    ///
265    /// # Errors
266    ///
267    /// Returns [`WorkerConfigBuildError`] naming the missing required field.
268    pub fn build(self) -> Result<WorkerConfig, WorkerConfigBuildError> {
269        Ok(WorkerConfig {
270            namespace: self
271                .namespace
272                .unwrap_or_else(|| String::from(DEFAULT_WORKER_NAMESPACE)),
273            subject: self
274                .subject
275                .unwrap_or_else(|| String::from(DEFAULT_WORKER_SUBJECT)),
276            endpoint: self
277                .endpoint
278                .ok_or(WorkerConfigBuildError::MissingEndpoint)?,
279            task_queue: self
280                .task_queue
281                .ok_or(WorkerConfigBuildError::MissingTaskQueue)?,
282            identity: self
283                .identity
284                .ok_or(WorkerConfigBuildError::MissingIdentity)?,
285            max_concurrency: self
286                .max_concurrency
287                .ok_or(WorkerConfigBuildError::MissingMaxConcurrency)?,
288            reconnect: ReconnectConfig {
289                initial_backoff: self
290                    .reconnect_initial_backoff
291                    .ok_or(WorkerConfigBuildError::MissingReconnectInitialBackoff)?,
292                max_backoff: self
293                    .reconnect_max_backoff
294                    .ok_or(WorkerConfigBuildError::MissingReconnectMaxBackoff)?,
295                max_attempts: self
296                    .reconnect_max_attempts
297                    .ok_or(WorkerConfigBuildError::MissingReconnectMaxAttempts)?,
298            },
299            transport_credentials: self.transport_credentials,
300        })
301    }
302}
303
304/// Errors produced while building [`WorkerConfig`].
305#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
306pub enum WorkerConfigBuildError {
307    /// The endpoint was not supplied.
308    #[error("worker endpoint is required")]
309    MissingEndpoint,
310    /// The task queue was not supplied.
311    #[error("worker task queue is required")]
312    MissingTaskQueue,
313    /// The worker identity was not supplied.
314    #[error("worker identity is required")]
315    MissingIdentity,
316    /// The max concurrency was not supplied.
317    #[error("worker max_concurrency is required")]
318    MissingMaxConcurrency,
319    /// The reconnect initial backoff was not supplied.
320    #[error("worker reconnect_initial_backoff is required")]
321    MissingReconnectInitialBackoff,
322    /// The reconnect max backoff was not supplied.
323    #[error("worker reconnect_max_backoff is required")]
324    MissingReconnectMaxBackoff,
325    /// The reconnect max attempts value was not supplied.
326    #[error("worker reconnect_max_attempts is required")]
327    MissingReconnectMaxAttempts,
328}
329
330#[cfg(test)]
331mod tests {
332    use std::time::Duration;
333
334    use super::{TransportCredentials, WorkerConfig};
335
336    #[test]
337    fn worker_config_builder_round_trips_fields() -> Result<(), Box<dyn std::error::Error>> {
338        let credentials = TransportCredentials::new(b"secret-token".to_vec());
339        let config = WorkerConfig::builder()
340            .endpoint("http://127.0.0.1:50051")
341            .task_queue("payments")
342            .identity("worker-a")
343            .max_concurrency(7)
344            .reconnect_initial_backoff(Duration::from_millis(10))
345            .reconnect_max_backoff(Duration::from_millis(100))
346            .reconnect_max_attempts(3)
347            .namespace("payments")
348            .subject("worker-a")
349            .transport_credentials(credentials.clone())
350            .build()?;
351
352        assert_eq!(config.namespace, "payments");
353        assert_eq!(config.subject, "worker-a");
354        assert_eq!(config.endpoint, "http://127.0.0.1:50051");
355        assert_eq!(config.task_queue, "payments");
356        assert_eq!(config.identity, "worker-a");
357        assert_eq!(config.max_concurrency, 7);
358        assert_eq!(config.reconnect.initial_backoff, Duration::from_millis(10));
359        assert_eq!(config.reconnect.max_backoff, Duration::from_millis(100));
360        assert_eq!(config.reconnect.max_attempts, 3);
361        assert_eq!(config.transport_credentials, Some(credentials));
362        assert!(!format!("{config:?}").contains("secret-token"));
363
364        Ok(())
365    }
366
367    #[test]
368    fn worker_config_new_uses_auth_metadata_defaults() {
369        let config = WorkerConfig::new(
370            "http://127.0.0.1:50051",
371            "default",
372            "worker-a",
373            4,
374            super::ReconnectConfig::new(Duration::from_millis(10), Duration::from_millis(100), 3),
375            None,
376        );
377
378        assert_eq!(config.namespace, "default");
379        assert_eq!(config.subject, "worker");
380    }
381}