1use crate::error::{AgentError, Result};
4use crate::runtime::ContainerId;
5use std::time::Duration;
6use zlayer_init_actions::InitAction;
7use zlayer_spec::{ErrorsSpec, InitFailureAction, InitSpec};
8
9const DEFAULT_MAX_INIT_RETRIES: u32 = 3;
11
12#[derive(Debug, Clone)]
14pub struct BackoffConfig {
15 pub initial_delay: Duration,
17 pub max_delay: Duration,
19 pub multiplier: f64,
21}
22
23impl Default for BackoffConfig {
24 fn default() -> Self {
25 Self {
26 initial_delay: Duration::from_secs(1),
27 max_delay: Duration::from_secs(60),
28 multiplier: 2.0,
29 }
30 }
31}
32
33impl BackoffConfig {
34 #[must_use]
36 #[allow(clippy::cast_possible_wrap)]
37 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
38 let delay_secs = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32);
39 let capped_secs = delay_secs.min(self.max_delay.as_secs_f64());
40 Duration::from_secs_f64(capped_secs)
41 }
42}
43
44pub struct InitOrchestrator {
46 id: ContainerId,
47 spec: InitSpec,
48 error_policy: ErrorsSpec,
50 max_retries: u32,
52 backoff_config: BackoffConfig,
54}
55
56impl InitOrchestrator {
57 #[must_use]
59 pub fn new(id: ContainerId, spec: InitSpec) -> Self {
60 Self {
61 id,
62 spec,
63 error_policy: ErrorsSpec::default(),
64 max_retries: DEFAULT_MAX_INIT_RETRIES,
65 backoff_config: BackoffConfig::default(),
66 }
67 }
68
69 #[must_use]
71 pub fn with_error_policy(id: ContainerId, spec: InitSpec, error_policy: ErrorsSpec) -> Self {
72 Self {
73 id,
74 spec,
75 error_policy,
76 max_retries: DEFAULT_MAX_INIT_RETRIES,
77 backoff_config: BackoffConfig::default(),
78 }
79 }
80
81 #[must_use]
83 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
84 self.max_retries = max_retries;
85 self
86 }
87
88 #[must_use]
90 pub fn with_backoff_config(mut self, config: BackoffConfig) -> Self {
91 self.backoff_config = config;
92 self
93 }
94
95 pub async fn run(&self) -> Result<()> {
105 match self.error_policy.on_init_failure.action {
106 InitFailureAction::Fail => {
107 self.run_init_steps().await
109 }
110 InitFailureAction::Restart => {
111 self.run_with_retries(false).await
113 }
114 InitFailureAction::Backoff => {
115 self.run_with_retries(true).await
117 }
118 }
119 }
120
121 async fn run_with_retries(&self, use_backoff: bool) -> Result<()> {
123 let mut last_error = None;
124
125 for attempt in 0..=self.max_retries {
126 if attempt > 0 {
127 let delay = if use_backoff {
128 self.backoff_config.delay_for_attempt(attempt - 1)
129 } else {
130 Duration::from_millis(100) };
132
133 tracing::info!(
134 container = %self.id,
135 attempt = attempt,
136 max_retries = self.max_retries,
137 delay_ms = delay.as_millis(),
138 "Retrying init steps after failure"
139 );
140
141 tokio::time::sleep(delay).await;
142 }
143
144 match self.run_init_steps().await {
145 Ok(()) => {
146 if attempt > 0 {
147 tracing::info!(
148 container = %self.id,
149 attempt = attempt,
150 "Init steps succeeded after retry"
151 );
152 }
153 return Ok(());
154 }
155 Err(e) => {
156 tracing::warn!(
157 container = %self.id,
158 attempt = attempt,
159 max_retries = self.max_retries,
160 error = %e,
161 "Init steps failed"
162 );
163 last_error = Some(e);
164 }
165 }
166 }
167
168 Err(last_error.unwrap_or_else(|| AgentError::InitActionFailed {
170 id: self.id.to_string(),
171 reason: "Init failed after all retries".to_string(),
172 }))
173 }
174
175 async fn run_init_steps(&self) -> Result<()> {
177 let _start_grace = std::time::Instant::now();
178
179 for step in &self.spec.steps {
180 let _step_start = std::time::Instant::now();
181
182 let action = zlayer_init_actions::from_spec(
184 &step.uses,
185 &step.with,
186 Duration::from_secs(30), )
188 .map_err(|e| AgentError::InitActionFailed {
189 id: self.id.to_string(),
190 reason: e.to_string(),
191 })?;
192
193 let timeout = step.timeout.unwrap_or(Duration::from_secs(300));
195 let result = tokio::time::timeout(timeout, self.execute_step(&action, step)).await;
196
197 match result {
198 Ok(Ok(())) => {
199 }
201 Ok(Err(e)) => {
202 return match step.on_failure {
204 zlayer_spec::FailureAction::Fail => Err(AgentError::InitActionFailed {
205 id: self.id.to_string(),
206 reason: format!("step '{}' failed: {}", step.id, e),
207 }),
208 zlayer_spec::FailureAction::Warn => {
209 tracing::warn!(
210 container = %self.id,
211 step = %step.id,
212 error = %e,
213 "Init step failed (continuing due to warn policy)"
214 );
215 continue; }
217 zlayer_spec::FailureAction::Continue => {
218 continue;
220 }
221 };
222 }
223 Err(_) => {
224 return match step.on_failure {
226 zlayer_spec::FailureAction::Fail => Err(AgentError::Timeout { timeout }),
227 zlayer_spec::FailureAction::Warn => {
228 tracing::warn!(
229 container = %self.id,
230 step = %step.id,
231 timeout_secs = timeout.as_secs(),
232 "Init step timed out (continuing due to warn policy)"
233 );
234 continue; }
236 zlayer_spec::FailureAction::Continue => {
237 continue;
239 }
240 };
241 }
242 }
243
244 if let Some(retry_count) = step.retry {
246 let _ = retry_count;
249 }
250 }
251
252 Ok(())
253 }
254
255 async fn execute_step(&self, action: &InitAction, _step: &zlayer_spec::InitStep) -> Result<()> {
256 match action {
257 InitAction::WaitTcp(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
258 id: self.id.to_string(),
259 reason: e.to_string(),
260 }),
261 InitAction::WaitHttp(a) => {
262 a.execute().await.map_err(|e| AgentError::InitActionFailed {
263 id: self.id.to_string(),
264 reason: e.to_string(),
265 })
266 }
267 InitAction::Run(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
268 id: self.id.to_string(),
269 reason: e.to_string(),
270 }),
271 #[cfg(feature = "s3")]
272 InitAction::S3Push(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
273 id: self.id.to_string(),
274 reason: e.to_string(),
275 }),
276 #[cfg(feature = "s3")]
277 InitAction::S3Pull(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
278 id: self.id.to_string(),
279 reason: e.to_string(),
280 }),
281 }
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 fn test_container_id() -> ContainerId {
290 ContainerId {
291 service: "test".to_string(),
292 replica: 1,
293 }
294 }
295
296 #[tokio::test]
297 async fn test_init_orchestrator_success() {
298 let id = test_container_id();
299 let spec = InitSpec { steps: vec![] };
300
301 let orchestrator = InitOrchestrator::new(id, spec);
302 orchestrator.run().await.unwrap();
303 }
304
305 #[tokio::test]
306 async fn test_init_orchestrator_with_error_policy() {
307 let id = test_container_id();
308 let spec = InitSpec { steps: vec![] };
309 let error_policy = ErrorsSpec::default();
310
311 let orchestrator = InitOrchestrator::with_error_policy(id, spec, error_policy);
312 orchestrator.run().await.unwrap();
313 }
314
315 #[test]
316 fn test_backoff_config_default() {
317 let config = BackoffConfig::default();
318 assert_eq!(config.initial_delay, Duration::from_secs(1));
319 assert_eq!(config.max_delay, Duration::from_secs(60));
320 assert!(
321 (config.multiplier - 2.0).abs() < f64::EPSILON,
322 "multiplier should be 2.0"
323 );
324 }
325
326 #[test]
327 fn test_backoff_delay_calculation() {
328 let config = BackoffConfig::default();
329
330 assert_eq!(config.delay_for_attempt(0), Duration::from_secs(1));
332 assert_eq!(config.delay_for_attempt(1), Duration::from_secs(2));
334 assert_eq!(config.delay_for_attempt(2), Duration::from_secs(4));
336 assert_eq!(config.delay_for_attempt(3), Duration::from_secs(8));
338 assert_eq!(config.delay_for_attempt(4), Duration::from_secs(16));
340 assert_eq!(config.delay_for_attempt(5), Duration::from_secs(32));
342 assert_eq!(config.delay_for_attempt(6), Duration::from_secs(60));
344 assert_eq!(config.delay_for_attempt(7), Duration::from_secs(60));
346 }
347
348 #[test]
349 fn test_backoff_custom_config() {
350 let config = BackoffConfig {
351 initial_delay: Duration::from_millis(100),
352 max_delay: Duration::from_secs(10),
353 multiplier: 3.0,
354 };
355
356 assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
358 assert_eq!(config.delay_for_attempt(1), Duration::from_millis(300));
360 assert_eq!(config.delay_for_attempt(2), Duration::from_millis(900));
362 assert_eq!(config.delay_for_attempt(3), Duration::from_millis(2700));
364 assert_eq!(config.delay_for_attempt(4), Duration::from_millis(8100));
366 assert_eq!(config.delay_for_attempt(5), Duration::from_secs(10));
368 }
369
370 #[test]
371 fn test_orchestrator_builder_pattern() {
372 let id = test_container_id();
373 let spec = InitSpec { steps: vec![] };
374
375 let orchestrator = InitOrchestrator::new(id.clone(), spec.clone())
376 .with_max_retries(5)
377 .with_backoff_config(BackoffConfig {
378 initial_delay: Duration::from_millis(500),
379 max_delay: Duration::from_secs(30),
380 multiplier: 1.5,
381 });
382
383 assert_eq!(orchestrator.max_retries, 5);
384 assert_eq!(
385 orchestrator.backoff_config.initial_delay,
386 Duration::from_millis(500)
387 );
388 }
389}