use crate::connection_manager::{BrokerAddress, ConnectionManager};
use crate::error::{ConnectionError, ServiceDiscoveryError};
use crate::executor::Executor;
use crate::message::proto::{
command_lookup_topic_response, command_partitioned_topic_metadata_response,
CommandLookupTopicResponse,
};
use futures::{future::try_join_all, FutureExt};
use std::sync::Arc;
use url::Url;
use crate::error::ServiceDiscoveryError::NotFound;
#[derive(Clone)]
pub struct ServiceDiscovery<Exe: Executor> {
manager: Arc<ConnectionManager<Exe>>,
}
impl<Exe: Executor> ServiceDiscovery<Exe> {
pub fn with_manager(manager: Arc<ConnectionManager<Exe>>) -> Self {
ServiceDiscovery { manager }
}
pub async fn lookup_topic<S: Into<String>>(
&self,
topic: S,
) -> Result<BrokerAddress, ServiceDiscoveryError> {
let topic = topic.into();
let mut proxied_query = false;
let mut conn = self.manager.get_base_connection().await?;
let base_url = self.manager.url.clone();
let mut is_authoritative = false;
let mut broker_address = self.manager.get_base_address();
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = self.manager.operation_retry_options.clone();
loop {
let response = match conn
.sender()
.lookup_topic(topic.to_string(), is_authoritative)
.await
{
Ok(res) => res,
Err(ConnectionError::Disconnected) => {
error!("tried to lookup a topic but connection was closed, reconnecting...");
conn = self.manager.get_connection(&broker_address).await?;
conn.sender()
.lookup_topic(topic.to_string(), is_authoritative)
.await?
}
Err(e) => return Err(e.into()),
};
if response.response.is_none()
|| response.response
== Some(command_lookup_topic_response::LookupType::Failed as i32)
{
let error = response.error.and_then(crate::error::server_error);
if error == Some(crate::message::proto::ServerError::ServiceNotReady) {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("lookup({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})", topic, operation_retry_options.retry_delay.as_millis(), operation_retry_options.max_retries);
current_retries += 1;
self.manager
.executor
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else {
error!("lookup({}) reached max retries", topic);
}
}
return Err(ServiceDiscoveryError::Query(
error,
response.message.clone(),
));
}
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"lookup({}) success after {} retries over {} seconds",
topic,
current_retries + 1,
dur
);
}
let LookupResponse {
broker_url,
broker_url_tls,
proxy,
redirect,
authoritative,
} = convert_lookup_response(&response)?;
is_authoritative = authoritative;
let connection_url = if let Some(u) = &broker_url_tls {
u.clone()
} else if let Some(u) = &broker_url {
u.clone()
} else {
return Err(ServiceDiscoveryError::NotFound);
};
let url = if proxied_query || proxy {
base_url.clone()
} else {
connection_url.clone()
};
let broker_url = if let Some(u) = broker_url_tls {
format!("{}:{}", u.host_str().unwrap(), u.port().unwrap_or(6651))
} else if let Some(u) = broker_url {
format!("{}:{}", u.host_str().unwrap(), u.port().unwrap_or(6650))
} else {
return Err(ServiceDiscoveryError::NotFound);
};
broker_address = BrokerAddress {
url,
broker_url,
proxy: proxied_query || proxy,
};
if redirect {
conn = self.manager.get_connection(&broker_address).await?;
proxied_query = broker_address.proxy;
continue;
} else {
let res = self
.manager
.get_connection(&broker_address)
.await
.map(|_| broker_address)
.map_err(ServiceDiscoveryError::Connection);
break res;
}
}
}
pub async fn lookup_partitioned_topic_number<S: Into<String>>(
&self,
topic: S,
) -> Result<u32, ServiceDiscoveryError> {
let mut connection = self.manager.get_base_connection().await?;
let topic = topic.into();
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = self.manager.operation_retry_options.clone();
let response = loop {
let response = match connection.sender().lookup_partitioned_topic(&topic).await {
Ok(res) => res,
Err(ConnectionError::Disconnected) => {
error!("tried to lookup a topic but connection was closed, reconnecting...");
connection = self.manager.get_base_connection().await?;
connection.sender().lookup_partitioned_topic(&topic).await?
}
Err(e) => return Err(e.into()),
};
if response.response.is_none()
|| response.response
== Some(command_partitioned_topic_metadata_response::LookupType::Failed as i32)
{
let error = response.error.and_then(crate::error::server_error);
if error == Some(crate::message::proto::ServerError::ServiceNotReady) {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("lookup_partitioned_topic_number({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries);
current_retries += 1;
self.manager
.executor
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else {
error!(
"lookup_partitioned_topic_number({}) reached max retries",
topic
);
}
}
return Err(ServiceDiscoveryError::Query(
error,
response.message.clone(),
));
}
break response;
};
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"lookup_partitioned_topic_number({}) success after {} retries over {} seconds",
topic,
current_retries + 1,
dur
);
}
match response.partitions {
Some(partitions) => Ok(partitions),
None => Err(ServiceDiscoveryError::Query(
response.error.and_then(crate::error::server_error),
response.message,
)),
}
}
pub async fn lookup_partitioned_topic<S: Into<String>>(
&self,
topic: S,
) -> Result<Vec<(String, BrokerAddress)>, ServiceDiscoveryError> {
let topic = topic.into();
let partitions = self.lookup_partitioned_topic_number(&topic).await?;
trace!("Partitions for topic {}: {}", &topic, &partitions);
let topics = match partitions {
0 => vec![topic],
_ => (0..partitions)
.map(|n| format!("{}-partition-{}", &topic, n))
.collect(),
};
try_join_all(topics.into_iter().map(|topic| {
self.lookup_topic(topic.clone())
.map(move |address_res| match address_res {
Err(e) => Err(e),
Ok(address) => Ok((topic, address)),
})
}))
.await
}
}
struct LookupResponse {
pub broker_url: Option<Url>,
pub broker_url_tls: Option<Url>,
pub proxy: bool,
pub redirect: bool,
pub authoritative: bool,
}
fn convert_lookup_response(
response: &CommandLookupTopicResponse,
) -> Result<LookupResponse, ServiceDiscoveryError> {
let proxy = response.proxy_through_service_url.unwrap_or(false);
let authoritative = response.authoritative.unwrap_or(false);
let redirect =
response.response == Some(command_lookup_topic_response::LookupType::Redirect as i32);
let broker_url = match response.broker_service_url.as_ref() {
Some(u) => Some(Url::parse(&response.broker_service_url.clone().unwrap()).map_err(|e| {
error!("error parsing URL: {:?}", e);
ServiceDiscoveryError::NotFound
})?),
None => None,
};
let broker_url_tls = match response.broker_service_url_tls.as_ref() {
Some(u) => Some(Url::parse(u).map_err(|e| {
error!("error parsing URL: {:?}", e);
ServiceDiscoveryError::NotFound
})?),
None => None,
};
Ok(LookupResponse {
broker_url,
broker_url_tls,
proxy,
redirect,
authoritative,
})
}