1use async_trait::async_trait;
2use bollard::container::{
3 Config, CreateContainerOptions, InspectContainerOptions, ListContainersOptions, LogOutput,
4 LogsOptions, RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
5 WaitContainerOptions,
6};
7use bollard::image::CreateImageOptions;
8use bollard::models::{
9 ContainerCreateResponse, ContainerInspectResponse, ContainerSummary, ContainerWaitResponse,
10 CreateImageInfo, Volume, VolumeListResponse,
11};
12use bollard::volume::{CreateVolumeOptions, ListVolumesOptions, RemoveVolumeOptions};
13use bollard::Docker;
14use bytes::Bytes;
15use futures::{stream::BoxStream, StreamExt};
16
17#[cfg(test)]
18use mockall::predicate::*;
19
20#[cfg(test)]
21mockall::mock! {
22 pub ContainerRuntime {}
23
24 #[async_trait]
25 impl ContainerRuntime for ContainerRuntime {
26 async fn create_container(
27 &self,
28 options: Option<CreateContainerOptions<String>>,
29 config: Config<String>,
30 ) -> Result<ContainerCreateResponse, bollard::errors::Error>;
31
32 async fn start_container(
33 &self,
34 container_name: &str,
35 options: Option<StartContainerOptions<String>>,
36 ) -> Result<(), bollard::errors::Error>;
37
38 async fn stop_container(
39 &self,
40 container_name: &str,
41 options: Option<StopContainerOptions>,
42 ) -> Result<(), bollard::errors::Error>;
43
44 async fn remove_container(
45 &self,
46 container_name: &str,
47 options: Option<RemoveContainerOptions>,
48 ) -> Result<(), bollard::errors::Error>;
49
50 fn wait_container(
51 &self,
52 container_name: &str,
53 options: Option<WaitContainerOptions<String>>,
54 ) -> BoxStream<'static, Result<ContainerWaitResponse, bollard::errors::Error>>;
55
56 async fn create_volume(
57 &self,
58 config: CreateVolumeOptions<String>,
59 ) -> Result<Volume, bollard::errors::Error>;
60
61 async fn remove_volume(
62 &self,
63 name: &str,
64 options: Option<RemoveVolumeOptions>,
65 ) -> Result<(), bollard::errors::Error>;
66
67 fn create_image(
68 &self,
69 options: Option<CreateImageOptions<'static, String>>,
70 root_fs: Option<Bytes>,
71 credentials: Option<bollard::auth::DockerCredentials>,
72 ) -> BoxStream<'static, Result<CreateImageInfo, bollard::errors::Error>>;
73
74 fn logs(
75 &self,
76 container_name: &str,
77 options: Option<LogsOptions<String>>,
78 ) -> BoxStream<'static, Result<LogOutput, bollard::errors::Error>>;
79
80 async fn inspect_container(
81 &self,
82 container_name: &str,
83 options: Option<InspectContainerOptions>,
84 ) -> Result<ContainerInspectResponse, bollard::errors::Error>;
85
86 async fn list_containers(
87 &self,
88 options: Option<ListContainersOptions<String>>,
89 ) -> Result<Vec<ContainerSummary>, bollard::errors::Error>;
90
91 async fn list_volumes(
92 &self,
93 options: Option<ListVolumesOptions<String>>,
94 ) -> Result<VolumeListResponse, bollard::errors::Error>;
95 }
96
97 impl Clone for ContainerRuntime {
98 fn clone(&self) -> Self;
99 }
100}
101
102#[async_trait]
104pub trait ContainerRuntime: Send + Sync + Clone + 'static {
105 async fn create_container(
107 &self,
108 options: Option<CreateContainerOptions<String>>,
109 config: Config<String>,
110 ) -> Result<ContainerCreateResponse, bollard::errors::Error>;
111
112 async fn start_container(
114 &self,
115 container_name: &str,
116 options: Option<StartContainerOptions<String>>,
117 ) -> Result<(), bollard::errors::Error>;
118
119 async fn stop_container(
121 &self,
122 container_name: &str,
123 options: Option<StopContainerOptions>,
124 ) -> Result<(), bollard::errors::Error>;
125
126 async fn remove_container(
128 &self,
129 container_name: &str,
130 options: Option<RemoveContainerOptions>,
131 ) -> Result<(), bollard::errors::Error>;
132
133 fn wait_container(
135 &self,
136 container_name: &str,
137 options: Option<WaitContainerOptions<String>>,
138 ) -> BoxStream<'static, Result<ContainerWaitResponse, bollard::errors::Error>>;
139
140 async fn create_volume(
142 &self,
143 config: CreateVolumeOptions<String>,
144 ) -> Result<Volume, bollard::errors::Error>;
145
146 async fn remove_volume(
148 &self,
149 name: &str,
150 options: Option<RemoveVolumeOptions>,
151 ) -> Result<(), bollard::errors::Error>;
152
153 fn create_image(
155 &self,
156 options: Option<CreateImageOptions<'static, String>>,
157 root_fs: Option<Bytes>,
158 credentials: Option<bollard::auth::DockerCredentials>,
159 ) -> BoxStream<'static, Result<CreateImageInfo, bollard::errors::Error>>;
160
161 fn logs(
163 &self,
164 container_name: &str,
165 options: Option<LogsOptions<String>>,
166 ) -> BoxStream<'static, Result<LogOutput, bollard::errors::Error>>;
167
168 async fn inspect_container(
170 &self,
171 container_name: &str,
172 options: Option<InspectContainerOptions>,
173 ) -> Result<ContainerInspectResponse, bollard::errors::Error>;
174
175 async fn list_containers(
177 &self,
178 options: Option<ListContainersOptions<String>>,
179 ) -> Result<Vec<ContainerSummary>, bollard::errors::Error>;
180
181 async fn list_volumes(
183 &self,
184 options: Option<ListVolumesOptions<String>>,
185 ) -> Result<VolumeListResponse, bollard::errors::Error>;
186}
187
188#[async_trait]
189impl ContainerRuntime for Docker {
190 async fn create_container(
191 &self,
192 options: Option<CreateContainerOptions<String>>,
193 config: Config<String>,
194 ) -> Result<ContainerCreateResponse, bollard::errors::Error> {
195 self.create_container(options, config).await
196 }
197
198 async fn start_container(
199 &self,
200 container_name: &str,
201 options: Option<StartContainerOptions<String>>,
202 ) -> Result<(), bollard::errors::Error> {
203 self.start_container(container_name, options).await
204 }
205
206 async fn stop_container(
207 &self,
208 container_name: &str,
209 options: Option<StopContainerOptions>,
210 ) -> Result<(), bollard::errors::Error> {
211 self.stop_container(container_name, options).await
212 }
213
214 async fn remove_container(
215 &self,
216 container_name: &str,
217 options: Option<RemoveContainerOptions>,
218 ) -> Result<(), bollard::errors::Error> {
219 self.remove_container(container_name, options).await
220 }
221
222 fn wait_container(
223 &self,
224 container_name: &str,
225 options: Option<WaitContainerOptions<String>>,
226 ) -> BoxStream<'static, Result<ContainerWaitResponse, bollard::errors::Error>> {
227 self.wait_container(container_name, options).boxed()
228 }
229
230 async fn create_volume(
231 &self,
232 config: CreateVolumeOptions<String>,
233 ) -> Result<Volume, bollard::errors::Error> {
234 self.create_volume(config).await
235 }
236
237 async fn remove_volume(
238 &self,
239 name: &str,
240 options: Option<RemoveVolumeOptions>,
241 ) -> Result<(), bollard::errors::Error> {
242 self.remove_volume(name, options).await
243 }
244
245 fn create_image(
246 &self,
247 options: Option<CreateImageOptions<'static, String>>,
248 root_fs: Option<Bytes>,
249 credentials: Option<bollard::auth::DockerCredentials>,
250 ) -> BoxStream<'static, Result<CreateImageInfo, bollard::errors::Error>> {
251 self.create_image(options, root_fs, credentials).boxed()
252 }
253
254 fn logs(
255 &self,
256 container_name: &str,
257 options: Option<LogsOptions<String>>,
258 ) -> BoxStream<'static, Result<LogOutput, bollard::errors::Error>> {
259 self.logs(container_name, options).boxed()
260 }
261
262 async fn inspect_container(
263 &self,
264 container_name: &str,
265 options: Option<InspectContainerOptions>,
266 ) -> Result<ContainerInspectResponse, bollard::errors::Error> {
267 self.inspect_container(container_name, options).await
268 }
269
270 async fn list_containers(
271 &self,
272 options: Option<ListContainersOptions<String>>,
273 ) -> Result<Vec<ContainerSummary>, bollard::errors::Error> {
274 self.list_containers(options).await
275 }
276
277 async fn list_volumes(
278 &self,
279 options: Option<ListVolumesOptions<String>>,
280 ) -> Result<VolumeListResponse, bollard::errors::Error> {
281 self.list_volumes(options).await
282 }
283}
284
285#[cfg(test)]
286mockall::mock! {
287 pub MessageBus {}
288
289 #[async_trait]
290 impl MessageBus for MessageBus {
291 async fn publish(
292 &self,
293 subject: String,
294 payload: Bytes,
295 ) -> Result<(), async_nats::PublishError>;
296 async fn request(
297 &self,
298 subject: String,
299 payload: Bytes,
300 ) -> Result<async_nats::Message, async_nats::RequestError>;
301 }
302
303 impl Clone for MessageBus {
304 fn clone(&self) -> Self;
305 }
306}
307
308#[async_trait]
310pub trait MessageBus: Send + Sync + Clone + 'static {
311 async fn publish(
313 &self,
314 subject: String,
315 payload: Bytes,
316 ) -> Result<(), async_nats::PublishError>;
317 async fn request(
319 &self,
320 subject: String,
321 payload: Bytes,
322 ) -> Result<async_nats::Message, async_nats::RequestError>;
323 }
326
327#[async_trait]
328impl MessageBus for async_nats::Client {
329 async fn publish(
330 &self,
331 subject: String,
332 payload: Bytes,
333 ) -> Result<(), async_nats::PublishError> {
334 self.publish(subject, payload).await
335 }
336 async fn request(
337 &self,
338 subject: String,
339 payload: Bytes,
340 ) -> Result<async_nats::Message, async_nats::RequestError> {
341 self.request(subject, payload).await
342 }
343}