1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::pin::Pin;
use async_trait::async_trait;
use futures::Stream;
use tokio::sync::broadcast;
use crate::error::ClusterError;
use crate::runner::Runner;
use crate::types::{MachineId, RunnerAddress, ShardId};
/// Health status of the runner's lease/registration keep-alive.
///
/// Published by `RunnerStorage` implementations to allow sharding to monitor
/// lease health and trigger detachment when connectivity is degraded.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LeaseHealth {
/// Whether the last keep-alive attempt was successful.
pub healthy: bool,
/// Number of consecutive keep-alive failures.
pub failure_streak: u32,
}
/// Result of a batch shard lock refresh, reporting successes, lost locks, and per-shard errors.
#[derive(Debug)]
pub struct BatchRefreshResult {
/// Shard IDs that were successfully refreshed.
pub refreshed: Vec<ShardId>,
/// Shard IDs where the lock was no longer held (refresh returned `false`).
pub lost: Vec<ShardId>,
/// Per-shard errors for shards that failed to refresh due to storage errors.
pub failures: Vec<(ShardId, ClusterError)>,
}
/// Result of a batch shard acquisition, reporting both successes and per-shard errors.
#[derive(Debug)]
pub struct BatchAcquireResult {
/// Shard IDs that were successfully acquired.
pub acquired: Vec<ShardId>,
/// Per-shard errors for shards that failed to acquire due to storage errors
/// (as opposed to being held by another runner, which is a normal `Ok(false)`).
pub failures: Vec<(ShardId, ClusterError)>,
}
/// Storage backend for runner registration and shard locking.
#[async_trait]
pub trait RunnerStorage: Send + Sync {
/// Register this runner. Returns assigned machine ID.
async fn register(&self, runner: &Runner) -> Result<MachineId, ClusterError>;
/// Unregister this runner.
async fn unregister(&self, address: &RunnerAddress) -> Result<(), ClusterError>;
/// Get all registered runners.
async fn get_runners(&self) -> Result<Vec<Runner>, ClusterError>;
/// Set runner health status.
async fn set_runner_health(
&self,
address: &RunnerAddress,
healthy: bool,
) -> Result<(), ClusterError>;
/// Try to acquire a shard lock.
async fn acquire(
&self,
shard_id: &ShardId,
runner: &RunnerAddress,
) -> Result<bool, ClusterError>;
/// Refresh shard lock TTL.
async fn refresh(
&self,
shard_id: &ShardId,
runner: &RunnerAddress,
) -> Result<bool, ClusterError>;
/// Release a shard lock.
async fn release(&self, shard_id: &ShardId, runner: &RunnerAddress)
-> Result<(), ClusterError>;
/// Release all locks held by this runner.
async fn release_all(&self, runner: &RunnerAddress) -> Result<(), ClusterError>;
/// Watch for changes to runner registrations.
async fn watch_runners(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Vec<Runner>> + Send>>, ClusterError>;
/// Refresh multiple shard lock TTLs in a batch. Returns the list of
/// shard IDs that were successfully refreshed. Shards where the lock was
/// not held (refresh returned `false`) are not included in the result.
///
/// The default implementation calls `refresh` in a loop. Backends may
/// override this for better performance (e.g., fewer mutex acquisitions).
async fn refresh_batch(
&self,
shard_ids: &[ShardId],
runner: &RunnerAddress,
) -> Result<BatchRefreshResult, ClusterError> {
let mut refreshed = Vec::new();
let mut lost = Vec::new();
let mut failures = Vec::new();
for shard_id in shard_ids {
match self.refresh(shard_id, runner).await {
Ok(true) => refreshed.push(shard_id.clone()),
Ok(false) => lost.push(shard_id.clone()),
Err(e) => {
tracing::warn!(shard_id = %shard_id, error = %e, "failed to refresh shard in batch");
failures.push((shard_id.clone(), e));
}
}
}
Ok(BatchRefreshResult {
refreshed,
lost,
failures,
})
}
/// Try to acquire multiple shard locks in a batch. Returns the list of
/// shard IDs that were successfully acquired.
///
/// The default implementation calls `acquire` in a loop. Backends may
/// override this for better performance (e.g., etcd multi-key transactions).
async fn acquire_batch(
&self,
shard_ids: &[ShardId],
runner: &RunnerAddress,
) -> Result<BatchAcquireResult, ClusterError> {
let mut acquired = Vec::new();
let mut failures = Vec::new();
for shard_id in shard_ids {
match self.acquire(shard_id, runner).await {
Ok(true) => acquired.push(shard_id.clone()),
Ok(false) => {}
Err(e) => {
tracing::warn!(shard_id = %shard_id, error = %e, "failed to acquire shard in batch");
failures.push((shard_id.clone(), e));
}
}
}
Ok(BatchAcquireResult { acquired, failures })
}
/// Subscribe to lease health updates.
///
/// The returned receiver will receive [`LeaseHealth`] updates whenever the
/// keep-alive status changes (success or failure). Sharding uses these
/// updates to detect connectivity degradation and trigger detachment.
///
/// The default implementation returns `None`, indicating the backend does
/// not support health notifications (e.g., in-memory implementations).
/// Backends that use leases (etcd, etc.) should override this.
fn lease_health_receiver(&self) -> Option<broadcast::Receiver<LeaseHealth>> {
None
}
}