Skip to main content

stormchaser_runner_docker/
traits.rs

1use async_trait::async_trait;
2use bollard::container::{
3    Config, CreateContainerOptions, InspectContainerOptions, ListContainersOptions, LogOutput,
4    LogsOptions, RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
5    WaitContainerOptions,
6};
7use bollard::image::CreateImageOptions;
8use bollard::models::{
9    ContainerCreateResponse, ContainerInspectResponse, ContainerSummary, ContainerWaitResponse,
10    CreateImageInfo, Volume, VolumeListResponse,
11};
12use bollard::volume::{CreateVolumeOptions, ListVolumesOptions, RemoveVolumeOptions};
13use bollard::Docker;
14use bytes::Bytes;
15use futures::{stream::BoxStream, StreamExt};
16
17#[cfg(test)]
18use mockall::predicate::*;
19
20#[cfg(test)]
21mockall::mock! {
22    pub ContainerRuntime {}
23
24    #[async_trait]
25    impl ContainerRuntime for ContainerRuntime {
26        async fn create_container(
27            &self,
28            options: Option<CreateContainerOptions<String>>,
29            config: Config<String>,
30        ) -> Result<ContainerCreateResponse, bollard::errors::Error>;
31
32        async fn start_container(
33            &self,
34            container_name: &str,
35            options: Option<StartContainerOptions<String>>,
36        ) -> Result<(), bollard::errors::Error>;
37
38        async fn stop_container(
39            &self,
40            container_name: &str,
41            options: Option<StopContainerOptions>,
42        ) -> Result<(), bollard::errors::Error>;
43
44        async fn remove_container(
45            &self,
46            container_name: &str,
47            options: Option<RemoveContainerOptions>,
48        ) -> Result<(), bollard::errors::Error>;
49
50        fn wait_container(
51            &self,
52            container_name: &str,
53            options: Option<WaitContainerOptions<String>>,
54        ) -> BoxStream<'static, Result<ContainerWaitResponse, bollard::errors::Error>>;
55
56        async fn create_volume(
57            &self,
58            config: CreateVolumeOptions<String>,
59        ) -> Result<Volume, bollard::errors::Error>;
60
61        async fn remove_volume(
62            &self,
63            name: &str,
64            options: Option<RemoveVolumeOptions>,
65        ) -> Result<(), bollard::errors::Error>;
66
67        fn create_image(
68            &self,
69            options: Option<CreateImageOptions<'static, String>>,
70            root_fs: Option<Bytes>,
71            credentials: Option<bollard::auth::DockerCredentials>,
72        ) -> BoxStream<'static, Result<CreateImageInfo, bollard::errors::Error>>;
73
74        fn logs(
75            &self,
76            container_name: &str,
77            options: Option<LogsOptions<String>>,
78        ) -> BoxStream<'static, Result<LogOutput, bollard::errors::Error>>;
79
80        async fn inspect_container(
81            &self,
82            container_name: &str,
83            options: Option<InspectContainerOptions>,
84        ) -> Result<ContainerInspectResponse, bollard::errors::Error>;
85
86        async fn list_containers(
87            &self,
88            options: Option<ListContainersOptions<String>>,
89        ) -> Result<Vec<ContainerSummary>, bollard::errors::Error>;
90
91        async fn list_volumes(
92            &self,
93            options: Option<ListVolumesOptions<String>>,
94        ) -> Result<VolumeListResponse, bollard::errors::Error>;
95    }
96
97    impl Clone for ContainerRuntime {
98        fn clone(&self) -> Self;
99    }
100}
101
102/// A trait abstraction over Docker container runtime interactions.
103#[async_trait]
104pub trait ContainerRuntime: Send + Sync + Clone + 'static {
105    /// Creates a new container.
106    async fn create_container(
107        &self,
108        options: Option<CreateContainerOptions<String>>,
109        config: Config<String>,
110    ) -> Result<ContainerCreateResponse, bollard::errors::Error>;
111
112    /// Starts an existing container.
113    async fn start_container(
114        &self,
115        container_name: &str,
116        options: Option<StartContainerOptions<String>>,
117    ) -> Result<(), bollard::errors::Error>;
118
119    /// Stops a running container.
120    async fn stop_container(
121        &self,
122        container_name: &str,
123        options: Option<StopContainerOptions>,
124    ) -> Result<(), bollard::errors::Error>;
125
126    /// Removes a container.
127    async fn remove_container(
128        &self,
129        container_name: &str,
130        options: Option<RemoveContainerOptions>,
131    ) -> Result<(), bollard::errors::Error>;
132
133    /// Waits for a container to finish executing.
134    fn wait_container(
135        &self,
136        container_name: &str,
137        options: Option<WaitContainerOptions<String>>,
138    ) -> BoxStream<'static, Result<ContainerWaitResponse, bollard::errors::Error>>;
139
140    /// Creates a volume.
141    async fn create_volume(
142        &self,
143        config: CreateVolumeOptions<String>,
144    ) -> Result<Volume, bollard::errors::Error>;
145
146    /// Removes a volume.
147    async fn remove_volume(
148        &self,
149        name: &str,
150        options: Option<RemoveVolumeOptions>,
151    ) -> Result<(), bollard::errors::Error>;
152
153    /// Creates an image.
154    fn create_image(
155        &self,
156        options: Option<CreateImageOptions<'static, String>>,
157        root_fs: Option<Bytes>,
158        credentials: Option<bollard::auth::DockerCredentials>,
159    ) -> BoxStream<'static, Result<CreateImageInfo, bollard::errors::Error>>;
160
161    /// Retrieves logs for a container.
162    fn logs(
163        &self,
164        container_name: &str,
165        options: Option<LogsOptions<String>>,
166    ) -> BoxStream<'static, Result<LogOutput, bollard::errors::Error>>;
167
168    /// Inspects a container's details.
169    async fn inspect_container(
170        &self,
171        container_name: &str,
172        options: Option<InspectContainerOptions>,
173    ) -> Result<ContainerInspectResponse, bollard::errors::Error>;
174
175    /// Lists containers.
176    async fn list_containers(
177        &self,
178        options: Option<ListContainersOptions<String>>,
179    ) -> Result<Vec<ContainerSummary>, bollard::errors::Error>;
180
181    /// Lists volumes.
182    async fn list_volumes(
183        &self,
184        options: Option<ListVolumesOptions<String>>,
185    ) -> Result<VolumeListResponse, bollard::errors::Error>;
186}
187
188#[async_trait]
189impl ContainerRuntime for Docker {
190    async fn create_container(
191        &self,
192        options: Option<CreateContainerOptions<String>>,
193        config: Config<String>,
194    ) -> Result<ContainerCreateResponse, bollard::errors::Error> {
195        self.create_container(options, config).await
196    }
197
198    async fn start_container(
199        &self,
200        container_name: &str,
201        options: Option<StartContainerOptions<String>>,
202    ) -> Result<(), bollard::errors::Error> {
203        self.start_container(container_name, options).await
204    }
205
206    async fn stop_container(
207        &self,
208        container_name: &str,
209        options: Option<StopContainerOptions>,
210    ) -> Result<(), bollard::errors::Error> {
211        self.stop_container(container_name, options).await
212    }
213
214    async fn remove_container(
215        &self,
216        container_name: &str,
217        options: Option<RemoveContainerOptions>,
218    ) -> Result<(), bollard::errors::Error> {
219        self.remove_container(container_name, options).await
220    }
221
222    fn wait_container(
223        &self,
224        container_name: &str,
225        options: Option<WaitContainerOptions<String>>,
226    ) -> BoxStream<'static, Result<ContainerWaitResponse, bollard::errors::Error>> {
227        self.wait_container(container_name, options).boxed()
228    }
229
230    async fn create_volume(
231        &self,
232        config: CreateVolumeOptions<String>,
233    ) -> Result<Volume, bollard::errors::Error> {
234        self.create_volume(config).await
235    }
236
237    async fn remove_volume(
238        &self,
239        name: &str,
240        options: Option<RemoveVolumeOptions>,
241    ) -> Result<(), bollard::errors::Error> {
242        self.remove_volume(name, options).await
243    }
244
245    fn create_image(
246        &self,
247        options: Option<CreateImageOptions<'static, String>>,
248        root_fs: Option<Bytes>,
249        credentials: Option<bollard::auth::DockerCredentials>,
250    ) -> BoxStream<'static, Result<CreateImageInfo, bollard::errors::Error>> {
251        self.create_image(options, root_fs, credentials).boxed()
252    }
253
254    fn logs(
255        &self,
256        container_name: &str,
257        options: Option<LogsOptions<String>>,
258    ) -> BoxStream<'static, Result<LogOutput, bollard::errors::Error>> {
259        self.logs(container_name, options).boxed()
260    }
261
262    async fn inspect_container(
263        &self,
264        container_name: &str,
265        options: Option<InspectContainerOptions>,
266    ) -> Result<ContainerInspectResponse, bollard::errors::Error> {
267        self.inspect_container(container_name, options).await
268    }
269
270    async fn list_containers(
271        &self,
272        options: Option<ListContainersOptions<String>>,
273    ) -> Result<Vec<ContainerSummary>, bollard::errors::Error> {
274        self.list_containers(options).await
275    }
276
277    async fn list_volumes(
278        &self,
279        options: Option<ListVolumesOptions<String>>,
280    ) -> Result<VolumeListResponse, bollard::errors::Error> {
281        self.list_volumes(options).await
282    }
283}
284
285#[cfg(test)]
286mockall::mock! {
287    pub MessageBus {}
288
289    #[async_trait]
290    impl MessageBus for MessageBus {
291        async fn publish(
292            &self,
293            subject: String,
294            payload: Bytes,
295        ) -> Result<(), async_nats::PublishError>;
296        async fn request(
297            &self,
298            subject: String,
299            payload: Bytes,
300        ) -> Result<async_nats::Message, async_nats::RequestError>;
301    }
302
303    impl Clone for MessageBus {
304        fn clone(&self) -> Self;
305    }
306}
307
308/// A trait abstraction over message bus (e.g. NATS) interactions.
309#[async_trait]
310pub trait MessageBus: Send + Sync + Clone + 'static {
311    /// Publishes a message to a given subject.
312    async fn publish(
313        &self,
314        subject: String,
315        payload: Bytes,
316    ) -> Result<(), async_nats::PublishError>;
317    /// Sends a request message to a given subject and waits for a response.
318    async fn request(
319        &self,
320        subject: String,
321        payload: Bytes,
322    ) -> Result<async_nats::Message, async_nats::RequestError>;
323    // subscribe returning BoxStream might be tough to mock properly without dealing with lifetimes
324    // we'll just define publish and request for now, since those are what scan_for_orphans uses.
325}
326
327#[async_trait]
328impl MessageBus for async_nats::Client {
329    async fn publish(
330        &self,
331        subject: String,
332        payload: Bytes,
333    ) -> Result<(), async_nats::PublishError> {
334        self.publish(subject, payload).await
335    }
336    async fn request(
337        &self,
338        subject: String,
339        payload: Bytes,
340    ) -> Result<async_nats::Message, async_nats::RequestError> {
341        self.request(subject, payload).await
342    }
343}