Skip to main content

dapr_durabletask/worker/
options.rs

1use super::reconnect_policy::ReconnectPolicy;
2use crate::internal::DEFAULT_MAX_IDENTIFIER_LENGTH;
3
4/// Configuration options for [`TaskHubGrpcWorker`](super::TaskHubGrpcWorker).
5#[derive(Debug, Clone)]
6pub struct WorkerOptions {
7    /// Maximum number of concurrent work items (orchestrations + activities)
8    /// processed simultaneously. The worker stops accepting new work items
9    /// until an in-flight task completes.
10    pub max_concurrent_work_items: usize,
11
12    /// Maximum number of distinct event names that can be buffered per
13    /// orchestration. External events arriving before the orchestrator calls
14    /// `wait_for_external_event` are held in a per-name buffer. This cap
15    /// limits the number of unique event names to prevent memory exhaustion
16    /// from a flood of differently-named events.
17    pub max_event_names: usize,
18
19    /// Maximum number of events buffered per event name. When an external
20    /// event arrives but no orchestrator is waiting for it yet, the event
21    /// payload is queued. This cap bounds the queue depth per event name —
22    /// excess events are discarded with a warning.
23    pub max_events_per_name: usize,
24
25    /// Maximum number of pending `wait_for_external_event` tasks per event
26    /// name. If an orchestrator issues more concurrent waits on the same
27    /// event name than this limit, additional waits return an incomplete task.
28    pub max_pending_tasks_per_name: usize,
29
30    /// Maximum JSON payload size in bytes for deserialisation. Payloads
31    /// exceeding this limit are rejected with an error.
32    pub max_json_payload_size: usize,
33
34    /// Maximum allowed length (in bytes) for identifiers such as orchestrator
35    /// names, activity names, instance IDs, and event names.
36    pub max_identifier_length: usize,
37
38    /// Reconnection policy applied when the gRPC connection to the sidecar
39    /// is unavailable or drops. The policy governs both the initial connection
40    /// attempt and every subsequent reconnect.
41    pub reconnect_policy: ReconnectPolicy,
42}
43
44impl Default for WorkerOptions {
45    fn default() -> Self {
46        Self {
47            max_concurrent_work_items: 10_000,
48            max_event_names: 1_000,
49            max_events_per_name: 10_000,
50            max_pending_tasks_per_name: 10_000,
51            max_json_payload_size: 64 * 1024 * 1024, // 64 MiB
52            max_identifier_length: DEFAULT_MAX_IDENTIFIER_LENGTH,
53            reconnect_policy: ReconnectPolicy::default(),
54        }
55    }
56}
57
58impl WorkerOptions {
59    /// Create options with default values.
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Set the maximum number of concurrent work items.
65    pub fn with_max_concurrent_work_items(mut self, limit: usize) -> Self {
66        self.max_concurrent_work_items = limit;
67        self
68    }
69
70    /// Set the maximum number of distinct event names buffered per orchestration.
71    pub fn with_max_event_names(mut self, limit: usize) -> Self {
72        self.max_event_names = limit;
73        self
74    }
75
76    /// Set the maximum number of events buffered per event name.
77    pub fn with_max_events_per_name(mut self, limit: usize) -> Self {
78        self.max_events_per_name = limit;
79        self
80    }
81
82    /// Set the maximum number of pending wait tasks per event name.
83    pub fn with_max_pending_tasks_per_name(mut self, limit: usize) -> Self {
84        self.max_pending_tasks_per_name = limit;
85        self
86    }
87
88    /// Set the maximum JSON payload size in bytes.
89    pub fn with_max_json_payload_size(mut self, limit: usize) -> Self {
90        self.max_json_payload_size = limit;
91        self
92    }
93
94    /// Set the maximum identifier length in bytes.
95    pub fn with_max_identifier_length(mut self, limit: usize) -> Self {
96        self.max_identifier_length = limit;
97        self
98    }
99
100    /// Set the reconnect backoff policy.
101    pub fn with_reconnect_policy(mut self, policy: ReconnectPolicy) -> Self {
102        self.reconnect_policy = policy;
103        self
104    }
105
106    /// Convenience: configure a fast reconnect policy suitable for tests.
107    ///
108    /// Sets a 50 ms initial delay, 500 ms maximum delay, ×2 multiplier, and
109    /// disables jitter.
110    pub fn with_fast_reconnect(self) -> Self {
111        self.with_reconnect_policy(
112            ReconnectPolicy::new()
113                .with_initial_delay(std::time::Duration::from_millis(50))
114                .with_max_delay(std::time::Duration::from_millis(500))
115                .with_multiplier(2.0)
116                .with_jitter(false),
117        )
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use std::time::Duration;
125
126    #[test]
127    fn worker_options_defaults() {
128        let opts = WorkerOptions::default();
129        assert_eq!(opts.max_concurrent_work_items, 10_000);
130        assert_eq!(opts.max_event_names, 1_000);
131        assert_eq!(opts.max_events_per_name, 10_000);
132        assert_eq!(opts.max_pending_tasks_per_name, 10_000);
133        assert_eq!(opts.max_json_payload_size, 64 * 1024 * 1024);
134        assert_eq!(opts.max_identifier_length, 1_024);
135    }
136
137    #[test]
138    fn with_max_concurrent_work_items() {
139        let opts = WorkerOptions::new().with_max_concurrent_work_items(500);
140        assert_eq!(opts.max_concurrent_work_items, 500);
141    }
142
143    #[test]
144    fn with_max_event_names() {
145        let opts = WorkerOptions::new().with_max_event_names(200);
146        assert_eq!(opts.max_event_names, 200);
147    }
148
149    #[test]
150    fn with_max_events_per_name() {
151        let opts = WorkerOptions::new().with_max_events_per_name(5_000);
152        assert_eq!(opts.max_events_per_name, 5_000);
153    }
154
155    #[test]
156    fn with_max_pending_tasks_per_name() {
157        let opts = WorkerOptions::new().with_max_pending_tasks_per_name(2_000);
158        assert_eq!(opts.max_pending_tasks_per_name, 2_000);
159    }
160
161    #[test]
162    fn with_max_json_payload_size() {
163        let opts = WorkerOptions::new().with_max_json_payload_size(1024);
164        assert_eq!(opts.max_json_payload_size, 1024);
165    }
166
167    #[test]
168    fn with_max_identifier_length() {
169        let opts = WorkerOptions::new().with_max_identifier_length(512);
170        assert_eq!(opts.max_identifier_length, 512);
171    }
172
173    #[test]
174    fn with_fast_reconnect() {
175        let opts = WorkerOptions::new().with_fast_reconnect();
176        let rp = &opts.reconnect_policy;
177        assert_eq!(rp.initial_delay, Duration::from_millis(50));
178        assert_eq!(rp.max_delay, Duration::from_millis(500));
179        assert_eq!(rp.multiplier, 2.0);
180        assert!(!rp.jitter);
181    }
182
183    #[test]
184    fn builder_chaining() {
185        let opts = WorkerOptions::new()
186            .with_max_concurrent_work_items(100)
187            .with_max_event_names(50)
188            .with_max_events_per_name(200)
189            .with_max_pending_tasks_per_name(300)
190            .with_max_json_payload_size(4096)
191            .with_max_identifier_length(128)
192            .with_fast_reconnect();
193
194        assert_eq!(opts.max_concurrent_work_items, 100);
195        assert_eq!(opts.max_event_names, 50);
196        assert_eq!(opts.max_events_per_name, 200);
197        assert_eq!(opts.max_pending_tasks_per_name, 300);
198        assert_eq!(opts.max_json_payload_size, 4096);
199        assert_eq!(opts.max_identifier_length, 128);
200        assert_eq!(
201            opts.reconnect_policy.initial_delay,
202            Duration::from_millis(50)
203        );
204    }
205}