Skip to main content

lightshuttle_runtime/lifecycle/
manager.rs

1//! Coordinated startup and shutdown of every resource declared in a
2//! [`crate::LifecyclePlan`].
3
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use lightshuttle_manifest::{InterpolationContext, Interpolator};
9use tokio::sync::{mpsc, watch};
10use tracing::{debug, info, warn};
11
12use crate::error::RuntimeError;
13use crate::lifecycle::error::LifecycleError;
14use crate::lifecycle::plan::LifecyclePlan;
15use crate::lifecycle::status::{LifecycleEvent, NodeStatus};
16use crate::runtime::{ContainerId, ContainerRuntime};
17use crate::spec::{ContainerSpec, ResourceOutputs};
18
19/// Default healthcheck timeout, applied when the manifest does not
20/// provide one of its own. Kept conservative for v0.1.
21const DEFAULT_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(60);
22
23/// Per-resource shared state.
24#[derive(Clone)]
25struct NodeHandle {
26    status_tx: Arc<watch::Sender<NodeStatus>>,
27    status_rx: watch::Receiver<NodeStatus>,
28    outputs_tx: Arc<watch::Sender<Option<ResourceOutputs>>>,
29    outputs_rx: watch::Receiver<Option<ResourceOutputs>>,
30    container_id: Arc<Mutex<Option<ContainerId>>>,
31}
32
33/// Coordinates the startup, supervision and shutdown of every resource
34/// declared in a [`LifecyclePlan`].
35pub struct LifecycleManager<R: ContainerRuntime + 'static> {
36    plan: Arc<LifecyclePlan>,
37    runtime: Arc<R>,
38    nodes: HashMap<String, NodeHandle>,
39    event_tx: mpsc::UnboundedSender<LifecycleEvent>,
40}
41
42impl<R: ContainerRuntime + 'static> LifecycleManager<R> {
43    /// Build a manager bound to `plan` and `runtime`. Returns the event
44    /// stream receiver alongside.
45    #[must_use]
46    pub fn new(plan: LifecyclePlan, runtime: R) -> (Self, mpsc::UnboundedReceiver<LifecycleEvent>) {
47        let (event_tx, event_rx) = mpsc::unbounded_channel();
48        let mut nodes: HashMap<String, NodeHandle> = HashMap::new();
49        for node in plan.nodes() {
50            let (status_tx, status_rx) = watch::channel(NodeStatus::Pending);
51            let (outputs_tx, outputs_rx) = watch::channel(None);
52            nodes.insert(
53                node.name.clone(),
54                NodeHandle {
55                    status_tx: Arc::new(status_tx),
56                    status_rx,
57                    outputs_tx: Arc::new(outputs_tx),
58                    outputs_rx,
59                    container_id: Arc::new(Mutex::new(None)),
60                },
61            );
62        }
63        let manager = Self {
64            plan: Arc::new(plan),
65            runtime: Arc::new(runtime),
66            nodes,
67            event_tx,
68        };
69        (manager, event_rx)
70    }
71
72    /// Start every resource in topological order. Independent branches
73    /// start in parallel. On the first failure, the resources that
74    /// already started are stopped automatically before the error is
75    /// returned.
76    pub async fn start_all(&self) -> Result<(), LifecycleError> {
77        let mut handles: Vec<tokio::task::JoinHandle<Result<(), LifecycleError>>> =
78            Vec::with_capacity(self.plan.nodes().len());
79
80        for node in self.plan.nodes() {
81            let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
82            let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
83                HashMap::new();
84            for dep in &node.depends_on {
85                let handle = self
86                    .nodes
87                    .get(dep)
88                    .ok_or_else(|| LifecycleError::UnknownResource(dep.clone()))?;
89                dep_status_rxs.insert(dep.clone(), handle.status_rx.clone());
90                dep_outputs_rxs.insert(dep.clone(), handle.outputs_rx.clone());
91            }
92
93            let node_handle = self.nodes[&node.name].clone();
94            let spec = node.spec.clone();
95            let own_outputs = node.outputs.clone();
96            let name = node.name.clone();
97            let runtime = Arc::clone(&self.runtime);
98            let event_tx = self.event_tx.clone();
99
100            let task = tokio::spawn(async move {
101                start_one(
102                    name,
103                    spec,
104                    own_outputs,
105                    runtime,
106                    node_handle,
107                    dep_status_rxs,
108                    dep_outputs_rxs,
109                    event_tx,
110                )
111                .await
112            });
113            handles.push(task);
114        }
115
116        let mut first_error: Option<LifecycleError> = None;
117        for handle in handles {
118            match handle.await {
119                Ok(Ok(())) => {}
120                Ok(Err(err)) => {
121                    if first_error.is_none() {
122                        first_error = Some(err);
123                    }
124                }
125                Err(join_err) => {
126                    if first_error.is_none() {
127                        first_error = Some(LifecycleError::Start {
128                            resource: "<panicked task>".to_owned(),
129                            source: RuntimeError::InvalidSpec(join_err.to_string()),
130                        });
131                    }
132                }
133            }
134        }
135
136        if let Some(err) = first_error {
137            warn!(error = %err, "start_all failed; rolling back");
138            let _ = self.stop_all(Duration::from_secs(10)).await;
139            return Err(err);
140        }
141
142        let _ = self.event_tx.send(LifecycleEvent::StackStarted);
143        info!(
144            "stack started: {} resource(s) healthy",
145            self.plan.nodes().len()
146        );
147        Ok(())
148    }
149
150    /// Stop every resource in reverse topological order with the given
151    /// SIGTERM-to-SIGKILL grace window.
152    pub async fn stop_all(&self, grace: Duration) -> Result<(), LifecycleError> {
153        let _ = self.event_tx.send(LifecycleEvent::StackStopping);
154
155        let mut errors: Vec<(String, RuntimeError)> = Vec::new();
156        for node in self.plan.nodes().iter().rev() {
157            let Some(handle) = self.nodes.get(&node.name) else {
158                continue;
159            };
160            let id = {
161                let guard = handle
162                    .container_id
163                    .lock()
164                    .expect("container_id mutex poisoned");
165                guard.clone()
166            };
167            let Some(id) = id else { continue };
168            match self.runtime.stop(&id, grace).await {
169                Ok(()) => {
170                    let _ = handle.status_tx.send(NodeStatus::Stopped);
171                    let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
172                        name: node.name.clone(),
173                    });
174                }
175                Err(e) => errors.push((node.name.clone(), e)),
176            }
177        }
178
179        let _ = self.event_tx.send(LifecycleEvent::StackStopped);
180
181        if let Some((resource, source)) = errors.into_iter().next() {
182            return Err(LifecycleError::Stop { resource, source });
183        }
184        Ok(())
185    }
186
187    /// Opinionated entry point used by `lightshuttle up`: starts the
188    /// stack, waits for `SIGINT` or `SIGTERM`, then stops the stack
189    /// cleanly with the configured grace window.
190    pub async fn run_until_signal(&self, grace: Duration) -> Result<(), LifecycleError> {
191        self.start_all().await?;
192        wait_for_shutdown_signal().await;
193        self.stop_all(grace).await
194    }
195}
196
197#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
198async fn start_one<R: ContainerRuntime + 'static>(
199    name: String,
200    spec: ContainerSpec,
201    own_outputs: ResourceOutputs,
202    runtime: Arc<R>,
203    handle: NodeHandle,
204    dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>>,
205    mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>>,
206    event_tx: mpsc::UnboundedSender<LifecycleEvent>,
207) -> Result<(), LifecycleError> {
208    // 1. Wait for every dependency to become ready.
209    for (dep_name, mut rx) in dep_status_rxs {
210        loop {
211            let status = rx.borrow_and_update().clone();
212            if status.is_ready() {
213                debug!(node = %name, dep = %dep_name, "dependency ready");
214                break;
215            }
216            if let NodeStatus::Failed { reason } = status {
217                let _ = handle.status_tx.send(NodeStatus::Failed {
218                    reason: format!("dependency `{dep_name}` failed: {reason}"),
219                });
220                return Err(LifecycleError::DependencyFailed {
221                    resource: name,
222                    dependency: dep_name,
223                    reason,
224                });
225            }
226            if rx.changed().await.is_err() {
227                let reason = format!("dependency `{dep_name}` watch channel closed");
228                let _ = handle.status_tx.send(NodeStatus::Failed {
229                    reason: reason.clone(),
230                });
231                return Err(LifecycleError::DependencyFailed {
232                    resource: name,
233                    dependency: dep_name,
234                    reason,
235                });
236            }
237        }
238    }
239
240    // 2. Collect dependency outputs.
241    let mut dep_outputs: HashMap<String, ResourceOutputs> = HashMap::new();
242    for (dep_name, rx) in &mut dep_outputs_rxs {
243        loop {
244            if let Some(out) = rx.borrow_and_update().clone() {
245                dep_outputs.insert(dep_name.clone(), out);
246                break;
247            }
248            if rx.changed().await.is_err() {
249                let reason = format!("dependency `{dep_name}` outputs channel closed");
250                let _ = handle.status_tx.send(NodeStatus::Failed {
251                    reason: reason.clone(),
252                });
253                return Err(LifecycleError::DependencyFailed {
254                    resource: name,
255                    dependency: dep_name.clone(),
256                    reason,
257                });
258            }
259        }
260    }
261
262    // 3. Resolve interpolations and inject LSH_<DEP>_<PROP> env vars.
263    let resolved_spec = match interpolate_and_inject(spec, &dep_outputs) {
264        Ok(s) => s,
265        Err(reason) => {
266            let _ = handle.status_tx.send(NodeStatus::Failed {
267                reason: reason.clone(),
268            });
269            return Err(LifecycleError::Start {
270                resource: name,
271                source: RuntimeError::InvalidSpec(reason),
272            });
273        }
274    };
275
276    // 4. Start the container.
277    let _ = handle.status_tx.send(NodeStatus::Starting);
278    let id = match runtime.start(&resolved_spec).await {
279        Ok(id) => id,
280        Err(source) => {
281            let _ = handle.status_tx.send(NodeStatus::Failed {
282                reason: source.to_string(),
283            });
284            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
285                name: name.clone(),
286                error: source.to_string(),
287            });
288            return Err(LifecycleError::Start {
289                resource: name,
290                source,
291            });
292        }
293    };
294
295    {
296        let mut guard = handle
297            .container_id
298            .lock()
299            .expect("container_id mutex poisoned");
300        *guard = Some(id.clone());
301    }
302    let _ = handle.status_tx.send(NodeStatus::Running);
303    let _ = event_tx.send(LifecycleEvent::ResourceStarted {
304        name: name.clone(),
305        container_id: id.to_string(),
306    });
307
308    // 5. Wait for the healthcheck.
309    match runtime.wait_healthy(&id, DEFAULT_HEALTHCHECK_TIMEOUT).await {
310        Ok(()) => {
311            let _ = handle.outputs_tx.send(Some(own_outputs));
312            let _ = handle.status_tx.send(NodeStatus::Healthy);
313            let _ = event_tx.send(LifecycleEvent::ResourceHealthy { name: name.clone() });
314            Ok(())
315        }
316        Err(RuntimeError::Timeout { .. }) => {
317            let reason = format!("healthcheck timed out after {DEFAULT_HEALTHCHECK_TIMEOUT:?}");
318            let _ = handle.status_tx.send(NodeStatus::Failed {
319                reason: reason.clone(),
320            });
321            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
322                name: name.clone(),
323                error: reason,
324            });
325            Err(LifecycleError::HealthcheckTimeout {
326                resource: name,
327                timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
328            })
329        }
330        Err(source) => {
331            let _ = handle.status_tx.send(NodeStatus::Failed {
332                reason: source.to_string(),
333            });
334            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
335                name: name.clone(),
336                error: source.to_string(),
337            });
338            Err(LifecycleError::Start {
339                resource: name,
340                source,
341            })
342        }
343    }
344}
345
346/// Apply two-pass interpolation to `spec`: resolve every
347/// `${resources.<name>.<property>}` against `dep_outputs`, then inject
348/// `LSH_<DEP>_<PROPERTY>` automatic environment variables.
349///
350/// Returns the resolved spec or a human-readable diagnostic when an
351/// interpolation references an unknown resource or property.
352fn interpolate_and_inject(
353    mut spec: ContainerSpec,
354    dep_outputs: &HashMap<String, ResourceOutputs>,
355) -> std::result::Result<ContainerSpec, String> {
356    let mut ctx = InterpolationContext::from_env();
357    for (name, outputs) in dep_outputs {
358        ctx = ctx.with_resource(name.clone(), outputs.clone());
359    }
360    let interpolator = Interpolator::new(&ctx);
361
362    // Resolve env values.
363    let mut resolved_env = std::collections::HashMap::with_capacity(spec.env.len());
364    for (k, v) in spec.env.drain() {
365        let resolved = interpolator.resolve(&v).map_err(|e| e.to_string())?;
366        resolved_env.insert(k, resolved);
367    }
368
369    // Inject LSH_<DEP>_<PROPERTY> variables.
370    for (dep_name, outputs) in dep_outputs {
371        let dep_upper = dep_name.to_uppercase().replace('-', "_");
372        for (prop, value) in outputs {
373            let prop_upper = prop.to_uppercase().replace('-', "_");
374            let key = format!("LSH_{dep_upper}_{prop_upper}");
375            resolved_env.entry(key).or_insert_with(|| value.clone());
376        }
377    }
378    spec.env = resolved_env;
379
380    // Resolve command arguments.
381    if let Some(args) = spec.command.as_mut() {
382        for arg in args.iter_mut() {
383            *arg = interpolator.resolve(arg).map_err(|e| e.to_string())?;
384        }
385    }
386
387    Ok(spec)
388}
389
390#[cfg(unix)]
391async fn wait_for_shutdown_signal() {
392    use tokio::signal::unix::{SignalKind, signal};
393    let mut sigterm = match signal(SignalKind::terminate()) {
394        Ok(s) => s,
395        Err(e) => {
396            warn!("failed to install SIGTERM handler: {e}");
397            let _ = tokio::signal::ctrl_c().await;
398            return;
399        }
400    };
401    tokio::select! {
402        _ = tokio::signal::ctrl_c() => info!("received SIGINT"),
403        _ = sigterm.recv() => info!("received SIGTERM"),
404    }
405}
406
407#[cfg(windows)]
408async fn wait_for_shutdown_signal() {
409    let _ = tokio::signal::ctrl_c().await;
410    info!("received Ctrl+C");
411}