#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
use std::time::Duration;
use s2_sdk::{
S2,
types::{AccountEndpoint, BasinEndpoint, S2Config, S2Endpoints, S2Error, ValidationError},
};
use testcontainers::{
ContainerAsync, ContainerRequest, GenericImage, ImageExt, TestcontainersError,
core::IntoContainerPort, runners::AsyncRunner,
};
use tokio::time::{Instant, sleep, timeout};
pub const IMAGE: &str = "ghcr.io/s2-streamstore/s2";
pub const DEFAULT_TAG: &str = env!("CARGO_PKG_VERSION");
pub const PORT: u16 = 80;
pub const DEFAULT_ACCESS_TOKEN: &str = "ignored";
const HEALTH_TIMEOUT: Duration = Duration::from_secs(30);
const HEALTH_POLL_INTERVAL: Duration = Duration::from_millis(100);
const HEALTH_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("testcontainers error: {0}")]
Testcontainers(#[from] TestcontainersError),
#[error("s2 sdk error: {0}")]
S2(#[from] S2Error),
#[error("validation error: {0}")]
Validation(#[from] ValidationError),
#[error("s2-lite did not become healthy at {endpoint}")]
NotHealthy {
endpoint: String,
},
}
#[derive(Debug)]
pub struct S2Lite {
container: ContainerAsync<GenericImage>,
endpoint: String,
client: S2,
}
impl S2Lite {
pub async fn start() -> Result<Self> {
Self::start_with(DEFAULT_TAG).await
}
pub async fn start_with(tag: impl Into<String>) -> Result<Self> {
let container = s2_lite_image_with_tag(tag).start().await?;
let host = container.get_host().await?;
let port = container.get_host_port_ipv4(PORT).await?;
let endpoint = format!("http://{host}:{port}");
wait_until_healthy(&endpoint).await?;
let client = S2::new(s2_config_for_endpoint(&endpoint, DEFAULT_ACCESS_TOKEN)?)?;
Ok(Self {
container,
endpoint,
client,
})
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn config(&self, access_token: impl Into<String>) -> Result<S2Config> {
s2_config_for_endpoint(&self.endpoint, access_token)
}
pub fn client(&self) -> Result<S2> {
Ok(self.client.clone())
}
pub fn container(&self) -> &ContainerAsync<GenericImage> {
&self.container
}
}
pub fn s2_image() -> GenericImage {
s2_image_with_tag(DEFAULT_TAG)
}
pub fn s2_image_with_tag(tag: impl Into<String>) -> GenericImage {
GenericImage::new(IMAGE.to_string(), tag.into())
}
pub fn s2_lite_image() -> ContainerRequest<GenericImage> {
s2_lite_image_with_tag(DEFAULT_TAG)
}
pub fn s2_lite_image_with_tag(tag: impl Into<String>) -> ContainerRequest<GenericImage> {
s2_image_with_tag(tag)
.with_exposed_port(PORT.tcp())
.with_cmd(["lite"])
}
pub fn s2_config_for_endpoint(
endpoint: impl AsRef<str>,
access_token: impl Into<String>,
) -> Result<S2Config> {
let endpoint = endpoint.as_ref();
let endpoints = S2Endpoints::new(
AccountEndpoint::new(endpoint)?,
BasinEndpoint::new(endpoint)?,
)?;
Ok(S2Config::new(access_token).with_endpoints(endpoints))
}
async fn wait_until_healthy(endpoint: &str) -> Result<()> {
let client = reqwest::Client::new();
let health_url = format!("{endpoint}/health");
let deadline = Instant::now() + HEALTH_TIMEOUT;
loop {
let now = Instant::now();
if now >= deadline {
return Err(Error::NotHealthy {
endpoint: endpoint.to_string(),
});
}
let request_timeout = HEALTH_REQUEST_TIMEOUT.min(deadline - now);
if let Ok(Ok(response)) = timeout(request_timeout, client.get(&health_url).send()).await
&& response.status().is_success()
{
return Ok(());
}
let now = Instant::now();
if now >= deadline {
return Err(Error::NotHealthy {
endpoint: endpoint.to_string(),
});
}
sleep(HEALTH_POLL_INTERVAL.min(deadline - now)).await;
}
}
#[cfg(test)]
mod tests {
use s2_sdk::types::{BasinName, EnsureBasinInput, EnsureStreamInput, StreamName};
use testcontainers::Image;
use super::*;
#[test]
fn s2_image_defaults_to_versioned_docker_image() {
let image = s2_image_with_tag("test-tag");
assert_eq!(image.name(), IMAGE);
assert_eq!(image.tag(), "test-tag");
assert!(image.expose_ports().is_empty());
}
#[test]
fn s2_lite_image_defaults_to_lite_command() {
let request = s2_lite_image_with_tag("test-tag");
assert_eq!(request.image().name(), IMAGE);
assert_eq!(request.image().tag(), "test-tag");
assert_eq!(request.image().expose_ports(), &[PORT.tcp()]);
assert_eq!(request.cmd().collect::<Vec<_>>(), ["lite"]);
}
#[tokio::test]
async fn config_uses_same_endpoint_for_account_and_basin() {
let config = s2_config_for_endpoint("http://localhost:8080", "ignored").unwrap();
S2::new(config).unwrap();
}
#[tokio::test]
async fn starts_s2_lite_and_ensures_resources() {
let s2 = S2Lite::start().await.unwrap();
let client = s2.client().unwrap();
let basin_name = "test-basin".parse::<BasinName>().unwrap();
client
.ensure_basin(EnsureBasinInput::new(basin_name.clone()))
.await
.unwrap();
let basin = client.basin(basin_name.clone());
let stream_name = "test-stream".parse::<StreamName>().unwrap();
basin
.ensure_stream(EnsureStreamInput::new(stream_name.clone()))
.await
.unwrap();
assert_eq!(basin_name.as_ref(), "test-basin");
assert_eq!(stream_name.as_ref(), "test-stream");
}
}