Skip to main content

zlayer_agent/
init.rs

1//! Init action orchestration
2
3use crate::error::{AgentError, Result};
4use crate::runtime::ContainerId;
5use std::time::Duration;
6use zlayer_init_actions::InitAction;
7use zlayer_spec::{ErrorsSpec, InitFailureAction, InitSpec};
8
9/// Default max retry attempts for init failure policies
10const DEFAULT_MAX_INIT_RETRIES: u32 = 3;
11
12/// Backoff configuration for exponential backoff
13#[derive(Debug, Clone)]
14pub struct BackoffConfig {
15    /// Initial delay before first retry
16    pub initial_delay: Duration,
17    /// Maximum delay between retries
18    pub max_delay: Duration,
19    /// Multiplier for each subsequent retry
20    pub multiplier: f64,
21}
22
23impl Default for BackoffConfig {
24    fn default() -> Self {
25        Self {
26            initial_delay: Duration::from_secs(1),
27            max_delay: Duration::from_secs(60),
28            multiplier: 2.0,
29        }
30    }
31}
32
33impl BackoffConfig {
34    /// Calculate the delay for a given retry attempt (0-indexed)
35    #[must_use]
36    #[allow(clippy::cast_possible_wrap)]
37    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
38        let delay_secs = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32);
39        let capped_secs = delay_secs.min(self.max_delay.as_secs_f64());
40        Duration::from_secs_f64(capped_secs)
41    }
42}
43
44/// Orchestrates init actions for a container
45pub struct InitOrchestrator {
46    id: ContainerId,
47    spec: InitSpec,
48    /// Error handling policy from the service spec
49    error_policy: ErrorsSpec,
50    /// Maximum retry attempts for init failure
51    max_retries: u32,
52    /// Backoff configuration for exponential backoff
53    backoff_config: BackoffConfig,
54}
55
56impl InitOrchestrator {
57    /// Create a new init orchestrator
58    #[must_use]
59    pub fn new(id: ContainerId, spec: InitSpec) -> Self {
60        Self {
61            id,
62            spec,
63            error_policy: ErrorsSpec::default(),
64            max_retries: DEFAULT_MAX_INIT_RETRIES,
65            backoff_config: BackoffConfig::default(),
66        }
67    }
68
69    /// Create a new init orchestrator with error policy
70    #[must_use]
71    pub fn with_error_policy(id: ContainerId, spec: InitSpec, error_policy: ErrorsSpec) -> Self {
72        Self {
73            id,
74            spec,
75            error_policy,
76            max_retries: DEFAULT_MAX_INIT_RETRIES,
77            backoff_config: BackoffConfig::default(),
78        }
79    }
80
81    /// Set the maximum retry attempts
82    #[must_use]
83    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
84        self.max_retries = max_retries;
85        self
86    }
87
88    /// Set the backoff configuration
89    #[must_use]
90    pub fn with_backoff_config(mut self, config: BackoffConfig) -> Self {
91        self.backoff_config = config;
92        self
93    }
94
95    /// Run all init steps with error policy enforcement
96    ///
97    /// The behavior on init failure depends on the `errors.on_init_failure` policy:
98    /// - `Fail` (default): Stop container creation, return error immediately
99    /// - `Restart`: Retry init steps (up to `max_retries`) with immediate retry
100    /// - `Backoff`: Retry with exponential backoff (1s, 2s, 4s, 8s, max 60s)
101    ///
102    /// # Errors
103    /// Returns an error if init steps fail and the error policy is `Fail` or all retries are exhausted.
104    pub async fn run(&self) -> Result<()> {
105        match self.error_policy.on_init_failure.action {
106            InitFailureAction::Fail => {
107                // Fail immediately on any error
108                self.run_init_steps().await
109            }
110            InitFailureAction::Restart => {
111                // Retry immediately on failure
112                self.run_with_retries(false).await
113            }
114            InitFailureAction::Backoff => {
115                // Retry with exponential backoff
116                self.run_with_retries(true).await
117            }
118        }
119    }
120
121    /// Run init steps with retry logic
122    async fn run_with_retries(&self, use_backoff: bool) -> Result<()> {
123        let mut last_error = None;
124
125        for attempt in 0..=self.max_retries {
126            if attempt > 0 {
127                let delay = if use_backoff {
128                    self.backoff_config.delay_for_attempt(attempt - 1)
129                } else {
130                    Duration::from_millis(100) // Small delay for immediate retry
131                };
132
133                tracing::info!(
134                    container = %self.id,
135                    attempt = attempt,
136                    max_retries = self.max_retries,
137                    delay_ms = delay.as_millis(),
138                    "Retrying init steps after failure"
139                );
140
141                tokio::time::sleep(delay).await;
142            }
143
144            match self.run_init_steps().await {
145                Ok(()) => {
146                    if attempt > 0 {
147                        tracing::info!(
148                            container = %self.id,
149                            attempt = attempt,
150                            "Init steps succeeded after retry"
151                        );
152                    }
153                    return Ok(());
154                }
155                Err(e) => {
156                    tracing::warn!(
157                        container = %self.id,
158                        attempt = attempt,
159                        max_retries = self.max_retries,
160                        error = %e,
161                        "Init steps failed"
162                    );
163                    last_error = Some(e);
164                }
165            }
166        }
167
168        // All retries exhausted
169        Err(last_error.unwrap_or_else(|| AgentError::InitActionFailed {
170            id: self.id.to_string(),
171            reason: "Init failed after all retries".to_string(),
172        }))
173    }
174
175    /// Run all init steps once (no retry logic)
176    async fn run_init_steps(&self) -> Result<()> {
177        let _start_grace = std::time::Instant::now();
178
179        for step in &self.spec.steps {
180            let _step_start = std::time::Instant::now();
181
182            // Parse action
183            let action = zlayer_init_actions::from_spec(
184                &step.uses,
185                &step.with,
186                Duration::from_secs(30), // default
187            )
188            .map_err(|e| AgentError::InitActionFailed {
189                id: self.id.to_string(),
190                reason: e.to_string(),
191            })?;
192
193            // Execute with timeout
194            let timeout = step.timeout.unwrap_or(Duration::from_secs(300));
195            let result = tokio::time::timeout(timeout, self.execute_step(&action, step)).await;
196
197            match result {
198                Ok(Ok(())) => {
199                    // Step succeeded
200                }
201                Ok(Err(e)) => {
202                    // Action failed
203                    return match step.on_failure {
204                        zlayer_spec::FailureAction::Fail => Err(AgentError::InitActionFailed {
205                            id: self.id.to_string(),
206                            reason: format!("step '{}' failed: {}", step.id, e),
207                        }),
208                        zlayer_spec::FailureAction::Warn => {
209                            tracing::warn!(
210                                container = %self.id,
211                                step = %step.id,
212                                error = %e,
213                                "Init step failed (continuing due to warn policy)"
214                            );
215                            continue; // Continue to next step
216                        }
217                        zlayer_spec::FailureAction::Continue => {
218                            // Continue to next step silently
219                            continue;
220                        }
221                    };
222                }
223                Err(_) => {
224                    // Timeout
225                    return match step.on_failure {
226                        zlayer_spec::FailureAction::Fail => Err(AgentError::Timeout { timeout }),
227                        zlayer_spec::FailureAction::Warn => {
228                            tracing::warn!(
229                                container = %self.id,
230                                step = %step.id,
231                                timeout_secs = timeout.as_secs(),
232                                "Init step timed out (continuing due to warn policy)"
233                            );
234                            continue; // Continue to next step
235                        }
236                        zlayer_spec::FailureAction::Continue => {
237                            // Continue to next step silently
238                            continue;
239                        }
240                    };
241                }
242            }
243
244            // Handle retries if specified
245            if let Some(retry_count) = step.retry {
246                // For now, retries are handled by the action itself
247                // A more sophisticated implementation would retry the entire step
248                let _ = retry_count;
249            }
250        }
251
252        Ok(())
253    }
254
255    async fn execute_step(&self, action: &InitAction, _step: &zlayer_spec::InitStep) -> Result<()> {
256        match action {
257            InitAction::WaitTcp(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
258                id: self.id.to_string(),
259                reason: e.to_string(),
260            }),
261            InitAction::WaitHttp(a) => {
262                a.execute().await.map_err(|e| AgentError::InitActionFailed {
263                    id: self.id.to_string(),
264                    reason: e.to_string(),
265                })
266            }
267            InitAction::Run(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
268                id: self.id.to_string(),
269                reason: e.to_string(),
270            }),
271            #[cfg(feature = "s3")]
272            InitAction::S3Push(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
273                id: self.id.to_string(),
274                reason: e.to_string(),
275            }),
276            #[cfg(feature = "s3")]
277            InitAction::S3Pull(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
278                id: self.id.to_string(),
279                reason: e.to_string(),
280            }),
281        }
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    fn test_container_id() -> ContainerId {
290        ContainerId {
291            service: "test".to_string(),
292            replica: 1,
293        }
294    }
295
296    #[tokio::test]
297    async fn test_init_orchestrator_success() {
298        let id = test_container_id();
299        let spec = InitSpec { steps: vec![] };
300
301        let orchestrator = InitOrchestrator::new(id, spec);
302        orchestrator.run().await.unwrap();
303    }
304
305    #[tokio::test]
306    async fn test_init_orchestrator_with_error_policy() {
307        let id = test_container_id();
308        let spec = InitSpec { steps: vec![] };
309        let error_policy = ErrorsSpec::default();
310
311        let orchestrator = InitOrchestrator::with_error_policy(id, spec, error_policy);
312        orchestrator.run().await.unwrap();
313    }
314
315    #[test]
316    fn test_backoff_config_default() {
317        let config = BackoffConfig::default();
318        assert_eq!(config.initial_delay, Duration::from_secs(1));
319        assert_eq!(config.max_delay, Duration::from_secs(60));
320        assert!(
321            (config.multiplier - 2.0).abs() < f64::EPSILON,
322            "multiplier should be 2.0"
323        );
324    }
325
326    #[test]
327    fn test_backoff_delay_calculation() {
328        let config = BackoffConfig::default();
329
330        // Attempt 0: 1s
331        assert_eq!(config.delay_for_attempt(0), Duration::from_secs(1));
332        // Attempt 1: 2s
333        assert_eq!(config.delay_for_attempt(1), Duration::from_secs(2));
334        // Attempt 2: 4s
335        assert_eq!(config.delay_for_attempt(2), Duration::from_secs(4));
336        // Attempt 3: 8s
337        assert_eq!(config.delay_for_attempt(3), Duration::from_secs(8));
338        // Attempt 4: 16s
339        assert_eq!(config.delay_for_attempt(4), Duration::from_secs(16));
340        // Attempt 5: 32s
341        assert_eq!(config.delay_for_attempt(5), Duration::from_secs(32));
342        // Attempt 6: would be 64s, but capped at 60s
343        assert_eq!(config.delay_for_attempt(6), Duration::from_secs(60));
344        // Attempt 7: still capped at 60s
345        assert_eq!(config.delay_for_attempt(7), Duration::from_secs(60));
346    }
347
348    #[test]
349    fn test_backoff_custom_config() {
350        let config = BackoffConfig {
351            initial_delay: Duration::from_millis(100),
352            max_delay: Duration::from_secs(10),
353            multiplier: 3.0,
354        };
355
356        // Attempt 0: 100ms
357        assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
358        // Attempt 1: 300ms
359        assert_eq!(config.delay_for_attempt(1), Duration::from_millis(300));
360        // Attempt 2: 900ms
361        assert_eq!(config.delay_for_attempt(2), Duration::from_millis(900));
362        // Attempt 3: 2700ms
363        assert_eq!(config.delay_for_attempt(3), Duration::from_millis(2700));
364        // Attempt 4: 8100ms
365        assert_eq!(config.delay_for_attempt(4), Duration::from_millis(8100));
366        // Attempt 5: would be 24300ms, but capped at 10s
367        assert_eq!(config.delay_for_attempt(5), Duration::from_secs(10));
368    }
369
370    #[test]
371    fn test_orchestrator_builder_pattern() {
372        let id = test_container_id();
373        let spec = InitSpec { steps: vec![] };
374
375        let orchestrator = InitOrchestrator::new(id.clone(), spec.clone())
376            .with_max_retries(5)
377            .with_backoff_config(BackoffConfig {
378                initial_delay: Duration::from_millis(500),
379                max_delay: Duration::from_secs(30),
380                multiplier: 1.5,
381            });
382
383        assert_eq!(orchestrator.max_retries, 5);
384        assert_eq!(
385            orchestrator.backoff_config.initial_delay,
386            Duration::from_millis(500)
387        );
388    }
389}