Skip to main content

zlayer_agent/
init.rs

1//! Init action orchestration
2
3use crate::error::{AgentError, Result};
4use crate::runtime::ContainerId;
5use std::time::Duration;
6use zlayer_init_actions::InitAction;
7use zlayer_spec::{ErrorsSpec, InitFailureAction, InitSpec};
8
9/// Default max retry attempts for init failure policies
10const DEFAULT_MAX_INIT_RETRIES: u32 = 3;
11
12/// Backoff configuration for exponential backoff
13#[derive(Debug, Clone)]
14pub struct BackoffConfig {
15    /// Initial delay before first retry
16    pub initial_delay: Duration,
17    /// Maximum delay between retries
18    pub max_delay: Duration,
19    /// Multiplier for each subsequent retry
20    pub multiplier: f64,
21}
22
23impl Default for BackoffConfig {
24    fn default() -> Self {
25        Self {
26            initial_delay: Duration::from_secs(1),
27            max_delay: Duration::from_secs(60),
28            multiplier: 2.0,
29        }
30    }
31}
32
33impl BackoffConfig {
34    /// Calculate the delay for a given retry attempt (0-indexed)
35    #[must_use]
36    #[allow(clippy::cast_possible_wrap)]
37    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
38        let delay_secs = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32);
39        let capped_secs = delay_secs.min(self.max_delay.as_secs_f64());
40        Duration::from_secs_f64(capped_secs)
41    }
42}
43
44/// Orchestrates init actions for a container
45pub struct InitOrchestrator {
46    id: ContainerId,
47    spec: InitSpec,
48    /// Error handling policy from the service spec
49    error_policy: ErrorsSpec,
50    /// Maximum retry attempts for init failure
51    max_retries: u32,
52    /// Backoff configuration for exponential backoff
53    backoff_config: BackoffConfig,
54}
55
56impl InitOrchestrator {
57    /// Create a new init orchestrator
58    #[must_use]
59    pub fn new(id: ContainerId, spec: InitSpec) -> Self {
60        Self {
61            id,
62            spec,
63            error_policy: ErrorsSpec::default(),
64            max_retries: DEFAULT_MAX_INIT_RETRIES,
65            backoff_config: BackoffConfig::default(),
66        }
67    }
68
69    /// Create a new init orchestrator with error policy
70    #[must_use]
71    pub fn with_error_policy(id: ContainerId, spec: InitSpec, error_policy: ErrorsSpec) -> Self {
72        Self {
73            id,
74            spec,
75            error_policy,
76            max_retries: DEFAULT_MAX_INIT_RETRIES,
77            backoff_config: BackoffConfig::default(),
78        }
79    }
80
81    /// Set the maximum retry attempts
82    #[must_use]
83    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
84        self.max_retries = max_retries;
85        self
86    }
87
88    /// Set the backoff configuration
89    #[must_use]
90    pub fn with_backoff_config(mut self, config: BackoffConfig) -> Self {
91        self.backoff_config = config;
92        self
93    }
94
95    /// Run all init steps with error policy enforcement
96    ///
97    /// The behavior on init failure depends on the `errors.on_init_failure` policy:
98    /// - `Fail` (default): Stop container creation, return error immediately
99    /// - `Restart`: Retry init steps (up to `max_retries`) with immediate retry
100    /// - `Backoff`: Retry with exponential backoff (1s, 2s, 4s, 8s, max 60s)
101    ///
102    /// # Errors
103    /// Returns an error if init steps fail and the error policy is `Fail` or all retries are exhausted.
104    pub async fn run(&self) -> Result<()> {
105        match self.error_policy.on_init_failure.action {
106            InitFailureAction::Fail => {
107                // Fail immediately on any error
108                self.run_init_steps().await
109            }
110            InitFailureAction::Restart => {
111                // Retry immediately on failure
112                self.run_with_retries(false).await
113            }
114            InitFailureAction::Backoff => {
115                // Retry with exponential backoff
116                self.run_with_retries(true).await
117            }
118        }
119    }
120
121    /// Run init steps with retry logic
122    async fn run_with_retries(&self, use_backoff: bool) -> Result<()> {
123        let mut last_error = None;
124
125        for attempt in 0..=self.max_retries {
126            if attempt > 0 {
127                let delay = if use_backoff {
128                    self.backoff_config.delay_for_attempt(attempt - 1)
129                } else {
130                    Duration::from_millis(100) // Small delay for immediate retry
131                };
132
133                tracing::info!(
134                    container = %self.id,
135                    attempt = attempt,
136                    max_retries = self.max_retries,
137                    delay_ms = delay.as_millis(),
138                    "Retrying init steps after failure"
139                );
140
141                tokio::time::sleep(delay).await;
142            }
143
144            match self.run_init_steps().await {
145                Ok(()) => {
146                    if attempt > 0 {
147                        tracing::info!(
148                            container = %self.id,
149                            attempt = attempt,
150                            "Init steps succeeded after retry"
151                        );
152                    }
153                    return Ok(());
154                }
155                Err(e) => {
156                    tracing::warn!(
157                        container = %self.id,
158                        attempt = attempt,
159                        max_retries = self.max_retries,
160                        error = %e,
161                        "Init steps failed"
162                    );
163                    last_error = Some(e);
164                }
165            }
166        }
167
168        // All retries exhausted
169        Err(last_error.unwrap_or_else(|| AgentError::InitActionFailed {
170            id: self.id.to_string(),
171            reason: "Init failed after all retries".to_string(),
172        }))
173    }
174
175    /// Run all init steps once (no retry logic)
176    async fn run_init_steps(&self) -> Result<()> {
177        let _start_grace = std::time::Instant::now();
178
179        for step in &self.spec.steps {
180            let _step_start = std::time::Instant::now();
181
182            // Parse action
183            let action = zlayer_init_actions::from_spec(
184                &step.uses,
185                &step.with,
186                Duration::from_secs(30), // default
187            )
188            .map_err(|e| AgentError::InitActionFailed {
189                id: self.id.to_string(),
190                reason: e.to_string(),
191            })?;
192
193            // Execute with timeout
194            let timeout = step.timeout.unwrap_or(Duration::from_secs(300));
195            let result = tokio::time::timeout(timeout, self.execute_step(&action, step)).await;
196
197            match result {
198                Ok(Ok(())) => {
199                    // Step succeeded
200                }
201                Ok(Err(e)) => {
202                    // Action failed
203                    return match step.on_failure {
204                        zlayer_spec::FailureAction::Fail => Err(AgentError::InitActionFailed {
205                            id: self.id.to_string(),
206                            reason: format!("step '{}' failed: {}", step.id, e),
207                        }),
208                        zlayer_spec::FailureAction::Warn => {
209                            tracing::warn!(
210                                container = %self.id,
211                                step = %step.id,
212                                error = %e,
213                                "Init step failed (continuing due to warn policy)"
214                            );
215                            continue; // Continue to next step
216                        }
217                        zlayer_spec::FailureAction::Continue => {
218                            // Continue to next step silently
219                            continue;
220                        }
221                    };
222                }
223                Err(_) => {
224                    // Timeout
225                    return match step.on_failure {
226                        zlayer_spec::FailureAction::Fail => Err(AgentError::Timeout { timeout }),
227                        zlayer_spec::FailureAction::Warn => {
228                            tracing::warn!(
229                                container = %self.id,
230                                step = %step.id,
231                                timeout_secs = timeout.as_secs(),
232                                "Init step timed out (continuing due to warn policy)"
233                            );
234                            continue; // Continue to next step
235                        }
236                        zlayer_spec::FailureAction::Continue => {
237                            // Continue to next step silently
238                            continue;
239                        }
240                    };
241                }
242            }
243
244            // Handle retries if specified
245            if let Some(retry_count) = step.retry {
246                // For now, retries are handled by the action itself
247                // A more sophisticated implementation would retry the entire step
248                let _ = retry_count;
249            }
250        }
251
252        Ok(())
253    }
254
255    async fn execute_step(&self, action: &InitAction, _step: &zlayer_spec::InitStep) -> Result<()> {
256        match action {
257            InitAction::WaitTcp(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
258                id: self.id.to_string(),
259                reason: e.to_string(),
260            }),
261            InitAction::WaitHttp(a) => {
262                a.execute().await.map_err(|e| AgentError::InitActionFailed {
263                    id: self.id.to_string(),
264                    reason: e.to_string(),
265                })
266            }
267            InitAction::Run(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
268                id: self.id.to_string(),
269                reason: e.to_string(),
270            }),
271            #[cfg(feature = "s3")]
272            InitAction::S3Push(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
273                id: self.id.to_string(),
274                reason: e.to_string(),
275            }),
276            #[cfg(feature = "s3")]
277            InitAction::S3Pull(a) => a.execute().await.map_err(|e| AgentError::InitActionFailed {
278                id: self.id.to_string(),
279                reason: e.to_string(),
280            }),
281        }
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    fn test_container_id() -> ContainerId {
290        ContainerId::new("test".to_string(), 1)
291    }
292
293    #[tokio::test]
294    async fn test_init_orchestrator_success() {
295        let id = test_container_id();
296        let spec = InitSpec { steps: vec![] };
297
298        let orchestrator = InitOrchestrator::new(id, spec);
299        orchestrator.run().await.unwrap();
300    }
301
302    #[tokio::test]
303    async fn test_init_orchestrator_with_error_policy() {
304        let id = test_container_id();
305        let spec = InitSpec { steps: vec![] };
306        let error_policy = ErrorsSpec::default();
307
308        let orchestrator = InitOrchestrator::with_error_policy(id, spec, error_policy);
309        orchestrator.run().await.unwrap();
310    }
311
312    #[test]
313    fn test_backoff_config_default() {
314        let config = BackoffConfig::default();
315        assert_eq!(config.initial_delay, Duration::from_secs(1));
316        assert_eq!(config.max_delay, Duration::from_secs(60));
317        assert!(
318            (config.multiplier - 2.0).abs() < f64::EPSILON,
319            "multiplier should be 2.0"
320        );
321    }
322
323    #[test]
324    fn test_backoff_delay_calculation() {
325        let config = BackoffConfig::default();
326
327        // Attempt 0: 1s
328        assert_eq!(config.delay_for_attempt(0), Duration::from_secs(1));
329        // Attempt 1: 2s
330        assert_eq!(config.delay_for_attempt(1), Duration::from_secs(2));
331        // Attempt 2: 4s
332        assert_eq!(config.delay_for_attempt(2), Duration::from_secs(4));
333        // Attempt 3: 8s
334        assert_eq!(config.delay_for_attempt(3), Duration::from_secs(8));
335        // Attempt 4: 16s
336        assert_eq!(config.delay_for_attempt(4), Duration::from_secs(16));
337        // Attempt 5: 32s
338        assert_eq!(config.delay_for_attempt(5), Duration::from_secs(32));
339        // Attempt 6: would be 64s, but capped at 60s
340        assert_eq!(config.delay_for_attempt(6), Duration::from_secs(60));
341        // Attempt 7: still capped at 60s
342        assert_eq!(config.delay_for_attempt(7), Duration::from_secs(60));
343    }
344
345    #[test]
346    fn test_backoff_custom_config() {
347        let config = BackoffConfig {
348            initial_delay: Duration::from_millis(100),
349            max_delay: Duration::from_secs(10),
350            multiplier: 3.0,
351        };
352
353        // Attempt 0: 100ms
354        assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
355        // Attempt 1: 300ms
356        assert_eq!(config.delay_for_attempt(1), Duration::from_millis(300));
357        // Attempt 2: 900ms
358        assert_eq!(config.delay_for_attempt(2), Duration::from_millis(900));
359        // Attempt 3: 2700ms
360        assert_eq!(config.delay_for_attempt(3), Duration::from_millis(2700));
361        // Attempt 4: 8100ms
362        assert_eq!(config.delay_for_attempt(4), Duration::from_millis(8100));
363        // Attempt 5: would be 24300ms, but capped at 10s
364        assert_eq!(config.delay_for_attempt(5), Duration::from_secs(10));
365    }
366
367    #[test]
368    fn test_orchestrator_builder_pattern() {
369        let id = test_container_id();
370        let spec = InitSpec { steps: vec![] };
371
372        let orchestrator = InitOrchestrator::new(id.clone(), spec.clone())
373            .with_max_retries(5)
374            .with_backoff_config(BackoffConfig {
375                initial_delay: Duration::from_millis(500),
376                max_delay: Duration::from_secs(30),
377                multiplier: 1.5,
378            });
379
380        assert_eq!(orchestrator.max_retries, 5);
381        assert_eq!(
382            orchestrator.backoff_config.initial_delay,
383            Duration::from_millis(500)
384        );
385    }
386}