1use crate::error::{AgentError, Result};
4use crate::runtime::ContainerId;
5use std::time::Duration;
6use zlayer_init_actions::InitAction;
7use zlayer_spec::{ErrorsSpec, InitFailureAction, InitSpec};
8
9const DEFAULT_MAX_INIT_RETRIES: u32 = 3;
11
12#[derive(Debug, Clone)]
14pub struct BackoffConfig {
15 pub initial_delay: Duration,
17 pub max_delay: Duration,
19 pub multiplier: f64,
21}
22
23impl Default for BackoffConfig {
24 fn default() -> Self {
25 Self {
26 initial_delay: Duration::from_secs(1),
27 max_delay: Duration::from_secs(60),
28 multiplier: 2.0,
29 }
30 }
31}
32
33impl BackoffConfig {
34 #[must_use]
36 #[allow(clippy::cast_possible_wrap)]
37 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
38 let delay_secs = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32);
39 let capped_secs = delay_secs.min(self.max_delay.as_secs_f64());
40 Duration::from_secs_f64(capped_secs)
41 }
42}
43
44pub struct InitOrchestrator {
46 id: ContainerId,
47 spec: InitSpec,
48 error_policy: ErrorsSpec,
50 max_retries: u32,
52 backoff_config: BackoffConfig,
54}
55
56impl InitOrchestrator {
57 #[must_use]
59 pub fn new(id: ContainerId, spec: InitSpec) -> Self {
60 Self {
61 id,
62 spec,
63 error_policy: ErrorsSpec::default(),
64 max_retries: DEFAULT_MAX_INIT_RETRIES,
65 backoff_config: BackoffConfig::default(),
66 }
67 }
68
69 #[must_use]
71 pub fn with_error_policy(id: ContainerId, spec: InitSpec, error_policy: ErrorsSpec) -> Self {
72 Self {
73 id,
74 spec,
75 error_policy,
76 max_retries: DEFAULT_MAX_INIT_RETRIES,
77 backoff_config: BackoffConfig::default(),
78 }
79 }
80
81 #[must_use]
83 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
84 self.max_retries = max_retries;
85 self
86 }
87
88 #[must_use]
90 pub fn with_backoff_config(mut self, config: BackoffConfig) -> Self {
91 self.backoff_config = config;
92 self
93 }
94
95 pub async fn run(&self) -> Result<()> {
105 match self.error_policy.on_init_failure.action {
106 InitFailureAction::Fail => {
107 self.run_init_steps().await
109 }
110 InitFailureAction::Restart => {
111 self.run_with_retries(false).await
113 }
114 InitFailureAction::Backoff => {
115 self.run_with_retries(true).await
117 }
118 }
119 }
120
121 async fn run_with_retries(&self, use_backoff: bool) -> Result<()> {
123 let mut last_error = None;
124
125 for attempt in 0..=self.max_retries {
126 if attempt > 0 {
127 let delay = if use_backoff {
128 self.backoff_config.delay_for_attempt(attempt - 1)
129 } else {
130 Duration::from_millis(100) };
132
133 tracing::info!(
134 container = %self.id,
135 attempt = attempt,
136 max_retries = self.max_retries,
137 delay_ms = delay.as_millis(),
138 "Retrying init steps after failure"
139 );
140
141 tokio::time::sleep(delay).await;
142 }
143
144 match self.run_init_steps().await {
145 Ok(()) => {
146 if attempt > 0 {
147 tracing::info!(
148 container = %self.id,
149 attempt = attempt,
150 "Init steps succeeded after retry"
151 );
152 }
153 return Ok(());
154 }
155 Err(e) => {
156 tracing::warn!(
157 container = %self.id,
158 attempt = attempt,
159 max_retries = self.max_retries,
160 error = %e,
161 "Init steps failed"
162 );
163 last_error = Some(e);
164 }
165 }
166 }
167
168 Err(last_error.unwrap_or_else(|| AgentError::InitActionFailed {
170 id: self.id.to_string(),
171 reason: "Init failed after all retries".to_string(),
172 }))
173 }
174
175 async fn run_init_steps(&self) -> Result<()> {
177 let _start_grace = std::time::Instant::now();
178
179 for step in &self.spec.steps {
180 let _step_start = std::time::Instant::now();
181
182 let action = zlayer_init_actions::from_spec(
184 &step.uses,
185 &step.with,
186 Duration::from_secs(30), )
188 .map_err(|e| AgentError::InitActionFailed {
189 id: self.id.to_string(),
190 reason: e.to_string(),
191 })?;
192
193 let timeout = step.timeout.unwrap_or(Duration::from_secs(300));
195 let result = tokio::time::timeout(timeout, self.execute_step(&action, step)).await;
196
197 match result {
198 Ok(Ok(())) => {
199 }
201 Ok(Err(e)) => {
202 return match step.on_failure {
204 zlayer_spec::FailureAction::Fail => Err(AgentError::InitActionFailed {
205 id: self.id.to_string(),
206 reason: format!("step '{}' failed: {}", step.id, e),
207 }),
208 zlayer_spec::FailureAction::Warn => {
209 tracing::warn!(
210 container = %self.id,
211 step = %step.id,
212 error = %e,
213 "Init step failed (continuing due to warn policy)"
214 );
215 continue; }
217 zlayer_spec::FailureAction::Continue => {
218 continue;
220 }
221 };
222 }
223 Err(_) => {
224 return match step.on_failure {
226 zlayer_spec::FailureAction::Fail => Err(AgentError::Timeout { timeout }),
227 zlayer_spec::FailureAction::Warn => {
228 tracing::warn!(
229 container = %self.id,
230 step = %step.id,
231 timeout_secs = timeout.as_secs(),
232 "Init step timed out (continuing due to warn policy)"
233 );
234 continue; }
236 zlayer_spec::FailureAction::Continue => {
237 continue;
239 }
240 };
241 }
242 }
243
244 if let Some(retry_count) = step.retry {
246 let _ = retry_count;
249 }
250 }
251
252 Ok(())
253 }
254
255 async fn execute_step(&self, action: &InitAction, _step: &zlayer_spec::InitStep) -> Result<()> {
256 match action {
257 InitAction::WaitTcp(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
258 id: self.id.to_string(),
259 reason: e.to_string(),
260 }),
261 InitAction::WaitHttp(a) => {
262 a.execute().await.map_err(|e| AgentError::InitActionFailed {
263 id: self.id.to_string(),
264 reason: e.to_string(),
265 })
266 }
267 InitAction::Run(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
268 id: self.id.to_string(),
269 reason: e.to_string(),
270 }),
271 #[cfg(feature = "s3")]
272 InitAction::S3Push(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
273 id: self.id.to_string(),
274 reason: e.to_string(),
275 }),
276 #[cfg(feature = "s3")]
277 InitAction::S3Pull(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
278 id: self.id.to_string(),
279 reason: e.to_string(),
280 }),
281 }
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 fn test_container_id() -> ContainerId {
290 ContainerId::new("test".to_string(), 1)
291 }
292
293 #[tokio::test]
294 async fn test_init_orchestrator_success() {
295 let id = test_container_id();
296 let spec = InitSpec { steps: vec![] };
297
298 let orchestrator = InitOrchestrator::new(id, spec);
299 orchestrator.run().await.unwrap();
300 }
301
302 #[tokio::test]
303 async fn test_init_orchestrator_with_error_policy() {
304 let id = test_container_id();
305 let spec = InitSpec { steps: vec![] };
306 let error_policy = ErrorsSpec::default();
307
308 let orchestrator = InitOrchestrator::with_error_policy(id, spec, error_policy);
309 orchestrator.run().await.unwrap();
310 }
311
312 #[test]
313 fn test_backoff_config_default() {
314 let config = BackoffConfig::default();
315 assert_eq!(config.initial_delay, Duration::from_secs(1));
316 assert_eq!(config.max_delay, Duration::from_secs(60));
317 assert!(
318 (config.multiplier - 2.0).abs() < f64::EPSILON,
319 "multiplier should be 2.0"
320 );
321 }
322
323 #[test]
324 fn test_backoff_delay_calculation() {
325 let config = BackoffConfig::default();
326
327 assert_eq!(config.delay_for_attempt(0), Duration::from_secs(1));
329 assert_eq!(config.delay_for_attempt(1), Duration::from_secs(2));
331 assert_eq!(config.delay_for_attempt(2), Duration::from_secs(4));
333 assert_eq!(config.delay_for_attempt(3), Duration::from_secs(8));
335 assert_eq!(config.delay_for_attempt(4), Duration::from_secs(16));
337 assert_eq!(config.delay_for_attempt(5), Duration::from_secs(32));
339 assert_eq!(config.delay_for_attempt(6), Duration::from_secs(60));
341 assert_eq!(config.delay_for_attempt(7), Duration::from_secs(60));
343 }
344
345 #[test]
346 fn test_backoff_custom_config() {
347 let config = BackoffConfig {
348 initial_delay: Duration::from_millis(100),
349 max_delay: Duration::from_secs(10),
350 multiplier: 3.0,
351 };
352
353 assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
355 assert_eq!(config.delay_for_attempt(1), Duration::from_millis(300));
357 assert_eq!(config.delay_for_attempt(2), Duration::from_millis(900));
359 assert_eq!(config.delay_for_attempt(3), Duration::from_millis(2700));
361 assert_eq!(config.delay_for_attempt(4), Duration::from_millis(8100));
363 assert_eq!(config.delay_for_attempt(5), Duration::from_secs(10));
365 }
366
367 #[test]
368 fn test_orchestrator_builder_pattern() {
369 let id = test_container_id();
370 let spec = InitSpec { steps: vec![] };
371
372 let orchestrator = InitOrchestrator::new(id.clone(), spec.clone())
373 .with_max_retries(5)
374 .with_backoff_config(BackoffConfig {
375 initial_delay: Duration::from_millis(500),
376 max_delay: Duration::from_secs(30),
377 multiplier: 1.5,
378 });
379
380 assert_eq!(orchestrator.max_retries, 5);
381 assert_eq!(
382 orchestrator.backoff_config.initial_delay,
383 Duration::from_millis(500)
384 );
385 }
386}