pdk_test/
composite.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Test composite orchestration
6//!
7//! This module provides the main test environment orchestration functionality.
8//! It manages Docker containers, networks, and services in a coordinated way
9//! for integration testing scenarios.
10//!
11//! ## Primary types
12//!
13//! - [`TestComposite`]: orchestrates test environment lifecycle
14//! - [`TestCompositeBuilder`]: configures and builds test environments
15//!
16
17use std::any::{type_name, Any, TypeId};
18use std::collections::hash_map::Entry;
19use std::collections::HashMap;
20use std::rc::Rc;
21use std::time::Duration;
22
23use crate::cleanup::Cleanup;
24use bollard::Docker;
25use futures::future::{join_all, try_join_all};
26use tokio::runtime::{Handle, RuntimeFlavor};
27
28use crate::config::Config;
29use crate::container::Container;
30use crate::error::TestError;
31use crate::host::Host;
32use crate::network::Network;
33use crate::runner::Test;
34use crate::service::Service;
35use crate::services::httpmock::HttpMockConfig;
36
37struct UntypedConfig {
38    erased: Rc<dyn Any>,
39    source: Rc<dyn Config>,
40}
41
42impl UntypedConfig {
43    fn new<T: Config + 'static>(config: T) -> Self {
44        let erased: Rc<dyn Any> = Rc::new(config);
45        let source: Rc<dyn Config> = erased.clone().downcast::<T>().unwrap();
46        Self { erased, source }
47    }
48
49    fn upcast(&self) -> &dyn Config {
50        self.source.as_ref()
51    }
52
53    fn downcast<T: Config + 'static>(&self) -> &T {
54        self.erased.downcast_ref().unwrap()
55    }
56}
57
58struct Inner {
59    configs: HashMap<TypeId, HashMap<String, UntypedConfig>>,
60    containers: HashMap<String, Container>,
61    network: Network,
62    test: Rc<Test>,
63}
64
65impl Inner {
66    fn configs<T: Service + 'static>(&self) -> Result<&HashMap<String, UntypedConfig>, TestError> {
67        self.configs
68            .get(&TypeId::of::<T::Config>())
69            .ok_or(TestError::UnknownService(type_name::<T>()))
70    }
71
72    fn service<T: Service + 'static>(&self) -> Result<T, TestError> {
73        let config = self.configs::<T>()?.values().next().unwrap();
74        let container = self.containers.get(config.upcast().hostname()).unwrap();
75        Ok(T::new(config.downcast(), container))
76    }
77
78    fn service_by_hostname<T: Service + 'static>(&self, hostname: &str) -> Result<T, TestError> {
79        let config = self
80            .configs::<T>()?
81            .get(hostname)
82            .ok_or_else(|| TestError::UnknownServiceHostname(hostname.to_string()))?;
83        let container = self.containers.get(config.upcast().hostname()).unwrap();
84        Ok(T::new(config.downcast(), container))
85    }
86}
87
88/// Main test environment orchestrator for integration tests.
89pub struct TestComposite {
90    inner: Option<Inner>,
91}
92
93impl TestComposite {
94    /// Creates a new builder for configuring test environments.
95    pub fn builder() -> TestCompositeBuilder {
96        TestCompositeBuilder::new()
97    }
98
99    /// Gets a service instance by its type.
100    /// Returns the first configured service of the specified type.
101    pub fn service<T: Service + 'static>(&self) -> Result<T, TestError> {
102        self.inner().service()
103    }
104
105    /// Gets a service instance by its hostname.
106    /// Returns the service with the specified hostname and type.
107    pub fn service_by_hostname<T: Service + 'static>(&self, name: &str) -> Result<T, TestError> {
108        self.inner().service_by_hostname(name)
109    }
110
111    fn inner(&self) -> &Inner {
112        self.inner.as_ref().unwrap()
113    }
114}
115
116fn check_runtime() -> Result<(), TestError> {
117    let handle = Handle::try_current()?;
118    if !matches!(handle.runtime_flavor(), RuntimeFlavor::MultiThread) {
119        return Err(TestError::UnavailableMultiThread);
120    }
121    Ok(())
122}
123
124async fn check_docker(docker: &Docker) -> Result<(), TestError> {
125    match docker.ping().await {
126        Ok(_) => Ok(()),
127        Err(err) => Err(TestError::UnavailableDocker(err.into())),
128    }
129}
130
131/// Builder for configuring and creating test environments.
132///
133/// This builder is used to configure the test environment with the services
134/// that will be used in the test. It provides an API for adding services
135/// and configuring them.
136pub struct TestCompositeBuilder {
137    configs: HashMap<TypeId, HashMap<String, UntypedConfig>>,
138}
139
140impl TestCompositeBuilder {
141    fn new() -> Self {
142        Self {
143            configs: HashMap::new(),
144        }
145    }
146
147    /// Configures a service with the provided configuration. The service will be
148    /// started when the test environment is built. Each service must have a unique hostname.
149    pub fn with_service<C: Config + 'static>(mut self, config: C) -> Self {
150        let entry = self
151            .configs
152            .entry(TypeId::of::<C>())
153            .or_default()
154            .entry(config.hostname().to_string());
155
156        match entry {
157            Entry::Occupied(_) => panic!("Name {} configured twice", config.hostname()),
158            Entry::Vacant(e) => e.insert(UntypedConfig::new(config)),
159        };
160
161        self
162    }
163
164    /// Builds the test environment with all configured services.
165    /// Starts Docker containers, creates networks and initializes all services.
166    /// Returns a `TestComposite` that can be used to access the services.
167    pub async fn build(self) -> Result<TestComposite, TestError> {
168        check_runtime()?;
169
170        let httpmock_configs_len = self
171            .configs
172            .get(&TypeId::of::<HttpMockConfig>())
173            .map(|f| f.len())
174            .unwrap_or(0);
175
176        if httpmock_configs_len > 1 {
177            return Err(TestError::NotSupportedConfig(
178                "Only 1 HttpMock can be defined per test".to_string(),
179            ));
180        }
181
182        let test = Test::current()?;
183
184        let docker = Docker::connect_with_local_defaults()?;
185        check_docker(&docker).await?;
186
187        let host = Host::current(&docker).await?;
188
189        Cleanup::new(docker.clone()).purge().await?;
190
191        let mut network = Network::new(docker.clone()).await?;
192
193        if let Some(host_container) = host.container() {
194            log::info!("Creating testing environment in containerized mode.");
195
196            // Containerized mode connects the current container into the network.
197            network.connect(host_container.id()).await?;
198        } else {
199            log::info!("Creating testing environment in standalone mode.");
200        }
201
202        let starts = self.configs.iter().flat_map(|(_, configs)| {
203            configs.values().map(|config| {
204                Container::initialized(
205                    docker.clone(),
206                    test.clone(),
207                    host.mode(),
208                    &network,
209                    config.upcast(),
210                )
211            })
212        });
213
214        let containers = try_join_all(starts)
215            .await?
216            .into_iter()
217            .map(|c| (c.config().hostname().to_string(), c));
218
219        Ok(TestComposite {
220            inner: Some(Inner {
221                configs: self.configs,
222                containers: containers.collect(),
223                network,
224                test: test.clone(),
225            }),
226        })
227    }
228}
229
230impl Drop for TestComposite {
231    fn drop(&mut self) {
232        let Inner {
233            mut network,
234            containers,
235            test,
236            ..
237        } = self.inner.take().unwrap();
238        tokio::task::block_in_place(|| {
239            log::info!("Dropping testing environment.");
240
241            Handle::current().block_on(async {
242                if !test.is_success() {
243                    tokio::time::sleep(Duration::from_secs(1)).await;
244                }
245                join_all(containers.into_values().map(|mut container| async move {
246                    container.dispose().await;
247                }))
248                .await;
249                network.remove().await;
250            })
251        });
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use std::collections::HashMap;
258
259    use bollard::container::{CreateContainerOptions, NetworkingConfig};
260    use bollard::errors::Error as BollardError;
261    use bollard::network::CreateNetworkOptions;
262    use bollard::secret::EndpointSettings;
263    use bollard::Docker;
264
265    use crate::constants::NETWORK_NAME;
266    use crate::error::TestError;
267    use crate::image::Image;
268    use crate::runner::Test;
269    use crate::services::httpbin::HttpBinConfig;
270
271    use super::TestComposite;
272
273    #[tokio::test]
274    async fn multi_thread_required_error() {
275        let result = TestComposite::builder().build().await;
276        assert!(matches!(result, Err(TestError::UnavailableMultiThread)));
277    }
278
279    #[test]
280    fn runtime_required() {
281        let result = futures::executor::block_on(TestComposite::builder().build());
282        assert!(matches!(result, Err(TestError::UnavailableRuntime(_))));
283    }
284
285    #[test]
286    fn create_container_logs() {
287        let test = Test::builder().module("foo").name("bar").build();
288
289        let target_dir = test.target_dir().to_owned();
290        let _ = test.run(async {
291            let s1 = HttpBinConfig::builder().hostname("service-1").build();
292            let s2 = HttpBinConfig::builder().hostname("service-2").build();
293
294            let _ = TestComposite::builder()
295                .with_service(s1)
296                .with_service(s2)
297                .build()
298                .await?;
299
300            assert!(target_dir.join("service-1.log").exists());
301            assert!(target_dir.join("service-2.log").exists());
302
303            Ok::<_, TestError>(())
304        });
305
306        assert!(!target_dir.join("service-1.log").exists());
307        assert!(!target_dir.join("service-2.log").exists());
308    }
309
310    #[test]
311    fn drop_network() {
312        let docker = Docker::connect_with_local_defaults().unwrap();
313        let test = Test::builder().module("foo").name("bar").build();
314
315        let _ = test.run(async {
316            let s1 = HttpBinConfig::builder().hostname("service-1").build();
317            let s2 = HttpBinConfig::builder().hostname("service-2").build();
318
319            let _tc = TestComposite::builder()
320                .with_service(s1)
321                .with_service(s2)
322                .build()
323                .await?;
324
325            // Check created network
326            let result = docker.inspect_network::<String>(NETWORK_NAME, None).await;
327            assert!(result.is_ok());
328
329            Ok::<_, TestError>(())
330        });
331
332        // Check network deletion
333        let runtime = tokio::runtime::Runtime::new().unwrap();
334        let result = runtime.block_on(docker.inspect_network::<String>(NETWORK_NAME, None));
335
336        assert!(matches!(
337            result,
338            Err(BollardError::DockerResponseServerError {
339                status_code: 404,
340                ..
341            })
342        ));
343    }
344
345    #[test]
346    fn purge_test_assets() -> Result<(), TestError> {
347        let test = Test::builder().module("foo").name("bar").build();
348
349        test.run(async {
350            let docker = bollard::Docker::connect_with_local_defaults()?;
351
352            // Ensure hello-world image
353            let hello_world_image = Image::from_repository("hello-world").with_version("linux");
354            hello_world_image.pull(&docker).await?;
355
356            // Create a network that shares the name and is properly labeled.
357            let network = docker
358                .create_network(CreateNetworkOptions {
359                    name: "pdk-test-network",
360                    driver: "bridge",
361                    labels: HashMap::from([("CreatedBy", "pdk-test")]),
362                    ..Default::default()
363                })
364                .await?;
365
366            let net_id = network.id;
367
368            let hello_world_locator = hello_world_image.locator();
369            let hello_world_name = "hello-world";
370
371            // Create a container that uses the network and is properly labeled.
372            let container = docker
373                .create_container(
374                    Some(CreateContainerOptions {
375                        name: hello_world_name,
376                        platform: None,
377                    }),
378                    bollard::container::Config {
379                        image: Some(hello_world_locator.as_str()),
380                        hostname: Some("helloWorld"),
381                        network_disabled: Some(false),
382                        networking_config: Some(NetworkingConfig {
383                            endpoints_config: HashMap::from([(
384                                net_id.as_str(),
385                                EndpointSettings {
386                                    ..Default::default()
387                                },
388                            )]),
389                        }),
390                        labels: Some(HashMap::from([("CreatedBy", "pdk-test")])),
391                        ..Default::default()
392                    },
393                )
394                .await?;
395
396            // start the container to connect it to the network
397            docker.start_container::<&str>(&container.id, None).await?;
398
399            let hello_world_inspect = docker.inspect_container(hello_world_name, None).await;
400
401            // Assert that hello-world container exists
402            assert!(hello_world_inspect.is_ok());
403
404            let httpbin_config = HttpBinConfig::builder().hostname("httpbin").build();
405
406            let _composite = TestComposite::builder()
407                .with_service(httpbin_config)
408                .build()
409                .await?;
410
411            let hello_world_inspect = docker.inspect_container(hello_world_name, None).await;
412
413            // Assert that hello-world container no longer exists
414            assert!(matches!(
415                hello_world_inspect,
416                Err(BollardError::DockerResponseServerError {
417                    status_code: 404,
418                    ..
419                })
420            ));
421
422            Ok(())
423        })
424    }
425}