Skip to main content

dapr_durabletask/worker/
options.rs

1use super::reconnect_policy::ReconnectPolicy;
2
3/// Configuration options for [`TaskHubGrpcWorker`](super::TaskHubGrpcWorker).
4#[derive(Debug, Clone)]
5pub struct WorkerOptions {
6    /// Maximum number of concurrent work items (orchestrations + activities)
7    /// processed simultaneously. The worker stops accepting new work items
8    /// until an in-flight task completes.
9    pub max_concurrent_work_items: usize,
10
11    /// Maximum number of distinct event names that can be buffered per
12    /// orchestration. External events arriving before the orchestrator calls
13    /// `wait_for_external_event` are held in a per-name buffer. This cap
14    /// limits the number of unique event names to prevent memory exhaustion
15    /// from a flood of differently-named events.
16    pub max_event_names: usize,
17
18    /// Maximum number of events buffered per event name. When an external
19    /// event arrives but no orchestrator is waiting for it yet, the event
20    /// payload is queued. This cap bounds the queue depth per event name —
21    /// excess events are discarded with a warning.
22    pub max_events_per_name: usize,
23
24    /// Maximum number of pending `wait_for_external_event` tasks per event
25    /// name. If an orchestrator issues more concurrent waits on the same
26    /// event name than this limit, additional waits return an incomplete task.
27    pub max_pending_tasks_per_name: usize,
28
29    /// Maximum JSON payload size in bytes for deserialisation. Payloads
30    /// exceeding this limit are rejected with an error.
31    pub max_json_payload_size: usize,
32
33    /// Maximum allowed length (in bytes) for identifiers such as orchestrator
34    /// names, activity names, instance IDs, and event names.
35    pub max_identifier_length: usize,
36
37    /// Reconnection policy applied when the gRPC connection to the sidecar
38    /// is unavailable or drops. The policy governs both the initial connection
39    /// attempt and every subsequent reconnect.
40    pub reconnect_policy: ReconnectPolicy,
41}
42
43impl Default for WorkerOptions {
44    fn default() -> Self {
45        Self {
46            max_concurrent_work_items: 10_000,
47            max_event_names: 1_000,
48            max_events_per_name: 10_000,
49            max_pending_tasks_per_name: 10_000,
50            max_json_payload_size: 64 * 1024 * 1024, // 64 MiB
51            max_identifier_length: 1_024,
52            reconnect_policy: ReconnectPolicy::default(),
53        }
54    }
55}
56
57impl WorkerOptions {
58    /// Create options with default values.
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    /// Set the maximum number of concurrent work items.
64    pub fn with_max_concurrent_work_items(mut self, limit: usize) -> Self {
65        self.max_concurrent_work_items = limit;
66        self
67    }
68
69    /// Set the maximum number of distinct event names buffered per orchestration.
70    pub fn with_max_event_names(mut self, limit: usize) -> Self {
71        self.max_event_names = limit;
72        self
73    }
74
75    /// Set the maximum number of events buffered per event name.
76    pub fn with_max_events_per_name(mut self, limit: usize) -> Self {
77        self.max_events_per_name = limit;
78        self
79    }
80
81    /// Set the maximum number of pending wait tasks per event name.
82    pub fn with_max_pending_tasks_per_name(mut self, limit: usize) -> Self {
83        self.max_pending_tasks_per_name = limit;
84        self
85    }
86
87    /// Set the maximum JSON payload size in bytes.
88    pub fn with_max_json_payload_size(mut self, limit: usize) -> Self {
89        self.max_json_payload_size = limit;
90        self
91    }
92
93    /// Set the maximum identifier length in bytes.
94    pub fn with_max_identifier_length(mut self, limit: usize) -> Self {
95        self.max_identifier_length = limit;
96        self
97    }
98
99    /// Set the reconnect backoff policy.
100    pub fn with_reconnect_policy(mut self, policy: ReconnectPolicy) -> Self {
101        self.reconnect_policy = policy;
102        self
103    }
104
105    /// Convenience: configure a fast reconnect policy suitable for tests.
106    ///
107    /// Sets a 50 ms initial delay, 500 ms maximum delay, ×2 multiplier, and
108    /// disables jitter.
109    pub fn with_fast_reconnect(self) -> Self {
110        self.with_reconnect_policy(
111            ReconnectPolicy::new()
112                .with_initial_delay(std::time::Duration::from_millis(50))
113                .with_max_delay(std::time::Duration::from_millis(500))
114                .with_multiplier(2.0)
115                .with_jitter(false),
116        )
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use std::time::Duration;
124
125    #[test]
126    fn worker_options_defaults() {
127        let opts = WorkerOptions::default();
128        assert_eq!(opts.max_concurrent_work_items, 10_000);
129        assert_eq!(opts.max_event_names, 1_000);
130        assert_eq!(opts.max_events_per_name, 10_000);
131        assert_eq!(opts.max_pending_tasks_per_name, 10_000);
132        assert_eq!(opts.max_json_payload_size, 64 * 1024 * 1024);
133        assert_eq!(opts.max_identifier_length, 1_024);
134    }
135
136    #[test]
137    fn with_max_concurrent_work_items() {
138        let opts = WorkerOptions::new().with_max_concurrent_work_items(500);
139        assert_eq!(opts.max_concurrent_work_items, 500);
140    }
141
142    #[test]
143    fn with_max_event_names() {
144        let opts = WorkerOptions::new().with_max_event_names(200);
145        assert_eq!(opts.max_event_names, 200);
146    }
147
148    #[test]
149    fn with_max_events_per_name() {
150        let opts = WorkerOptions::new().with_max_events_per_name(5_000);
151        assert_eq!(opts.max_events_per_name, 5_000);
152    }
153
154    #[test]
155    fn with_max_pending_tasks_per_name() {
156        let opts = WorkerOptions::new().with_max_pending_tasks_per_name(2_000);
157        assert_eq!(opts.max_pending_tasks_per_name, 2_000);
158    }
159
160    #[test]
161    fn with_max_json_payload_size() {
162        let opts = WorkerOptions::new().with_max_json_payload_size(1024);
163        assert_eq!(opts.max_json_payload_size, 1024);
164    }
165
166    #[test]
167    fn with_max_identifier_length() {
168        let opts = WorkerOptions::new().with_max_identifier_length(512);
169        assert_eq!(opts.max_identifier_length, 512);
170    }
171
172    #[test]
173    fn with_fast_reconnect() {
174        let opts = WorkerOptions::new().with_fast_reconnect();
175        let rp = &opts.reconnect_policy;
176        assert_eq!(rp.initial_delay, Duration::from_millis(50));
177        assert_eq!(rp.max_delay, Duration::from_millis(500));
178        assert_eq!(rp.multiplier, 2.0);
179        assert!(!rp.jitter);
180    }
181
182    #[test]
183    fn builder_chaining() {
184        let opts = WorkerOptions::new()
185            .with_max_concurrent_work_items(100)
186            .with_max_event_names(50)
187            .with_max_events_per_name(200)
188            .with_max_pending_tasks_per_name(300)
189            .with_max_json_payload_size(4096)
190            .with_max_identifier_length(128)
191            .with_fast_reconnect();
192
193        assert_eq!(opts.max_concurrent_work_items, 100);
194        assert_eq!(opts.max_event_names, 50);
195        assert_eq!(opts.max_events_per_name, 200);
196        assert_eq!(opts.max_pending_tasks_per_name, 300);
197        assert_eq!(opts.max_json_payload_size, 4096);
198        assert_eq!(opts.max_identifier_length, 128);
199        assert_eq!(
200            opts.reconnect_policy.initial_delay,
201            Duration::from_millis(50)
202        );
203    }
204}