Skip to main content

lightshuttle_runtime/
testkit.rs

1//! Test helpers for downstream crates and integration tests.
2//!
3//! Provides an in-memory [`MockRuntime`] that satisfies
4//! [`crate::ContainerRuntime`] without any Docker daemon. Containers
5//! become healthy a short, deterministic delay after start, unless the
6//! caller flags a specific resource as a failure target.
7//!
8//! `MockRuntime` is intentionally cheap to clone: every internal field
9//! is an [`Arc<Mutex<_>>`], so a test can hold an observer clone for
10//! introspection after the manager has consumed the original instance.
11
12use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use futures::stream::{Stream, StreamExt};
18
19use crate::error::RuntimeError;
20use crate::runtime::{ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream};
21use lightshuttle_spec::ContainerSpec;
22
23/// In-memory [`ContainerRuntime`] for tests.
24///
25/// Every container becomes [`ContainerStatus::Healthy`] 30 ms after
26/// `start`, unless its name is configured as a failure target via
27/// [`MockRuntime::fail_on`].
28#[derive(Clone)]
29pub struct MockRuntime {
30    state: Arc<Mutex<HashMap<String, MockContainer>>>,
31    fail_on: Arc<Mutex<Option<String>>>,
32    start_order: Arc<Mutex<Vec<String>>>,
33    stop_order: Arc<Mutex<Vec<String>>>,
34    started_specs: Arc<Mutex<Vec<ContainerSpec>>>,
35}
36
37struct MockContainer {
38    name: String,
39    status: ContainerStatus,
40    started_at: Instant,
41    healthy_after: Duration,
42}
43
44impl MockRuntime {
45    /// Build a fresh runtime with empty state.
46    #[must_use]
47    pub fn new() -> Self {
48        Self {
49            state: Arc::new(Mutex::new(HashMap::new())),
50            fail_on: Arc::new(Mutex::new(None)),
51            start_order: Arc::new(Mutex::new(Vec::new())),
52            stop_order: Arc::new(Mutex::new(Vec::new())),
53            started_specs: Arc::new(Mutex::new(Vec::new())),
54        }
55    }
56
57    /// Configure the runtime to reject `start` for the resource whose
58    /// `ContainerSpec.name` equals `name`.
59    pub fn fail_on(&self, name: &str) {
60        *self.fail_on.lock().expect("fail_on mutex poisoned") = Some(name.to_owned());
61    }
62
63    /// Snapshot of the resource names in start order.
64    #[must_use]
65    pub fn started_resources(&self) -> Vec<String> {
66        self.start_order
67            .lock()
68            .expect("start_order mutex poisoned")
69            .clone()
70    }
71
72    /// Snapshot of the resource names in stop order.
73    #[must_use]
74    pub fn stopped_resources(&self) -> Vec<String> {
75        self.stop_order
76            .lock()
77            .expect("stop_order mutex poisoned")
78            .clone()
79    }
80
81    /// Snapshot of every container spec the runtime has accepted.
82    #[must_use]
83    pub fn started_specs(&self) -> Vec<ContainerSpec> {
84        self.started_specs
85            .lock()
86            .expect("started_specs mutex poisoned")
87            .clone()
88    }
89}
90
91impl Default for MockRuntime {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl ContainerRuntime for MockRuntime {
98    async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId, RuntimeError> {
99        if self
100            .fail_on
101            .lock()
102            .expect("fail_on mutex poisoned")
103            .as_deref()
104            == Some(spec.name.as_str())
105        {
106            return Err(RuntimeError::InvalidSpec(format!(
107                "mock failure for `{}`",
108                spec.name
109            )));
110        }
111        let id = ContainerId::new(format!("mock-{}", spec.name));
112        if self
113            .state
114            .lock()
115            .expect("state mutex poisoned")
116            .contains_key(id.as_str())
117        {
118            return Err(RuntimeError::InvalidSpec(format!(
119                "container name `{}` already in use",
120                spec.name
121            )));
122        }
123        self.start_order
124            .lock()
125            .expect("start_order mutex poisoned")
126            .push(spec.name.clone());
127        self.started_specs
128            .lock()
129            .expect("started_specs mutex poisoned")
130            .push(spec.clone());
131        self.state.lock().expect("state mutex poisoned").insert(
132            id.as_str().to_owned(),
133            MockContainer {
134                name: spec.name.clone(),
135                status: ContainerStatus::Starting,
136                started_at: Instant::now(),
137                healthy_after: Duration::from_millis(30),
138            },
139        );
140        Ok(id)
141    }
142
143    async fn stop(&self, id: &ContainerId, _grace: Duration) -> Result<(), RuntimeError> {
144        let mut state = self.state.lock().expect("state mutex poisoned");
145        if let Some(c) = state.get_mut(id.as_str()) {
146            c.status = ContainerStatus::Stopped { exit_code: Some(0) };
147            self.stop_order
148                .lock()
149                .expect("stop_order mutex poisoned")
150                .push(c.name.clone());
151        }
152        Ok(())
153    }
154
155    async fn remove(&self, name: &str) -> Result<(), RuntimeError> {
156        self.state
157            .lock()
158            .expect("state mutex poisoned")
159            .remove(&format!("mock-{name}"));
160        Ok(())
161    }
162
163    async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus, RuntimeError> {
164        let state = self.state.lock().expect("state mutex poisoned");
165        let c = state
166            .get(id.as_str())
167            .ok_or_else(|| RuntimeError::NotFound(id.as_str().to_owned()))?;
168        Ok(c.status.clone())
169    }
170
171    async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<(), RuntimeError> {
172        let start = Instant::now();
173        while start.elapsed() < timeout {
174            {
175                let mut state = self.state.lock().expect("state mutex poisoned");
176                if let Some(c) = state.get_mut(id.as_str())
177                    && c.started_at.elapsed() >= c.healthy_after
178                {
179                    c.status = ContainerStatus::Healthy;
180                    return Ok(());
181                }
182            }
183            tokio::time::sleep(Duration::from_millis(10)).await;
184        }
185        Err(RuntimeError::Timeout {
186            operation: "wait_healthy",
187            after: timeout,
188        })
189    }
190
191    async fn logs(&self, _id: &ContainerId, _follow: bool) -> Result<LogChunkStream, RuntimeError> {
192        let empty: Pin<Box<dyn Stream<Item = Result<LogChunk, RuntimeError>> + Send>> =
193            Box::pin(futures::stream::empty::<Result<LogChunk, RuntimeError>>().map(|x| x));
194        Ok(empty)
195    }
196}