Skip to main content

lightshuttle_runtime/lifecycle/
manager.rs

1//! Coordinated startup and shutdown of every resource declared in a
2//! [`crate::LifecyclePlan`].
3
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime};
7
8use lightshuttle_manifest::{InterpolationContext, Interpolator};
9use tokio::sync::{broadcast, watch};
10use tracing::{Instrument, debug, info, info_span, instrument, warn};
11
12/// Buffer size for the broadcast event channel. Slow subscribers that
13/// fall behind by more than this number of events will see lagged
14/// messages and have to resynchronise.
15const EVENT_CHANNEL_CAPACITY: usize = 256;
16
17use crate::error::RuntimeError;
18use crate::lifecycle::error::LifecycleError;
19use crate::lifecycle::plan::LifecyclePlan;
20use crate::lifecycle::status::{LifecycleEvent, NodeStatus};
21use crate::runtime::{ContainerId, ContainerRuntime};
22use lightshuttle_spec::{ContainerSpec, ResourceOutputs};
23
24/// Default healthcheck timeout, applied when the manifest does not
25/// provide one of its own. Kept conservative for v0.1.
26const DEFAULT_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(60);
27
28/// Per-resource shared state.
29#[derive(Clone)]
30struct NodeHandle {
31    status_tx: Arc<watch::Sender<NodeStatus>>,
32    status_rx: watch::Receiver<NodeStatus>,
33    outputs_tx: Arc<watch::Sender<Option<ResourceOutputs>>>,
34    outputs_rx: watch::Receiver<Option<ResourceOutputs>>,
35    container_id: Arc<Mutex<Option<ContainerId>>>,
36    started_at: Arc<Mutex<Option<SystemTime>>>,
37}
38
39/// Point-in-time snapshot of one managed resource, consumed by the
40/// control plane via [`super::handle::ManagerHandle`].
41pub(super) struct NodeSnapshot {
42    /// Lifecycle status at the moment of the snapshot.
43    pub(super) status: NodeStatus,
44    /// Wall-clock time at which the runtime accepted the start request.
45    pub(super) started_at: Option<SystemTime>,
46    /// Container identifier returned by the runtime, when known.
47    pub(super) container_id: Option<ContainerId>,
48}
49
50/// Coordinates the startup, supervision and shutdown of every resource
51/// declared in a [`LifecyclePlan`].
52pub struct LifecycleManager<R: ContainerRuntime + 'static> {
53    plan: Arc<LifecyclePlan>,
54    runtime: Arc<R>,
55    nodes: HashMap<String, NodeHandle>,
56    event_tx: broadcast::Sender<LifecycleEvent>,
57    extra_env: Arc<HashMap<String, String>>,
58}
59
60impl<R: ContainerRuntime + 'static> LifecycleManager<R> {
61    /// Build a manager bound to `plan` and `runtime`. Returns a fresh
62    /// event subscriber alongside; further subscribers can be obtained
63    /// from [`Self::subscribe_events`].
64    #[must_use]
65    pub fn new(plan: LifecyclePlan, runtime: R) -> (Self, broadcast::Receiver<LifecycleEvent>) {
66        let (event_tx, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
67        let mut nodes: HashMap<String, NodeHandle> = HashMap::new();
68        for node in plan.nodes() {
69            let (status_tx, status_rx) = watch::channel(NodeStatus::Pending);
70            let (outputs_tx, outputs_rx) = watch::channel(None);
71            nodes.insert(
72                node.name.clone(),
73                NodeHandle {
74                    status_tx: Arc::new(status_tx),
75                    status_rx,
76                    outputs_tx: Arc::new(outputs_tx),
77                    outputs_rx,
78                    container_id: Arc::new(Mutex::new(None)),
79                    started_at: Arc::new(Mutex::new(None)),
80                },
81            );
82        }
83        let manager = Self {
84            plan: Arc::new(plan),
85            runtime: Arc::new(runtime),
86            nodes,
87            event_tx,
88            extra_env: Arc::new(HashMap::new()),
89        };
90        (manager, event_rx)
91    }
92
93    /// Merge additional environment variables into the interpolation
94    /// context used for every resource.
95    ///
96    /// Variables provided here take precedence over system environment
97    /// variables of the same name. Call before [`start_all`].
98    ///
99    /// [`start_all`]: Self::start_all
100    #[must_use]
101    pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
102        self.extra_env = Arc::new(env);
103        self
104    }
105
106    /// Scan every resource spec for `${env.VAR}` references that cannot
107    /// be resolved and return a single error listing all missing names.
108    ///
109    /// Delegates to [`LifecyclePlan::env_report`] so the fail-fast check and
110    /// the `secrets check` diagnostic share one source of truth. Call before
111    /// [`start_all`] to surface missing variables before any container is
112    /// started.
113    ///
114    /// [`start_all`]: Self::start_all
115    pub fn check_required_env(&self) -> Result<(), LifecycleError> {
116        let report = self.plan.env_report(&self.extra_env);
117        if report.has_missing() {
118            Err(LifecycleError::MissingEnvVars {
119                names: report.missing(),
120            })
121        } else {
122            Ok(())
123        }
124    }
125
126    /// Start every resource in topological order. Independent branches
127    /// start in parallel. On the first failure, the resources that
128    /// already started are stopped automatically before the error is
129    /// returned.
130    pub async fn start_all(&self) -> Result<(), LifecycleError> {
131        let mut handles: Vec<tokio::task::JoinHandle<Result<(), LifecycleError>>> =
132            Vec::with_capacity(self.plan.nodes().len());
133
134        for node in self.plan.nodes() {
135            let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
136            let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
137                HashMap::new();
138            for dep in &node.depends_on {
139                let handle = self
140                    .nodes
141                    .get(dep)
142                    .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
143                dep_status_rxs.insert(dep.clone(), handle.status_rx.clone());
144                dep_outputs_rxs.insert(dep.clone(), handle.outputs_rx.clone());
145            }
146
147            let node_handle = self.nodes[&node.name].clone();
148            let spec = node.spec.clone();
149            let own_outputs = node.outputs.clone();
150            let name = node.name.clone();
151            let runtime = Arc::clone(&self.runtime);
152            let event_tx = self.event_tx.clone();
153            let extra_env = Arc::clone(&self.extra_env);
154
155            let task = tokio::spawn(async move {
156                start_one(
157                    name,
158                    spec,
159                    own_outputs,
160                    runtime,
161                    node_handle,
162                    dep_status_rxs,
163                    dep_outputs_rxs,
164                    event_tx,
165                    extra_env,
166                )
167                .await
168            });
169            handles.push(task);
170        }
171
172        let mut first_error: Option<LifecycleError> = None;
173        for handle in handles {
174            match handle.await {
175                Ok(Ok(())) => {}
176                Ok(Err(err)) => {
177                    if first_error.is_none() {
178                        first_error = Some(err);
179                    }
180                }
181                Err(join_err) => {
182                    if first_error.is_none() {
183                        first_error = Some(LifecycleError::Start {
184                            resource: "<panicked task>".to_owned(),
185                            source: RuntimeError::InvalidSpec(join_err.to_string()),
186                        });
187                    }
188                }
189            }
190        }
191
192        if let Some(err) = first_error {
193            warn!(error = %err, "start_all failed; rolling back");
194            let _ = self.stop_all(Duration::from_secs(10)).await;
195            return Err(err);
196        }
197
198        let _ = self.event_tx.send(LifecycleEvent::StackStarted);
199        info!(
200            "stack started: {} resource(s) healthy",
201            self.plan.nodes().len()
202        );
203        Ok(())
204    }
205
206    /// Stop every resource in reverse topological order with the given
207    /// SIGTERM-to-SIGKILL grace window.
208    #[instrument(skip_all, fields(resources = self.plan.nodes().len()))]
209    pub async fn stop_all(&self, grace: Duration) -> Result<(), LifecycleError> {
210        let _ = self.event_tx.send(LifecycleEvent::StackStopping);
211
212        let mut errors: Vec<(String, RuntimeError)> = Vec::new();
213        for node in self.plan.nodes().iter().rev() {
214            let Some(handle) = self.nodes.get(&node.name) else {
215                continue;
216            };
217            let id = {
218                let guard = handle
219                    .container_id
220                    .lock()
221                    .expect("container_id mutex poisoned");
222                guard.clone()
223            };
224            let Some(id) = id else { continue };
225            let stop_span = info_span!("stop", resource = %node.name);
226            match self.runtime.stop(&id, grace).instrument(stop_span).await {
227                Ok(()) => {
228                    let _ = handle.status_tx.send(NodeStatus::Stopped);
229                    let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
230                        name: node.name.clone(),
231                    });
232                }
233                Err(e) => errors.push((node.name.clone(), e)),
234            }
235        }
236
237        let _ = self.event_tx.send(LifecycleEvent::StackStopped);
238
239        // Remove the per-project bridge network. Containers that failed
240        // to stop may still hold endpoints, causing Docker to reject the
241        // request — log the failure and continue so callers always see
242        // the primary stop errors, not a secondary network error.
243        if let Some(project) = self.plan.nodes().first().map(|n| n.spec.project.as_str()) {
244            if let Err(e) = self.runtime.teardown_project_network(project).await {
245                warn!(error = %e, "could not remove project network");
246            }
247        }
248
249        if let Some((resource, source)) = errors.into_iter().next() {
250            return Err(LifecycleError::Stop { resource, source });
251        }
252        Ok(())
253    }
254
255    /// Opinionated entry point used by `lightshuttle up`: starts the
256    /// stack, waits for `SIGINT` or `SIGTERM`, then stops the stack
257    /// cleanly with the configured grace window.
258    pub async fn run_until_signal(&self, grace: Duration) -> Result<(), LifecycleError> {
259        self.start_all().await?;
260        wait_for_shutdown_signal().await;
261        self.stop_all(grace).await
262    }
263
264    /// Restart a single resource without touching its dependents.
265    ///
266    /// The target is stopped via `SIGTERM` (10-second grace window),
267    /// its container id and started-at timestamp are cleared, then
268    /// `start_one` is re-run from the same cached spec. Three events
269    /// are emitted on the lifecycle channel in order: `ResourceStopped`,
270    /// `ResourceStarted`, `ResourceHealthy`.
271    ///
272    /// Dependents keep running. Their `watch` channels observe the
273    /// target's status going `Stopped` → `Pending` → `Starting` →
274    /// `Running` → `Healthy`, so callers that depend on the target can
275    /// pause their work locally until it is healthy again.
276    #[instrument(skip(self), fields(resource = %resource))]
277    pub async fn restart_one(&self, resource: &str) -> Result<(), LifecycleError> {
278        let node = self
279            .plan
280            .nodes()
281            .iter()
282            .find(|n| n.name == resource)
283            .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
284        let handle = self
285            .nodes
286            .get(resource)
287            .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
288
289        // Stop the running container if any.
290        let id = {
291            let guard = handle
292                .container_id
293                .lock()
294                .expect("container_id mutex poisoned");
295            guard.clone()
296        };
297        if let Some(id) = id {
298            self.runtime
299                .stop(&id, Duration::from_secs(10))
300                .await
301                .map_err(|source| LifecycleError::Stop {
302                    resource: resource.to_owned(),
303                    source,
304                })?;
305            *handle
306                .container_id
307                .lock()
308                .expect("container_id mutex poisoned") = None;
309            *handle.started_at.lock().expect("started_at mutex poisoned") = None;
310            let _ = handle.status_tx.send(NodeStatus::Stopped);
311            let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
312                name: resource.to_owned(),
313            });
314        }
315
316        // Reset to Pending so start_one drives the full restart cycle.
317        let _ = handle.status_tx.send(NodeStatus::Pending);
318
319        // Collect dependency watch receivers. Deps are already Healthy,
320        // so start_one's wait loop returns instantly.
321        let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
322        let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
323            HashMap::new();
324        for dep in &node.depends_on {
325            let dep_handle = self
326                .nodes
327                .get(dep)
328                .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
329            dep_status_rxs.insert(dep.clone(), dep_handle.status_rx.clone());
330            dep_outputs_rxs.insert(dep.clone(), dep_handle.outputs_rx.clone());
331        }
332
333        start_one(
334            resource.to_owned(),
335            node.spec.clone(),
336            node.outputs.clone(),
337            Arc::clone(&self.runtime),
338            handle.clone(),
339            dep_status_rxs,
340            dep_outputs_rxs,
341            self.event_tx.clone(),
342            Arc::clone(&self.extra_env),
343        )
344        .await
345    }
346
347    /// Open a new subscription on the lifecycle event broadcast.
348    ///
349    /// Multiple subscribers can read concurrently. Subscribers that
350    /// fall more than [`EVENT_CHANNEL_CAPACITY`] events behind will
351    /// observe a `RecvError::Lagged` and have to resynchronise.
352    #[must_use]
353    pub fn subscribe_events(&self) -> broadcast::Receiver<LifecycleEvent> {
354        self.event_tx.subscribe()
355    }
356
357    /// Shared reference to the underlying execution plan.
358    pub(super) fn plan_arc(&self) -> &Arc<LifecyclePlan> {
359        &self.plan
360    }
361
362    /// Shared reference to the underlying container runtime.
363    pub(super) fn runtime_arc(&self) -> &Arc<R> {
364        &self.runtime
365    }
366
367    /// Point-in-time snapshot of one resource, or `None` when the name
368    /// is not part of the plan.
369    pub(super) fn snapshot(&self, name: &str) -> Option<NodeSnapshot> {
370        let handle = self.nodes.get(name)?;
371        let status = handle.status_rx.borrow().clone();
372        let started_at = *handle.started_at.lock().expect("started_at mutex poisoned");
373        let container_id = handle
374            .container_id
375            .lock()
376            .expect("container_id mutex poisoned")
377            .clone();
378        Some(NodeSnapshot {
379            status,
380            started_at,
381            container_id,
382        })
383    }
384}
385
386#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
387#[instrument(name = "start", skip_all, fields(resource = %name))]
388async fn start_one<R: ContainerRuntime + 'static>(
389    name: String,
390    spec: ContainerSpec,
391    own_outputs: ResourceOutputs,
392    runtime: Arc<R>,
393    handle: NodeHandle,
394    dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>>,
395    mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>>,
396    event_tx: broadcast::Sender<LifecycleEvent>,
397    extra_env: Arc<HashMap<String, String>>,
398) -> Result<(), LifecycleError> {
399    // 1. Wait for every dependency to become ready.
400    for (dep_name, mut rx) in dep_status_rxs {
401        loop {
402            let status = rx.borrow_and_update().clone();
403            if status.is_ready() {
404                debug!(node = %name, dep = %dep_name, "dependency ready");
405                break;
406            }
407            if let NodeStatus::Failed { reason } = status {
408                let _ = handle.status_tx.send(NodeStatus::Failed {
409                    reason: format!("dependency `{dep_name}` failed: {reason}"),
410                });
411                return Err(LifecycleError::DependencyFailed {
412                    resource: name,
413                    dependency: dep_name,
414                    reason,
415                });
416            }
417            if rx.changed().await.is_err() {
418                let reason = format!("dependency `{dep_name}` watch channel closed");
419                let _ = handle.status_tx.send(NodeStatus::Failed {
420                    reason: reason.clone(),
421                });
422                return Err(LifecycleError::DependencyFailed {
423                    resource: name,
424                    dependency: dep_name,
425                    reason,
426                });
427            }
428        }
429    }
430
431    // 2. Collect dependency outputs.
432    let mut dep_outputs: HashMap<String, ResourceOutputs> = HashMap::new();
433    for (dep_name, rx) in &mut dep_outputs_rxs {
434        loop {
435            if let Some(out) = rx.borrow_and_update().clone() {
436                dep_outputs.insert(dep_name.clone(), out);
437                break;
438            }
439            if rx.changed().await.is_err() {
440                let reason = format!("dependency `{dep_name}` outputs channel closed");
441                let _ = handle.status_tx.send(NodeStatus::Failed {
442                    reason: reason.clone(),
443                });
444                return Err(LifecycleError::DependencyFailed {
445                    resource: name,
446                    dependency: dep_name.clone(),
447                    reason,
448                });
449            }
450        }
451    }
452
453    // 3. Resolve interpolations and inject LSH_<DEP>_<PROP> env vars.
454    let resolved_spec = match interpolate_and_inject(spec, &dep_outputs, &extra_env) {
455        Ok(s) => s,
456        Err(reason) => {
457            let _ = handle.status_tx.send(NodeStatus::Failed {
458                reason: reason.clone(),
459            });
460            return Err(LifecycleError::Start {
461                resource: name,
462                source: RuntimeError::InvalidSpec(reason),
463            });
464        }
465    };
466
467    // 4. Remove any container left over from a previous run so the
468    //    create call below never collides with a stale name.
469    let _ = handle.status_tx.send(NodeStatus::Starting);
470    if let Err(source) = runtime.remove(&resolved_spec.name).await {
471        let _ = handle.status_tx.send(NodeStatus::Failed {
472            reason: source.to_string(),
473        });
474        let _ = event_tx.send(LifecycleEvent::ResourceFailed {
475            name: name.clone(),
476            error: source.to_string(),
477        });
478        return Err(LifecycleError::Start {
479            resource: name,
480            source,
481        });
482    }
483
484    // 5. Start the container.
485    let id = match runtime.start(&resolved_spec).await {
486        Ok(id) => id,
487        Err(source) => {
488            let _ = handle.status_tx.send(NodeStatus::Failed {
489                reason: source.to_string(),
490            });
491            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
492                name: name.clone(),
493                error: source.to_string(),
494            });
495            return Err(LifecycleError::Start {
496                resource: name,
497                source,
498            });
499        }
500    };
501
502    {
503        let mut guard = handle
504            .container_id
505            .lock()
506            .expect("container_id mutex poisoned");
507        *guard = Some(id.clone());
508    }
509    {
510        let mut guard = handle.started_at.lock().expect("started_at mutex poisoned");
511        *guard = Some(SystemTime::now());
512    }
513    let _ = handle.status_tx.send(NodeStatus::Running);
514    let _ = event_tx.send(LifecycleEvent::ResourceStarted {
515        name: name.clone(),
516        container_id: id.to_string(),
517    });
518
519    // 6. Wait for the healthcheck.
520    let wait_span = info_span!("wait_healthy", resource = %name);
521    match runtime
522        .wait_healthy(&id, DEFAULT_HEALTHCHECK_TIMEOUT)
523        .instrument(wait_span)
524        .await
525    {
526        Ok(()) => {
527            let _ = handle.outputs_tx.send(Some(own_outputs));
528            let _ = handle.status_tx.send(NodeStatus::Healthy);
529            let _ = event_tx.send(LifecycleEvent::ResourceHealthy { name: name.clone() });
530            Ok(())
531        }
532        Err(RuntimeError::Timeout { .. }) => {
533            let reason = format!("healthcheck timed out after {DEFAULT_HEALTHCHECK_TIMEOUT:?}");
534            let _ = handle.status_tx.send(NodeStatus::Failed {
535                reason: reason.clone(),
536            });
537            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
538                name: name.clone(),
539                error: reason,
540            });
541            Err(LifecycleError::HealthcheckTimeout {
542                resource: name,
543                timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
544            })
545        }
546        Err(source) => {
547            let _ = handle.status_tx.send(NodeStatus::Failed {
548                reason: source.to_string(),
549            });
550            let _ = event_tx.send(LifecycleEvent::ResourceFailed {
551                name: name.clone(),
552                error: source.to_string(),
553            });
554            Err(LifecycleError::Start {
555                resource: name,
556                source,
557            })
558        }
559    }
560}
561
562/// Apply two-pass interpolation to `spec`: resolve every
563/// `${resources.<name>.<property>}` against `dep_outputs`, then inject
564/// `LSH_<DEP>_<PROPERTY>` automatic environment variables.
565///
566/// Returns the resolved spec or a human-readable diagnostic when an
567/// interpolation references an unknown resource or property.
568fn interpolate_and_inject(
569    mut spec: ContainerSpec,
570    dep_outputs: &HashMap<String, ResourceOutputs>,
571    extra_env: &HashMap<String, String>,
572) -> std::result::Result<ContainerSpec, String> {
573    let mut ctx = InterpolationContext::from_env()
574        .with_env(extra_env.iter().map(|(k, v)| (k.clone(), v.clone())));
575    for (name, outputs) in dep_outputs {
576        ctx = ctx.with_resource(name.clone(), outputs.clone());
577    }
578    let interpolator = Interpolator::new(&ctx);
579
580    // Resolve env values.
581    let mut resolved_env = std::collections::HashMap::with_capacity(spec.env.len());
582    for (k, v) in spec.env.drain() {
583        let resolved = interpolator.resolve(&v).map_err(|e| e.to_string())?;
584        resolved_env.insert(k, resolved);
585    }
586
587    // Inject LSH_<DEP>_<PROPERTY> variables.
588    for (dep_name, outputs) in dep_outputs {
589        let dep_upper = dep_name.to_uppercase().replace('-', "_");
590        for (prop, value) in outputs {
591            let prop_upper = prop.to_uppercase().replace('-', "_");
592            let key = format!("LSH_{dep_upper}_{prop_upper}");
593            resolved_env.entry(key).or_insert_with(|| value.clone());
594        }
595    }
596    spec.env = resolved_env;
597
598    // Resolve command arguments.
599    if let Some(args) = spec.command.as_mut() {
600        for arg in args.iter_mut() {
601            *arg = interpolator.resolve(arg).map_err(|e| e.to_string())?;
602        }
603    }
604
605    Ok(spec)
606}
607
608#[cfg(unix)]
609async fn wait_for_shutdown_signal() {
610    use tokio::signal::unix::{SignalKind, signal};
611    let mut sigterm = match signal(SignalKind::terminate()) {
612        Ok(s) => s,
613        Err(e) => {
614            warn!("failed to install SIGTERM handler: {e}");
615            let _ = tokio::signal::ctrl_c().await;
616            return;
617        }
618    };
619    tokio::select! {
620        _ = tokio::signal::ctrl_c() => info!("received SIGINT"),
621        _ = sigterm.recv() => info!("received SIGTERM"),
622    }
623}
624
625#[cfg(windows)]
626async fn wait_for_shutdown_signal() {
627    let _ = tokio::signal::ctrl_c().await;
628    info!("received Ctrl+C");
629}