#![warn(unreachable_pub)]
#![forbid(unsafe_code)]
#![allow(
// The driver state machine is naturally branchy; pedantic lints fight
// the readability of an event-pump loop. We tighten these later once the
// engine has stabilised.
clippy::too_many_lines,
clippy::module_name_repetitions,
clippy::missing_errors_doc,
clippy::doc_markdown
)]
pub mod auto_cluster_failover;
mod client;
mod consumer;
pub mod crypto;
pub mod dns;
mod driver;
mod log_fields;
mod pool;
mod producer;
pub mod tls;
pub mod tls_crypto;
mod transport;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::Waker;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use magnetar_proto::{Connection, ConnectionConfig};
pub use magnetar_proto::{ControlledClusterFailover, ServiceUrlProvider, StaticServiceUrlProvider};
use moonpool_core::{Providers, TimeProvider};
use parking_lot::Mutex;
use slab::Slab;
use tokio::sync::Notify;
pub use crate::client::{Client, ClientError, LookupTopicResult};
pub use crate::consumer::Consumer;
pub use crate::crypto::{EncryptError, MessageDecryptor, MessageEncryptor};
pub use crate::dns::{DnsResolveFuture, DnsResolver, StaticDnsResolver, arc_dns_resolver};
pub use crate::driver::DriverHandle;
pub use crate::producer::{Producer, SendFut};
use crate::transport::Transport;
pub const DETERMINISTIC_SIM_EPOCH_MS: u64 = 1_704_067_200_000;
fn current_wall_clock_base_ms() -> u64 {
DETERMINISTIC_SIM_EPOCH_MS
}
pub(crate) fn make_shared_with_providers<P: Providers>(
providers: &P,
config: ConnectionConfig,
) -> Arc<ConnectionShared> {
let time = providers.time().clone();
let start = Instant::now();
let now_instant_provider: Arc<dyn Fn() -> Instant + Send + Sync> = Arc::new(move || {
start
.checked_add(time.now())
.unwrap_or_else(|| start + std::time::Duration::from_secs(0))
});
ConnectionShared::with_auth_wall_clock_and_instant(
config,
None,
current_wall_clock_base_ms(),
now_instant_provider,
)
}
pub struct ConnectionShared {
pub inner: Mutex<Connection>,
pub driver_waker: Notify,
pub auth_provider: Option<Arc<dyn magnetar_proto::AuthProvider>>,
pub topic_list_changes: Mutex<std::collections::VecDeque<TopicListChange>>,
pub topic_list_notify: Notify,
pub replicated_subscription_markers:
Mutex<std::collections::VecDeque<ObservedReplicatedSubscriptionMarker>>,
pub replicated_subscription_marker_notify: Notify,
pub pending_rebuild: AtomicBool,
pub no_driver: AtomicBool,
pub memory_limit_bytes: u64,
pub memory_used: AtomicU64,
pub memory_limit_policy: magnetar_proto::MemoryLimitPolicy,
pub memory_wakers: Mutex<Slab<Waker>>,
pub wall_clock_base_ms: u64,
pub wall_clock_ms: Arc<AtomicU64>,
pub now_instant_provider: Arc<dyn Fn() -> Instant + Send + Sync>,
#[cfg(feature = "scalable-topics")]
pub scalable_events: Mutex<std::collections::VecDeque<crate::ScalableEvent>>,
#[cfg(feature = "scalable-topics")]
pub scalable_notify: Notify,
}
impl std::fmt::Debug for ConnectionShared {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionShared")
.field("inner", &"<Connection>")
.field("has_auth_provider", &self.auth_provider.is_some())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LookupReissueReadiness {
Reconnected,
Terminal,
}
impl ConnectionShared {
#[must_use]
pub fn new(config: ConnectionConfig) -> Arc<Self> {
Self::with_auth(config, None)
}
pub fn mark_no_driver(&self) {
self.no_driver.store(true, Ordering::SeqCst);
}
pub fn fail_if_no_driver(&self) -> Result<(), crate::client::ClientError> {
if self.no_driver.load(Ordering::SeqCst) && self.inner.lock().is_closed() {
return Err(crate::client::ClientError::PeerClosed);
}
Ok(())
}
pub async fn await_reconnect_or_terminal(&self) -> LookupReissueReadiness {
loop {
let notified = self.driver_waker.notified();
tokio::pin!(notified);
notified.as_mut().enable();
{
let conn = self.inner.lock();
if conn.is_connected() {
return LookupReissueReadiness::Reconnected;
}
if conn.is_closed() && self.no_driver.load(Ordering::SeqCst) {
return LookupReissueReadiness::Terminal;
}
}
notified.await;
}
}
#[must_use]
pub fn with_auth(
config: ConnectionConfig,
auth_provider: Option<Arc<dyn magnetar_proto::AuthProvider>>,
) -> Arc<Self> {
Self::with_auth_and_wall_clock_base(config, auth_provider, current_wall_clock_base_ms())
}
#[must_use]
pub fn with_auth_and_wall_clock_base(
config: ConnectionConfig,
auth_provider: Option<Arc<dyn magnetar_proto::AuthProvider>>,
wall_clock_base_ms: u64,
) -> Arc<Self> {
let memory_limit_bytes = config.memory_limit_bytes;
let memory_limit_policy = config.memory_limit_policy;
let anti_thrash_threshold = config
.supervisor
.as_ref()
.and_then(|s| s.anti_thrash_threshold);
let anti_thrash_cooldown = config.supervisor.as_ref().map_or_else(
|| std::time::Duration::from_secs(30),
|s| s.max_backoff_after_thrash,
);
let wall_clock_ms = Arc::new(AtomicU64::new(wall_clock_base_ms));
let read_handle = wall_clock_ms.clone();
let wall_clock_provider: Arc<dyn Fn() -> std::time::SystemTime + Send + Sync> =
Arc::new(move || {
std::time::UNIX_EPOCH
+ std::time::Duration::from_millis(read_handle.load(Ordering::Relaxed))
});
let now_instant_provider: Arc<dyn Fn() -> Instant + Send + Sync> = Arc::new(Instant::now);
let mut conn = Connection::new(config, wall_clock_provider);
conn.set_anti_thrash(anti_thrash_threshold, anti_thrash_cooldown);
debug_assert!(
!conn.is_connected() && !conn.is_closed(),
"freshly-constructed ConnectionShared must start in a non-connected, \
non-terminal handshake state",
);
Arc::new(Self {
inner: Mutex::new(conn),
driver_waker: Notify::new(),
auth_provider,
topic_list_changes: Mutex::new(std::collections::VecDeque::new()),
topic_list_notify: Notify::new(),
replicated_subscription_markers: Mutex::new(std::collections::VecDeque::new()),
replicated_subscription_marker_notify: Notify::new(),
pending_rebuild: AtomicBool::new(false),
no_driver: AtomicBool::new(false),
memory_limit_bytes,
memory_used: AtomicU64::new(0),
memory_limit_policy,
memory_wakers: Mutex::new(Slab::new()),
wall_clock_base_ms,
wall_clock_ms,
now_instant_provider,
#[cfg(feature = "scalable-topics")]
scalable_events: Mutex::new(std::collections::VecDeque::new()),
#[cfg(feature = "scalable-topics")]
scalable_notify: Notify::new(),
})
}
#[must_use]
pub fn now_instant(&self) -> Instant {
(self.now_instant_provider)()
}
#[must_use]
pub fn now_wall_clock_ms(&self) -> u64 {
self.wall_clock_ms.load(Ordering::Relaxed)
}
#[must_use]
pub fn with_auth_wall_clock_and_instant(
config: ConnectionConfig,
auth_provider: Option<Arc<dyn magnetar_proto::AuthProvider>>,
wall_clock_base_ms: u64,
now_instant_provider: Arc<dyn Fn() -> Instant + Send + Sync>,
) -> Arc<Self> {
let mut shared =
Self::with_auth_and_wall_clock_base(config, auth_provider, wall_clock_base_ms);
let installed = if let Some(mu) = Arc::get_mut(&mut shared) {
mu.now_instant_provider = now_instant_provider;
true
} else {
false
};
debug_assert!(
installed,
"with_auth_wall_clock_and_instant must install before cloning"
);
shared
}
pub fn try_reserve_memory(&self, bytes: u64) -> Result<(), EngineError> {
if self.memory_limit_bytes == 0 {
return Ok(());
}
loop {
let current = self.memory_used.load(Ordering::Acquire);
let next = current.saturating_add(bytes);
if next > self.memory_limit_bytes {
return Err(EngineError::MemoryLimitExceeded {
current,
limit: self.memory_limit_bytes,
requested: bytes,
});
}
if self
.memory_used
.compare_exchange(current, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(());
}
}
}
pub fn release_memory(&self, bytes: u64) {
if bytes == 0 || self.memory_limit_bytes == 0 {
return;
}
loop {
let current = self.memory_used.load(Ordering::Acquire);
let next = current.saturating_sub(bytes);
if self
.memory_used
.compare_exchange(current, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
self.drain_memory_wakers();
}
pub fn try_reserve_memory_or_register(&self, bytes: u64, waker: &Waker) -> Result<(), usize> {
if self.try_reserve_memory(bytes).is_ok() {
return Ok(());
}
let key = self.memory_wakers.lock().insert(waker.clone());
if self.try_reserve_memory(bytes).is_ok() {
self.cancel_memory_waker(key);
return Ok(());
}
Err(key)
}
pub fn cancel_memory_waker(&self, slab_key: usize) {
let mut slab = self.memory_wakers.lock();
if slab.contains(slab_key) {
slab.remove(slab_key);
}
}
fn drain_memory_wakers(&self) {
let wakers: Vec<Waker> = {
let mut slab = self.memory_wakers.lock();
let drained: Vec<Waker> = slab.drain().collect();
drained
};
for w in wakers {
w.wake();
}
}
}
#[derive(Debug, Clone)]
pub struct TopicListChange {
pub added: Vec<String>,
pub removed: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct ObservedReplicatedSubscriptionMarker {
pub handle: magnetar_proto::ConsumerHandle,
pub marker: magnetar_proto::ReplicatedSubscriptionMarker,
}
#[cfg(feature = "scalable-topics")]
#[must_use]
pub fn is_scalable_topic_url(topic: &str) -> bool {
topic.starts_with("topic://")
}
#[cfg(all(test, feature = "scalable-topics"))]
mod scalable_url_tests {
use super::is_scalable_topic_url;
#[test]
fn recognises_scalable_and_v4_schemes() {
assert!(is_scalable_topic_url("topic://public/default/scaled"));
assert!(!is_scalable_topic_url(
"persistent://public/default/regular"
));
assert!(!is_scalable_topic_url("non-persistent://public/default/np"));
}
}
#[cfg(feature = "scalable-topics")]
#[derive(Debug, Clone)]
pub struct ScalableLookup {
pub controller_broker_url: String,
pub segments: Vec<magnetar_proto::SegmentDescriptor>,
pub lookup_token: u64,
}
#[cfg(feature = "scalable-topics")]
#[derive(Debug, Clone)]
pub enum ScalableEvent {
LookupResolved {
request_id: magnetar_proto::RequestId,
controller_broker_url: String,
segments: Vec<magnetar_proto::SegmentDescriptor>,
lookup_token: u64,
},
DagUpdated {
watch_session_id: u64,
delta: magnetar_proto::DagDelta,
},
DagChangedDuringConsume {
watch_session_id: u64,
reason: magnetar_proto::DagChangeReason,
},
DagWatchClosed {
watch_session_id: u64,
reason: Option<String>,
},
}
#[derive(Debug, thiserror::Error)]
pub enum EngineError {
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("protocol error: {0}")]
Protocol(#[from] magnetar_proto::ProtocolError),
#[error("tls error: {0}")]
Tls(#[from] rustls::Error),
#[error("peer closed connection")]
PeerClosed,
#[error("handshake failed: {0}")]
HandshakeFailed(String),
#[error("config error: {0}")]
Config(String),
#[error("memory limit exceeded: current={current}B + requested={requested}B > limit={limit}B")]
MemoryLimitExceeded {
current: u64,
limit: u64,
requested: u64,
},
}
pub struct MoonpoolEngine<P: Providers> {
providers: P,
}
impl<P: Providers> std::fmt::Debug for MoonpoolEngine<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MoonpoolEngine").finish_non_exhaustive()
}
}
impl<P: Providers> MoonpoolEngine<P> {
#[must_use]
pub fn new(providers: P) -> Self {
Self { providers }
}
#[must_use]
pub fn providers(&self) -> &P {
&self.providers
}
fn make_shared(&self, config: ConnectionConfig) -> Arc<ConnectionShared> {
make_shared_with_providers(&self.providers, config)
}
pub async fn connect_plain(
&self,
addr: &str,
config: ConnectionConfig,
) -> Result<(Arc<ConnectionShared>, DriverHandle), EngineError> {
self.connect_plain_with_resolver(addr, config, None).await
}
pub async fn connect_plain_with_resolver(
&self,
addr: &str,
config: ConnectionConfig,
resolver: Option<&dyn DnsResolver>,
) -> Result<(Arc<ConnectionShared>, DriverHandle), EngineError> {
let connect_timeout = config.connect_timeout;
let operation_timeout = config.operation_timeout;
let mut transport = dial_with_retry::<P, _, _>(
self.providers.time(),
config.connect_max_retries,
operation_timeout,
|| {
Transport::<P>::connect_with_resolver(
self.providers.network(),
addr,
resolver,
self.providers.time(),
connect_timeout,
)
},
)
.await?;
let shared = self.make_shared(config);
handshake_plain::<P>(
&shared,
&mut transport,
self.providers.time(),
Some(operation_timeout),
addr,
false,
)
.await?;
let driver = driver::spawn::<P>(
shared.clone(),
transport,
self.providers.time().clone(),
self.providers.task(),
);
Ok((shared, driver))
}
pub async fn connect_tls(
&self,
addr: &str,
host: &str,
tls_config: Arc<rustls::ClientConfig>,
config: ConnectionConfig,
resolver: Option<&dyn DnsResolver>,
) -> Result<(Arc<ConnectionShared>, DriverHandle), EngineError> {
let connect_timeout = config.connect_timeout;
let operation_timeout = config.operation_timeout;
let mut transport = dial_with_retry::<P, _, _>(
self.providers.time(),
config.connect_max_retries,
operation_timeout,
|| {
Transport::<P>::connect_tls(
self.providers.network(),
addr,
host,
tls_config.clone(),
resolver,
self.providers.time(),
connect_timeout,
)
},
)
.await?;
let shared = self.make_shared(config);
handshake_plain::<P>(
&shared,
&mut transport,
self.providers.time(),
Some(operation_timeout),
addr,
true,
)
.await?;
let driver = driver::spawn::<P>(
shared.clone(),
transport,
self.providers.time().clone(),
self.providers.task(),
);
Ok((shared, driver))
}
pub async fn connect_plain_supervised(
&self,
addr: &str,
config: ConnectionConfig,
service_url_provider: Option<Arc<dyn magnetar_proto::ServiceUrlProvider>>,
dns_resolver: Option<Arc<dyn DnsResolver>>,
) -> Result<(Arc<ConnectionShared>, DriverHandle), EngineError> {
let connect_timeout = config.connect_timeout;
let operation_timeout = config.operation_timeout;
let mut transport = dial_with_retry::<P, _, _>(
self.providers.time(),
config.connect_max_retries,
operation_timeout,
|| {
Transport::<P>::connect_with_resolver(
self.providers.network(),
addr,
dns_resolver.as_deref(),
self.providers.time(),
connect_timeout,
)
},
)
.await?;
let shared = self.make_shared(config);
handshake_plain::<P>(
&shared,
&mut transport,
self.providers.time(),
Some(operation_timeout),
addr,
false,
)
.await?;
let ctx = driver::ReconnectContext {
host_port: addr.to_owned(),
service_url_provider,
dns_resolver,
};
let driver =
driver::spawn_supervised::<P>(shared.clone(), transport, ctx, self.providers.clone());
Ok((shared, driver))
}
}
pub(crate) async fn dial_with_retry<P, F, Fut>(
time: &P::Time,
max_retries: u32,
operation_timeout: Duration,
mut dial: F,
) -> Result<Transport<P>, EngineError>
where
P: Providers,
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<Transport<P>, EngineError>>,
{
let started = time.now();
let mut attempt: u32 = 0;
loop {
match dial().await {
Ok(transport) => return Ok(transport),
Err(err) => {
let elapsed = time.now().saturating_sub(started);
if !matches!(err, EngineError::Io(_))
|| attempt >= max_retries
|| elapsed >= operation_timeout
{
return Err(err);
}
attempt += 1;
let _ = time.sleep(connect_backoff(attempt)).await;
}
}
}
}
fn connect_backoff(attempt: u32) -> Duration {
let shift = attempt.saturating_sub(1).min(5);
Duration::from_millis((50u64 << shift).min(1_000))
}
pub(crate) async fn handshake_plain<P: Providers>(
shared: &Arc<ConnectionShared>,
transport: &mut Transport<P>,
time: &P::Time,
bound: Option<Duration>,
addr: &str,
tls: bool,
) -> Result<(), EngineError> {
let mut read_buf = BytesMut::with_capacity(8 * 1024);
{
let mut conn = shared.inner.lock();
if matches!(conn.state(), magnetar_proto::HandshakeState::Uninitialized) {
conn.begin_handshake()
.map_err(|err| EngineError::Config(format!("begin_handshake failed: {err}")))?;
}
}
let mut deadline = bound.map(|d| Box::pin(time.sleep(d)));
loop {
let write_buf = {
let mut conn = shared.inner.lock();
conn.poll_transmit()
};
if !write_buf.is_empty() {
transport.write_all(&write_buf).await?;
transport.flush().await?;
}
{
let conn = shared.inner.lock();
if matches!(
conn.state(),
magnetar_proto::HandshakeState::Connected
| magnetar_proto::HandshakeState::AuthChallenging
) {
let auth_challenge_pending = matches!(
conn.state(),
magnetar_proto::HandshakeState::AuthChallenging
);
drop(conn);
let (host, port) = addr.split_once(':').unwrap_or((addr, ""));
let auth_method = shared
.auth_provider
.as_deref()
.map_or("none", |p| p.method());
tracing::info!(
host = %host,
port = %port,
tls,
auth_method,
auth_challenge_pending,
"connection established"
);
return Ok(());
}
if matches!(
conn.state(),
magnetar_proto::HandshakeState::Failed | magnetar_proto::HandshakeState::Closed
) {
if let Some(reason) = conn.handshake_failure_reason() {
return Err(EngineError::HandshakeFailed(reason.to_owned()));
}
return Err(EngineError::PeerClosed);
}
}
let n = if let Some(deadline) = deadline.as_mut() {
tokio::select! {
biased;
r = transport.read_buf(&mut read_buf) => r?,
_ = deadline => {
return Err(EngineError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"handshake to {addr} exceeded operation_timeout: broker accepted \
the connection but never completed CONNECT -> CONNECTED"
),
)));
}
}
} else {
transport.read_buf(&mut read_buf).await?
};
if n == 0 {
let mut conn = shared.inner.lock();
conn.mark_disconnected();
if let Some(reason) = conn.handshake_failure_reason() {
return Err(EngineError::HandshakeFailed(reason.to_owned()));
}
return Err(EngineError::PeerClosed);
}
let bytes = read_buf.split().freeze();
let now = shared.now_instant();
shared.inner.lock().handle_bytes(now, &bytes)?;
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::Ordering;
use magnetar_proto::ConnectionConfig;
use moonpool_core::TokioProviders;
use super::{ConnectionShared, DETERMINISTIC_SIM_EPOCH_MS, MoonpoolEngine, TopicListChange};
#[test]
fn engine_can_be_constructed_with_tokio_providers() {
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let _ = engine.providers();
}
#[test]
fn shared_state_can_be_constructed() {
let s = ConnectionShared::new(ConnectionConfig::default());
let _g = s.inner.lock();
assert!(s.topic_list_changes.lock().is_empty());
}
#[test]
fn fail_if_no_driver_does_not_fire_on_recoverable_failed_window() {
let s = ConnectionShared::new(ConnectionConfig::default());
s.inner.lock().mark_disconnected();
assert!(
s.inner.lock().is_closed(),
"mark_disconnected must put the connection in a terminal handshake state",
);
assert!(
!s.no_driver.load(Ordering::SeqCst),
"no_driver must still be UNSET in the recoverable-Failed window",
);
assert!(
s.fail_if_no_driver().is_ok(),
"fail_if_no_driver must NOT fire while the connection is recoverable \
(is_closed but no_driver unset) — regressing this breaks transparent reconnect",
);
}
#[test]
fn fail_if_no_driver_fires_when_closed_and_no_driver_latched() {
let s = ConnectionShared::new(ConnectionConfig::default());
s.inner.lock().mark_disconnected();
s.mark_no_driver();
assert!(s.inner.lock().is_closed());
assert!(s.no_driver.load(Ordering::SeqCst));
assert!(
matches!(
s.fail_if_no_driver(),
Err(crate::client::ClientError::PeerClosed)
),
"fail_if_no_driver must return PeerClosed once the connection is terminal \
AND no driver is left to recover it",
);
}
#[test]
fn fail_if_no_driver_does_not_fire_before_any_terminal_state() {
let s = ConnectionShared::new(ConnectionConfig::default());
assert!(
!s.inner.lock().is_closed(),
"a fresh connection is not terminal",
);
assert!(
s.fail_if_no_driver().is_ok(),
"fail_if_no_driver must not fire on a non-terminal connection",
);
}
#[test]
fn shared_state_seeds_wall_clock_deterministically_by_default() {
let s = ConnectionShared::new(ConnectionConfig::default());
let observed = s.wall_clock_ms.load(Ordering::Relaxed);
assert_eq!(observed, super::DETERMINISTIC_SIM_EPOCH_MS);
assert_eq!(s.wall_clock_base_ms, super::DETERMINISTIC_SIM_EPOCH_MS);
}
#[test]
fn shared_state_pins_wall_clock_base_for_deterministic_sim() {
let s = ConnectionShared::with_auth_and_wall_clock_base(
ConnectionConfig::default(),
None,
DETERMINISTIC_SIM_EPOCH_MS,
);
assert_eq!(s.wall_clock_base_ms, DETERMINISTIC_SIM_EPOCH_MS);
assert_eq!(
s.wall_clock_ms.load(Ordering::Relaxed),
DETERMINISTIC_SIM_EPOCH_MS,
);
}
#[test]
fn topic_list_changes_buffer_round_trip() {
let s = ConnectionShared::new(ConnectionConfig::default());
s.topic_list_changes.lock().push_back(TopicListChange {
added: vec!["a".to_owned()],
removed: vec![],
});
s.topic_list_changes.lock().push_back(TopicListChange {
added: vec![],
removed: vec!["b".to_owned()],
});
let first = s.topic_list_changes.lock().pop_front().unwrap();
assert_eq!(first.added, vec!["a".to_owned()]);
let second = s.topic_list_changes.lock().pop_front().unwrap();
assert_eq!(second.removed, vec!["b".to_owned()]);
assert!(s.topic_list_changes.lock().is_empty());
}
#[test]
#[allow(clippy::let_underscore_future, clippy::no_effect_underscore_binding)]
fn connect_plain_compiles() {
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let _fut = engine.connect_plain("127.0.0.1:6650", ConnectionConfig::default());
}
#[test]
#[allow(clippy::let_underscore_future, clippy::no_effect_underscore_binding)]
fn connect_plain_supervised_compiles() {
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let _fut = engine.connect_plain_supervised(
"127.0.0.1:6650",
ConnectionConfig::default(),
None,
None,
);
}
#[test]
#[allow(clippy::let_underscore_future, clippy::no_effect_underscore_binding)]
fn connect_tls_compiles() {
crate::tls_crypto::install_default_provider();
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let tls_config = std::sync::Arc::new(
rustls::ClientConfig::builder_with_provider(crate::tls_crypto::active_provider())
.with_safe_default_protocol_versions()
.expect("rustls default protocol versions are valid")
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth(),
);
let _fut = engine.connect_tls(
"127.0.0.1:6651",
"broker.example.com",
tls_config,
ConnectionConfig::default(),
None,
);
}
#[test]
fn no_unbounded_compile_check() {
let _ = std::any::type_name::<super::EngineError>();
let _ = std::time::Duration::from_secs(0);
}
#[test]
fn try_reserve_memory_or_register_succeeds_when_budget_available() {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::task::Wake;
struct CountingWaker(AtomicUsize);
impl Wake for CountingWaker {
fn wake(self: Arc<Self>) {
self.0.fetch_add(1, super::Ordering::SeqCst);
}
}
let cfg = ConnectionConfig {
memory_limit_bytes: 1024,
..ConnectionConfig::default()
};
let s = ConnectionShared::new(cfg);
let counter = Arc::new(CountingWaker(AtomicUsize::new(0)));
let waker = std::task::Waker::from(counter.clone());
s.try_reserve_memory_or_register(512, &waker)
.expect("should succeed with budget available");
assert_eq!(s.memory_used.load(super::Ordering::Acquire), 512);
assert_eq!(s.memory_wakers.lock().len(), 0);
assert_eq!(counter.0.load(super::Ordering::Acquire), 0);
}
#[test]
fn try_reserve_memory_or_register_wins_recheck_under_contention() {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::task::Wake;
use std::time::{Duration, Instant};
struct NoopWaker(AtomicUsize);
impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {
self.0.fetch_add(1, super::Ordering::SeqCst);
}
}
let cfg = ConnectionConfig {
memory_limit_bytes: 16,
..ConnectionConfig::default()
};
let shared = Arc::new(ConnectionShared::new(cfg));
let waker_ctr = Arc::new(NoopWaker(AtomicUsize::new(0)));
let waker = std::task::Waker::from(waker_ctr.clone());
let deadline = Instant::now() + Duration::from_secs(5);
let mut iters = 0usize;
while Instant::now() <= deadline && iters < 10_000 {
iters += 1;
shared.try_reserve_memory(16).expect("seed budget at limit");
let releaser = {
let s = shared.clone();
std::thread::spawn(move || {
std::thread::yield_now();
s.release_memory(16);
})
};
let outcome = shared.try_reserve_memory_or_register(2, &waker);
releaser.join().expect("releaser thread");
match outcome {
Ok(()) => {
assert!(shared.memory_wakers.lock().is_empty());
shared.release_memory(2);
}
Err(key) => {
shared.cancel_memory_waker(key);
}
}
}
assert!(
iters >= 100,
"expected ≥100 race iterations within 5s, got {iters}",
);
}
#[test]
fn engine_debug_does_not_leak_providers() {
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let s = format!("{engine:?}");
assert!(s.contains("MoonpoolEngine"));
assert!(!s.contains("TokioProviders"));
}
#[test]
fn shared_state_debug_hides_inner_connection() {
let s = ConnectionShared::new(ConnectionConfig::default());
let dbg = format!("{s:?}");
assert!(dbg.contains("ConnectionShared"));
assert!(dbg.contains("has_auth_provider"));
assert!(dbg.contains("<Connection>"));
}
#[test]
fn shared_state_arc_is_clone() {
let s = ConnectionShared::new(ConnectionConfig::default());
let cloned: std::sync::Arc<ConnectionShared> = s.clone();
assert!(std::sync::Arc::ptr_eq(&s, &cloned));
}
#[test]
fn topic_list_change_clone_preserves_payload() {
let original = TopicListChange {
added: vec!["a".to_owned(), "b".to_owned()],
removed: vec!["c".to_owned()],
};
let cloned = original.clone();
assert_eq!(cloned.added, original.added);
assert_eq!(cloned.removed, original.removed);
}
#[test]
fn engine_error_peer_closed_display_is_stable() {
let err = super::EngineError::PeerClosed;
assert_eq!(format!("{err}"), "peer closed connection");
}
}