Skip to main content

lightshuttle_runtime/lifecycle/
manager.rs

1//! Coordinated startup and shutdown of every resource declared in a
2//! [`crate::LifecyclePlan`].
3
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime};
7
8use lightshuttle_manifest::{InterpolationContext, Interpolator};
9use tokio::sync::{broadcast, watch};
10use tracing::{Instrument, debug, info, info_span, instrument, warn};
11
12/// Buffer size for the broadcast event channel. Slow subscribers that
13/// fall behind by more than this number of events will see lagged
14/// messages and have to resynchronise.
15const EVENT_CHANNEL_CAPACITY: usize = 256;
16
17use crate::error::RuntimeError;
18use crate::lifecycle::error::LifecycleError;
19use crate::lifecycle::plan::LifecyclePlan;
20use crate::lifecycle::status::{LifecycleEvent, NodeStatus};
21use crate::runtime::{ContainerId, ContainerRuntime};
22use lightshuttle_spec::{ContainerSpec, ResourceOutputs};
23
24/// Default healthcheck timeout, applied when the manifest does not
25/// provide one of its own. Kept conservative for v0.1.
26const DEFAULT_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(60);
27
28/// Per-resource shared state.
29#[derive(Clone)]
30struct NodeHandle {
31    status_tx: Arc<watch::Sender<NodeStatus>>,
32    status_rx: watch::Receiver<NodeStatus>,
33    outputs_tx: Arc<watch::Sender<Option<ResourceOutputs>>>,
34    outputs_rx: watch::Receiver<Option<ResourceOutputs>>,
35    container_id: Arc<Mutex<Option<ContainerId>>>,
36    started_at: Arc<Mutex<Option<SystemTime>>>,
37}
38
39/// Point-in-time snapshot of one managed resource, consumed by the
40/// control plane via [`super::handle::ManagerHandle`].
41pub(super) struct NodeSnapshot {
42    /// Lifecycle status at the moment of the snapshot.
43    pub(super) status: NodeStatus,
44    /// Wall-clock time at which the runtime accepted the start request.
45    pub(super) started_at: Option<SystemTime>,
46    /// Container identifier returned by the runtime, when known.
47    pub(super) container_id: Option<ContainerId>,
48}
49
50/// Coordinates the startup, supervision and shutdown of every resource
51/// declared in a [`LifecyclePlan`].
52pub struct LifecycleManager<R: ContainerRuntime + 'static> {
53    plan: Arc<LifecyclePlan>,
54    runtime: Arc<R>,
55    nodes: HashMap<String, NodeHandle>,
56    event_tx: broadcast::Sender<LifecycleEvent>,
57}
58
59impl<R: ContainerRuntime + 'static> LifecycleManager<R> {
60    /// Build a manager bound to `plan` and `runtime`. Returns a fresh
61    /// event subscriber alongside; further subscribers can be obtained
62    /// from [`Self::subscribe_events`].
63    #[must_use]
64    pub fn new(plan: LifecyclePlan, runtime: R) -> (Self, broadcast::Receiver<LifecycleEvent>) {
65        let (event_tx, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
66        let mut nodes: HashMap<String, NodeHandle> = HashMap::new();
67        for node in plan.nodes() {
68            let (status_tx, status_rx) = watch::channel(NodeStatus::Pending);
69            let (outputs_tx, outputs_rx) = watch::channel(None);
70            nodes.insert(
71                node.name.clone(),
72                NodeHandle {
73                    status_tx: Arc::new(status_tx),
74                    status_rx,
75                    outputs_tx: Arc::new(outputs_tx),
76                    outputs_rx,
77                    container_id: Arc::new(Mutex::new(None)),
78                    started_at: Arc::new(Mutex::new(None)),
79                },
80            );
81        }
82        let manager = Self {
83            plan: Arc::new(plan),
84            runtime: Arc::new(runtime),
85            nodes,
86            event_tx,
87        };
88        (manager, event_rx)
89    }
90
91    /// Start every resource in topological order. Independent branches
92    /// start in parallel. On the first failure, the resources that
93    /// already started are stopped automatically before the error is
94    /// returned.
95    pub async fn start_all(&self) -> Result<(), LifecycleError> {
96        let mut handles: Vec<tokio::task::JoinHandle<Result<(), LifecycleError>>> =
97            Vec::with_capacity(self.plan.nodes().len());
98
99        for node in self.plan.nodes() {
100            let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
101            let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
102                HashMap::new();
103            for dep in &node.depends_on {
104                let handle = self
105                    .nodes
106                    .get(dep)
107                    .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
108                dep_status_rxs.insert(dep.clone(), handle.status_rx.clone());
109                dep_outputs_rxs.insert(dep.clone(), handle.outputs_rx.clone());
110            }
111
112            let node_handle = self.nodes[&node.name].clone();
113            let spec = node.spec.clone();
114            let own_outputs = node.outputs.clone();
115            let name = node.name.clone();
116            let runtime = Arc::clone(&self.runtime);
117            let event_tx = self.event_tx.clone();
118
119            let task = tokio::spawn(async move {
120                start_one(
121                    name,
122                    spec,
123                    own_outputs,
124                    runtime,
125                    node_handle,
126                    dep_status_rxs,
127                    dep_outputs_rxs,
128                    event_tx,
129                )
130                .await
131            });
132            handles.push(task);
133        }
134
135        let mut first_error: Option<LifecycleError> = None;
136        for handle in handles {
137            match handle.await {
138                Ok(Ok(())) => {}
139                Ok(Err(err)) => {
140                    if first_error.is_none() {
141                        first_error = Some(err);
142                    }
143                }
144                Err(join_err) => {
145                    if first_error.is_none() {
146                        first_error = Some(LifecycleError::Start {
147                            resource: "<panicked task>".to_owned(),
148                            source: RuntimeError::InvalidSpec(join_err.to_string()),
149                        });
150                    }
151                }
152            }
153        }
154
155        if let Some(err) = first_error {
156            warn!(error = %err, "start_all failed; rolling back");
157            let _ = self.stop_all(Duration::from_secs(10)).await;
158            return Err(err);
159        }
160
161        let _ = self.event_tx.send(LifecycleEvent::StackStarted);
162        info!(
163            "stack started: {} resource(s) healthy",
164            self.plan.nodes().len()
165        );
166        Ok(())
167    }
168
169    /// Stop every resource in reverse topological order with the given
170    /// SIGTERM-to-SIGKILL grace window.
171    #[instrument(skip_all, fields(resources = self.plan.nodes().len()))]
172    pub async fn stop_all(&self, grace: Duration) -> Result<(), LifecycleError> {
173        let _ = self.event_tx.send(LifecycleEvent::StackStopping);
174
175        let mut errors: Vec<(String, RuntimeError)> = Vec::new();
176        for node in self.plan.nodes().iter().rev() {
177            let Some(handle) = self.nodes.get(&node.name) else {
178                continue;
179            };
180            let id = {
181                let guard = handle
182                    .container_id
183                    .lock()
184                    .expect("container_id mutex poisoned");
185                guard.clone()
186            };
187            let Some(id) = id else { continue };
188            let stop_span = info_span!("stop", resource = %node.name);
189            match self.runtime.stop(&id, grace).instrument(stop_span).await {
190                Ok(()) => {
191                    let _ = handle.status_tx.send(NodeStatus::Stopped);
192                    let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
193                        name: node.name.clone(),
194                    });
195                }
196                Err(e) => errors.push((node.name.clone(), e)),
197            }
198        }
199
200        let _ = self.event_tx.send(LifecycleEvent::StackStopped);
201
202        if let Some((resource, source)) = errors.into_iter().next() {
203            return Err(LifecycleError::Stop { resource, source });
204        }
205        Ok(())
206    }
207
208    /// Opinionated entry point used by `lightshuttle up`: starts the
209    /// stack, waits for `SIGINT` or `SIGTERM`, then stops the stack
210    /// cleanly with the configured grace window.
211    pub async fn run_until_signal(&self, grace: Duration) -> Result<(), LifecycleError> {
212        self.start_all().await?;
213        wait_for_shutdown_signal().await;
214        self.stop_all(grace).await
215    }
216
217    /// Restart a single resource without touching its dependents.
218    ///
219    /// The target is stopped via `SIGTERM` (10-second grace window),
220    /// its container id and started-at timestamp are cleared, then
221    /// `start_one` is re-run from the same cached spec. Three events
222    /// are emitted on the lifecycle channel in order: `ResourceStopped`,
223    /// `ResourceStarted`, `ResourceHealthy`.
224    ///
225    /// Dependents keep running. Their `watch` channels observe the
226    /// target's status going `Stopped` → `Pending` → `Starting` →
227    /// `Running` → `Healthy`, so callers that depend on the target can
228    /// pause their work locally until it is healthy again.
229    #[instrument(skip(self), fields(resource = %resource))]
230    pub async fn restart_one(&self, resource: &str) -> Result<(), LifecycleError> {
231        let node = self
232            .plan
233            .nodes()
234            .iter()
235            .find(|n| n.name == resource)
236            .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
237        let handle = self
238            .nodes
239            .get(resource)
240            .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
241
242        // Stop the running container if any.
243        let id = {
244            let guard = handle
245                .container_id
246                .lock()
247                .expect("container_id mutex poisoned");
248            guard.clone()
249        };
250        if let Some(id) = id {
251            self.runtime
252                .stop(&id, Duration::from_secs(10))
253                .await
254                .map_err(|source| LifecycleError::Stop {
255                    resource: resource.to_owned(),
256                    source,
257                })?;
258            *handle
259                .container_id
260                .lock()
261                .expect("container_id mutex poisoned") = None;
262            *handle.started_at.lock().expect("started_at mutex poisoned") = None;
263            let _ = handle.status_tx.send(NodeStatus::Stopped);
264            let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
265                name: resource.to_owned(),
266            });
267        }
268
269        // Reset to Pending so start_one drives the full restart cycle.
270        let _ = handle.status_tx.send(NodeStatus::Pending);
271
272        // Collect dependency watch receivers. Deps are already Healthy,
273        // so start_one's wait loop returns instantly.
274        let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
275        let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
276            HashMap::new();
277        for dep in &node.depends_on {
278            let dep_handle = self
279                .nodes
280                .get(dep)
281                .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
282            dep_status_rxs.insert(dep.clone(), dep_handle.status_rx.clone());
283            dep_outputs_rxs.insert(dep.clone(), dep_handle.outputs_rx.clone());
284        }
285
286        start_one(
287            resource.to_owned(),
288            node.spec.clone(),
289            node.outputs.clone(),
290            Arc::clone(&self.runtime),
291            handle.clone(),
292            dep_status_rxs,
293            dep_outputs_rxs,
294            self.event_tx.clone(),
295        )
296        .await
297    }
298
299    /// Open a new subscription on the lifecycle event broadcast.
300    ///
301    /// Multiple subscribers can read concurrently. Subscribers that
302    /// fall more than [`EVENT_CHANNEL_CAPACITY`] events behind will
303    /// observe a `RecvError::Lagged` and have to resynchronise.
304    #[must_use]
305    pub fn subscribe_events(&self) -> broadcast::Receiver<LifecycleEvent> {
306        self.event_tx.subscribe()
307    }
308
309    /// Shared reference to the underlying execution plan.
310    pub(super) fn plan_arc(&self) -> &Arc<LifecyclePlan> {
311        &self.plan
312    }
313
314    /// Shared reference to the underlying container runtime.
315    pub(super) fn runtime_arc(&self) -> &Arc<R> {
316        &self.runtime
317    }
318
319    /// Point-in-time snapshot of one resource, or `None` when the name
320    /// is not part of the plan.
321    pub(super) fn snapshot(&self, name: &str) -> Option<NodeSnapshot> {
322        let handle = self.nodes.get(name)?;
323        let status = handle.status_rx.borrow().clone();
324        let started_at = *handle.started_at.lock().expect("started_at mutex poisoned");
325        let container_id = handle
326            .container_id
327            .lock()
328            .expect("container_id mutex poisoned")
329            .clone();
330        Some(NodeSnapshot {
331            status,
332            started_at,
333            container_id,
334        })
335    }
336}
337
338#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
339#[instrument(name = "start", skip_all, fields(resource = %name))]
340async fn start_one<R: ContainerRuntime + 'static>(
341    name: String,
342    spec: ContainerSpec,
343    own_outputs: ResourceOutputs,
344    runtime: Arc<R>,
345    handle: NodeHandle,
346    dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>>,
347    mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>>,
348    event_tx: broadcast::Sender<LifecycleEvent>,
349) -> Result<(), LifecycleError> {
350    // 1. Wait for every dependency to become ready.
351    for (dep_name, mut rx) in dep_status_rxs {
352        loop {
353            let status = rx.borrow_and_update().clone();
354            if status.is_ready() {
355                debug!(node = %name, dep = %dep_name, "dependency ready");
356                break;
357            }
358            if let NodeStatus::Failed { reason } = status {
359                let _ = handle.status_tx.send(NodeStatus::Failed {
360                    reason: format!("dependency `{dep_name}` failed: {reason}"),
361                });
362                return Err(LifecycleError::DependencyFailed {
363                    resource: name,
364                    dependency: dep_name,
365                    reason,
366                });
367            }
368            if rx.changed().await.is_err() {
369                let reason = format!("dependency `{dep_name}` watch channel closed");
370                let _ = handle.status_tx.send(NodeStatus::Failed {
371                    reason: reason.clone(),
372                });
373                return Err(LifecycleError::DependencyFailed {
374                    resource: name,
375                    dependency: dep_name,
376                    reason,
377                });
378            }
379        }
380    }
381
382    // 2. Collect dependency outputs.
383    let mut dep_outputs: HashMap<String, ResourceOutputs> = HashMap::new();
384    for (dep_name, rx) in &mut dep_outputs_rxs {
385        loop {
386            if let Some(out) = rx.borrow_and_update().clone() {
387                dep_outputs.insert(dep_name.clone(), out);
388                break;
389            }
390            if rx.changed().await.is_err() {
391                let reason = format!("dependency `{dep_name}` outputs channel closed");
392                let _ = handle.status_tx.send(NodeStatus::Failed {
393                    reason: reason.clone(),
394                });
395                return Err(LifecycleError::DependencyFailed {
396                    resource: name,
397                    dependency: dep_name.clone(),
398                    reason,
399                });
400            }
401        }
402    }
403
404    // 3. Resolve interpolations and inject LSH_<DEP>_<PROP> env vars.
405    let resolved_spec = match interpolate_and_inject(spec, &dep_outputs) {
406        Ok(s) => s,
407        Err(reason) => {
408            let _ = handle.status_tx.send(NodeStatus::Failed {
409                reason: reason.clone(),
410            });
411            return Err(LifecycleError::Start {
412                resource: name,
413                source: RuntimeError::InvalidSpec(reason),
414            });
415        }
416    };
417
418    // 4. Remove any container left over from a previous run so the
419    //    create call below never collides with a stale name.
420    let _ = handle.status_tx.send(NodeStatus::Starting);
421    if let Err(source) = runtime.remove(&resolved_spec.name).await {
422        let _ = handle.status_tx.send(NodeStatus::Failed {
423            reason: source.to_string(),
424        });
425        let _ = event_tx.send(LifecycleEvent::ResourceFailed {
426            name: name.clone(),
427            error: source.to_string(),
428        });
429        return Err(LifecycleError::Start {
430            resource: name,
431            source,
432        });
433    }
434
435    // 5. Start the container.
436    let id = match runtime.start(&resolved_spec).await {
437        Ok(id) => id,
438        Err(source) => {
439            let _ = handle.status_tx.send(NodeStatus::Failed {
440                reason: source.to_string(),
441            });
442            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
443                name: name.clone(),
444                error: source.to_string(),
445            });
446            return Err(LifecycleError::Start {
447                resource: name,
448                source,
449            });
450        }
451    };
452
453    {
454        let mut guard = handle
455            .container_id
456            .lock()
457            .expect("container_id mutex poisoned");
458        *guard = Some(id.clone());
459    }
460    {
461        let mut guard = handle.started_at.lock().expect("started_at mutex poisoned");
462        *guard = Some(SystemTime::now());
463    }
464    let _ = handle.status_tx.send(NodeStatus::Running);
465    let _ = event_tx.send(LifecycleEvent::ResourceStarted {
466        name: name.clone(),
467        container_id: id.to_string(),
468    });
469
470    // 6. Wait for the healthcheck.
471    let wait_span = info_span!("wait_healthy", resource = %name);
472    match runtime
473        .wait_healthy(&id, DEFAULT_HEALTHCHECK_TIMEOUT)
474        .instrument(wait_span)
475        .await
476    {
477        Ok(()) => {
478            let _ = handle.outputs_tx.send(Some(own_outputs));
479            let _ = handle.status_tx.send(NodeStatus::Healthy);
480            let _ = event_tx.send(LifecycleEvent::ResourceHealthy { name: name.clone() });
481            Ok(())
482        }
483        Err(RuntimeError::Timeout { .. }) => {
484            let reason = format!("healthcheck timed out after {DEFAULT_HEALTHCHECK_TIMEOUT:?}");
485            let _ = handle.status_tx.send(NodeStatus::Failed {
486                reason: reason.clone(),
487            });
488            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
489                name: name.clone(),
490                error: reason,
491            });
492            Err(LifecycleError::HealthcheckTimeout {
493                resource: name,
494                timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
495            })
496        }
497        Err(source) => {
498            let _ = handle.status_tx.send(NodeStatus::Failed {
499                reason: source.to_string(),
500            });
501            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
502                name: name.clone(),
503                error: source.to_string(),
504            });
505            Err(LifecycleError::Start {
506                resource: name,
507                source,
508            })
509        }
510    }
511}
512
513/// Apply two-pass interpolation to `spec`: resolve every
514/// `${resources.<name>.<property>}` against `dep_outputs`, then inject
515/// `LSH_<DEP>_<PROPERTY>` automatic environment variables.
516///
517/// Returns the resolved spec or a human-readable diagnostic when an
518/// interpolation references an unknown resource or property.
519fn interpolate_and_inject(
520    mut spec: ContainerSpec,
521    dep_outputs: &HashMap<String, ResourceOutputs>,
522) -> std::result::Result<ContainerSpec, String> {
523    let mut ctx = InterpolationContext::from_env();
524    for (name, outputs) in dep_outputs {
525        ctx = ctx.with_resource(name.clone(), outputs.clone());
526    }
527    let interpolator = Interpolator::new(&ctx);
528
529    // Resolve env values.
530    let mut resolved_env = std::collections::HashMap::with_capacity(spec.env.len());
531    for (k, v) in spec.env.drain() {
532        let resolved = interpolator.resolve(&v).map_err(|e| e.to_string())?;
533        resolved_env.insert(k, resolved);
534    }
535
536    // Inject LSH_<DEP>_<PROPERTY> variables.
537    for (dep_name, outputs) in dep_outputs {
538        let dep_upper = dep_name.to_uppercase().replace('-', "_");
539        for (prop, value) in outputs {
540            let prop_upper = prop.to_uppercase().replace('-', "_");
541            let key = format!("LSH_{dep_upper}_{prop_upper}");
542            resolved_env.entry(key).or_insert_with(|| value.clone());
543        }
544    }
545    spec.env = resolved_env;
546
547    // Resolve command arguments.
548    if let Some(args) = spec.command.as_mut() {
549        for arg in args.iter_mut() {
550            *arg = interpolator.resolve(arg).map_err(|e| e.to_string())?;
551        }
552    }
553
554    Ok(spec)
555}
556
557#[cfg(unix)]
558async fn wait_for_shutdown_signal() {
559    use tokio::signal::unix::{SignalKind, signal};
560    let mut sigterm = match signal(SignalKind::terminate()) {
561        Ok(s) => s,
562        Err(e) => {
563            warn!("failed to install SIGTERM handler: {e}");
564            let _ = tokio::signal::ctrl_c().await;
565            return;
566        }
567    };
568    tokio::select! {
569        _ = tokio::signal::ctrl_c() => info!("received SIGINT"),
570        _ = sigterm.recv() => info!("received SIGTERM"),
571    }
572}
573
574#[cfg(windows)]
575async fn wait_for_shutdown_signal() {
576    let _ = tokio::signal::ctrl_c().await;
577    info!("received Ctrl+C");
578}