use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use magnetar_proto::event::LookupOutcome;
use magnetar_proto::{ConnectionConfig, OpOutcome, PendingOpKey, RequestId};
use moonpool_core::Providers;
use parking_lot::Mutex;
use crate::driver::DriverHandle;
use crate::pool::ProxyConnectionPool;
use crate::{ConnectionShared, EngineError, MoonpoolEngine, TopicListChange};
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error(transparent)]
Engine(#[from] EngineError),
#[error("broker error: code={code} message={message}")]
Broker {
code: i32,
message: String,
},
#[error("connection is closed")]
Closed,
#[error("peer closed the connection")]
PeerClosed,
#[error(
"lookup of topic '{topic}' requires proxy routing (proxy_through_service_url=true) \
but this moonpool client was built without a supervisor; rebuild with \
Client::connect_plain_supervised"
)]
ProxyUnsupportedOnUnsupervisedClient {
topic: String,
},
#[error("other: {0}")]
Other(String),
}
pub type LookupTopicResult = LookupOutcome;
pub struct Client<P: Providers> {
shared: Arc<ConnectionShared>,
driver: Mutex<Option<DriverHandle>>,
pool: Option<Arc<ProxyConnectionPool<P>>>,
_providers: std::marker::PhantomData<fn() -> P>,
}
#[derive(Debug, Clone)]
pub(crate) enum LookupTarget {
Direct {
#[allow(dead_code)]
broker_url: Option<String>,
},
Proxy {
#[allow(dead_code)]
broker_url: String,
},
}
impl<P: Providers> std::fmt::Debug for Client<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("shared", &self.shared)
.finish_non_exhaustive()
}
}
impl<P: Providers> Client<P> {
pub async fn connect_plain(
engine: &MoonpoolEngine<P>,
addr: &str,
config: ConnectionConfig,
) -> Result<Self, ClientError> {
let (shared, driver) = engine.connect_plain(addr, config).await?;
Ok(Self {
shared,
driver: Mutex::new(Some(driver)),
pool: None,
_providers: std::marker::PhantomData,
})
}
pub async fn connect_plain_supervised(
engine: &MoonpoolEngine<P>,
addr: &str,
config: ConnectionConfig,
service_url_provider: Option<Arc<dyn magnetar_proto::ServiceUrlProvider>>,
dns_resolver: Option<Arc<dyn crate::DnsResolver>>,
) -> Result<Self, ClientError> {
let (shared, driver) = engine
.connect_plain_supervised(
addr,
config.clone(),
service_url_provider.clone(),
dns_resolver.clone(),
)
.await?;
let factory = crate::pool::ConnectionFactory {
addr: addr.to_owned(),
bootstrap_config: config,
providers: engine.providers().clone(),
service_url_provider,
dns_resolver,
};
let pool = ProxyConnectionPool::new(factory);
Ok(Self {
shared,
driver: Mutex::new(Some(driver)),
pool: Some(pool),
_providers: std::marker::PhantomData,
})
}
#[must_use]
pub fn from_parts(shared: Arc<ConnectionShared>, driver: DriverHandle) -> Self {
Self {
shared,
driver: Mutex::new(Some(driver)),
pool: None,
_providers: std::marker::PhantomData,
}
}
#[must_use]
pub fn take_driver(&self) -> Option<DriverHandle> {
self.driver.lock().take()
}
#[must_use]
pub fn shared(&self) -> &Arc<ConnectionShared> {
&self.shared
}
#[must_use]
pub fn is_connected(&self) -> bool {
self.shared.inner.lock().is_connected()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.shared.inner.lock().is_closed()
}
pub async fn close(self)
where
P: Send + Sync,
{
{
let mut conn = self.shared.inner.lock();
conn.close();
}
self.shared.driver_waker.notify_one();
let handle = self.driver.lock().take();
if let Some(handle) = handle {
let _ = handle.join().await;
}
if let Some(pool) = self.pool.as_ref() {
pool.close().await;
}
}
pub(crate) async fn lookup_topic_target(
&self,
topic: &str,
) -> Result<(LookupTarget, Arc<ConnectionShared>), ClientError>
where
P: Send + Sync,
{
let mut current = self.shared.clone();
let mut next_hop: Option<(bool, u8)> = None;
loop {
let outcome = match next_hop {
None => self.issue_lookup_on(¤t, topic, false, None).await?,
Some((authoritative, hops)) => {
self.issue_lookup_on(¤t, topic, authoritative, Some(hops))
.await?
}
};
let lookup = match outcome {
OpOutcome::LookupResponse { outcome, .. } => outcome,
OpOutcome::Error { code, message, .. } => {
return Err(ClientError::Broker { code, message });
}
OpOutcome::Terminal { .. } => return Err(ClientError::PeerClosed),
other => {
return Err(ClientError::Other(format!(
"unexpected lookup outcome: {other:?}"
)));
}
};
match lookup {
LookupOutcome::Connect {
broker_service_url,
broker_service_url_tls,
proxy_through_service_url,
} => {
if proxy_through_service_url {
let raw = broker_service_url.or(broker_service_url_tls).ok_or_else(|| {
ClientError::Other(format!(
"lookup of '{topic}' set proxy_through_service_url=true but did \
not advertise a broker_service_url"
))
})?;
let broker_url = proxy_broker_authority(&raw);
return Ok((LookupTarget::Proxy { broker_url }, current));
}
let broker_url = broker_service_url.or(broker_service_url_tls);
return Ok((LookupTarget::Direct { broker_url }, current));
}
LookupOutcome::Redirected {
broker_service_url,
broker_service_url_tls,
authoritative,
hops_remaining,
} => {
if hops_remaining == 0 {
return Err(ClientError::Broker {
code: 0,
message: format!(
"lookup redirect cap exceeded ({} hops)",
magnetar_proto::lookup::MAX_LOOKUP_REDIRECTS
),
});
}
let raw = broker_service_url
.or(broker_service_url_tls)
.ok_or_else(|| {
ClientError::Other(format!(
"lookup of '{topic}' was redirected but the broker advertised no \
broker_service_url or broker_service_url_tls to dial"
))
})?;
tracing::debug!(
topic,
redirect_url = %raw,
hops_remaining,
"lookup redirected; dialing redirect target and re-issuing"
);
current = self.resolve_direct_broker(&raw, topic).await?;
next_hop = Some((authoritative, hops_remaining));
}
LookupOutcome::Failed { code, message } => {
return Err(ClientError::Broker { code, message });
}
}
}
}
pub(crate) async fn resolve_target(
&self,
target: &LookupTarget,
landed_on: &Arc<ConnectionShared>,
topic: &str,
) -> Result<Arc<ConnectionShared>, ClientError>
where
P: Send + Sync,
{
match target {
LookupTarget::Direct { broker_url: None } => Ok(landed_on.clone()),
LookupTarget::Direct {
broker_url: Some(broker_url),
} => self.resolve_direct_broker(broker_url, topic).await,
LookupTarget::Proxy { broker_url } => {
let pool = self.pool.as_ref().ok_or_else(|| {
ClientError::ProxyUnsupportedOnUnsupervisedClient {
topic: topic.to_owned(),
}
})?;
let physical = pool.bootstrap_addr().to_owned();
let shared = crate::pool::get_or_open(
pool.clone(),
broker_url,
&physical,
Some(broker_url.clone()),
)
.await?;
Ok(shared)
}
}
}
async fn resolve_direct_broker(
&self,
broker_url: &str,
_topic: &str,
) -> Result<Arc<ConnectionShared>, ClientError>
where
P: Send + Sync,
{
let Some(pool) = self.pool.as_ref() else {
tracing::warn!(
broker_url,
"lookup resolved to a specific broker but moonpool client has no proxy pool \
(unsupervised); falling back to bootstrap connection"
);
return Ok(self.shared.clone());
};
let physical = direct_broker_authority(broker_url);
if physical == pool.bootstrap_addr() {
return Ok(self.shared.clone());
}
let shared = crate::pool::get_or_open(pool.clone(), broker_url, &physical, None).await?;
Ok(shared)
}
pub async fn lookup_topic(
&self,
topic: &str,
authoritative: bool,
) -> Result<LookupTopicResult, ClientError> {
let outcome = self
.issue_lookup_on(&self.shared, topic, authoritative, None)
.await?;
match outcome {
OpOutcome::LookupResponse { outcome, .. } => match outcome {
LookupOutcome::Failed { code, message } => {
Err(ClientError::Broker { code, message })
}
other => Ok(other),
},
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected lookup outcome: {other:?}"
))),
}
}
async fn issue_lookup_on(
&self,
shared: &Arc<ConnectionShared>,
topic: &str,
authoritative: bool,
redirect_budget: Option<u8>,
) -> Result<OpOutcome, ClientError> {
shared.fail_if_no_driver()?;
let mut reissues_remaining = magnetar_proto::lookup::MAX_LOOKUP_SESSION_REISSUES;
loop {
let request_id = {
let mut conn = shared.inner.lock();
match redirect_budget {
None => conn.lookup(topic, authoritative),
Some(hops) => conn.lookup_redirect(topic, authoritative, hops),
}
};
shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: shared.clone(),
request_id,
}
.await;
if matches!(outcome, OpOutcome::SessionLost { .. }) {
match shared.await_reconnect_or_terminal().await {
crate::LookupReissueReadiness::Reconnected => {
if reissues_remaining == 0 {
tracing::warn!(
topic,
max_reissues = magnetar_proto::lookup::MAX_LOOKUP_SESSION_REISSUES,
"lookup session-reissue cap exceeded; surfacing PeerClosed"
);
return Err(ClientError::PeerClosed);
}
reissues_remaining -= 1;
tracing::debug!(
topic,
reissues_remaining,
"lookup severed by reconnect; re-issuing against fresh session"
);
continue;
}
crate::LookupReissueReadiness::Terminal => {
return Err(ClientError::PeerClosed);
}
}
}
return Ok(outcome);
}
}
pub async fn partitioned_topic_metadata(&self, topic: &str) -> Result<u32, ClientError> {
let request_id = {
let mut conn = self.shared.inner.lock();
conn.get_partitioned_topic_metadata(topic)
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
request_id,
}
.await;
match outcome {
OpOutcome::PartitionedMetadata {
partitions, error, ..
} => {
if let Some((code, message)) = error {
Err(ClientError::Broker { code, message })
} else {
Ok(partitions)
}
}
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected partitioned metadata outcome: {other:?}"
))),
}
}
pub async fn watch_topic_list(
&self,
namespace: &str,
pattern: &str,
) -> Result<Vec<String>, ClientError> {
let request_id = {
let mut conn = self.shared.inner.lock();
conn.watch_topic_list(namespace, pattern)
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
request_id,
}
.await;
match outcome {
OpOutcome::TopicListSnapshot { topics, .. } => Ok(topics),
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected topic-list snapshot outcome: {other:?}"
))),
}
}
pub async fn next_topic_list_change(&self) -> Option<TopicListChange> {
loop {
if let Some(change) = self.shared.topic_list_changes.lock().pop_front() {
return Some(change);
}
if self.shared.inner.lock().is_closed() {
return None;
}
self.shared.topic_list_notify.notified().await;
}
}
#[must_use]
pub fn poll_topic_list_change(&self) -> Option<TopicListChange> {
self.shared.topic_list_changes.lock().pop_front()
}
#[cfg(feature = "scalable-topics")]
pub async fn scalable_topic_lookup(
&self,
topic: &str,
) -> Result<crate::ScalableLookup, ClientError> {
let request_id = {
let mut conn = self.shared.inner.lock();
conn.send_scalable_topic_lookup(topic, false)
};
self.shared.driver_waker.notify_one();
loop {
let drained = {
let mut buf = self.shared.scalable_events.lock();
let pos = buf.iter().position(|ev| {
matches!(
ev,
crate::ScalableEvent::LookupResolved { request_id: r, .. } if *r == request_id
)
});
pos.and_then(|p| buf.remove(p))
};
if let Some(crate::ScalableEvent::LookupResolved {
controller_broker_url,
segments,
lookup_token,
..
}) = drained
{
return Ok(crate::ScalableLookup {
controller_broker_url,
segments,
lookup_token,
});
}
if self.shared.inner.lock().is_closed() {
return Err(ClientError::Other(
"connection closed before scalable lookup resolved".to_owned(),
));
}
self.shared.scalable_notify.notified().await;
}
}
#[cfg(feature = "scalable-topics")]
pub fn open_scalable_dag_watch(
&self,
topic: &str,
lookup_token: u64,
segments: Vec<magnetar_proto::SegmentDescriptor>,
) -> u64 {
let sid = {
let mut conn = self.shared.inner.lock();
conn.open_dag_watch(topic, lookup_token, segments)
};
self.shared.driver_waker.notify_one();
sid
}
#[cfg(feature = "scalable-topics")]
pub fn close_scalable_dag_watch(&self, watch_session_id: u64) {
{
let mut conn = self.shared.inner.lock();
let _ = conn.close_dag_watch(watch_session_id);
}
self.shared.driver_waker.notify_one();
}
#[cfg(feature = "scalable-topics")]
pub async fn next_scalable_event(&self) -> Option<crate::ScalableEvent> {
loop {
if let Some(ev) = self.shared.scalable_events.lock().pop_front() {
return Some(ev);
}
if self.shared.inner.lock().is_closed() {
return None;
}
self.shared.scalable_notify.notified().await;
}
}
pub async fn next_replicated_subscription_marker(
&self,
) -> Option<crate::ObservedReplicatedSubscriptionMarker> {
loop {
let notified = self.shared.replicated_subscription_marker_notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(marker) = self
.shared
.replicated_subscription_markers
.lock()
.pop_front()
{
return Some(marker);
}
if self.shared.inner.lock().is_closed() {
return None;
}
notified.await;
}
}
#[must_use]
pub fn poll_replicated_subscription_marker(
&self,
) -> Option<crate::ObservedReplicatedSubscriptionMarker> {
self.shared
.replicated_subscription_markers
.lock()
.pop_front()
}
pub async fn new_txn(
&self,
timeout: std::time::Duration,
) -> Result<magnetar_proto::TxnId, ClientError> {
let request_id = {
let mut conn = self.shared.inner.lock();
conn.new_txn(timeout)
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
request_id,
}
.await;
match outcome {
OpOutcome::NewTxn { result, .. } => {
result.map_err(|err| ClientError::Other(format!("new_txn: {err}")))
}
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected new_txn outcome: {other:?}"
))),
}
}
pub async fn add_partition_to_txn(
&self,
txn: magnetar_proto::TxnId,
topic: impl Into<String>,
) -> Result<(), ClientError> {
let request_id = {
let mut conn = self.shared.inner.lock();
conn.add_partition_to_txn(txn, topic.into())
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
request_id,
}
.await;
match outcome {
OpOutcome::AddPartitionToTxn { result, .. } => {
result.map_err(|err| ClientError::Other(format!("add_partition_to_txn: {err}")))
}
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected add_partition_to_txn outcome: {other:?}"
))),
}
}
pub async fn add_subscription_to_txn(
&self,
txn: magnetar_proto::TxnId,
topic: impl Into<String>,
subscription: impl Into<String>,
) -> Result<(), ClientError> {
let request_id = {
let mut conn = self.shared.inner.lock();
conn.add_subscription_to_txn(txn, subscription.into(), topic.into())
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
request_id,
}
.await;
match outcome {
OpOutcome::AddSubscriptionToTxn { result, .. } => {
result.map_err(|err| ClientError::Other(format!("add_subscription_to_txn: {err}")))
}
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected add_subscription_to_txn outcome: {other:?}"
))),
}
}
pub async fn end_txn(
&self,
txn: magnetar_proto::TxnId,
action: magnetar_proto::TxnAction,
) -> Result<magnetar_proto::TxnState, ClientError> {
let request_id = {
let mut conn = self.shared.inner.lock();
conn.end_txn(txn, action)
};
self.shared.driver_waker.notify_one();
let outcome = RequestFut {
shared: self.shared.clone(),
request_id,
}
.await;
match outcome {
OpOutcome::EndTxn { result, .. } => {
result.map_err(|err| ClientError::Other(format!("end_txn: {err}")))
}
OpOutcome::Error { code, message, .. } => Err(ClientError::Broker { code, message }),
OpOutcome::Terminal { .. } => Err(ClientError::PeerClosed),
other => Err(ClientError::Other(format!(
"unexpected end_txn outcome: {other:?}"
))),
}
}
}
struct RequestFut {
shared: Arc<ConnectionShared>,
request_id: RequestId,
}
impl Future for RequestFut {
type Output = OpOutcome;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let key = PendingOpKey::Request(self.request_id);
let mut conn = self.shared.inner.lock();
if let Some(outcome) = conn.take_outcome(key) {
return Poll::Ready(outcome);
}
conn.register_waker(key, cx.waker().clone());
Poll::Pending
}
}
impl Drop for RequestFut {
fn drop(&mut self) {
let key = PendingOpKey::Request(self.request_id);
self.shared.inner.lock().unregister_waker(key);
}
}
fn proxy_broker_authority(input: &str) -> String {
let (rest, default_port) = if let Some(rest) = input.strip_prefix("pulsar+ssl://") {
(rest, Some(6651u16))
} else if let Some(rest) = input.strip_prefix("pulsar://") {
(rest, Some(6650u16))
} else {
(input, None)
};
let host_port = rest.split('/').next().unwrap_or(rest);
match default_port {
Some(port) if !host_port.contains(':') => format!("{host_port}:{port}"),
_ => host_port.to_owned(),
}
}
fn direct_broker_authority(input: &str) -> String {
proxy_broker_authority(input)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use magnetar_proto::ConnectionConfig;
use moonpool_core::TokioProviders;
use super::{Client, ClientError, LookupTopicResult, proxy_broker_authority};
use crate::{ConnectionShared, MoonpoolEngine, TopicListChange};
#[test]
#[allow(clippy::let_underscore_future, clippy::no_effect_underscore_binding)]
fn connect_plain_compiles_against_tokio_providers() {
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let _fut = Client::connect_plain(&engine, "127.0.0.1:6650", ConnectionConfig::default());
}
#[test]
fn lookup_topic_result_alias_constructs() {
let _: LookupTopicResult = LookupTopicResult::Connect {
broker_service_url: Some("pulsar://broker:6650".to_owned()),
broker_service_url_tls: None,
proxy_through_service_url: false,
};
}
#[test]
fn client_error_from_engine_error() {
let io_err = std::io::Error::other("dialled into the void");
let engine: crate::EngineError = io_err.into();
let client: ClientError = engine.into();
assert!(matches!(client, ClientError::Engine(_)));
let s = format!("{client}");
assert!(s.contains("io error"), "got {s:?}");
}
#[test]
fn next_topic_list_change_drains_queue() {
let shared = ConnectionShared::new(ConnectionConfig::default());
shared.topic_list_changes.lock().push_back(TopicListChange {
added: vec!["persistent://t/n/foo".to_owned()],
removed: vec![],
});
let popped = shared.topic_list_changes.lock().pop_front();
assert!(popped.is_some());
let popped = popped.unwrap();
assert_eq!(popped.added, vec!["persistent://t/n/foo".to_owned()]);
}
#[test]
fn poll_topic_list_change_empty_yields_none() {
let shared = ConnectionShared::new(ConnectionConfig::default());
assert!(shared.topic_list_changes.lock().pop_front().is_none());
}
#[test]
fn is_connected_and_is_closed_default_false() {
let shared = ConnectionShared::new(ConnectionConfig::default());
let conn = shared.inner.lock();
assert!(!conn.is_connected());
assert!(!conn.is_closed());
}
#[test]
#[allow(clippy::let_underscore_future, clippy::no_effect_underscore_binding)]
fn connect_supervised_with_controlled_failover_compiles() {
use std::sync::Arc;
use magnetar_proto::{ControlledClusterFailover, ServiceUrlProvider};
let providers = TokioProviders::new();
let engine = MoonpoolEngine::new(providers);
let failover = ControlledClusterFailover::new("pulsar://primary:6650");
let provider: Arc<dyn ServiceUrlProvider> = Arc::new(failover);
let _fut = Client::connect_plain_supervised(
&engine,
"127.0.0.1:6650",
ConnectionConfig::default(),
Some(provider),
None,
);
}
#[test]
fn controlled_failover_set_url_observed_by_provider() {
use magnetar_proto::{ControlledClusterFailover, ServiceUrlProvider};
let failover = ControlledClusterFailover::new("pulsar://primary:6650");
assert_eq!(failover.get_service_url(), "pulsar://primary:6650");
failover.set_url("pulsar://secondary:6650");
assert_eq!(failover.get_service_url(), "pulsar://secondary:6650");
}
#[test]
fn duration_marker() {
let _ = Duration::from_millis(1);
}
#[test]
fn proxy_broker_authority_strips_pulsar_ssl_scheme() {
assert_eq!(
proxy_broker_authority("pulsar+ssl://b-c3-n12:6651"),
"b-c3-n12:6651"
);
}
#[test]
fn proxy_broker_authority_strips_pulsar_scheme() {
assert_eq!(
proxy_broker_authority("pulsar://b-c3-n12:6650"),
"b-c3-n12:6650"
);
}
#[test]
fn proxy_broker_authority_appends_default_port_for_pulsar_scheme() {
assert_eq!(proxy_broker_authority("pulsar://b-c3-n12"), "b-c3-n12:6650");
}
#[test]
fn proxy_broker_authority_appends_default_port_for_pulsar_ssl_scheme() {
assert_eq!(
proxy_broker_authority("pulsar+ssl://b-c3-n12"),
"b-c3-n12:6651"
);
}
#[test]
fn proxy_broker_authority_passes_through_bare_host_port() {
assert_eq!(proxy_broker_authority("b-c3-n12:6650"), "b-c3-n12:6650");
}
#[test]
fn proxy_broker_authority_trims_trailing_path_segments() {
assert_eq!(
proxy_broker_authority("pulsar://b-c3-n12:6650/extra/path"),
"b-c3-n12:6650"
);
}
}