Skip to main content

s2_testcontainers/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3
4use std::time::Duration;
5
6use s2_sdk::{
7    S2,
8    types::{AccountEndpoint, BasinEndpoint, S2Config, S2Endpoints, S2Error, ValidationError},
9};
10use testcontainers::{
11    ContainerAsync, ContainerRequest, GenericImage, ImageExt, TestcontainersError,
12    core::IntoContainerPort, runners::AsyncRunner,
13};
14use tokio::time::{Instant, sleep, timeout};
15
16/// Image repository for the S2 Docker image.
17pub const IMAGE: &str = "ghcr.io/s2-streamstore/s2";
18/// Default S2 image tag.
19pub const DEFAULT_TAG: &str = env!("CARGO_PKG_VERSION");
20/// Port exposed by s2-lite.
21pub const PORT: u16 = 80;
22/// Default access token used by [`S2Lite::client`].
23pub const DEFAULT_ACCESS_TOKEN: &str = "ignored";
24
25const HEALTH_TIMEOUT: Duration = Duration::from_secs(30);
26const HEALTH_POLL_INTERVAL: Duration = Duration::from_millis(100);
27const HEALTH_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
28
29/// Result type for this crate.
30pub type Result<T> = std::result::Result<T, Error>;
31
32/// Errors from s2-testcontainers helpers.
33#[derive(Debug, thiserror::Error)]
34pub enum Error {
35    /// Error from Testcontainers.
36    #[error("testcontainers error: {0}")]
37    Testcontainers(#[from] TestcontainersError),
38    /// Error from the S2 SDK.
39    #[error("s2 sdk error: {0}")]
40    S2(#[from] S2Error),
41    /// S2 endpoint or resource name validation error.
42    #[error("validation error: {0}")]
43    Validation(#[from] ValidationError),
44    /// s2-lite did not become healthy before the startup timeout.
45    #[error("s2-lite did not become healthy at {endpoint}")]
46    NotHealthy {
47        /// Endpoint that did not become healthy.
48        endpoint: String,
49    },
50}
51
52/// Running s2-lite Testcontainers instance.
53#[derive(Debug)]
54pub struct S2Lite {
55    container: ContainerAsync<GenericImage>,
56    endpoint: String,
57    client: S2,
58}
59
60impl S2Lite {
61    /// Start s2-lite with the default image tag.
62    pub async fn start() -> Result<Self> {
63        Self::start_with(DEFAULT_TAG).await
64    }
65
66    /// Start s2-lite with a specific image tag.
67    pub async fn start_with(tag: impl Into<String>) -> Result<Self> {
68        let container = s2_lite_image_with_tag(tag).start().await?;
69        let host = container.get_host().await?;
70        let port = container.get_host_port_ipv4(PORT).await?;
71        let endpoint = format!("http://{host}:{port}");
72
73        wait_until_healthy(&endpoint).await?;
74
75        let client = S2::new(s2_config_for_endpoint(&endpoint, DEFAULT_ACCESS_TOKEN)?)?;
76
77        Ok(Self {
78            container,
79            endpoint,
80            client,
81        })
82    }
83
84    /// Return the mapped HTTP endpoint for this s2-lite instance.
85    pub fn endpoint(&self) -> &str {
86        &self.endpoint
87    }
88
89    /// Build an [`S2Config`] for this s2-lite instance with the provided access token.
90    pub fn config(&self, access_token: impl Into<String>) -> Result<S2Config> {
91        s2_config_for_endpoint(&self.endpoint, access_token)
92    }
93
94    /// Build an [`S2`] client for this s2-lite instance.
95    pub fn client(&self) -> Result<S2> {
96        Ok(self.client.clone())
97    }
98
99    /// Return the underlying Testcontainers container.
100    pub fn container(&self) -> &ContainerAsync<GenericImage> {
101        &self.container
102    }
103}
104
105/// Return the default S2 Docker [`GenericImage`].
106pub fn s2_image() -> GenericImage {
107    s2_image_with_tag(DEFAULT_TAG)
108}
109
110/// Return an S2 Docker [`GenericImage`] with a specific tag.
111pub fn s2_image_with_tag(tag: impl Into<String>) -> GenericImage {
112    GenericImage::new(IMAGE.to_string(), tag.into())
113}
114
115/// Return the default S2 Docker [`ContainerRequest`] configured to run `s2 lite`.
116pub fn s2_lite_image() -> ContainerRequest<GenericImage> {
117    s2_lite_image_with_tag(DEFAULT_TAG)
118}
119
120/// Return an S2 Docker [`ContainerRequest`] with a specific tag configured to run `s2 lite`.
121pub fn s2_lite_image_with_tag(tag: impl Into<String>) -> ContainerRequest<GenericImage> {
122    s2_image_with_tag(tag)
123        .with_exposed_port(PORT.tcp())
124        .with_cmd(["lite"])
125}
126
127/// Build an [`S2Config`] wired to use an endpoint for both account and basin APIs.
128pub fn s2_config_for_endpoint(
129    endpoint: impl AsRef<str>,
130    access_token: impl Into<String>,
131) -> Result<S2Config> {
132    let endpoint = endpoint.as_ref();
133    let endpoints = S2Endpoints::new(
134        AccountEndpoint::new(endpoint)?,
135        BasinEndpoint::new(endpoint)?,
136    )?;
137
138    Ok(S2Config::new(access_token).with_endpoints(endpoints))
139}
140
141async fn wait_until_healthy(endpoint: &str) -> Result<()> {
142    let client = reqwest::Client::new();
143    let health_url = format!("{endpoint}/health");
144    let deadline = Instant::now() + HEALTH_TIMEOUT;
145
146    loop {
147        let now = Instant::now();
148        if now >= deadline {
149            return Err(Error::NotHealthy {
150                endpoint: endpoint.to_string(),
151            });
152        }
153
154        let request_timeout = HEALTH_REQUEST_TIMEOUT.min(deadline - now);
155        if let Ok(Ok(response)) = timeout(request_timeout, client.get(&health_url).send()).await
156            && response.status().is_success()
157        {
158            return Ok(());
159        }
160
161        let now = Instant::now();
162        if now >= deadline {
163            return Err(Error::NotHealthy {
164                endpoint: endpoint.to_string(),
165            });
166        }
167
168        sleep(HEALTH_POLL_INTERVAL.min(deadline - now)).await;
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use s2_sdk::types::{BasinName, EnsureBasinInput, EnsureStreamInput, StreamName};
175    use testcontainers::Image;
176
177    use super::*;
178
179    #[test]
180    fn s2_image_defaults_to_versioned_docker_image() {
181        let image = s2_image_with_tag("test-tag");
182
183        assert_eq!(image.name(), IMAGE);
184        assert_eq!(image.tag(), "test-tag");
185        assert!(image.expose_ports().is_empty());
186    }
187
188    #[test]
189    fn s2_lite_image_defaults_to_lite_command() {
190        let request = s2_lite_image_with_tag("test-tag");
191
192        assert_eq!(request.image().name(), IMAGE);
193        assert_eq!(request.image().tag(), "test-tag");
194        assert_eq!(request.image().expose_ports(), &[PORT.tcp()]);
195        assert_eq!(request.cmd().collect::<Vec<_>>(), ["lite"]);
196    }
197
198    #[tokio::test]
199    async fn config_uses_same_endpoint_for_account_and_basin() {
200        let config = s2_config_for_endpoint("http://localhost:8080", "ignored").unwrap();
201
202        S2::new(config).unwrap();
203    }
204
205    #[tokio::test]
206    async fn starts_s2_lite_and_ensures_resources() {
207        let s2 = S2Lite::start().await.unwrap();
208
209        let client = s2.client().unwrap();
210        let basin_name = "test-basin".parse::<BasinName>().unwrap();
211        client
212            .ensure_basin(EnsureBasinInput::new(basin_name.clone()))
213            .await
214            .unwrap();
215
216        let basin = client.basin(basin_name.clone());
217        let stream_name = "test-stream".parse::<StreamName>().unwrap();
218        basin
219            .ensure_stream(EnsureStreamInput::new(stream_name.clone()))
220            .await
221            .unwrap();
222
223        assert_eq!(basin_name.as_ref(), "test-basin");
224        assert_eq!(stream_name.as_ref(), "test-stream");
225    }
226}