1use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime};
7
8use lightshuttle_manifest::{InterpolationContext, Interpolator};
9use tokio::sync::{broadcast, watch};
10use tracing::{Instrument, debug, info, info_span, instrument, warn};
11
12const EVENT_CHANNEL_CAPACITY: usize = 256;
16
17use crate::error::RuntimeError;
18use crate::lifecycle::error::LifecycleError;
19use crate::lifecycle::plan::LifecyclePlan;
20use crate::lifecycle::status::{LifecycleEvent, NodeStatus};
21use crate::runtime::{ContainerId, ContainerRuntime};
22use lightshuttle_spec::{ContainerSpec, ResourceOutputs};
23
24const DEFAULT_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(60);
27
28#[derive(Clone)]
30struct NodeHandle {
31 status_tx: Arc<watch::Sender<NodeStatus>>,
32 status_rx: watch::Receiver<NodeStatus>,
33 outputs_tx: Arc<watch::Sender<Option<ResourceOutputs>>>,
34 outputs_rx: watch::Receiver<Option<ResourceOutputs>>,
35 container_id: Arc<Mutex<Option<ContainerId>>>,
36 started_at: Arc<Mutex<Option<SystemTime>>>,
37}
38
39pub(super) struct NodeSnapshot {
42 pub(super) status: NodeStatus,
44 pub(super) started_at: Option<SystemTime>,
46 pub(super) container_id: Option<ContainerId>,
48}
49
50pub struct LifecycleManager<R: ContainerRuntime + 'static> {
53 plan: Arc<LifecyclePlan>,
54 runtime: Arc<R>,
55 nodes: HashMap<String, NodeHandle>,
56 event_tx: broadcast::Sender<LifecycleEvent>,
57 extra_env: Arc<HashMap<String, String>>,
58}
59
60impl<R: ContainerRuntime + 'static> LifecycleManager<R> {
61 #[must_use]
65 pub fn new(plan: LifecyclePlan, runtime: R) -> (Self, broadcast::Receiver<LifecycleEvent>) {
66 let (event_tx, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
67 let mut nodes: HashMap<String, NodeHandle> = HashMap::new();
68 for node in plan.nodes() {
69 let (status_tx, status_rx) = watch::channel(NodeStatus::Pending);
70 let (outputs_tx, outputs_rx) = watch::channel(None);
71 nodes.insert(
72 node.name.clone(),
73 NodeHandle {
74 status_tx: Arc::new(status_tx),
75 status_rx,
76 outputs_tx: Arc::new(outputs_tx),
77 outputs_rx,
78 container_id: Arc::new(Mutex::new(None)),
79 started_at: Arc::new(Mutex::new(None)),
80 },
81 );
82 }
83 let manager = Self {
84 plan: Arc::new(plan),
85 runtime: Arc::new(runtime),
86 nodes,
87 event_tx,
88 extra_env: Arc::new(HashMap::new()),
89 };
90 (manager, event_rx)
91 }
92
93 #[must_use]
101 pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
102 self.extra_env = Arc::new(env);
103 self
104 }
105
106 pub fn check_required_env(&self) -> Result<(), LifecycleError> {
116 let report = self.plan.env_report(&self.extra_env);
117 if report.has_missing() {
118 Err(LifecycleError::MissingEnvVars {
119 names: report.missing(),
120 })
121 } else {
122 Ok(())
123 }
124 }
125
126 pub async fn start_all(&self) -> Result<(), LifecycleError> {
131 let mut handles: Vec<tokio::task::JoinHandle<Result<(), LifecycleError>>> =
132 Vec::with_capacity(self.plan.nodes().len());
133
134 for node in self.plan.nodes() {
135 let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
136 let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
137 HashMap::new();
138 for dep in &node.depends_on {
139 let handle = self
140 .nodes
141 .get(dep)
142 .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
143 dep_status_rxs.insert(dep.clone(), handle.status_rx.clone());
144 dep_outputs_rxs.insert(dep.clone(), handle.outputs_rx.clone());
145 }
146
147 let node_handle = self.nodes[&node.name].clone();
148 let spec = node.spec.clone();
149 let own_outputs = node.outputs.clone();
150 let name = node.name.clone();
151 let runtime = Arc::clone(&self.runtime);
152 let event_tx = self.event_tx.clone();
153 let extra_env = Arc::clone(&self.extra_env);
154
155 let task = tokio::spawn(async move {
156 start_one(
157 name,
158 spec,
159 own_outputs,
160 runtime,
161 node_handle,
162 dep_status_rxs,
163 dep_outputs_rxs,
164 event_tx,
165 extra_env,
166 )
167 .await
168 });
169 handles.push(task);
170 }
171
172 let mut first_error: Option<LifecycleError> = None;
173 for handle in handles {
174 match handle.await {
175 Ok(Ok(())) => {}
176 Ok(Err(err)) => {
177 if first_error.is_none() {
178 first_error = Some(err);
179 }
180 }
181 Err(join_err) => {
182 if first_error.is_none() {
183 first_error = Some(LifecycleError::Start {
184 resource: "<panicked task>".to_owned(),
185 source: RuntimeError::InvalidSpec(join_err.to_string()),
186 });
187 }
188 }
189 }
190 }
191
192 if let Some(err) = first_error {
193 warn!(error = %err, "start_all failed; rolling back");
194 let _ = self.stop_all(Duration::from_secs(10)).await;
195 return Err(err);
196 }
197
198 let _ = self.event_tx.send(LifecycleEvent::StackStarted);
199 info!(
200 "stack started: {} resource(s) healthy",
201 self.plan.nodes().len()
202 );
203 Ok(())
204 }
205
206 #[instrument(skip_all, fields(resources = self.plan.nodes().len()))]
209 pub async fn stop_all(&self, grace: Duration) -> Result<(), LifecycleError> {
210 let _ = self.event_tx.send(LifecycleEvent::StackStopping);
211
212 let mut errors: Vec<(String, RuntimeError)> = Vec::new();
213 for node in self.plan.nodes().iter().rev() {
214 let Some(handle) = self.nodes.get(&node.name) else {
215 continue;
216 };
217 let id = {
218 let guard = handle
219 .container_id
220 .lock()
221 .expect("container_id mutex poisoned");
222 guard.clone()
223 };
224 let Some(id) = id else { continue };
225 let stop_span = info_span!("stop", resource = %node.name);
226 match self.runtime.stop(&id, grace).instrument(stop_span).await {
227 Ok(()) => {
228 let _ = handle.status_tx.send(NodeStatus::Stopped);
229 let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
230 name: node.name.clone(),
231 });
232 }
233 Err(e) => errors.push((node.name.clone(), e)),
234 }
235 }
236
237 let _ = self.event_tx.send(LifecycleEvent::StackStopped);
238
239 if let Some(project) = self.plan.nodes().first().map(|n| n.spec.project.as_str()) {
244 if let Err(e) = self.runtime.teardown_project_network(project).await {
245 warn!(error = %e, "could not remove project network");
246 }
247 }
248
249 if let Some((resource, source)) = errors.into_iter().next() {
250 return Err(LifecycleError::Stop { resource, source });
251 }
252 Ok(())
253 }
254
255 pub async fn run_until_signal(&self, grace: Duration) -> Result<(), LifecycleError> {
259 self.start_all().await?;
260 wait_for_shutdown_signal().await;
261 self.stop_all(grace).await
262 }
263
264 #[instrument(skip(self), fields(resource = %resource))]
277 pub async fn restart_one(&self, resource: &str) -> Result<(), LifecycleError> {
278 let node = self
279 .plan
280 .nodes()
281 .iter()
282 .find(|n| n.name == resource)
283 .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
284 let handle = self
285 .nodes
286 .get(resource)
287 .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
288
289 let id = {
291 let guard = handle
292 .container_id
293 .lock()
294 .expect("container_id mutex poisoned");
295 guard.clone()
296 };
297 if let Some(id) = id {
298 self.runtime
299 .stop(&id, Duration::from_secs(10))
300 .await
301 .map_err(|source| LifecycleError::Stop {
302 resource: resource.to_owned(),
303 source,
304 })?;
305 *handle
306 .container_id
307 .lock()
308 .expect("container_id mutex poisoned") = None;
309 *handle.started_at.lock().expect("started_at mutex poisoned") = None;
310 let _ = handle.status_tx.send(NodeStatus::Stopped);
311 let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
312 name: resource.to_owned(),
313 });
314 }
315
316 let _ = handle.status_tx.send(NodeStatus::Pending);
318
319 let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
322 let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
323 HashMap::new();
324 for dep in &node.depends_on {
325 let dep_handle = self
326 .nodes
327 .get(dep)
328 .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
329 dep_status_rxs.insert(dep.clone(), dep_handle.status_rx.clone());
330 dep_outputs_rxs.insert(dep.clone(), dep_handle.outputs_rx.clone());
331 }
332
333 start_one(
334 resource.to_owned(),
335 node.spec.clone(),
336 node.outputs.clone(),
337 Arc::clone(&self.runtime),
338 handle.clone(),
339 dep_status_rxs,
340 dep_outputs_rxs,
341 self.event_tx.clone(),
342 Arc::clone(&self.extra_env),
343 )
344 .await
345 }
346
347 #[must_use]
353 pub fn subscribe_events(&self) -> broadcast::Receiver<LifecycleEvent> {
354 self.event_tx.subscribe()
355 }
356
357 pub(super) fn plan_arc(&self) -> &Arc<LifecyclePlan> {
359 &self.plan
360 }
361
362 pub(super) fn runtime_arc(&self) -> &Arc<R> {
364 &self.runtime
365 }
366
367 pub(super) fn snapshot(&self, name: &str) -> Option<NodeSnapshot> {
370 let handle = self.nodes.get(name)?;
371 let status = handle.status_rx.borrow().clone();
372 let started_at = *handle.started_at.lock().expect("started_at mutex poisoned");
373 let container_id = handle
374 .container_id
375 .lock()
376 .expect("container_id mutex poisoned")
377 .clone();
378 Some(NodeSnapshot {
379 status,
380 started_at,
381 container_id,
382 })
383 }
384}
385
386#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
387#[instrument(name = "start", skip_all, fields(resource = %name))]
388async fn start_one<R: ContainerRuntime + 'static>(
389 name: String,
390 spec: ContainerSpec,
391 own_outputs: ResourceOutputs,
392 runtime: Arc<R>,
393 handle: NodeHandle,
394 dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>>,
395 mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>>,
396 event_tx: broadcast::Sender<LifecycleEvent>,
397 extra_env: Arc<HashMap<String, String>>,
398) -> Result<(), LifecycleError> {
399 for (dep_name, mut rx) in dep_status_rxs {
401 loop {
402 let status = rx.borrow_and_update().clone();
403 if status.is_ready() {
404 debug!(node = %name, dep = %dep_name, "dependency ready");
405 break;
406 }
407 if let NodeStatus::Failed { reason } = status {
408 let _ = handle.status_tx.send(NodeStatus::Failed {
409 reason: format!("dependency `{dep_name}` failed: {reason}"),
410 });
411 return Err(LifecycleError::DependencyFailed {
412 resource: name,
413 dependency: dep_name,
414 reason,
415 });
416 }
417 if rx.changed().await.is_err() {
418 let reason = format!("dependency `{dep_name}` watch channel closed");
419 let _ = handle.status_tx.send(NodeStatus::Failed {
420 reason: reason.clone(),
421 });
422 return Err(LifecycleError::DependencyFailed {
423 resource: name,
424 dependency: dep_name,
425 reason,
426 });
427 }
428 }
429 }
430
431 let mut dep_outputs: HashMap<String, ResourceOutputs> = HashMap::new();
433 for (dep_name, rx) in &mut dep_outputs_rxs {
434 loop {
435 if let Some(out) = rx.borrow_and_update().clone() {
436 dep_outputs.insert(dep_name.clone(), out);
437 break;
438 }
439 if rx.changed().await.is_err() {
440 let reason = format!("dependency `{dep_name}` outputs channel closed");
441 let _ = handle.status_tx.send(NodeStatus::Failed {
442 reason: reason.clone(),
443 });
444 return Err(LifecycleError::DependencyFailed {
445 resource: name,
446 dependency: dep_name.clone(),
447 reason,
448 });
449 }
450 }
451 }
452
453 let resolved_spec = match interpolate_and_inject(spec, &dep_outputs, &extra_env) {
455 Ok(s) => s,
456 Err(reason) => {
457 let _ = handle.status_tx.send(NodeStatus::Failed {
458 reason: reason.clone(),
459 });
460 return Err(LifecycleError::Start {
461 resource: name,
462 source: RuntimeError::InvalidSpec(reason),
463 });
464 }
465 };
466
467 let _ = handle.status_tx.send(NodeStatus::Starting);
470 if let Err(source) = runtime.remove(&resolved_spec.name).await {
471 let _ = handle.status_tx.send(NodeStatus::Failed {
472 reason: source.to_string(),
473 });
474 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
475 name: name.clone(),
476 error: source.to_string(),
477 });
478 return Err(LifecycleError::Start {
479 resource: name,
480 source,
481 });
482 }
483
484 let id = match runtime.start(&resolved_spec).await {
486 Ok(id) => id,
487 Err(source) => {
488 let _ = handle.status_tx.send(NodeStatus::Failed {
489 reason: source.to_string(),
490 });
491 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
492 name: name.clone(),
493 error: source.to_string(),
494 });
495 return Err(LifecycleError::Start {
496 resource: name,
497 source,
498 });
499 }
500 };
501
502 {
503 let mut guard = handle
504 .container_id
505 .lock()
506 .expect("container_id mutex poisoned");
507 *guard = Some(id.clone());
508 }
509 {
510 let mut guard = handle.started_at.lock().expect("started_at mutex poisoned");
511 *guard = Some(SystemTime::now());
512 }
513 let _ = handle.status_tx.send(NodeStatus::Running);
514 let _ = event_tx.send(LifecycleEvent::ResourceStarted {
515 name: name.clone(),
516 container_id: id.to_string(),
517 });
518
519 let wait_span = info_span!("wait_healthy", resource = %name);
521 match runtime
522 .wait_healthy(&id, DEFAULT_HEALTHCHECK_TIMEOUT)
523 .instrument(wait_span)
524 .await
525 {
526 Ok(()) => {
527 let _ = handle.outputs_tx.send(Some(own_outputs));
528 let _ = handle.status_tx.send(NodeStatus::Healthy);
529 let _ = event_tx.send(LifecycleEvent::ResourceHealthy { name: name.clone() });
530 Ok(())
531 }
532 Err(RuntimeError::Timeout { .. }) => {
533 let reason = format!("healthcheck timed out after {DEFAULT_HEALTHCHECK_TIMEOUT:?}");
534 let _ = handle.status_tx.send(NodeStatus::Failed {
535 reason: reason.clone(),
536 });
537 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
538 name: name.clone(),
539 error: reason,
540 });
541 Err(LifecycleError::HealthcheckTimeout {
542 resource: name,
543 timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
544 })
545 }
546 Err(source) => {
547 let _ = handle.status_tx.send(NodeStatus::Failed {
548 reason: source.to_string(),
549 });
550 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
551 name: name.clone(),
552 error: source.to_string(),
553 });
554 Err(LifecycleError::Start {
555 resource: name,
556 source,
557 })
558 }
559 }
560}
561
562fn interpolate_and_inject(
569 mut spec: ContainerSpec,
570 dep_outputs: &HashMap<String, ResourceOutputs>,
571 extra_env: &HashMap<String, String>,
572) -> std::result::Result<ContainerSpec, String> {
573 let mut ctx = InterpolationContext::from_env()
574 .with_env(extra_env.iter().map(|(k, v)| (k.clone(), v.clone())));
575 for (name, outputs) in dep_outputs {
576 ctx = ctx.with_resource(name.clone(), outputs.clone());
577 }
578 let interpolator = Interpolator::new(&ctx);
579
580 let mut resolved_env = std::collections::HashMap::with_capacity(spec.env.len());
582 for (k, v) in spec.env.drain() {
583 let resolved = interpolator.resolve(&v).map_err(|e| e.to_string())?;
584 resolved_env.insert(k, resolved);
585 }
586
587 for (dep_name, outputs) in dep_outputs {
589 let dep_upper = dep_name.to_uppercase().replace('-', "_");
590 for (prop, value) in outputs {
591 let prop_upper = prop.to_uppercase().replace('-', "_");
592 let key = format!("LSH_{dep_upper}_{prop_upper}");
593 resolved_env.entry(key).or_insert_with(|| value.clone());
594 }
595 }
596 spec.env = resolved_env;
597
598 if let Some(args) = spec.command.as_mut() {
600 for arg in args.iter_mut() {
601 *arg = interpolator.resolve(arg).map_err(|e| e.to_string())?;
602 }
603 }
604
605 Ok(spec)
606}
607
608#[cfg(unix)]
609async fn wait_for_shutdown_signal() {
610 use tokio::signal::unix::{SignalKind, signal};
611 let mut sigterm = match signal(SignalKind::terminate()) {
612 Ok(s) => s,
613 Err(e) => {
614 warn!("failed to install SIGTERM handler: {e}");
615 let _ = tokio::signal::ctrl_c().await;
616 return;
617 }
618 };
619 tokio::select! {
620 _ = tokio::signal::ctrl_c() => info!("received SIGINT"),
621 _ = sigterm.recv() => info!("received SIGTERM"),
622 }
623}
624
625#[cfg(windows)]
626async fn wait_for_shutdown_signal() {
627 let _ = tokio::signal::ctrl_c().await;
628 info!("received Ctrl+C");
629}