lightshuttle_runtime/
testkit.rs1use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use futures::stream::{Stream, StreamExt};
18
19use crate::error::RuntimeError;
20use crate::runtime::{ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream};
21use crate::spec::ContainerSpec;
22
23#[derive(Clone)]
29pub struct MockRuntime {
30 state: Arc<Mutex<HashMap<String, MockContainer>>>,
31 fail_on: Arc<Mutex<Option<String>>>,
32 start_order: Arc<Mutex<Vec<String>>>,
33 stop_order: Arc<Mutex<Vec<String>>>,
34 started_specs: Arc<Mutex<Vec<ContainerSpec>>>,
35}
36
37struct MockContainer {
38 name: String,
39 status: ContainerStatus,
40 started_at: Instant,
41 healthy_after: Duration,
42}
43
44impl MockRuntime {
45 #[must_use]
47 pub fn new() -> Self {
48 Self {
49 state: Arc::new(Mutex::new(HashMap::new())),
50 fail_on: Arc::new(Mutex::new(None)),
51 start_order: Arc::new(Mutex::new(Vec::new())),
52 stop_order: Arc::new(Mutex::new(Vec::new())),
53 started_specs: Arc::new(Mutex::new(Vec::new())),
54 }
55 }
56
57 pub fn fail_on(&self, name: &str) {
60 *self.fail_on.lock().expect("fail_on mutex poisoned") = Some(name.to_owned());
61 }
62
63 #[must_use]
65 pub fn started_resources(&self) -> Vec<String> {
66 self.start_order
67 .lock()
68 .expect("start_order mutex poisoned")
69 .clone()
70 }
71
72 #[must_use]
74 pub fn stopped_resources(&self) -> Vec<String> {
75 self.stop_order
76 .lock()
77 .expect("stop_order mutex poisoned")
78 .clone()
79 }
80
81 #[must_use]
83 pub fn started_specs(&self) -> Vec<ContainerSpec> {
84 self.started_specs
85 .lock()
86 .expect("started_specs mutex poisoned")
87 .clone()
88 }
89}
90
91impl Default for MockRuntime {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl ContainerRuntime for MockRuntime {
98 async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId, RuntimeError> {
99 if self
100 .fail_on
101 .lock()
102 .expect("fail_on mutex poisoned")
103 .as_deref()
104 == Some(spec.name.as_str())
105 {
106 return Err(RuntimeError::InvalidSpec(format!(
107 "mock failure for `{}`",
108 spec.name
109 )));
110 }
111 let id = ContainerId::new(format!("mock-{}", spec.name));
112 if self
113 .state
114 .lock()
115 .expect("state mutex poisoned")
116 .contains_key(id.as_str())
117 {
118 return Err(RuntimeError::InvalidSpec(format!(
119 "container name `{}` already in use",
120 spec.name
121 )));
122 }
123 self.start_order
124 .lock()
125 .expect("start_order mutex poisoned")
126 .push(spec.name.clone());
127 self.started_specs
128 .lock()
129 .expect("started_specs mutex poisoned")
130 .push(spec.clone());
131 self.state.lock().expect("state mutex poisoned").insert(
132 id.as_str().to_owned(),
133 MockContainer {
134 name: spec.name.clone(),
135 status: ContainerStatus::Starting,
136 started_at: Instant::now(),
137 healthy_after: Duration::from_millis(30),
138 },
139 );
140 Ok(id)
141 }
142
143 async fn stop(&self, id: &ContainerId, _grace: Duration) -> Result<(), RuntimeError> {
144 let mut state = self.state.lock().expect("state mutex poisoned");
145 if let Some(c) = state.get_mut(id.as_str()) {
146 c.status = ContainerStatus::Stopped { exit_code: Some(0) };
147 self.stop_order
148 .lock()
149 .expect("stop_order mutex poisoned")
150 .push(c.name.clone());
151 }
152 Ok(())
153 }
154
155 async fn remove(&self, name: &str) -> Result<(), RuntimeError> {
156 self.state
157 .lock()
158 .expect("state mutex poisoned")
159 .remove(&format!("mock-{name}"));
160 Ok(())
161 }
162
163 async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus, RuntimeError> {
164 let state = self.state.lock().expect("state mutex poisoned");
165 let c = state
166 .get(id.as_str())
167 .ok_or_else(|| RuntimeError::NotFound(id.as_str().to_owned()))?;
168 Ok(c.status.clone())
169 }
170
171 async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<(), RuntimeError> {
172 let start = Instant::now();
173 while start.elapsed() < timeout {
174 {
175 let mut state = self.state.lock().expect("state mutex poisoned");
176 if let Some(c) = state.get_mut(id.as_str())
177 && c.started_at.elapsed() >= c.healthy_after
178 {
179 c.status = ContainerStatus::Healthy;
180 return Ok(());
181 }
182 }
183 tokio::time::sleep(Duration::from_millis(10)).await;
184 }
185 Err(RuntimeError::Timeout {
186 operation: "wait_healthy",
187 after: timeout,
188 })
189 }
190
191 async fn logs(&self, _id: &ContainerId, _follow: bool) -> Result<LogChunkStream, RuntimeError> {
192 let empty: Pin<Box<dyn Stream<Item = Result<LogChunk, RuntimeError>> + Send>> =
193 Box::pin(futures::stream::empty::<Result<LogChunk, RuntimeError>>().map(|x| x));
194 Ok(empty)
195 }
196}