use std::borrow::Cow;
use std::collections::BTreeMap;
use testcontainers_modules::testcontainers::core::{
ContainerPort, ContainerState, ExecCommand, WaitFor,
};
use testcontainers_modules::testcontainers::{Image, TestcontainersError};
const NAME: &str = "supabase/realtime";
const TAG: &str = "v2.33.58";
pub const REALTIME_PORT: u16 = 4000;
#[derive(Debug, Clone)]
pub struct Realtime {
env_vars: BTreeMap<String, String>,
tag: String,
}
impl Realtime {
pub fn new() -> Self {
Self::default()
}
pub fn new_with_env(envs: BTreeMap<&str, &str>) -> Self {
let mut instance = Self::default();
for (key, val) in envs {
instance.env_vars.insert(key.to_string(), val.to_string());
}
instance
}
pub fn with_postgres_connection(mut self, connection_string: impl Into<String>) -> Self {
self.env_vars
.insert("DB_URL".to_string(), connection_string.into());
self
}
pub fn with_db_host(mut self, host: impl Into<String>) -> Self {
self.env_vars.insert("DB_HOST".to_string(), host.into());
self
}
pub fn with_db_port(mut self, port: u16) -> Self {
self.env_vars
.insert("DB_PORT".to_string(), port.to_string());
self
}
pub fn with_db_name(mut self, name: impl Into<String>) -> Self {
self.env_vars.insert("DB_NAME".to_string(), name.into());
self
}
pub fn with_db_user(mut self, user: impl Into<String>) -> Self {
self.env_vars.insert("DB_USER".to_string(), user.into());
self
}
pub fn with_db_password(mut self, password: impl Into<String>) -> Self {
self.env_vars
.insert("DB_PASSWORD".to_string(), password.into());
self
}
pub fn with_db_ssl(mut self, enabled: bool) -> Self {
self.env_vars
.insert("DB_SSL".to_string(), enabled.to_string());
self
}
pub fn with_db_after_connect_query(mut self, query: impl Into<String>) -> Self {
self.env_vars
.insert("DB_AFTER_CONNECT_QUERY".to_string(), query.into());
self
}
pub fn with_jwt_secret(mut self, secret: impl Into<String>) -> Self {
self.env_vars
.insert("JWT_SECRET".to_string(), secret.into());
self
}
pub fn with_api_jwt_secret(mut self, secret: impl Into<String>) -> Self {
self.env_vars
.insert("API_JWT_SECRET".to_string(), secret.into());
self
}
pub fn with_secret_key_base(mut self, secret: impl Into<String>) -> Self {
self.env_vars
.insert("SECRET_KEY_BASE".to_string(), secret.into());
self
}
pub fn with_slot_name(mut self, name: impl Into<String>) -> Self {
self.env_vars.insert("SLOT_NAME".to_string(), name.into());
self
}
pub fn with_temporary_slot(mut self, temporary: bool) -> Self {
self.env_vars
.insert("TEMPORARY_SLOT".to_string(), temporary.to_string());
self
}
pub fn with_max_record_bytes(mut self, bytes: u64) -> Self {
self.env_vars
.insert("MAX_RECORD_BYTES".to_string(), bytes.to_string());
self
}
pub fn with_secure_channels(mut self, secure: bool) -> Self {
self.env_vars
.insert("SECURE_CHANNELS".to_string(), secure.to_string());
self
}
pub fn with_region(mut self, region: impl Into<String>) -> Self {
self.env_vars.insert("REGION".to_string(), region.into());
self
}
pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
self.env_vars
.insert("TENANT_ID".to_string(), tenant_id.into());
self
}
pub fn with_erl_aflags(mut self, flags: impl Into<String>) -> Self {
self.env_vars.insert("ERL_AFLAGS".to_string(), flags.into());
self
}
pub fn with_dns_nodes(mut self, nodes: impl Into<String>) -> Self {
self.env_vars.insert("DNS_NODES".to_string(), nodes.into());
self
}
pub fn with_enable_tailscale(mut self, enabled: bool) -> Self {
self.env_vars
.insert("ENABLE_TAILSCALE".to_string(), enabled.to_string());
self
}
pub fn with_port(mut self, port: u16) -> Self {
self.env_vars.insert("PORT".to_string(), port.to_string());
self
}
pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
self.tag = tag.into();
self
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.env_vars.insert(key.into(), value.into());
self
}
}
impl Default for Realtime {
fn default() -> Self {
let mut env_vars = BTreeMap::new();
env_vars.insert("PORT".to_string(), REALTIME_PORT.to_string());
env_vars.insert("APP_NAME".to_string(), "realtime".to_string());
env_vars.insert("SLOT_NAME".to_string(), "realtime_rls".to_string());
env_vars.insert("TEMPORARY_SLOT".to_string(), "true".to_string());
env_vars.insert("SECURE_CHANNELS".to_string(), "true".to_string());
env_vars.insert("REGION".to_string(), "local".to_string());
env_vars.insert("TENANT_ID".to_string(), "realtime-dev".to_string());
env_vars.insert("ERL_AFLAGS".to_string(), "-proto_dist inet_tcp".to_string());
env_vars.insert("ENABLE_TAILSCALE".to_string(), "false".to_string());
env_vars.insert("DB_PORT".to_string(), "5432".to_string());
env_vars.insert("DB_SSL".to_string(), "false".to_string());
env_vars.insert("RLIMIT_NOFILE".to_string(), "10000".to_string());
Self {
env_vars,
tag: TAG.to_string(),
}
}
}
impl Image for Realtime {
fn name(&self) -> &str {
NAME
}
fn tag(&self) -> &str {
&self.tag
}
fn ready_conditions(&self) -> Vec<WaitFor> {
vec![WaitFor::message_on_stdout("Realtime has started")]
}
fn expose_ports(&self) -> &[ContainerPort] {
&[ContainerPort::Tcp(REALTIME_PORT)]
}
fn env_vars(
&self,
) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
&self.env_vars
}
#[allow(unused_variables)]
fn exec_after_start(
&self,
cs: ContainerState,
) -> Result<Vec<ExecCommand>, TestcontainersError> {
Ok(vec![])
}
}
#[cfg(test)]
#[cfg(feature = "realtime")]
mod tests {
use super::*;
use testcontainers_modules::testcontainers::Image;
#[test]
fn test_default_configuration() {
let realtime = Realtime::default();
assert_eq!(realtime.env_vars.get("PORT"), Some(&"4000".to_string()));
assert_eq!(
realtime.env_vars.get("APP_NAME"),
Some(&"realtime".to_string())
);
assert_eq!(
realtime.env_vars.get("SLOT_NAME"),
Some(&"realtime_rls".to_string())
);
assert_eq!(
realtime.env_vars.get("TEMPORARY_SLOT"),
Some(&"true".to_string())
);
assert_eq!(
realtime.env_vars.get("SECURE_CHANNELS"),
Some(&"true".to_string())
);
assert_eq!(realtime.env_vars.get("REGION"), Some(&"local".to_string()));
assert_eq!(
realtime.env_vars.get("TENANT_ID"),
Some(&"realtime-dev".to_string())
);
assert_eq!(
realtime.env_vars.get("ERL_AFLAGS"),
Some(&"-proto_dist inet_tcp".to_string())
);
assert_eq!(
realtime.env_vars.get("ENABLE_TAILSCALE"),
Some(&"false".to_string())
);
assert_eq!(realtime.env_vars.get("DB_PORT"), Some(&"5432".to_string()));
assert_eq!(realtime.env_vars.get("DB_SSL"), Some(&"false".to_string()));
assert_eq!(
realtime.env_vars.get("RLIMIT_NOFILE"),
Some(&"10000".to_string())
);
}
#[test]
fn test_name_returns_correct_image() {
let realtime = Realtime::default();
assert_eq!(realtime.name(), "supabase/realtime");
}
#[test]
fn test_tag_returns_correct_version() {
let realtime = Realtime::default();
assert_eq!(realtime.tag(), TAG);
}
#[test]
fn test_realtime_port_constant() {
assert_eq!(REALTIME_PORT, 4000);
}
#[test]
fn test_with_postgres_connection() {
let realtime =
Realtime::default().with_postgres_connection("postgres://user:pass@localhost:5432/db");
assert_eq!(
realtime.env_vars.get("DB_URL"),
Some(&"postgres://user:pass@localhost:5432/db".to_string())
);
}
#[test]
fn test_with_db_host() {
let realtime = Realtime::default().with_db_host("myhost.example.com");
assert_eq!(
realtime.env_vars.get("DB_HOST"),
Some(&"myhost.example.com".to_string())
);
}
#[test]
fn test_with_db_port() {
let realtime = Realtime::default().with_db_port(5433);
assert_eq!(realtime.env_vars.get("DB_PORT"), Some(&"5433".to_string()));
}
#[test]
fn test_with_db_name() {
let realtime = Realtime::default().with_db_name("mydb");
assert_eq!(realtime.env_vars.get("DB_NAME"), Some(&"mydb".to_string()));
}
#[test]
fn test_with_db_user() {
let realtime = Realtime::default().with_db_user("myuser");
assert_eq!(
realtime.env_vars.get("DB_USER"),
Some(&"myuser".to_string())
);
}
#[test]
fn test_with_db_password() {
let realtime = Realtime::default().with_db_password("mypassword");
assert_eq!(
realtime.env_vars.get("DB_PASSWORD"),
Some(&"mypassword".to_string())
);
}
#[test]
fn test_with_db_ssl() {
let realtime = Realtime::default().with_db_ssl(true);
assert_eq!(realtime.env_vars.get("DB_SSL"), Some(&"true".to_string()));
}
#[test]
fn test_with_db_after_connect_query() {
let realtime = Realtime::default().with_db_after_connect_query("SET timezone TO 'UTC'");
assert_eq!(
realtime.env_vars.get("DB_AFTER_CONNECT_QUERY"),
Some(&"SET timezone TO 'UTC'".to_string())
);
}
#[test]
fn test_with_jwt_secret() {
let realtime = Realtime::default()
.with_jwt_secret("super-secret-jwt-token-with-at-least-32-characters");
assert_eq!(
realtime.env_vars.get("JWT_SECRET"),
Some(&"super-secret-jwt-token-with-at-least-32-characters".to_string())
);
}
#[test]
fn test_with_api_jwt_secret() {
let realtime = Realtime::default().with_api_jwt_secret("api-secret-key");
assert_eq!(
realtime.env_vars.get("API_JWT_SECRET"),
Some(&"api-secret-key".to_string())
);
}
#[test]
fn test_with_secret_key_base() {
let realtime = Realtime::default().with_secret_key_base("phoenix-secret-key-base");
assert_eq!(
realtime.env_vars.get("SECRET_KEY_BASE"),
Some(&"phoenix-secret-key-base".to_string())
);
}
#[test]
fn test_with_slot_name() {
let realtime = Realtime::default().with_slot_name("my_custom_slot");
assert_eq!(
realtime.env_vars.get("SLOT_NAME"),
Some(&"my_custom_slot".to_string())
);
}
#[test]
fn test_with_temporary_slot() {
let realtime = Realtime::default().with_temporary_slot(false);
assert_eq!(
realtime.env_vars.get("TEMPORARY_SLOT"),
Some(&"false".to_string())
);
}
#[test]
fn test_with_max_record_bytes() {
let realtime = Realtime::default().with_max_record_bytes(1048576);
assert_eq!(
realtime.env_vars.get("MAX_RECORD_BYTES"),
Some(&"1048576".to_string())
);
}
#[test]
fn test_with_secure_channels() {
let realtime = Realtime::default().with_secure_channels(false);
assert_eq!(
realtime.env_vars.get("SECURE_CHANNELS"),
Some(&"false".to_string())
);
}
#[test]
fn test_with_region() {
let realtime = Realtime::default().with_region("us-east-1");
assert_eq!(
realtime.env_vars.get("REGION"),
Some(&"us-east-1".to_string())
);
}
#[test]
fn test_with_tenant_id() {
let realtime = Realtime::default().with_tenant_id("my-tenant");
assert_eq!(
realtime.env_vars.get("TENANT_ID"),
Some(&"my-tenant".to_string())
);
}
#[test]
fn test_with_erl_aflags() {
let realtime =
Realtime::default().with_erl_aflags("-kernel inet_dist_use_interface {0,0,0,0}");
assert_eq!(
realtime.env_vars.get("ERL_AFLAGS"),
Some(&"-kernel inet_dist_use_interface {0,0,0,0}".to_string())
);
}
#[test]
fn test_with_dns_nodes() {
let realtime = Realtime::default().with_dns_nodes("node1.example.com,node2.example.com");
assert_eq!(
realtime.env_vars.get("DNS_NODES"),
Some(&"node1.example.com,node2.example.com".to_string())
);
}
#[test]
fn test_with_enable_tailscale() {
let realtime = Realtime::default().with_enable_tailscale(true);
assert_eq!(
realtime.env_vars.get("ENABLE_TAILSCALE"),
Some(&"true".to_string())
);
}
#[test]
fn test_with_port() {
let realtime = Realtime::default().with_port(8080);
assert_eq!(realtime.env_vars.get("PORT"), Some(&"8080".to_string()));
}
#[test]
fn test_with_tag_overrides_default() {
let realtime = Realtime::default().with_tag("v2.0.0");
assert_eq!(realtime.tag(), "v2.0.0");
}
#[test]
fn test_with_env_adds_custom_variable() {
let realtime = Realtime::default()
.with_env("CUSTOM_VAR", "custom_value")
.with_env("ANOTHER_VAR", "another_value");
assert_eq!(
realtime.env_vars.get("CUSTOM_VAR"),
Some(&"custom_value".to_string())
);
assert_eq!(
realtime.env_vars.get("ANOTHER_VAR"),
Some(&"another_value".to_string())
);
}
#[test]
fn test_builder_method_chaining() {
let realtime = Realtime::default()
.with_postgres_connection("postgres://user:pass@localhost:5432/db")
.with_jwt_secret("my-jwt-secret")
.with_slot_name("custom_slot")
.with_temporary_slot(true)
.with_secure_channels(false)
.with_region("eu-west-1")
.with_tenant_id("my-tenant")
.with_tag("v3.0.0");
assert_eq!(
realtime.env_vars.get("DB_URL"),
Some(&"postgres://user:pass@localhost:5432/db".to_string())
);
assert_eq!(
realtime.env_vars.get("JWT_SECRET"),
Some(&"my-jwt-secret".to_string())
);
assert_eq!(
realtime.env_vars.get("SLOT_NAME"),
Some(&"custom_slot".to_string())
);
assert_eq!(
realtime.env_vars.get("TEMPORARY_SLOT"),
Some(&"true".to_string())
);
assert_eq!(
realtime.env_vars.get("SECURE_CHANNELS"),
Some(&"false".to_string())
);
assert_eq!(
realtime.env_vars.get("REGION"),
Some(&"eu-west-1".to_string())
);
assert_eq!(
realtime.env_vars.get("TENANT_ID"),
Some(&"my-tenant".to_string())
);
assert_eq!(realtime.tag(), "v3.0.0");
}
#[test]
fn test_new_creates_default_instance() {
let realtime = Realtime::new();
assert_eq!(realtime.name(), NAME);
assert_eq!(realtime.tag(), TAG);
}
#[test]
fn test_new_with_env() {
let mut envs = BTreeMap::new();
envs.insert("CUSTOM_KEY", "custom_value");
envs.insert("PORT", "5000");
let realtime = Realtime::new_with_env(envs);
assert_eq!(
realtime.env_vars.get("CUSTOM_KEY"),
Some(&"custom_value".to_string())
);
assert_eq!(realtime.env_vars.get("PORT"), Some(&"5000".to_string()));
}
#[test]
fn test_expose_ports() {
let realtime = Realtime::default();
let ports = realtime.expose_ports();
assert_eq!(ports.len(), 1);
assert_eq!(ports[0], ContainerPort::Tcp(4000));
}
#[test]
fn test_ready_conditions() {
let realtime = Realtime::default();
let conditions = realtime.ready_conditions();
assert_eq!(conditions.len(), 1);
}
#[test]
fn test_individual_db_config() {
let realtime = Realtime::default()
.with_db_host("db.example.com")
.with_db_port(5433)
.with_db_name("mydb")
.with_db_user("myuser")
.with_db_password("secret")
.with_db_ssl(true);
assert_eq!(
realtime.env_vars.get("DB_HOST"),
Some(&"db.example.com".to_string())
);
assert_eq!(realtime.env_vars.get("DB_PORT"), Some(&"5433".to_string()));
assert_eq!(realtime.env_vars.get("DB_NAME"), Some(&"mydb".to_string()));
assert_eq!(
realtime.env_vars.get("DB_USER"),
Some(&"myuser".to_string())
);
assert_eq!(
realtime.env_vars.get("DB_PASSWORD"),
Some(&"secret".to_string())
);
assert_eq!(realtime.env_vars.get("DB_SSL"), Some(&"true".to_string()));
}
}