1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use camel_api::CamelError;
use camel_component_api::{Consumer, ConsumerContext, is_retryable_camel_error};
use tokio::time::{interval, timeout};
use tokio_util::sync::CancellationToken;
use tracing::{error, warn};
use crate::consumer::{DelegateState, MasterConsumer};
use crate::leadership::{ReconcileContext, reconcile_event, stop_delegate};
const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
#[async_trait]
impl Consumer for MasterConsumer {
async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
if self.leadership_task.is_some() {
return Ok(());
}
let handle = self
.platform_service
.leadership()
.start(&self.lock_name)
.await
.map_err(|e| {
CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
})?;
let lock_name = self.lock_name.clone();
let delegate_uri = self.delegate_uri.clone();
let delegate_component = Arc::clone(&self.delegate_component);
let metrics = Arc::clone(&self.metrics);
let platform_service = Arc::clone(&self.platform_service);
let sender = context.sender();
let parent_cancel = context.cancel_token();
let route_id = context.route_id().to_string();
let drain_timeout = self.drain_timeout;
let reconnect = self.reconnect.clone();
let runtime = Arc::clone(&self.runtime);
let mut events = handle.events.clone();
let stop_token = CancellationToken::new();
let stop_token_loop = stop_token.clone();
let leadership_handle = handle;
let task = tokio::spawn(async move {
let mut state = DelegateState::Inactive;
let mut is_leading = false;
let mut delegate_attempts = 0u32;
let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
let rctx = ReconcileContext {
lock_name: &lock_name,
delegate_component: &delegate_component,
delegate_uri: &delegate_uri,
route_id,
sender: &sender,
parent_cancel: &parent_cancel,
drain_timeout,
metrics: &metrics,
platform_service: &platform_service,
runtime: Arc::clone(&runtime),
};
let initial_event = { events.borrow().clone() };
if let Some(initial_event) = initial_event {
is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
if is_leading {
delegate_attempts = 0;
}
if let Err(err) = reconcile_event(initial_event, &mut state, &rctx).await {
// log-policy: system-broken
error!(lock = %lock_name, "master delegate error: {err}");
return Err(err);
}
}
loop {
tokio::select! {
_ = stop_token_loop.cancelled() => {
break;
}
_ = context.cancelled() => {
break;
}
changed = events.changed() => {
if changed.is_err() {
break;
}
let event = { events.borrow().clone() };
if let Some(event) = event {
let was_leading = is_leading;
is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
if !was_leading && is_leading {
delegate_attempts = 0;
}
if let Err(err) = reconcile_event(event, &mut state, &rctx).await {
// log-policy: system-broken
error!(lock = %lock_name, "master delegate error: {err}");
return Err(err);
}
}
}
_ = retry_tick.tick() => {
if matches!(&state, DelegateState::Active { handle, .. } if handle.is_finished())
&& let Err(err) = stop_delegate(&mut state, drain_timeout).await
{
// log-policy: system-broken
error!(lock = %lock_name, "master delegate task failed: {err}");
return Err(err);
}
if is_leading && matches!(state, DelegateState::Inactive) {
// Manual retry loop (not retry_async) because:
// - The retry logic is embedded inside a periodic
// retry_tick.tick() handler; the outer select! runs
// every DELEGATE_RETRY_INTERVAL regardless, so the
// delay is applied as an additive sleep on top of
// the tick interval, not as a replacement for it.
// - reconcile_event() requires &mut state, and the
// inter-attempt logic checks handle.is_finished()
// before retrying — both require state access
// between iterations that retry_async cannot provide.
// - Classifies errors (rc-i1z): permanent → fail-fast,
// transient → retry with backoff.
// Use NetworkRetryPolicy for bounded retries.
// delegate_attempts tracks the next zero-based attempt index.
if !reconnect.should_retry(delegate_attempts) {
warn!(
lock = %lock_name,
attempts = delegate_attempts,
"delegate start exceeded max attempts, stopping consumer"
);
break;
}
// Apply backoff delay for retries (skip first attempt).
if delegate_attempts > 0 {
let delay = reconnect.delay_for(delegate_attempts - 1);
if delay > DELEGATE_RETRY_INTERVAL {
tokio::select! {
_ = stop_token_loop.cancelled() => break,
_ = tokio::time::sleep(delay.saturating_sub(DELEGATE_RETRY_INTERVAL)) => {}
}
}
}
delegate_attempts = delegate_attempts.saturating_add(1);
if let Err(err) = reconcile_event(
camel_api::LeadershipEvent::StartedLeading,
&mut state,
&rctx,
)
.await {
if is_retryable_camel_error(&err) {
// log-policy: system-broken
error!(
lock = %lock_name,
error = %err,
attempt = delegate_attempts,
"master delegate transient error, will retry"
);
// Don't return — let the next tick attempt retry.
} else {
// log-policy: system-broken
error!(
lock = %lock_name,
error = %err,
"master delegate permanent error, terminating"
);
return Err(err);
}
}
}
}
}
}
stop_delegate(&mut state, drain_timeout).await?;
let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
Ok::<(), CamelError>(())
});
self.stop_token = Some(stop_token);
self.leadership_task = Some(task);
Ok(())
}
async fn stop(&mut self) -> Result<(), CamelError> {
if let Some(token) = self.stop_token.take() {
token.cancel();
}
if let Some(handle) = self.leadership_task.take() {
if handle.is_finished() {
match timeout(self.drain_timeout, handle).await {
Ok(Ok(Ok(()))) => {}
Ok(Ok(Err(err))) => return Err(err),
Ok(Err(e)) => {
return Err(CamelError::ProcessorError(format!(
"leadership task join failed: {e}"
)));
}
Err(_) => {
return Err(CamelError::ProcessorError(
"leadership task join timed out".to_string(),
));
}
}
return Ok(());
}
// Abort first so the task is guaranteed to stop; then await with
// a timeout as a safety-net in case abort takes a moment to land.
handle.abort();
match timeout(self.drain_timeout, handle).await {
Ok(Ok(Ok(()))) => {}
Ok(Ok(Err(err))) => return Err(err),
Ok(Err(e)) if e.is_panic() => {
// log-policy: system-broken
error!(lock = %self.lock_name, error = %e, "leadership task panicked");
}
Ok(Err(e)) => {
warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
}
Err(_) => {
warn!("master leadership loop shutdown timed out after abort");
}
}
}
Ok(())
}
fn background_task_handle(
&mut self,
) -> Option<tokio::task::JoinHandle<Result<(), CamelError>>> {
self.leadership_task.take()
}
}