lightshuttle_runtime/lifecycle/
manager.rs1use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8use lightshuttle_manifest::{InterpolationContext, Interpolator};
9use tokio::sync::{mpsc, watch};
10use tracing::{debug, info, warn};
11
12use crate::error::RuntimeError;
13use crate::lifecycle::error::LifecycleError;
14use crate::lifecycle::plan::LifecyclePlan;
15use crate::lifecycle::status::{LifecycleEvent, NodeStatus};
16use crate::runtime::{ContainerId, ContainerRuntime};
17use crate::spec::{ContainerSpec, ResourceOutputs};
18
19const DEFAULT_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(60);
22
23#[derive(Clone)]
25struct NodeHandle {
26 status_tx: Arc<watch::Sender<NodeStatus>>,
27 status_rx: watch::Receiver<NodeStatus>,
28 outputs_tx: Arc<watch::Sender<Option<ResourceOutputs>>>,
29 outputs_rx: watch::Receiver<Option<ResourceOutputs>>,
30 container_id: Arc<Mutex<Option<ContainerId>>>,
31}
32
33pub struct LifecycleManager<R: ContainerRuntime + 'static> {
36 plan: Arc<LifecyclePlan>,
37 runtime: Arc<R>,
38 nodes: HashMap<String, NodeHandle>,
39 event_tx: mpsc::UnboundedSender<LifecycleEvent>,
40}
41
42impl<R: ContainerRuntime + 'static> LifecycleManager<R> {
43 #[must_use]
46 pub fn new(plan: LifecyclePlan, runtime: R) -> (Self, mpsc::UnboundedReceiver<LifecycleEvent>) {
47 let (event_tx, event_rx) = mpsc::unbounded_channel();
48 let mut nodes: HashMap<String, NodeHandle> = HashMap::new();
49 for node in plan.nodes() {
50 let (status_tx, status_rx) = watch::channel(NodeStatus::Pending);
51 let (outputs_tx, outputs_rx) = watch::channel(None);
52 nodes.insert(
53 node.name.clone(),
54 NodeHandle {
55 status_tx: Arc::new(status_tx),
56 status_rx,
57 outputs_tx: Arc::new(outputs_tx),
58 outputs_rx,
59 container_id: Arc::new(Mutex::new(None)),
60 },
61 );
62 }
63 let manager = Self {
64 plan: Arc::new(plan),
65 runtime: Arc::new(runtime),
66 nodes,
67 event_tx,
68 };
69 (manager, event_rx)
70 }
71
72 pub async fn start_all(&self) -> Result<(), LifecycleError> {
77 let mut handles: Vec<tokio::task::JoinHandle<Result<(), LifecycleError>>> =
78 Vec::with_capacity(self.plan.nodes().len());
79
80 for node in self.plan.nodes() {
81 let mut dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>> = HashMap::new();
82 let mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>> =
83 HashMap::new();
84 for dep in &node.depends_on {
85 let handle = self
86 .nodes
87 .get(dep)
88 .ok_or_else(|| LifecycleError::UnknownResource(dep.clone()))?;
89 dep_status_rxs.insert(dep.clone(), handle.status_rx.clone());
90 dep_outputs_rxs.insert(dep.clone(), handle.outputs_rx.clone());
91 }
92
93 let node_handle = self.nodes[&node.name].clone();
94 let spec = node.spec.clone();
95 let own_outputs = node.outputs.clone();
96 let name = node.name.clone();
97 let runtime = Arc::clone(&self.runtime);
98 let event_tx = self.event_tx.clone();
99
100 let task = tokio::spawn(async move {
101 start_one(
102 name,
103 spec,
104 own_outputs,
105 runtime,
106 node_handle,
107 dep_status_rxs,
108 dep_outputs_rxs,
109 event_tx,
110 )
111 .await
112 });
113 handles.push(task);
114 }
115
116 let mut first_error: Option<LifecycleError> = None;
117 for handle in handles {
118 match handle.await {
119 Ok(Ok(())) => {}
120 Ok(Err(err)) => {
121 if first_error.is_none() {
122 first_error = Some(err);
123 }
124 }
125 Err(join_err) => {
126 if first_error.is_none() {
127 first_error = Some(LifecycleError::Start {
128 resource: "<panicked task>".to_owned(),
129 source: RuntimeError::InvalidSpec(join_err.to_string()),
130 });
131 }
132 }
133 }
134 }
135
136 if let Some(err) = first_error {
137 warn!(error = %err, "start_all failed; rolling back");
138 let _ = self.stop_all(Duration::from_secs(10)).await;
139 return Err(err);
140 }
141
142 let _ = self.event_tx.send(LifecycleEvent::StackStarted);
143 info!(
144 "stack started: {} resource(s) healthy",
145 self.plan.nodes().len()
146 );
147 Ok(())
148 }
149
150 pub async fn stop_all(&self, grace: Duration) -> Result<(), LifecycleError> {
153 let _ = self.event_tx.send(LifecycleEvent::StackStopping);
154
155 let mut errors: Vec<(String, RuntimeError)> = Vec::new();
156 for node in self.plan.nodes().iter().rev() {
157 let Some(handle) = self.nodes.get(&node.name) else {
158 continue;
159 };
160 let id = {
161 let guard = handle
162 .container_id
163 .lock()
164 .expect("container_id mutex poisoned");
165 guard.clone()
166 };
167 let Some(id) = id else { continue };
168 match self.runtime.stop(&id, grace).await {
169 Ok(()) => {
170 let _ = handle.status_tx.send(NodeStatus::Stopped);
171 let _ = self.event_tx.send(LifecycleEvent::ResourceStopped {
172 name: node.name.clone(),
173 });
174 }
175 Err(e) => errors.push((node.name.clone(), e)),
176 }
177 }
178
179 let _ = self.event_tx.send(LifecycleEvent::StackStopped);
180
181 if let Some((resource, source)) = errors.into_iter().next() {
182 return Err(LifecycleError::Stop { resource, source });
183 }
184 Ok(())
185 }
186
187 pub async fn run_until_signal(&self, grace: Duration) -> Result<(), LifecycleError> {
191 self.start_all().await?;
192 wait_for_shutdown_signal().await;
193 self.stop_all(grace).await
194 }
195}
196
197#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
198async fn start_one<R: ContainerRuntime + 'static>(
199 name: String,
200 spec: ContainerSpec,
201 own_outputs: ResourceOutputs,
202 runtime: Arc<R>,
203 handle: NodeHandle,
204 dep_status_rxs: HashMap<String, watch::Receiver<NodeStatus>>,
205 mut dep_outputs_rxs: HashMap<String, watch::Receiver<Option<ResourceOutputs>>>,
206 event_tx: mpsc::UnboundedSender<LifecycleEvent>,
207) -> Result<(), LifecycleError> {
208 for (dep_name, mut rx) in dep_status_rxs {
210 loop {
211 let status = rx.borrow_and_update().clone();
212 if status.is_ready() {
213 debug!(node = %name, dep = %dep_name, "dependency ready");
214 break;
215 }
216 if let NodeStatus::Failed { reason } = status {
217 let _ = handle.status_tx.send(NodeStatus::Failed {
218 reason: format!("dependency `{dep_name}` failed: {reason}"),
219 });
220 return Err(LifecycleError::DependencyFailed {
221 resource: name,
222 dependency: dep_name,
223 reason,
224 });
225 }
226 if rx.changed().await.is_err() {
227 let reason = format!("dependency `{dep_name}` watch channel closed");
228 let _ = handle.status_tx.send(NodeStatus::Failed {
229 reason: reason.clone(),
230 });
231 return Err(LifecycleError::DependencyFailed {
232 resource: name,
233 dependency: dep_name,
234 reason,
235 });
236 }
237 }
238 }
239
240 let mut dep_outputs: HashMap<String, ResourceOutputs> = HashMap::new();
242 for (dep_name, rx) in &mut dep_outputs_rxs {
243 loop {
244 if let Some(out) = rx.borrow_and_update().clone() {
245 dep_outputs.insert(dep_name.clone(), out);
246 break;
247 }
248 if rx.changed().await.is_err() {
249 let reason = format!("dependency `{dep_name}` outputs channel closed");
250 let _ = handle.status_tx.send(NodeStatus::Failed {
251 reason: reason.clone(),
252 });
253 return Err(LifecycleError::DependencyFailed {
254 resource: name,
255 dependency: dep_name.clone(),
256 reason,
257 });
258 }
259 }
260 }
261
262 let resolved_spec = match interpolate_and_inject(spec, &dep_outputs) {
264 Ok(s) => s,
265 Err(reason) => {
266 let _ = handle.status_tx.send(NodeStatus::Failed {
267 reason: reason.clone(),
268 });
269 return Err(LifecycleError::Start {
270 resource: name,
271 source: RuntimeError::InvalidSpec(reason),
272 });
273 }
274 };
275
276 let _ = handle.status_tx.send(NodeStatus::Starting);
278 let id = match runtime.start(&resolved_spec).await {
279 Ok(id) => id,
280 Err(source) => {
281 let _ = handle.status_tx.send(NodeStatus::Failed {
282 reason: source.to_string(),
283 });
284 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
285 name: name.clone(),
286 error: source.to_string(),
287 });
288 return Err(LifecycleError::Start {
289 resource: name,
290 source,
291 });
292 }
293 };
294
295 {
296 let mut guard = handle
297 .container_id
298 .lock()
299 .expect("container_id mutex poisoned");
300 *guard = Some(id.clone());
301 }
302 let _ = handle.status_tx.send(NodeStatus::Running);
303 let _ = event_tx.send(LifecycleEvent::ResourceStarted {
304 name: name.clone(),
305 container_id: id.to_string(),
306 });
307
308 match runtime.wait_healthy(&id, DEFAULT_HEALTHCHECK_TIMEOUT).await {
310 Ok(()) => {
311 let _ = handle.outputs_tx.send(Some(own_outputs));
312 let _ = handle.status_tx.send(NodeStatus::Healthy);
313 let _ = event_tx.send(LifecycleEvent::ResourceHealthy { name: name.clone() });
314 Ok(())
315 }
316 Err(RuntimeError::Timeout { .. }) => {
317 let reason = format!("healthcheck timed out after {DEFAULT_HEALTHCHECK_TIMEOUT:?}");
318 let _ = handle.status_tx.send(NodeStatus::Failed {
319 reason: reason.clone(),
320 });
321 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
322 name: name.clone(),
323 error: reason,
324 });
325 Err(LifecycleError::HealthcheckTimeout {
326 resource: name,
327 timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
328 })
329 }
330 Err(source) => {
331 let _ = handle.status_tx.send(NodeStatus::Failed {
332 reason: source.to_string(),
333 });
334 let _ = event_tx.send(LifecycleEvent::ResourceFailed {
335 name: name.clone(),
336 error: source.to_string(),
337 });
338 Err(LifecycleError::Start {
339 resource: name,
340 source,
341 })
342 }
343 }
344}
345
346fn interpolate_and_inject(
353 mut spec: ContainerSpec,
354 dep_outputs: &HashMap<String, ResourceOutputs>,
355) -> std::result::Result<ContainerSpec, String> {
356 let mut ctx = InterpolationContext::from_env();
357 for (name, outputs) in dep_outputs {
358 ctx = ctx.with_resource(name.clone(), outputs.clone());
359 }
360 let interpolator = Interpolator::new(&ctx);
361
362 let mut resolved_env = std::collections::HashMap::with_capacity(spec.env.len());
364 for (k, v) in spec.env.drain() {
365 let resolved = interpolator.resolve(&v).map_err(|e| e.to_string())?;
366 resolved_env.insert(k, resolved);
367 }
368
369 for (dep_name, outputs) in dep_outputs {
371 let dep_upper = dep_name.to_uppercase().replace('-', "_");
372 for (prop, value) in outputs {
373 let prop_upper = prop.to_uppercase().replace('-', "_");
374 let key = format!("LSH_{dep_upper}_{prop_upper}");
375 resolved_env.entry(key).or_insert_with(|| value.clone());
376 }
377 }
378 spec.env = resolved_env;
379
380 if let Some(args) = spec.command.as_mut() {
382 for arg in args.iter_mut() {
383 *arg = interpolator.resolve(arg).map_err(|e| e.to_string())?;
384 }
385 }
386
387 Ok(spec)
388}
389
390#[cfg(unix)]
391async fn wait_for_shutdown_signal() {
392 use tokio::signal::unix::{SignalKind, signal};
393 let mut sigterm = match signal(SignalKind::terminate()) {
394 Ok(s) => s,
395 Err(e) => {
396 warn!("failed to install SIGTERM handler: {e}");
397 let _ = tokio::signal::ctrl_c().await;
398 return;
399 }
400 };
401 tokio::select! {
402 _ = tokio::signal::ctrl_c() => info!("received SIGINT"),
403 _ = sigterm.recv() => info!("received SIGTERM"),
404 }
405}
406
407#[cfg(windows)]
408async fn wait_for_shutdown_signal() {
409 let _ = tokio::signal::ctrl_c().await;
410 info!("received Ctrl+C");
411}