1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
use crate::error::ClusterError;
use crate::shard_assigner::ShardAssignmentStrategy;
use crate::types::RunnerAddress;
use std::time::Duration;
/// Configuration for the sharding system.
#[derive(Debug, Clone)]
pub struct ShardingConfig {
/// Address this runner advertises to other runners.
pub runner_address: RunnerAddress,
/// Address this runner listens on (defaults to runner_address).
pub runner_listen_address: Option<RunnerAddress>,
/// Weight of this runner for shard assignment. Default: 1.
pub runner_weight: i32,
/// Shard groups to participate in. Default: ["default"].
pub shard_groups: Vec<String>,
/// Number of shards per group. Default: 300.
pub shards_per_group: i32,
/// TTL for runner lock in storage. Default: 30s.
pub runner_lock_ttl: Duration,
/// How often to refresh runner lock. Default: 10s.
pub runner_lock_refresh_interval: Duration,
/// Max idle time before an entity is reaped. Default: 60s.
pub entity_max_idle_time: Duration,
/// How long to wait for entity termination. Default: 15s.
pub entity_termination_timeout: Duration,
/// Mailbox capacity per entity. Default: 100.
pub entity_mailbox_capacity: usize,
/// Maximum number of concurrent requests per entity instance. Default: 1 (serial).
/// 0 = unbounded concurrency. Per-entity `Entity::concurrency()` overrides this.
/// When > 1, requests are dispatched concurrently under a read lock; crash
/// recovery takes a write lock to swap the handler and replays all in-flight requests.
pub entity_max_concurrent_requests: usize,
/// How often to poll storage for unprocessed messages. Default: 500ms.
pub storage_poll_interval: Duration,
/// Max inbox size from storage polling. Default: 1000.
pub storage_inbox_size: usize,
/// How often to poll for runner changes. Default: 5s.
pub runner_poll_interval: Duration,
/// Debounce interval for shard rebalancing. Default: 3s.
pub shard_rebalance_debounce: Duration,
/// Retry interval for shard rebalancing. Default: 2s.
pub shard_rebalance_retry_interval: Duration,
/// Maximum number of consecutive crash recovery attempts for an entity
/// before giving up and dropping the entity instance. 0 = unlimited
/// retries (matching TS behavior). Default: 0 (unlimited).
pub entity_crash_max_retries: u32,
/// Initial backoff duration between crash recovery attempts. Uses
/// exponential backoff with factor 1.5, capped at
/// `entity_crash_max_backoff`. Default: 500ms.
pub entity_crash_initial_backoff: Duration,
/// Maximum backoff duration between crash recovery attempts. Default: 10s.
pub entity_crash_max_backoff: Duration,
/// Maximum number of delivery attempts for a persisted message before
/// it is marked as dead-lettered (processed with a failure reply).
/// 0 = unlimited retries. Default: 10.
pub storage_message_max_retries: u32,
/// Maximum consecutive lock refresh failures before a shard is released.
/// When a shard lock fails to refresh this many times in a row, the shard
/// is removed from owned shards and its entities are interrupted to prevent
/// split-brain (another runner may have already acquired the expired lock).
/// Default: 3.
pub shard_lock_refresh_max_failures: u32,
/// How long to wait for an entity type to be registered before returning
/// an error. During startup, messages may arrive before all entity types
/// are registered. This timeout allows the router to wait for late
/// registrations instead of immediately failing. Default: 5s.
pub entity_registration_timeout: Duration,
/// Retry interval for delivering messages to entities with full mailboxes.
/// When a storage-polled message encounters `MailboxFull`, a per-entity
/// resumption task retries delivery at this interval. Default: 100ms.
pub send_retry_interval: Duration,
/// Base backoff duration for singleton crash recovery. When a singleton
/// fails (returns Err or panics), re-spawn is delayed by
/// `base * 2^min(consecutive_failures - 1, 10)`. Default: 1s.
pub singleton_crash_backoff_base: Duration,
/// Number of retry attempts for `send()` and `notify()` when routing
/// fails with `EntityNotAssignedToRunner` or `RunnerUnavailable` during
/// shard rebalancing. 0 = no retries. Default: 3.
pub send_retry_count: u32,
/// Maximum number of MailboxFull retries in the storage resumption task
/// before giving up on a message. When exhausted, a failure reply is saved
/// to storage and the message is considered dead-lettered.
/// 0 = unlimited (retry indefinitely until shard is lost or shutdown).
/// Default: 0 (matches TS behavior of infinite retry).
pub storage_resumption_max_retries: u32,
/// Timeout for establishing gRPC connections to remote runners. Prevents
/// blocking indefinitely when a runner host is unresponsive. Default: 5s.
pub grpc_connect_timeout: Duration,
/// Guard interval for the `last_read` dedup mechanism in SQL message storage.
/// After a message is read by `unprocessed_messages`, it will not be re-read
/// until this interval has elapsed, preventing duplicate dispatch of messages
/// that are still being processed. Default: 10 minutes (matches TS).
pub last_read_guard_interval: Duration,
/// Strategy for assigning shards to runners. Default: Rendezvous.
///
/// - `Rendezvous`: Best distribution (each node gets exactly 1/n shards, ±1),
/// optimal rebalancing. Recommended for clusters with < 1000 nodes.
pub shard_assignment_strategy: ShardAssignmentStrategy,
/// Window before detaching from the cluster after a storage error or keep-alive
/// failure. If connectivity is restored within this window, the runner stays
/// attached. If not, owned shards are cleared and entities are interrupted.
/// Default: 200ms.
pub detachment_window: Duration,
/// Duration of sustained healthy status required before re-attaching to the
/// cluster after detachment. The runner must observe healthy keep-alive and
/// storage operations for this duration before it can acquire shards again.
/// Default: 500ms.
pub detachment_recover_window: Duration,
/// Whether detachment is enabled. When false, storage errors are logged but
/// do not trigger detachment. Use this to preserve current behavior during
/// rollout. Default: false.
pub detachment_enabled: bool,
/// Number of consecutive keep-alive failures before triggering detachment.
/// When the lease keep-alive fails this many times in a row, the runner
/// detaches from the cluster to prevent split-brain. Default: 3.
pub keepalive_failure_threshold: u32,
/// Interval between acquire retries for shards held by other runners.
/// After the initial acquire_batch, shards that were not acquired due to
/// being held elsewhere are retried at this interval. Default: 200ms.
pub acquire_retry_interval: Duration,
/// Maximum duration to retry acquiring shards before giving up for this
/// rebalance cycle. Retries continue until either all shards are acquired
/// or this window elapses. 0 = no retries (single attempt). Default: 2s.
pub acquire_retry_window: Duration,
}
impl ShardingConfig {
/// Validate configuration values. Returns an error message if any value is invalid.
///
/// Checks:
/// - `shards_per_group >= 1` (prevents division-by-zero in shard assignment)
/// - `runner_weight >= 0` (negative weight has no meaning)
/// - `entity_mailbox_capacity >= 1` (zero-capacity mailbox can never accept messages)
/// - `storage_inbox_size >= 1` (zero inbox can never process messages)
/// - All durations are non-zero where required
pub fn validate(&self) -> Result<(), ClusterError> {
if self.shards_per_group < 1 {
return Err(ClusterError::InvalidConfig {
reason: format!(
"shards_per_group must be >= 1, got {}",
self.shards_per_group
),
});
}
if self.runner_weight < 0 {
return Err(ClusterError::InvalidConfig {
reason: format!("runner_weight must be >= 0, got {}", self.runner_weight),
});
}
if self.entity_mailbox_capacity == 0 {
return Err(ClusterError::InvalidConfig {
reason: "entity_mailbox_capacity must be >= 1".to_string(),
});
}
if self.storage_inbox_size == 0 {
return Err(ClusterError::InvalidConfig {
reason: "storage_inbox_size must be >= 1".to_string(),
});
}
if self.runner_lock_ttl.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "runner_lock_ttl must be > 0".to_string(),
});
}
if self.runner_lock_refresh_interval.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "runner_lock_refresh_interval must be > 0".to_string(),
});
}
if self.entity_max_idle_time.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "entity_max_idle_time must be > 0".to_string(),
});
}
if self.entity_termination_timeout.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "entity_termination_timeout must be > 0".to_string(),
});
}
if self.storage_poll_interval.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "storage_poll_interval must be > 0".to_string(),
});
}
if self.runner_poll_interval.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "runner_poll_interval must be > 0".to_string(),
});
}
if self.shard_rebalance_debounce.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "shard_rebalance_debounce must be > 0".to_string(),
});
}
if self.shard_rebalance_retry_interval.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "shard_rebalance_retry_interval must be > 0".to_string(),
});
}
if self.entity_registration_timeout.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "entity_registration_timeout must be > 0".to_string(),
});
}
if self.send_retry_interval.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "send_retry_interval must be > 0".to_string(),
});
}
if self.singleton_crash_backoff_base.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "singleton_crash_backoff_base must be > 0".to_string(),
});
}
if self.entity_crash_initial_backoff.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "entity_crash_initial_backoff must be > 0".to_string(),
});
}
if self.entity_crash_max_backoff.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "entity_crash_max_backoff must be > 0".to_string(),
});
}
if self.shard_groups.is_empty() {
return Err(ClusterError::InvalidConfig {
reason: "shard_groups must not be empty".to_string(),
});
}
if self.grpc_connect_timeout.is_zero() {
return Err(ClusterError::InvalidConfig {
reason: "grpc_connect_timeout must be > 0".to_string(),
});
}
Ok(())
}
}
impl Default for ShardingConfig {
fn default() -> Self {
Self {
runner_address: RunnerAddress::new("127.0.0.1", 9000),
runner_listen_address: None,
runner_weight: 1,
shard_groups: vec!["default".to_string()],
shards_per_group: 2048,
runner_lock_ttl: Duration::from_secs(30),
runner_lock_refresh_interval: Duration::from_secs(10),
entity_max_idle_time: Duration::from_secs(60),
entity_termination_timeout: Duration::from_secs(15),
entity_mailbox_capacity: 100,
entity_max_concurrent_requests: 1,
storage_poll_interval: Duration::from_millis(500),
storage_inbox_size: 1000,
runner_poll_interval: Duration::from_secs(5),
shard_rebalance_debounce: Duration::from_secs(3),
shard_rebalance_retry_interval: Duration::from_secs(2),
entity_crash_max_retries: 0,
entity_crash_initial_backoff: Duration::from_millis(500),
entity_crash_max_backoff: Duration::from_secs(10),
storage_message_max_retries: 10,
shard_lock_refresh_max_failures: 3,
entity_registration_timeout: Duration::from_secs(5),
send_retry_interval: Duration::from_millis(100),
singleton_crash_backoff_base: Duration::from_secs(1),
send_retry_count: 3,
storage_resumption_max_retries: 0,
grpc_connect_timeout: Duration::from_secs(5),
last_read_guard_interval: Duration::from_secs(600),
shard_assignment_strategy: ShardAssignmentStrategy::default(),
detachment_window: Duration::from_millis(200),
detachment_recover_window: Duration::from_millis(500),
detachment_enabled: false,
keepalive_failure_threshold: 3,
acquire_retry_interval: Duration::from_millis(200),
acquire_retry_window: Duration::from_secs(2),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_values() {
let config = ShardingConfig::default();
assert_eq!(config.runner_weight, 1);
assert_eq!(config.shard_groups, vec!["default".to_string()]);
assert_eq!(config.shards_per_group, 2048);
assert_eq!(config.runner_lock_ttl, Duration::from_secs(30));
assert_eq!(config.entity_max_idle_time, Duration::from_secs(60));
assert_eq!(config.entity_mailbox_capacity, 100);
assert_eq!(config.entity_max_concurrent_requests, 1);
assert_eq!(config.entity_crash_max_retries, 0);
assert_eq!(
config.entity_crash_initial_backoff,
Duration::from_millis(500)
);
assert_eq!(config.entity_crash_max_backoff, Duration::from_secs(10));
assert_eq!(config.entity_registration_timeout, Duration::from_secs(5));
}
#[test]
fn custom_config() {
let config = ShardingConfig {
shards_per_group: 600,
entity_mailbox_capacity: 200,
..Default::default()
};
assert_eq!(config.shards_per_group, 600);
assert_eq!(config.entity_mailbox_capacity, 200);
// Other fields keep defaults
assert_eq!(config.runner_weight, 1);
}
#[test]
fn default_config_is_valid() {
ShardingConfig::default().validate().unwrap();
}
#[test]
fn validate_shards_per_group_zero() {
let config = ShardingConfig {
shards_per_group: 0,
..Default::default()
};
let err = config.validate().unwrap_err();
let msg = err.to_string();
assert!(msg.contains("shards_per_group"), "got: {msg}");
}
#[test]
fn validate_negative_runner_weight() {
let config = ShardingConfig {
runner_weight: -1,
..Default::default()
};
let err = config.validate().unwrap_err();
let msg = err.to_string();
assert!(msg.contains("runner_weight"), "got: {msg}");
}
#[test]
fn validate_zero_mailbox_capacity() {
let config = ShardingConfig {
entity_mailbox_capacity: 0,
..Default::default()
};
let err = config.validate().unwrap_err();
let msg = err.to_string();
assert!(msg.contains("entity_mailbox_capacity"), "got: {msg}");
}
#[test]
fn validate_zero_duration() {
let config = ShardingConfig {
runner_lock_ttl: Duration::ZERO,
..Default::default()
};
let err = config.validate().unwrap_err();
let msg = err.to_string();
assert!(msg.contains("runner_lock_ttl"), "got: {msg}");
}
#[test]
fn validate_zero_storage_inbox_size() {
let config = ShardingConfig {
storage_inbox_size: 0,
..Default::default()
};
let err = config.validate().unwrap_err();
let msg = err.to_string();
assert!(msg.contains("storage_inbox_size"), "got: {msg}");
}
#[test]
fn validate_runner_weight_zero_is_valid() {
// Weight 0 = drain mode, valid
let config = ShardingConfig {
runner_weight: 0,
..Default::default()
};
config.validate().unwrap();
}
}