1use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime};
7
8use lightshuttle_manifest::{InterpolationContext, Interpolator};
9use tokio::sync::{broadcast, watch};
10use tracing::{Instrument, debug, info, info_span, instrument, warn};
11
12const EVENT_CHANNEL_CAPACITY: usize = 256;
16
17use crate::error::RuntimeError;
18use crate::lifecycle::error::LifecycleError;
19use crate::lifecycle::plan::LifecyclePlan;
20use crate::lifecycle::status::{LifecycleEvent, NodeStatus};
21use crate::runtime::{ContainerId, ContainerRuntime};
22use crate::spec::{ContainerSpec, ResourceOutputs};
23
24const DEFAULT_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(60);
27
28#[derive(Clone)]
30struct NodeHandle {
31 status_tx: Arc<watch::Sender<NodeStatus>>,
32 status_rx: watch::Receiver<NodeStatus>,
33 outputs_tx: Arc<watch::Sender<Option<ResourceOutputs>>>,
34 outputs_rx: watch::Receiver<Option<ResourceOutputs>>,
35 container_id: Arc<Mutex<Option<ContainerId>>>,
36 started_at: Arc<Mutex<Option<SystemTime>>>,
37}
38
39pub(super) struct NodeSnapshot {
42 pub(super) status: NodeStatus,
44 pub(super) started_at: Option<SystemTime>,
46 pub(super) container_id: Option<ContainerId>,
48}
49
50pub struct LifecycleManager<R: ContainerRuntime + 'static> {
53 plan: Arc<LifecyclePlan>,
54 runtime: Arc<R>,
55 nodes: HashMap<String, NodeHandle>,
56 event_tx: broadcast::Sender<LifecycleEvent>,
57}
58
59impl<R: ContainerRuntime + 'static> LifecycleManager<R> {
60 #[must_use]
64 pub fn new(plan: LifecyclePlan, runtime: R) -> (Self, broadcast::Receiver<LifecycleEvent>) {
65 let (event_tx, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
66 let mut nodes: HashMap<String, NodeHandle> = HashMap::new();
67 for node in plan.nodes() {
68 let (status_tx, status_rx) = watch::channel(NodeStatus::Pending);
69 let (outputs_tx, outputs_rx) = watch::channel(None);
70 nodes.insert(
71 node.name.clone(),
72 NodeHandle {
73 status_tx: Arc::new(status_tx),
74 status_rx,
75 outputs_tx: Arc::new(outputs_tx),
76 outputs_rx,
77 container_id: Arc::new(Mutex::new(None)),
78 started_at: Arc::new(Mutex::new(None)),
79 },
80 );
81 }
82 let manager = Self {
83 plan: Arc::new(plan),
84 runtime: Arc::new(runtime),
85 nodes,
86 event_tx,
87 };
88 (manager, event_rx)
89 }
90
91 pub async fn start_all(&self) -> Result<(), LifecycleError> {
96 let mut handles: Vec<tokio::task::JoinHandle<Result<(), LifecycleError>>> =
97 Vec::with_capacity(self.plan.nodes().len());
98
99 for node in self.plan.nodes() {
100 let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
101 let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
102 HashMap::new();
103 for dep in &node.depends_on {
104 let handle = self
105 .nodes
106 .get(dep)
107 .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
108 dep_status_rxs.insert(dep.clone(), handle.status_rx.clone());
109 dep_outputs_rxs.insert(dep.clone(), handle.outputs_rx.clone());
110 }
111
112 let node_handle = self.nodes[&node.name].clone();
113 let spec = node.spec.clone();
114 let own_outputs = node.outputs.clone();
115 let name = node.name.clone();
116 let runtime = Arc::clone(&self.runtime);
117 let event_tx = self.event_tx.clone();
118
119 let task = tokio::spawn(async move {
120 start_one(
121 name,
122 spec,
123 own_outputs,
124 runtime,
125 node_handle,
126 dep_status_rxs,
127 dep_outputs_rxs,
128 event_tx,
129 )
130 .await
131 });
132 handles.push(task);
133 }
134
135 let mut first_error: Option<LifecycleError> = None;
136 for handle in handles {
137 match handle.await {
138 Ok(Ok(())) => {}
139 Ok(Err(err)) => {
140 if first_error.is_none() {
141 first_error = Some(err);
142 }
143 }
144 Err(join_err) => {
145 if first_error.is_none() {
146 first_error = Some(LifecycleError::Start {
147 resource: "<panicked task>".to_owned(),
148 source: RuntimeError::InvalidSpec(join_err.to_string()),
149 });
150 }
151 }
152 }
153 }
154
155 if let Some(err) = first_error {
156 warn!(error = %err, "start_all failed; rolling back");
157 let _ = self.stop_all(Duration::from_secs(10)).await;
158 return Err(err);
159 }
160
161 let _ = self.event_tx.send(LifecycleEvent::StackStarted);
162 info!(
163 "stack started: {} resource(s) healthy",
164 self.plan.nodes().len()
165 );
166 Ok(())
167 }
168
169 #[instrument(skip_all, fields(resources = self.plan.nodes().len()))]
172 pub async fn stop_all(&self, grace: Duration) -> Result<(), LifecycleError> {
173 let _ = self.event_tx.send(LifecycleEvent::StackStopping);
174
175 let mut errors: Vec<(String, RuntimeError)> = Vec::new();
176 for node in self.plan.nodes().iter().rev() {
177 let Some(handle) = self.nodes.get(&node.name) else {
178 continue;
179 };
180 let id = {
181 let guard = handle
182 .container_id
183 .lock()
184 .expect("container_id mutex poisoned");
185 guard.clone()
186 };
187 let Some(id) = id else { continue };
188 let stop_span = info_span!("stop", resource = %node.name);
189 match self.runtime.stop(&id, grace).instrument(stop_span).await {
190 Ok(()) => {
191 let _ = handle.status_tx.send(NodeStatus::Stopped);
192 let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
193 name: node.name.clone(),
194 });
195 }
196 Err(e) => errors.push((node.name.clone(), e)),
197 }
198 }
199
200 let _ = self.event_tx.send(LifecycleEvent::StackStopped);
201
202 if let Some((resource, source)) = errors.into_iter().next() {
203 return Err(LifecycleError::Stop { resource, source });
204 }
205 Ok(())
206 }
207
208 pub async fn run_until_signal(&self, grace: Duration) -> Result<(), LifecycleError> {
212 self.start_all().await?;
213 wait_for_shutdown_signal().await;
214 self.stop_all(grace).await
215 }
216
217 #[instrument(skip(self), fields(resource = %resource))]
230 pub async fn restart_one(&self, resource: &str) -> Result<(), LifecycleError> {
231 let node = self
232 .plan
233 .nodes()
234 .iter()
235 .find(|n| n.name == resource)
236 .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
237 let handle = self
238 .nodes
239 .get(resource)
240 .ok_or_else(|| LifecycleError::ResourceNotFound(resource.to_owned()))?;
241
242 let id = {
244 let guard = handle
245 .container_id
246 .lock()
247 .expect("container_id mutex poisoned");
248 guard.clone()
249 };
250 if let Some(id) = id {
251 self.runtime
252 .stop(&id, Duration::from_secs(10))
253 .await
254 .map_err(|source| LifecycleError::Stop {
255 resource: resource.to_owned(),
256 source,
257 })?;
258 *handle
259 .container_id
260 .lock()
261 .expect("container_id mutex poisoned") = None;
262 *handle.started_at.lock().expect("started_at mutex poisoned") = None;
263 let _ = handle.status_tx.send(NodeStatus::Stopped);
264 let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
265 name: resource.to_owned(),
266 });
267 }
268
269 let _ = handle.status_tx.send(NodeStatus::Pending);
271
272 let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
275 let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
276 HashMap::new();
277 for dep in &node.depends_on {
278 let dep_handle = self
279 .nodes
280 .get(dep)
281 .ok_or_else(|| LifecycleError::ResourceNotFound(dep.clone()))?;
282 dep_status_rxs.insert(dep.clone(), dep_handle.status_rx.clone());
283 dep_outputs_rxs.insert(dep.clone(), dep_handle.outputs_rx.clone());
284 }
285
286 start_one(
287 resource.to_owned(),
288 node.spec.clone(),
289 node.outputs.clone(),
290 Arc::clone(&self.runtime),
291 handle.clone(),
292 dep_status_rxs,
293 dep_outputs_rxs,
294 self.event_tx.clone(),
295 )
296 .await
297 }
298
299 #[must_use]
305 pub fn subscribe_events(&self) -> broadcast::Receiver<LifecycleEvent> {
306 self.event_tx.subscribe()
307 }
308
309 pub(super) fn plan_arc(&self) -> &Arc<LifecyclePlan> {
311 &self.plan
312 }
313
314 pub(super) fn runtime_arc(&self) -> &Arc<R> {
316 &self.runtime
317 }
318
319 pub(super) fn snapshot(&self, name: &str) -> Option<NodeSnapshot> {
322 let handle = self.nodes.get(name)?;
323 let status = handle.status_rx.borrow().clone();
324 let started_at = *handle.started_at.lock().expect("started_at mutex poisoned");
325 let container_id = handle
326 .container_id
327 .lock()
328 .expect("container_id mutex poisoned")
329 .clone();
330 Some(NodeSnapshot {
331 status,
332 started_at,
333 container_id,
334 })
335 }
336}
337
338#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
339#[instrument(name = "start", skip_all, fields(resource = %name))]
340async fn start_one<R: ContainerRuntime + 'static>(
341 name: String,
342 spec: ContainerSpec,
343 own_outputs: ResourceOutputs,
344 runtime: Arc<R>,
345 handle: NodeHandle,
346 dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>>,
347 mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>>,
348 event_tx: broadcast::Sender<LifecycleEvent>,
349) -> Result<(), LifecycleError> {
350 for (dep_name, mut rx) in dep_status_rxs {
352 loop {
353 let status = rx.borrow_and_update().clone();
354 if status.is_ready() {
355 debug!(node = %name, dep = %dep_name, "dependency ready");
356 break;
357 }
358 if let NodeStatus::Failed { reason } = status {
359 let _ = handle.status_tx.send(NodeStatus::Failed {
360 reason: format!("dependency `{dep_name}` failed: {reason}"),
361 });
362 return Err(LifecycleError::DependencyFailed {
363 resource: name,
364 dependency: dep_name,
365 reason,
366 });
367 }
368 if rx.changed().await.is_err() {
369 let reason = format!("dependency `{dep_name}` watch channel closed");
370 let _ = handle.status_tx.send(NodeStatus::Failed {
371 reason: reason.clone(),
372 });
373 return Err(LifecycleError::DependencyFailed {
374 resource: name,
375 dependency: dep_name,
376 reason,
377 });
378 }
379 }
380 }
381
382 let mut dep_outputs: HashMap<String, ResourceOutputs> = HashMap::new();
384 for (dep_name, rx) in &mut dep_outputs_rxs {
385 loop {
386 if let Some(out) = rx.borrow_and_update().clone() {
387 dep_outputs.insert(dep_name.clone(), out);
388 break;
389 }
390 if rx.changed().await.is_err() {
391 let reason = format!("dependency `{dep_name}` outputs channel closed");
392 let _ = handle.status_tx.send(NodeStatus::Failed {
393 reason: reason.clone(),
394 });
395 return Err(LifecycleError::DependencyFailed {
396 resource: name,
397 dependency: dep_name.clone(),
398 reason,
399 });
400 }
401 }
402 }
403
404 let resolved_spec = match interpolate_and_inject(spec, &dep_outputs) {
406 Ok(s) => s,
407 Err(reason) => {
408 let _ = handle.status_tx.send(NodeStatus::Failed {
409 reason: reason.clone(),
410 });
411 return Err(LifecycleError::Start {
412 resource: name,
413 source: RuntimeError::InvalidSpec(reason),
414 });
415 }
416 };
417
418 let _ = handle.status_tx.send(NodeStatus::Starting);
421 if let Err(source) = runtime.remove(&resolved_spec.name).await {
422 let _ = handle.status_tx.send(NodeStatus::Failed {
423 reason: source.to_string(),
424 });
425 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
426 name: name.clone(),
427 error: source.to_string(),
428 });
429 return Err(LifecycleError::Start {
430 resource: name,
431 source,
432 });
433 }
434
435 let id = match runtime.start(&resolved_spec).await {
437 Ok(id) => id,
438 Err(source) => {
439 let _ = handle.status_tx.send(NodeStatus::Failed {
440 reason: source.to_string(),
441 });
442 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
443 name: name.clone(),
444 error: source.to_string(),
445 });
446 return Err(LifecycleError::Start {
447 resource: name,
448 source,
449 });
450 }
451 };
452
453 {
454 let mut guard = handle
455 .container_id
456 .lock()
457 .expect("container_id mutex poisoned");
458 *guard = Some(id.clone());
459 }
460 {
461 let mut guard = handle.started_at.lock().expect("started_at mutex poisoned");
462 *guard = Some(SystemTime::now());
463 }
464 let _ = handle.status_tx.send(NodeStatus::Running);
465 let _ = event_tx.send(LifecycleEvent::ResourceStarted {
466 name: name.clone(),
467 container_id: id.to_string(),
468 });
469
470 let wait_span = info_span!("wait_healthy", resource = %name);
472 match runtime
473 .wait_healthy(&id, DEFAULT_HEALTHCHECK_TIMEOUT)
474 .instrument(wait_span)
475 .await
476 {
477 Ok(()) => {
478 let _ = handle.outputs_tx.send(Some(own_outputs));
479 let _ = handle.status_tx.send(NodeStatus::Healthy);
480 let _ = event_tx.send(LifecycleEvent::ResourceHealthy { name: name.clone() });
481 Ok(())
482 }
483 Err(RuntimeError::Timeout { .. }) => {
484 let reason = format!("healthcheck timed out after {DEFAULT_HEALTHCHECK_TIMEOUT:?}");
485 let _ = handle.status_tx.send(NodeStatus::Failed {
486 reason: reason.clone(),
487 });
488 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
489 name: name.clone(),
490 error: reason,
491 });
492 Err(LifecycleError::HealthcheckTimeout {
493 resource: name,
494 timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
495 })
496 }
497 Err(source) => {
498 let _ = handle.status_tx.send(NodeStatus::Failed {
499 reason: source.to_string(),
500 });
501 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
502 name: name.clone(),
503 error: source.to_string(),
504 });
505 Err(LifecycleError::Start {
506 resource: name,
507 source,
508 })
509 }
510 }
511}
512
513fn interpolate_and_inject(
520 mut spec: ContainerSpec,
521 dep_outputs: &HashMap<String, ResourceOutputs>,
522) -> std::result::Result<ContainerSpec, String> {
523 let mut ctx = InterpolationContext::from_env();
524 for (name, outputs) in dep_outputs {
525 ctx = ctx.with_resource(name.clone(), outputs.clone());
526 }
527 let interpolator = Interpolator::new(&ctx);
528
529 let mut resolved_env = std::collections::HashMap::with_capacity(spec.env.len());
531 for (k, v) in spec.env.drain() {
532 let resolved = interpolator.resolve(&v).map_err(|e| e.to_string())?;
533 resolved_env.insert(k, resolved);
534 }
535
536 for (dep_name, outputs) in dep_outputs {
538 let dep_upper = dep_name.to_uppercase().replace('-', "_");
539 for (prop, value) in outputs {
540 let prop_upper = prop.to_uppercase().replace('-', "_");
541 let key = format!("LSH_{dep_upper}_{prop_upper}");
542 resolved_env.entry(key).or_insert_with(|| value.clone());
543 }
544 }
545 spec.env = resolved_env;
546
547 if let Some(args) = spec.command.as_mut() {
549 for arg in args.iter_mut() {
550 *arg = interpolator.resolve(arg).map_err(|e| e.to_string())?;
551 }
552 }
553
554 Ok(spec)
555}
556
557#[cfg(unix)]
558async fn wait_for_shutdown_signal() {
559 use tokio::signal::unix::{SignalKind, signal};
560 let mut sigterm = match signal(SignalKind::terminate()) {
561 Ok(s) => s,
562 Err(e) => {
563 warn!("failed to install SIGTERM handler: {e}");
564 let _ = tokio::signal::ctrl_c().await;
565 return;
566 }
567 };
568 tokio::select! {
569 _ = tokio::signal::ctrl_c() => info!("received SIGINT"),
570 _ = sigterm.recv() => info!("received SIGTERM"),
571 }
572}
573
574#[cfg(windows)]
575async fn wait_for_shutdown_signal() {
576 let _ = tokio::signal::ctrl_c().await;
577 info!("received Ctrl+C");
578}