dockertest/static_container/
internal.rs1use crate::docker::Docker;
2use crate::{
3 composition::Composition, DockerTestError, Network, OperationalContainer, PendingContainer,
4};
5use std::{
6 collections::{HashMap, HashSet},
7 sync::Arc,
8};
9use tokio::sync::RwLock;
10
11#[derive(Default)]
12pub struct InternalContainers {
13 inner: Arc<RwLock<HashMap<String, InternalContainer>>>,
14}
15
16struct InternalContainer {
18 status: InternalStatus,
20
21 completion_counter: u8,
27}
28
29#[allow(clippy::large_enum_variant)]
32enum InternalStatus {
33 Operational(OperationalContainer, PendingContainer),
39 Pending(PendingContainer),
40 Cleaned,
48 Failed(DockerTestError, Option<String>),
55}
56
57impl InternalContainers {
58 pub async fn create(
59 &self,
60 composition: Composition,
61 client: &Docker,
62 network: Option<&str>,
63 network_setting: &Network,
64 ) -> Result<PendingContainer, DockerTestError> {
65 let container = self
66 .create_internal_container_inner(composition, client, network, network_setting)
67 .await?;
68
69 Ok(container)
70 }
71
72 pub async fn start(
73 &self,
74 container: &PendingContainer,
75 ) -> Result<OperationalContainer, DockerTestError> {
76 let mut map = self.inner.write().await;
77
78 if let Some(c) = map.get_mut(&container.name) {
83 match &c.status {
84 InternalStatus::Failed(e, _) => Err(e.clone()),
85 InternalStatus::Operational(r, _) => Ok(r.clone()),
86 InternalStatus::Pending(p) => {
87 let cloned = p.clone();
88 let running = cloned.start_inner().await;
89 match running {
90 Ok(r) => {
91 c.status = InternalStatus::Operational(r.clone(), p.clone());
92 Ok(r)
93 }
94 Err(e) => {
95 c.status =
96 InternalStatus::Failed(e.clone(), Some(container.id.clone()));
97 Err(e)
98 }
99 }
100 }
101 InternalStatus::Cleaned => Err(DockerTestError::Startup(
106 "encountered a cleaned container during startup".to_string(),
107 )),
108 }
109 } else {
110 Err(DockerTestError::Startup(
111 "tried to start a non-existing internal container".to_string(),
112 ))
113 }
114 }
115
116 pub async fn cleanup(&self, client: &Docker, network: &str, to_cleanup: &HashSet<&str>) {
117 self.disconnect(client, network, to_cleanup).await;
118 let to_remove = self.decrement_completion_counters(to_cleanup).await;
119 for to_cleanup in to_remove {
120 client.remove_container(&to_cleanup).await;
121 }
122 }
123
124 async fn create_internal_container_inner(
125 &self,
126 composition: Composition,
127 client: &Docker,
128 network: Option<&str>,
129 network_setting: &Network,
130 ) -> Result<PendingContainer, DockerTestError> {
131 let mut map = self.inner.write().await;
132
133 if let Some(c) = map.get_mut(&composition.container_name) {
142 match &c.status {
143 InternalStatus::Pending(p) | InternalStatus::Operational(_, p) => {
144 match (network, network_setting) {
147 (Some(n), Network::Isolated) => {
148 client.add_container_to_network(&p.id, n).await
149 }
150 _ => Ok(()),
151 }?;
152
153 c.completion_counter += 1;
154
155 Ok(p.clone())
156 }
157 InternalStatus::Failed(e, _) => {
158 c.completion_counter += 1;
159 Err(e.clone())
160 }
161 InternalStatus::Cleaned => {
162 let container = self
163 .create_internal_container_impl(&mut map, composition, client, network)
164 .await?;
165
166 if let Some(n) = network {
168 client.add_container_to_network(&container.id, n).await?;
169 }
170
171 Ok(container)
172 }
173 }
174 } else {
175 let container = self
176 .create_internal_container_impl(&mut map, composition, client, network)
177 .await?;
178
179 if let Some(n) = network {
182 client.add_container_to_network(&container.id, n).await?;
183 }
184
185 Ok(container)
186 }
187 }
188
189 async fn create_internal_container_impl(
190 &self,
191 containers: &mut HashMap<String, InternalContainer>,
192 composition: Composition,
193 client: &Docker,
194 network: Option<&str>,
195 ) -> Result<PendingContainer, DockerTestError> {
196 let container_name = composition.container_name.clone();
197 let pending = client.create_container_inner(composition, network).await;
198 match pending {
199 Ok(p) => {
200 let c = InternalContainer {
201 status: InternalStatus::Pending(p.clone()),
202 completion_counter: 1,
203 };
204 containers.insert(container_name, c);
205 Ok(p)
206 }
207 Err(e) => {
208 let c = InternalContainer {
209 status: InternalStatus::Failed(e.clone(), None),
210 completion_counter: 1,
211 };
212 containers.insert(container_name, c);
213 Err(e)
214 }
215 }
216 }
217
218 async fn disconnect(&self, client: &Docker, network: &str, to_cleanup: &HashSet<&str>) {
219 let map = self.inner.read().await;
220 for (_, container) in map.iter() {
221 if let InternalStatus::Operational(r, _) = &container.status {
222 if to_cleanup.contains(r.id()) {
223 client.disconnect_container(r.id(), network).await;
224 }
225 }
226 }
227 }
228
229 async fn decrement_completion_counters(&self, to_cleanup: &HashSet<&str>) -> Vec<String> {
230 let mut containers = self.inner.write().await;
231
232 let mut responsible_to_remove = Vec::new();
233
234 for (_, container) in containers.iter_mut() {
238 if let Some(container_id) = container.status.container_id() {
239 if to_cleanup.contains(container_id) {
240 container.completion_counter -= 1;
241 if container.completion_counter == 0 {
242 responsible_to_remove.push(container_id.to_string());
243 container.status = InternalStatus::Cleaned;
244 }
245 }
246 }
247 }
248 responsible_to_remove
249 }
250}
251
252impl InternalStatus {
253 fn container_id(&self) -> Option<&str> {
254 match &self {
255 InternalStatus::Operational(_, r) => Some(r.id.as_str()),
256 InternalStatus::Pending(p) => Some(p.id.as_str()),
257 InternalStatus::Failed(_, container_id) => container_id.as_ref().map(|id| id.as_str()),
258 InternalStatus::Cleaned => None,
259 }
260 }
261}