use crate::error::Error;
use crate::hosts::solr_host::SolrHost;
use async_trait::async_trait;
use log::debug;
use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
use zookeeper_async::{WatchedEvent, Watcher, ZkResult, ZooKeeper};
#[derive(Clone)]
pub struct ZookeeperEnsembleHostConnector {
pub hosts: Vec<String>,
pub timeout: Duration,
}
impl ZookeeperEnsembleHostConnector {
pub fn new<S: Into<String>, V: IntoIterator<Item = S>>(
hosts: V,
timeout: Duration,
) -> ZookeeperEnsembleHostConnector {
ZookeeperEnsembleHostConnector {
hosts: hosts.into_iter().map(|s| s.into()).collect(),
timeout,
}
}
pub async fn connect(self) -> Result<ZookeeperEnsembleHost, Error> {
ZookeeperEnsembleHost::new(self.hosts.as_slice(), self.timeout).await
}
}
#[cfg(feature = "blocking")]
use crate::runtime::RUNTIME;
#[cfg(feature = "blocking")]
impl ZookeeperEnsembleHostConnector {
pub fn connect_blocking(self) -> Result<ZookeeperEnsembleHost, Error> {
RUNTIME.block_on(self.connect())
}
}
#[derive(Clone)]
pub struct ZookeeperEnsembleHost {
client: Arc<ZooKeeper>,
}
impl ZookeeperEnsembleHost {
pub(crate) async fn new<S: Into<String>, V: IntoIterator<Item = S>>(
hosts: V,
timeout: Duration,
) -> Result<ZookeeperEnsembleHost, Error> {
let hosts = hosts.into_iter().map(|s| s.into()).collect::<Vec<String>>();
let hosts = hosts.join(",");
Ok(ZookeeperEnsembleHost {
client: Arc::new(ZooKeeper::connect(hosts.as_ref(), timeout, LoggingWatcher).await?),
})
}
}
#[async_trait]
impl SolrHost for ZookeeperEnsembleHost {
async fn get_solr_node<'a>(&'a self) -> Result<Cow<'a, str>, Error> {
let hosts = get_hosts_from_zookeeper(&self.client).await?;
match hosts.get(fastrand::usize(0..hosts.len())) {
None => Err(Error::SolrSetupError(
"No ready Solr nodes from Zookeeper".to_string(),
)),
Some(r) => Ok(Cow::Owned(format!(
"http://{}",
r.strip_suffix("_solr").unwrap_or(r)
))),
}
}
}
pub struct LoggingWatcher;
impl Watcher for LoggingWatcher {
fn handle(&self, e: WatchedEvent) {
debug!("{:?}", e)
}
}
pub(crate) async fn get_hosts_from_zookeeper(client: &ZooKeeper) -> ZkResult<Vec<String>> {
client.get_children("/live_nodes", true).await
}