use std::sync::Arc;
use std::time::Duration;
use bytes::BytesMut;
use magnetar_proto::{ConnectionEvent, ConsumerHandle, OpOutcome, PendingOpKey, ProducerHandle};
use moonpool_core::{Providers, TaskProvider, TimeProvider};
use parking_lot::Mutex;
use tokio::sync::Notify;
use crate::dns::DnsResolver;
use crate::transport::Transport;
use crate::{ConnectionShared, EngineError, ObservedReplicatedSubscriptionMarker, TopicListChange};
const READ_BUFFER_CAPACITY: usize = 64 * 1024;
const TRANSIENT_RETRY_DELAY: Duration = Duration::from_secs(2);
#[derive(Debug, Clone, Copy)]
enum RetryRequest {
Producer(ProducerHandle),
Consumer(ConsumerHandle),
}
fn handle_pending_events(
shared: &Arc<ConnectionShared>,
retries: &mut Vec<RetryRequest>,
) -> Result<(), EngineError> {
loop {
let event = shared.inner.lock().poll_event_if(|ev| {
#[cfg(feature = "scalable-topics")]
if matches!(
ev,
ConnectionEvent::ScalableTopicLookupResolved { .. }
| ConnectionEvent::SegmentDagUpdated { .. }
| ConnectionEvent::DagChangedDuringConsume { .. }
| ConnectionEvent::DagWatchClosed { .. }
) {
return true;
}
matches!(
ev,
ConnectionEvent::AuthChallenge { .. }
| ConnectionEvent::TopicListChanged { .. }
| ConnectionEvent::TopicMigrated { .. }
| ConnectionEvent::RedirectUrlRejected { .. }
| ConnectionEvent::ProducerOpenFailedTransient { .. }
| ConnectionEvent::SubscribeFailedTransient { .. }
| ConnectionEvent::ReplicatedSubscriptionMarkerObserved { .. }
| ConnectionEvent::ChecksumMismatch { .. }
| ConnectionEvent::LookupResponse {
result: magnetar_proto::LookupOutcome::Redirected { .. },
..
}
)
});
let Some(event) = event else {
return Ok(());
};
match event {
ConnectionEvent::AuthChallenge { method, challenge } => {
let Some(provider) = shared.auth_provider.clone() else {
tracing::warn!(
auth_method = method
.as_deref()
.map_or("none", crate::log_fields::truncate_broker_str),
"broker requested in-band auth refresh but no AuthProvider configured; \
the connection will be reset"
);
return Err(EngineError::Config(
"broker requested AUTH_CHALLENGE but client has no auth provider"
.to_owned(),
));
};
let bytes = challenge.unwrap_or_default();
tracing::debug!(
auth_method = %provider.method(),
"auth challenge received; refreshing credentials"
);
let refreshed = match provider.respond_to_challenge(&bytes) {
Ok(refreshed) => refreshed,
Err(err) => {
tracing::warn!(
auth_method = %provider.method(),
error_class = "auth_refresh_failed",
"in-band auth refresh failed; the connection will be reset"
);
return Err(EngineError::Config(format!("auth refresh failed: {err}")));
}
};
let method = provider.method().to_owned();
shared
.inner
.lock()
.submit_auth_response(refreshed, Some(method));
shared.driver_waker.notify_one();
}
ConnectionEvent::TopicListChanged { added, removed } => {
shared
.topic_list_changes
.lock()
.push_back(TopicListChange { added, removed });
shared.topic_list_notify.notify_waiters();
}
ConnectionEvent::ReplicatedSubscriptionMarkerObserved { handle, marker } => {
shared
.replicated_subscription_markers
.lock()
.push_back(ObservedReplicatedSubscriptionMarker { handle, marker });
shared
.replicated_subscription_marker_notify
.notify_waiters();
}
ConnectionEvent::RedirectUrlRejected {
source,
broker_service_url,
broker_service_url_tls,
} => {
tracing::warn!(
source,
rejected_url = broker_service_url
.as_deref()
.map(crate::log_fields::truncate_broker_str),
rejected_url_tls = broker_service_url_tls
.as_deref()
.map(crate::log_fields::truncate_broker_str),
"broker-advertised redirect URL rejected by redirect_url_allow_list; \
ignoring the hint (auth provider NOT replayed against the unverified host)",
);
}
ConnectionEvent::TopicMigrated {
producer,
consumer,
broker_service_url,
broker_service_url_tls,
} => {
tracing::info!(
?producer,
?consumer,
new_url = broker_service_url
.as_deref()
.map(crate::log_fields::truncate_broker_str),
new_url_tls = broker_service_url_tls
.as_deref()
.map(crate::log_fields::truncate_broker_str),
"broker requested PIP-188 topic migration; supervised reconnect will fire"
);
return Err(EngineError::Config(
"PIP-188: broker requested topic migration; resetting connection".to_owned(),
));
}
ConnectionEvent::ProducerOpenFailedTransient {
handle,
code,
message,
} => {
tracing::warn!(
?handle,
code,
message = crate::log_fields::truncate_broker_str(&message),
"producer-open transient error; scheduling lookup + retry"
);
retries.push(RetryRequest::Producer(handle));
}
ConnectionEvent::SubscribeFailedTransient {
handle,
code,
message,
} => {
tracing::warn!(
?handle,
code,
message = crate::log_fields::truncate_broker_str(&message),
"consumer-subscribe transient error; scheduling lookup + retry"
);
retries.push(RetryRequest::Consumer(handle));
}
#[cfg(feature = "scalable-topics")]
ConnectionEvent::ScalableTopicLookupResolved {
request_id,
controller_broker_url,
segments,
lookup_token,
} => {
shared
.scalable_events
.lock()
.push_back(crate::ScalableEvent::LookupResolved {
request_id,
controller_broker_url,
segments,
lookup_token,
});
shared.scalable_notify.notify_waiters();
}
#[cfg(feature = "scalable-topics")]
ConnectionEvent::SegmentDagUpdated {
watch_session_id,
delta,
} => {
shared
.scalable_events
.lock()
.push_back(crate::ScalableEvent::DagUpdated {
watch_session_id,
delta,
});
shared.scalable_notify.notify_waiters();
}
#[cfg(feature = "scalable-topics")]
ConnectionEvent::DagChangedDuringConsume {
watch_session_id,
reason,
} => {
shared.scalable_events.lock().push_back(
crate::ScalableEvent::DagChangedDuringConsume {
watch_session_id,
reason,
},
);
shared.scalable_notify.notify_waiters();
}
#[cfg(feature = "scalable-topics")]
ConnectionEvent::DagWatchClosed {
watch_session_id,
reason,
} => {
shared
.scalable_events
.lock()
.push_back(crate::ScalableEvent::DagWatchClosed {
watch_session_id,
reason,
});
shared.scalable_notify.notify_waiters();
}
ConnectionEvent::ChecksumMismatch { .. } => {}
ConnectionEvent::LookupResponse { .. } => {}
_ => {}
}
}
}
fn spawn_retry_leg<P>(
shared: &Arc<ConnectionShared>,
time: &P::Time,
task: &P::Task,
req: RetryRequest,
) where
P: Providers,
{
let shared = shared.clone();
let time = time.clone();
let _detached = task.spawn_task("magnetar-moonpool-transient-retry", async move {
let _ = time.sleep(TRANSIENT_RETRY_DELAY).await;
let topic = {
let conn = shared.inner.lock();
match req {
RetryRequest::Producer(handle) => conn.producer_topic(handle).map(str::to_owned),
RetryRequest::Consumer(handle) => conn.consumer_topic(handle).map(str::to_owned),
}
};
let Some(topic) = topic else { return };
if !lookup_then(&shared, &topic).await {
return;
}
let request_id = {
let mut conn = shared.inner.lock();
match req {
RetryRequest::Producer(handle) => conn.retry_producer_open(handle),
RetryRequest::Consumer(handle) => conn.retry_consumer_subscribe(handle),
}
};
if request_id.is_some() {
shared.driver_waker.notify_one();
}
});
}
async fn lookup_then(shared: &Arc<ConnectionShared>, topic: &str) -> bool {
let request_id = {
let mut conn = shared.inner.lock();
conn.lookup(topic, false)
};
shared.driver_waker.notify_one();
let outcome = LookupRetryFut {
shared: shared.clone(),
key: PendingOpKey::Request(request_id),
}
.await;
if matches!(
&outcome,
OpOutcome::LookupResponse { .. } | OpOutcome::Error { .. }
) {
tracing::debug!(?outcome, %topic, "retry-path lookup completed");
true
} else {
tracing::warn!(?outcome, %topic, "retry-path lookup landed unexpected outcome");
false
}
}
struct LookupRetryFut {
shared: Arc<ConnectionShared>,
key: PendingOpKey,
}
impl core::future::Future for LookupRetryFut {
type Output = OpOutcome;
fn poll(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
let mut conn = self.shared.inner.lock();
if let Some(outcome) = conn.take_outcome(self.key) {
return core::task::Poll::Ready(outcome);
}
conn.register_waker(self.key, cx.waker().clone());
core::task::Poll::Pending
}
}
impl Drop for LookupRetryFut {
fn drop(&mut self) {
self.shared.inner.lock().unregister_waker(self.key);
}
}
struct DriverResult {
result: Mutex<Option<Result<(), EngineError>>>,
done: Notify,
}
pub struct DriverHandle {
_join: Box<dyn core::any::Any + Send>,
result: Arc<DriverResult>,
shared: Arc<ConnectionShared>,
}
impl std::fmt::Debug for DriverHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DriverHandle").finish_non_exhaustive()
}
}
impl DriverHandle {
pub async fn join(self) -> Result<(), EngineError> {
loop {
if let Some(res) = self.result.result.lock().take() {
return res;
}
self.result.done.notified().await;
}
}
pub fn abort(&self) {
{
let mut conn = self.shared.inner.lock();
conn.close();
}
self.shared.driver_waker.notify_one();
}
}
#[derive(Clone)]
pub(crate) struct ReconnectContext {
pub(crate) host_port: String,
pub(crate) service_url_provider: Option<Arc<dyn magnetar_proto::ServiceUrlProvider>>,
pub(crate) dns_resolver: Option<Arc<dyn DnsResolver>>,
}
impl std::fmt::Debug for ReconnectContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReconnectContext")
.field("host_port", &self.host_port)
.field(
"has_service_url_provider",
&self.service_url_provider.is_some(),
)
.field("has_dns_resolver", &self.dns_resolver.is_some())
.finish()
}
}
pub(crate) fn spawn<P>(
shared: Arc<ConnectionShared>,
transport: Transport<P>,
time: P::Time,
task: &P::Task,
) -> DriverHandle
where
P: Providers,
{
let result = Arc::new(DriverResult {
result: Mutex::new(None),
done: Notify::new(),
});
let result_for_task = result.clone();
let shared_for_handle = shared.clone();
let task_for_loop = task.clone();
let join = task.spawn_task("magnetar-moonpool-driver", async move {
let driver_shared = shared.clone();
let outcome = driver_loop_inner::<P>(shared, transport, time, task_for_loop).await;
{
let reason = match &outcome {
Ok(()) => "connection closed".to_owned(),
Err(err) => err.to_string(),
};
driver_shared.inner.lock().fail_all_pending(&reason);
}
driver_shared.mark_no_driver();
driver_shared.driver_waker.notify_waiters();
*result_for_task.result.lock() = Some(outcome);
result_for_task.done.notify_one();
});
DriverHandle {
_join: Box::new(join),
result,
shared: shared_for_handle,
}
}
pub(crate) fn spawn_supervised<P>(
shared: Arc<ConnectionShared>,
transport: Transport<P>,
reconnect_ctx: ReconnectContext,
providers: P,
) -> DriverHandle
where
P: Providers,
{
let result = Arc::new(DriverResult {
result: Mutex::new(None),
done: Notify::new(),
});
let result_for_task = result.clone();
let shared_for_handle = shared.clone();
let time = providers.time().clone();
let task = providers.task().clone();
let join = task.spawn_task("magnetar-moonpool-driver-supervised", async move {
let driver_shared = shared.clone();
let outcome =
supervised_driver_loop::<P>(shared, transport, reconnect_ctx, providers, time).await;
{
let reason = match &outcome {
Ok(()) => "connection closed".to_owned(),
Err(err) => err.to_string(),
};
driver_shared.inner.lock().fail_all_pending(&reason);
}
driver_shared.mark_no_driver();
driver_shared.driver_waker.notify_waiters();
*result_for_task.result.lock() = Some(outcome);
result_for_task.done.notify_one();
});
DriverHandle {
_join: Box::new(join),
result,
shared: shared_for_handle,
}
}
async fn supervised_driver_loop<P>(
shared: Arc<ConnectionShared>,
mut transport: Transport<P>,
reconnect_ctx: ReconnectContext,
providers: P,
time: P::Time,
) -> Result<(), EngineError>
where
P: Providers,
{
let seed: u64 = Arc::as_ptr(&shared) as usize as u64;
let mut backoff: Option<magnetar_proto::Backoff> = None;
let mut give_up_attempts: u32 = 0;
let task = providers.task().clone();
let mut socket_alive_since = shared.now_instant();
let mut last_inner_result =
driver_loop_inner::<P>(shared.clone(), transport, time.clone(), task.clone()).await;
loop {
if shared.inner.lock().is_user_closed() {
return last_inner_result;
}
let supervisor_cfg = shared.inner.lock().supervisor_config().cloned();
let connect_timeout = shared.inner.lock().connect_timeout();
let Some(cfg) = supervisor_cfg else {
return last_inner_result;
};
if cfg.anti_thrash_threshold.is_some() {
let now = shared.now_instant();
let should_record = {
let conn = shared.inner.lock();
conn.anti_thrash_state()
.last_reattach_at()
.is_some_and(|t| now.saturating_duration_since(t) <= cfg.drop_grace)
};
if should_record {
shared.inner.lock().record_reattach_outcome(
now,
magnetar_proto::ReAttachHandle::Producer(magnetar_proto::ProducerHandle(0)),
magnetar_proto::ReAttachOutcomeKind::TcpDropAfterReAttach,
);
}
}
let cooldown_until = {
let conn = shared.inner.lock();
match conn.anti_thrash_tick(shared.now_instant()) {
magnetar_proto::AntiThrashDisposition::Cooldown { until } => Some(until),
magnetar_proto::AntiThrashDisposition::Normal => None,
}
};
if let Some(until) = cooldown_until {
let now = shared.now_instant();
if until > now {
let dur = until.saturating_duration_since(now);
tracing::warn!(
cooldown_ms = u64::try_from(dur.as_millis()).unwrap_or(u64::MAX),
"supervisor: anti-thrash cooldown engaged; sleeping before next redial"
);
let _ = time.sleep(dur).await;
}
shared.inner.lock().anti_thrash_state_mut().clear_cooldown();
}
let backoff = backoff.get_or_insert_with(|| cfg.build_backoff(seed));
let socket_lifetime = shared
.now_instant()
.saturating_duration_since(socket_alive_since);
if cfg.should_reset_backoff(socket_lifetime) {
backoff.reset();
give_up_attempts = 0;
}
let new_transport = loop {
let delay = backoff.next();
let _ = time.sleep(delay).await;
give_up_attempts = give_up_attempts.saturating_add(1);
if cfg.should_give_up(give_up_attempts) {
tracing::warn!(
attempt = give_up_attempts,
max_attempts = cfg.max_attempts.unwrap_or(0),
"supervisor: gave up; reconnect attempt budget exhausted"
);
return last_inner_result;
}
let attempt = give_up_attempts;
if shared.inner.lock().is_user_closed() {
return last_inner_result;
}
let target_host_port: String =
if let Some(provider) = reconnect_ctx.service_url_provider.as_ref() {
strip_url_to_host_port(&provider.get_service_url()).unwrap_or_else(|| {
tracing::warn!(
attempt,
"supervisor: service-url provider returned an unparseable URL; \
falling back to the cached host:port"
);
reconnect_ctx.host_port.clone()
})
} else {
reconnect_ctx.host_port.clone()
};
let resolver = reconnect_ctx.dns_resolver.as_deref();
match Transport::<P>::connect_with_resolver(
providers.network(),
&target_host_port,
resolver,
providers.time(),
connect_timeout,
)
.await
{
Ok(t) => {
tracing::info!(
attempt,
target = %target_host_port,
"supervisor: TCP connected; handshaking"
);
break t;
}
Err(err) => {
tracing::warn!(
attempt,
target = %target_host_port,
error = %err,
"supervisor: reconnect attempt failed; will retry"
);
}
}
};
{
let mut conn = shared.inner.lock();
conn.reset();
if let Err(err) = conn.begin_handshake() {
tracing::error!(error = %err, "supervisor: begin_handshake after reset failed");
return Err(EngineError::Protocol(err));
}
}
shared
.pending_rebuild
.store(true, std::sync::atomic::Ordering::SeqCst);
shared.driver_waker.notify_one();
transport = new_transport;
socket_alive_since = shared.now_instant();
last_inner_result =
driver_loop_inner::<P>(shared.clone(), transport, time.clone(), task.clone()).await;
}
}
fn strip_url_to_host_port(raw: &str) -> Option<String> {
let rest = raw
.strip_prefix("pulsar://")
.or_else(|| raw.strip_prefix("pulsar+ssl://"))?;
let rest = rest.split(['/', '?', '#']).next().unwrap_or(rest);
if rest.is_empty() {
return None;
}
if rest.contains(':') {
Some(rest.to_owned())
} else {
let default_port = if raw.starts_with("pulsar+ssl://") {
6651
} else {
6650
};
Some(format!("{rest}:{default_port}"))
}
}
pub(crate) async fn driver_loop_inner<P>(
shared: Arc<ConnectionShared>,
mut transport: Transport<P>,
time: P::Time,
task: P::Task,
) -> Result<(), EngineError>
where
P: Providers,
{
let mut read_buf = BytesMut::with_capacity(READ_BUFFER_CAPACITY);
loop {
{
let elapsed_ms = time.now().as_millis();
let now_ms = shared
.wall_clock_base_ms
.saturating_add(u64::try_from(elapsed_ms).unwrap_or(u64::MAX));
shared
.wall_clock_ms
.store(now_ms, std::sync::atomic::Ordering::Relaxed);
}
let (out, deadline, should_close) = {
let mut conn = shared.inner.lock();
let out = conn.poll_transmit_owned();
let dl = conn.poll_timeout();
let closing = matches!(
conn.state(),
magnetar_proto::HandshakeState::Closing
| magnetar_proto::HandshakeState::Closed
| magnetar_proto::HandshakeState::Failed
);
(out, dl, closing)
};
if !out.is_empty() {
let write_result = match &out {
magnetar_proto::TransmitOwned::Contiguous(buf) => transport.write_all(buf).await,
magnetar_proto::TransmitOwned::Vectored(segs) => {
transport.write_all_vectored(segs).await
}
};
if let Err(err) = write_result {
shared.inner.lock().mark_disconnected();
return Err(err.into());
}
if let Err(err) = transport.flush().await {
shared.inner.lock().mark_disconnected();
return Err(err.into());
}
}
if should_close {
let _ = transport.shutdown().await;
return Ok(());
}
let sleep_dur = deadline.map(|t| t.saturating_duration_since(shared.now_instant()));
tokio::select! {
biased;
() = shared.driver_waker.notified() => {
}
r = transport.read_buf(&mut read_buf) => {
let n = match r {
Ok(n) => n,
Err(err) => {
shared.inner.lock().mark_disconnected();
return Err(err.into());
}
};
if n == 0 {
{
let mut conn = shared.inner.lock();
conn.mark_disconnected();
debug_assert!(
!conn.is_connected(),
"mark_disconnected() must clear is_connected() (ADR-0038)"
);
}
return Err(EngineError::PeerClosed);
}
let chunk = read_buf.split();
debug_assert_eq!(
chunk.len(),
n,
"read chunk length must equal the byte count just read"
);
let now = shared.now_instant();
let handle_result = shared.inner.lock().handle_bytes_owned(now, chunk);
if let Err(err) = handle_result {
shared.inner.lock().mark_disconnected();
return Err(err.into());
}
if shared
.pending_rebuild
.load(std::sync::atomic::Ordering::SeqCst)
{
let connected = shared.inner.lock().is_connected();
if connected
&& shared
.pending_rebuild
.compare_exchange(
true,
false,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
)
.is_ok()
{
let (n_p, n_c) = {
let mut conn = shared.inner.lock();
let producers = conn.rebuild_producers();
let consumers = conn.rebuild_consumers();
(producers.len(), consumers.len())
};
tracing::info!(
producers = n_p,
consumers = n_c,
"supervisor: reconnected to broker; handshake complete, replayed \
producer + consumer state"
);
shared.driver_waker.notify_one();
}
}
let mut retries: Vec<RetryRequest> = Vec::new();
handle_pending_events(&shared, &mut retries)?;
for req in retries {
spawn_retry_leg::<P>(&shared, &time, &task, req);
}
shared.driver_waker.notify_waiters();
}
() = sleep_or_pending::<P>(&time, sleep_dur) => {
shared.inner.lock().handle_timeout(shared.now_instant());
}
}
}
}
async fn sleep_or_pending<P: Providers>(time: &P::Time, dur: Option<Duration>) {
match dur {
Some(d) => {
let _ = time.sleep(d).await;
}
None => std::future::pending::<()>().await,
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use bytes::BytesMut;
use magnetar_proto::{ConnectionConfig, ConnectionEvent, ProducerHandle, encode_command, pb};
use super::{handle_pending_events, strip_url_to_host_port};
use crate::{ConnectionShared, EngineError};
fn handshake_response_bytes() -> BytesMut {
let cmd = pb::BaseCommand {
r#type: pb::base_command::Type::Connected as i32,
connected: Some(pb::CommandConnected {
server_version: "magnetar-test".to_owned(),
protocol_version: Some(21),
max_message_size: Some(5 * 1024 * 1024),
feature_flags: Some(pb::FeatureFlags::default()),
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &cmd).expect("encode CommandConnected");
buf
}
#[test]
fn topic_migrated_triggers_recoverable_error() {
let shared = ConnectionShared::new(ConnectionConfig::default());
{
let mut conn = shared.inner.lock();
conn.begin_handshake().expect("handshake");
let frame = handshake_response_bytes();
conn.handle_bytes(Instant::now(), &frame)
.expect("connected");
match conn.poll_event() {
Some(ConnectionEvent::Connected { .. }) => {}
other => panic!("expected Connected, got {other:?}"),
}
let migrated = pb::BaseCommand {
r#type: pb::base_command::Type::TopicMigrated as i32,
topic_migrated: Some(pb::CommandTopicMigrated {
resource_id: 42,
resource_type: pb::command_topic_migrated::ResourceType::Producer as i32,
broker_service_url: Some("pulsar://new-broker:6650".to_owned()),
broker_service_url_tls: None,
}),
..Default::default()
};
let mut buf = BytesMut::new();
encode_command(&mut buf, &migrated).expect("encode CommandTopicMigrated");
conn.handle_bytes(Instant::now(), &buf)
.expect("handle migration");
}
let mut retries = Vec::new();
let err = handle_pending_events(&shared, &mut retries).expect_err("migration must error");
assert!(
retries.is_empty(),
"a topic-migration event must not enqueue a transient retry"
);
let msg = format!("{err}");
assert!(
matches!(err, EngineError::Config(_)) && msg.contains("PIP-188"),
"expected PIP-188 config error, got {err:?}"
);
assert_eq!(ProducerHandle(42), ProducerHandle(42));
}
#[test]
fn strip_url_to_host_port_handles_plain() {
assert_eq!(
strip_url_to_host_port("pulsar://broker:6650").as_deref(),
Some("broker:6650")
);
}
#[test]
fn strip_url_to_host_port_handles_tls() {
assert_eq!(
strip_url_to_host_port("pulsar+ssl://broker.example.com:6651").as_deref(),
Some("broker.example.com:6651")
);
}
#[test]
fn strip_url_to_host_port_defaults_plain_port() {
assert_eq!(
strip_url_to_host_port("pulsar://broker").as_deref(),
Some("broker:6650")
);
}
#[test]
fn strip_url_to_host_port_defaults_tls_port() {
assert_eq!(
strip_url_to_host_port("pulsar+ssl://broker").as_deref(),
Some("broker:6651")
);
}
#[test]
fn strip_url_to_host_port_strips_path() {
assert_eq!(
strip_url_to_host_port("pulsar://broker:6650/admin").as_deref(),
Some("broker:6650")
);
}
#[test]
fn strip_url_to_host_port_rejects_unknown_scheme() {
assert!(strip_url_to_host_port("http://broker:6650").is_none());
}
}