1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3
4use std::time::Duration;
5
6use s2_sdk::{
7 S2,
8 types::{AccountEndpoint, BasinEndpoint, S2Config, S2Endpoints, S2Error, ValidationError},
9};
10use testcontainers::{
11 ContainerAsync, ContainerRequest, GenericImage, ImageExt, TestcontainersError,
12 core::IntoContainerPort, runners::AsyncRunner,
13};
14use tokio::time::{Instant, sleep, timeout};
15
16pub const IMAGE: &str = "ghcr.io/s2-streamstore/s2";
18pub const DEFAULT_TAG: &str = env!("CARGO_PKG_VERSION");
20pub const PORT: u16 = 80;
22pub const DEFAULT_ACCESS_TOKEN: &str = "ignored";
24
25const HEALTH_TIMEOUT: Duration = Duration::from_secs(30);
26const HEALTH_POLL_INTERVAL: Duration = Duration::from_millis(100);
27const HEALTH_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
28
29pub type Result<T> = std::result::Result<T, Error>;
31
32#[derive(Debug, thiserror::Error)]
34pub enum Error {
35 #[error("testcontainers error: {0}")]
37 Testcontainers(#[from] TestcontainersError),
38 #[error("s2 sdk error: {0}")]
40 S2(#[from] S2Error),
41 #[error("validation error: {0}")]
43 Validation(#[from] ValidationError),
44 #[error("s2-lite did not become healthy at {endpoint}")]
46 NotHealthy {
47 endpoint: String,
49 },
50}
51
52#[derive(Debug)]
54pub struct S2Lite {
55 container: ContainerAsync<GenericImage>,
56 endpoint: String,
57 client: S2,
58}
59
60impl S2Lite {
61 pub async fn start() -> Result<Self> {
63 Self::start_with(DEFAULT_TAG).await
64 }
65
66 pub async fn start_with(tag: impl Into<String>) -> Result<Self> {
68 let container = s2_lite_image_with_tag(tag).start().await?;
69 let host = container.get_host().await?;
70 let port = container.get_host_port_ipv4(PORT).await?;
71 let endpoint = format!("http://{host}:{port}");
72
73 wait_until_healthy(&endpoint).await?;
74
75 let client = S2::new(s2_config_for_endpoint(&endpoint, DEFAULT_ACCESS_TOKEN)?)?;
76
77 Ok(Self {
78 container,
79 endpoint,
80 client,
81 })
82 }
83
84 pub fn endpoint(&self) -> &str {
86 &self.endpoint
87 }
88
89 pub fn config(&self, access_token: impl Into<String>) -> Result<S2Config> {
91 s2_config_for_endpoint(&self.endpoint, access_token)
92 }
93
94 pub fn client(&self) -> Result<S2> {
96 Ok(self.client.clone())
97 }
98
99 pub fn container(&self) -> &ContainerAsync<GenericImage> {
101 &self.container
102 }
103}
104
105pub fn s2_image() -> GenericImage {
107 s2_image_with_tag(DEFAULT_TAG)
108}
109
110pub fn s2_image_with_tag(tag: impl Into<String>) -> GenericImage {
112 GenericImage::new(IMAGE.to_string(), tag.into())
113}
114
115pub fn s2_lite_image() -> ContainerRequest<GenericImage> {
117 s2_lite_image_with_tag(DEFAULT_TAG)
118}
119
120pub fn s2_lite_image_with_tag(tag: impl Into<String>) -> ContainerRequest<GenericImage> {
122 s2_image_with_tag(tag)
123 .with_exposed_port(PORT.tcp())
124 .with_cmd(["lite"])
125}
126
127pub fn s2_config_for_endpoint(
129 endpoint: impl AsRef<str>,
130 access_token: impl Into<String>,
131) -> Result<S2Config> {
132 let endpoint = endpoint.as_ref();
133 let endpoints = S2Endpoints::new(
134 AccountEndpoint::new(endpoint)?,
135 BasinEndpoint::new(endpoint)?,
136 )?;
137
138 Ok(S2Config::new(access_token).with_endpoints(endpoints))
139}
140
141async fn wait_until_healthy(endpoint: &str) -> Result<()> {
142 let client = reqwest::Client::new();
143 let health_url = format!("{endpoint}/health");
144 let deadline = Instant::now() + HEALTH_TIMEOUT;
145
146 loop {
147 let now = Instant::now();
148 if now >= deadline {
149 return Err(Error::NotHealthy {
150 endpoint: endpoint.to_string(),
151 });
152 }
153
154 let request_timeout = HEALTH_REQUEST_TIMEOUT.min(deadline - now);
155 if let Ok(Ok(response)) = timeout(request_timeout, client.get(&health_url).send()).await
156 && response.status().is_success()
157 {
158 return Ok(());
159 }
160
161 let now = Instant::now();
162 if now >= deadline {
163 return Err(Error::NotHealthy {
164 endpoint: endpoint.to_string(),
165 });
166 }
167
168 sleep(HEALTH_POLL_INTERVAL.min(deadline - now)).await;
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use s2_sdk::types::{BasinName, EnsureBasinInput, EnsureStreamInput, StreamName};
175 use testcontainers::Image;
176
177 use super::*;
178
179 #[test]
180 fn s2_image_defaults_to_versioned_docker_image() {
181 let image = s2_image_with_tag("test-tag");
182
183 assert_eq!(image.name(), IMAGE);
184 assert_eq!(image.tag(), "test-tag");
185 assert!(image.expose_ports().is_empty());
186 }
187
188 #[test]
189 fn s2_lite_image_defaults_to_lite_command() {
190 let request = s2_lite_image_with_tag("test-tag");
191
192 assert_eq!(request.image().name(), IMAGE);
193 assert_eq!(request.image().tag(), "test-tag");
194 assert_eq!(request.image().expose_ports(), &[PORT.tcp()]);
195 assert_eq!(request.cmd().collect::<Vec<_>>(), ["lite"]);
196 }
197
198 #[tokio::test]
199 async fn config_uses_same_endpoint_for_account_and_basin() {
200 let config = s2_config_for_endpoint("http://localhost:8080", "ignored").unwrap();
201
202 S2::new(config).unwrap();
203 }
204
205 #[tokio::test]
206 async fn starts_s2_lite_and_ensures_resources() {
207 let s2 = S2Lite::start().await.unwrap();
208
209 let client = s2.client().unwrap();
210 let basin_name = "test-basin".parse::<BasinName>().unwrap();
211 client
212 .ensure_basin(EnsureBasinInput::new(basin_name.clone()))
213 .await
214 .unwrap();
215
216 let basin = client.basin(basin_name.clone());
217 let stream_name = "test-stream".parse::<StreamName>().unwrap();
218 basin
219 .ensure_stream(EnsureStreamInput::new(stream_name.clone()))
220 .await
221 .unwrap();
222
223 assert_eq!(basin_name.as_ref(), "test-basin");
224 assert_eq!(stream_name.as_ref(), "test-stream");
225 }
226}