Skip to main content

aion_worker/
config.rs

1//! `WorkerConfig` endpoint, task queue, identity, concurrency, and TLS/credentials passthrough.
2
3use std::fmt;
4use std::time::Duration;
5
6/// Opaque credentials forwarded to the worker transport layer.
7///
8/// The worker SDK does not interpret this value or define an authentication
9/// scheme. It exists only so operators can pass transport-specific credentials
10/// to the session implementation that knows how to apply them.
11#[derive(Clone, PartialEq, Eq)]
12pub struct TransportCredentials {
13    secret: Vec<u8>,
14}
15
16impl TransportCredentials {
17    /// Creates opaque transport credentials from caller-supplied bytes.
18    #[must_use]
19    pub fn new(secret: impl Into<Vec<u8>>) -> Self {
20        Self {
21            secret: secret.into(),
22        }
23    }
24
25    /// Returns the opaque credential bytes for transport-specific forwarding.
26    #[must_use]
27    pub fn secret(&self) -> &[u8] {
28        &self.secret
29    }
30}
31
32impl fmt::Debug for TransportCredentials {
33    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34        formatter
35            .debug_struct("TransportCredentials")
36            .field("secret", &"<redacted>")
37            .finish()
38    }
39}
40
41/// Operator-supplied reconnect backoff settings.
42///
43/// The settings govern both session establishment and the run loop's
44/// cumulative mid-run session-drop budget.
45///
46/// **Budget reset:** the cumulative drop budget resets to zero once an
47/// established session proves healthy — it served at least one task, or it
48/// stayed connected longer than `max_backoff` (measured monotonically from
49/// successful registration to the moment the stream ended or dropped;
50/// post-drop draining of in-flight activities never extends it). The cap is
51/// the policy's own definition of the longest pause, so a session outliving
52/// it is demonstrably past the flapping regime, and a served task proves
53/// end-to-end health. A genuinely flapping server — no session ever serves
54/// a task or outlives `max_backoff` — exhausts the budget after exactly
55/// `max_attempts` drops.
56///
57/// **Drains and clean closes:** a server-announced drain (the wire
58/// `DrainRequest` frame) is an unbudgeted drop — the worker finishes
59/// in-flight work and redials after `initial_backoff`; the drain
60/// classification latches for the session, so even an abrupt end after the
61/// frame stays drain-class. An *unannounced* clean stream close remains a
62/// budgeted retryable drop: the worker redials through the same budgeted,
63/// backed-off cycle, and only a persistent unannounced clean-close loop
64/// exhausts the budget (surfacing
65/// [`crate::error::WorkerError::CleanCloseExhausted`]).
66///
67/// **Shutdown during a drop backoff:** every SDK races the backoff sleep
68/// against the shutdown signal and returns promptly, and the run outcome is
69/// aligned across the Rust, Python, and TypeScript workers: a pending
70/// drain-class or clean-close drop ends the run cleanly, while a pending
71/// error-class drop surfaces its error — a supervisor sees "this worker was
72/// mid-fault" distinctly from "this worker drained cleanly".
73#[derive(Clone, Debug, PartialEq, Eq)]
74pub struct ReconnectConfig {
75    /// Initial reconnect backoff delay. Must be non-zero before reconnecting.
76    pub initial_backoff: Duration,
77    /// Maximum reconnect backoff delay cap. Must be non-zero before
78    /// reconnecting. Doubles as the session-health threshold for the
79    /// drop-budget reset described on this type.
80    pub max_backoff: Duration,
81    /// Maximum reconnect attempts before surfacing the last connection error.
82    pub max_attempts: usize,
83}
84
85impl ReconnectConfig {
86    /// Creates reconnect settings with every field supplied explicitly.
87    #[must_use]
88    pub const fn new(
89        initial_backoff: Duration,
90        max_backoff: Duration,
91        max_attempts: usize,
92    ) -> Self {
93        Self {
94            initial_backoff,
95            max_backoff,
96            max_attempts,
97        }
98    }
99}
100
101/// Operator-supplied worker connection and serving configuration.
102///
103/// Tunable fields remain caller-supplied. Namespace authorization metadata
104/// defaults to `default`/`worker` so development workers can register against
105/// the default task queue without an explicit auth setup.
106#[derive(Clone, Debug, PartialEq, Eq)]
107pub struct WorkerConfig {
108    /// Namespace advertised in `x-aion-namespaces` worker stream metadata.
109    pub namespace: String,
110    /// Subject advertised in `x-aion-subject` worker stream metadata.
111    pub subject: String,
112    /// Engine worker endpoint URI.
113    pub endpoint: String,
114    /// Task queue advertised to the engine. The current AW wire names this field
115    /// `namespace`; this SDK maps the task queue value to that owned wire shape.
116    pub task_queue: String,
117    /// Worker identity used by operators and future wire metadata.
118    pub identity: String,
119    /// Maximum concurrent activities this worker may serve.
120    pub max_concurrency: usize,
121    /// Operator-supplied reconnect settings.
122    pub reconnect: ReconnectConfig,
123    /// Opaque credentials for the transport implementation.
124    pub transport_credentials: Option<TransportCredentials>,
125}
126
127const DEFAULT_WORKER_NAMESPACE: &str = "default";
128const DEFAULT_WORKER_SUBJECT: &str = "worker";
129
130impl WorkerConfig {
131    /// Starts an explicit builder. The caller must provide every required field
132    /// before calling [`WorkerConfigBuilder::build`].
133    #[must_use]
134    pub const fn builder() -> WorkerConfigBuilder {
135        WorkerConfigBuilder::new()
136    }
137
138    /// Creates a worker config with default authorization metadata.
139    #[must_use]
140    pub fn new(
141        endpoint: impl Into<String>,
142        task_queue: impl Into<String>,
143        identity: impl Into<String>,
144        max_concurrency: usize,
145        reconnect: ReconnectConfig,
146        transport_credentials: Option<TransportCredentials>,
147    ) -> Self {
148        Self {
149            namespace: String::from(DEFAULT_WORKER_NAMESPACE),
150            subject: String::from(DEFAULT_WORKER_SUBJECT),
151            endpoint: endpoint.into(),
152            task_queue: task_queue.into(),
153            identity: identity.into(),
154            max_concurrency,
155            reconnect,
156            transport_credentials,
157        }
158    }
159}
160
161/// Builder for [`WorkerConfig`] with auth metadata defaults and explicit required fields.
162#[derive(Clone, Debug, Default)]
163pub struct WorkerConfigBuilder {
164    namespace: Option<String>,
165    subject: Option<String>,
166    endpoint: Option<String>,
167    task_queue: Option<String>,
168    identity: Option<String>,
169    max_concurrency: Option<usize>,
170    reconnect_initial_backoff: Option<Duration>,
171    reconnect_max_backoff: Option<Duration>,
172    reconnect_max_attempts: Option<usize>,
173    transport_credentials: Option<TransportCredentials>,
174}
175
176impl WorkerConfigBuilder {
177    /// Creates an empty config builder.
178    #[must_use]
179    pub const fn new() -> Self {
180        Self {
181            namespace: None,
182            subject: None,
183            endpoint: None,
184            task_queue: None,
185            identity: None,
186            max_concurrency: None,
187            reconnect_initial_backoff: None,
188            reconnect_max_backoff: None,
189            reconnect_max_attempts: None,
190            transport_credentials: None,
191        }
192    }
193
194    /// Sets the namespace advertised in worker stream authorization metadata.
195    #[must_use]
196    pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
197        self.namespace = Some(namespace.into());
198        self
199    }
200
201    /// Sets the subject advertised in worker stream authorization metadata.
202    #[must_use]
203    pub fn subject(mut self, subject: impl Into<String>) -> Self {
204        self.subject = Some(subject.into());
205        self
206    }
207
208    /// Sets the engine worker endpoint URI.
209    #[must_use]
210    pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
211        self.endpoint = Some(endpoint.into());
212        self
213    }
214
215    /// Sets the task queue advertised to the engine.
216    #[must_use]
217    pub fn task_queue(mut self, task_queue: impl Into<String>) -> Self {
218        self.task_queue = Some(task_queue.into());
219        self
220    }
221
222    /// Sets the worker identity.
223    #[must_use]
224    pub fn identity(mut self, identity: impl Into<String>) -> Self {
225        self.identity = Some(identity.into());
226        self
227    }
228
229    /// Sets the operator-configured maximum concurrency.
230    #[must_use]
231    pub const fn max_concurrency(mut self, max_concurrency: usize) -> Self {
232        self.max_concurrency = Some(max_concurrency);
233        self
234    }
235
236    /// Sets the operator-configured initial reconnect backoff delay.
237    #[must_use]
238    pub const fn reconnect_initial_backoff(mut self, delay: Duration) -> Self {
239        self.reconnect_initial_backoff = Some(delay);
240        self
241    }
242
243    /// Sets the operator-configured reconnect backoff cap.
244    #[must_use]
245    pub const fn reconnect_max_backoff(mut self, delay: Duration) -> Self {
246        self.reconnect_max_backoff = Some(delay);
247        self
248    }
249
250    /// Sets the operator-configured maximum reconnect attempts.
251    #[must_use]
252    pub const fn reconnect_max_attempts(mut self, attempts: usize) -> Self {
253        self.reconnect_max_attempts = Some(attempts);
254        self
255    }
256
257    /// Sets optional opaque transport credentials.
258    #[must_use]
259    pub fn transport_credentials(mut self, credentials: TransportCredentials) -> Self {
260        self.transport_credentials = Some(credentials);
261        self
262    }
263
264    /// Builds a [`WorkerConfig`] when every required field has been supplied.
265    ///
266    /// # Errors
267    ///
268    /// Returns [`WorkerConfigBuildError`] naming the missing required field.
269    pub fn build(self) -> Result<WorkerConfig, WorkerConfigBuildError> {
270        Ok(WorkerConfig {
271            namespace: self
272                .namespace
273                .unwrap_or_else(|| String::from(DEFAULT_WORKER_NAMESPACE)),
274            subject: self
275                .subject
276                .unwrap_or_else(|| String::from(DEFAULT_WORKER_SUBJECT)),
277            endpoint: self
278                .endpoint
279                .ok_or(WorkerConfigBuildError::MissingEndpoint)?,
280            task_queue: self
281                .task_queue
282                .ok_or(WorkerConfigBuildError::MissingTaskQueue)?,
283            identity: self
284                .identity
285                .ok_or(WorkerConfigBuildError::MissingIdentity)?,
286            max_concurrency: self
287                .max_concurrency
288                .ok_or(WorkerConfigBuildError::MissingMaxConcurrency)?,
289            reconnect: ReconnectConfig {
290                initial_backoff: self
291                    .reconnect_initial_backoff
292                    .ok_or(WorkerConfigBuildError::MissingReconnectInitialBackoff)?,
293                max_backoff: self
294                    .reconnect_max_backoff
295                    .ok_or(WorkerConfigBuildError::MissingReconnectMaxBackoff)?,
296                max_attempts: self
297                    .reconnect_max_attempts
298                    .ok_or(WorkerConfigBuildError::MissingReconnectMaxAttempts)?,
299            },
300            transport_credentials: self.transport_credentials,
301        })
302    }
303}
304
305/// Errors produced while building [`WorkerConfig`].
306#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
307pub enum WorkerConfigBuildError {
308    /// The endpoint was not supplied.
309    #[error("worker endpoint is required")]
310    MissingEndpoint,
311    /// The task queue was not supplied.
312    #[error("worker task queue is required")]
313    MissingTaskQueue,
314    /// The worker identity was not supplied.
315    #[error("worker identity is required")]
316    MissingIdentity,
317    /// The max concurrency was not supplied.
318    #[error("worker max_concurrency is required")]
319    MissingMaxConcurrency,
320    /// The reconnect initial backoff was not supplied.
321    #[error("worker reconnect_initial_backoff is required")]
322    MissingReconnectInitialBackoff,
323    /// The reconnect max backoff was not supplied.
324    #[error("worker reconnect_max_backoff is required")]
325    MissingReconnectMaxBackoff,
326    /// The reconnect max attempts value was not supplied.
327    #[error("worker reconnect_max_attempts is required")]
328    MissingReconnectMaxAttempts,
329}
330
331#[cfg(test)]
332mod tests {
333    use std::time::Duration;
334
335    use super::{TransportCredentials, WorkerConfig};
336
337    #[test]
338    fn worker_config_builder_round_trips_fields() -> Result<(), Box<dyn std::error::Error>> {
339        let credentials = TransportCredentials::new(b"secret-token".to_vec());
340        let config = WorkerConfig::builder()
341            .endpoint("http://127.0.0.1:50051")
342            .task_queue("payments")
343            .identity("worker-a")
344            .max_concurrency(7)
345            .reconnect_initial_backoff(Duration::from_millis(10))
346            .reconnect_max_backoff(Duration::from_millis(100))
347            .reconnect_max_attempts(3)
348            .namespace("payments")
349            .subject("worker-a")
350            .transport_credentials(credentials.clone())
351            .build()?;
352
353        assert_eq!(config.namespace, "payments");
354        assert_eq!(config.subject, "worker-a");
355        assert_eq!(config.endpoint, "http://127.0.0.1:50051");
356        assert_eq!(config.task_queue, "payments");
357        assert_eq!(config.identity, "worker-a");
358        assert_eq!(config.max_concurrency, 7);
359        assert_eq!(config.reconnect.initial_backoff, Duration::from_millis(10));
360        assert_eq!(config.reconnect.max_backoff, Duration::from_millis(100));
361        assert_eq!(config.reconnect.max_attempts, 3);
362        assert_eq!(config.transport_credentials, Some(credentials));
363        assert!(!format!("{config:?}").contains("secret-token"));
364
365        Ok(())
366    }
367
368    #[test]
369    fn worker_config_new_uses_auth_metadata_defaults() {
370        let config = WorkerConfig::new(
371            "http://127.0.0.1:50051",
372            "default",
373            "worker-a",
374            4,
375            super::ReconnectConfig::new(Duration::from_millis(10), Duration::from_millis(100), 3),
376            None,
377        );
378
379        assert_eq!(config.namespace, "default");
380        assert_eq!(config.subject, "worker");
381    }
382}