use anyhow::Result;
pub trait ZmqRegistry: Send + 'static {
fn register(&self, name: &str, address: &str) -> Result<Box<dyn ZmqHandle>>;
fn lookup(&self, name: &str) -> Result<String>;
}
pub trait ZmqHandle: Send {
fn revoke(&mut self);
}
pub struct ZmqPubRegistration(pub(crate) Option<(String, Box<dyn ZmqRegistry>)>);
impl From<()> for ZmqPubRegistration {
fn from(_: ()) -> Self {
ZmqPubRegistration(None)
}
}
impl<R: ZmqRegistry + 'static> From<(&str, R)> for ZmqPubRegistration {
fn from((name, registry): (&str, R)) -> Self {
ZmqPubRegistration(Some((name.to_string(), Box::new(registry))))
}
}
pub struct ZmqSubConfig(pub(crate) ZmqSubResolution);
pub(crate) enum ZmqSubResolution {
Direct(String),
Discover(String, Box<dyn ZmqRegistry>),
}
impl From<&str> for ZmqSubConfig {
fn from(addr: &str) -> Self {
ZmqSubConfig(ZmqSubResolution::Direct(addr.to_string()))
}
}
impl From<String> for ZmqSubConfig {
fn from(addr: String) -> Self {
ZmqSubConfig(ZmqSubResolution::Direct(addr))
}
}
impl From<&String> for ZmqSubConfig {
fn from(addr: &String) -> Self {
ZmqSubConfig(ZmqSubResolution::Direct(addr.clone()))
}
}
impl<R: ZmqRegistry + 'static> From<(&str, R)> for ZmqSubConfig {
fn from((name, registry): (&str, R)) -> Self {
ZmqSubConfig(ZmqSubResolution::Discover(
name.to_string(),
Box::new(registry),
))
}
}
#[cfg(feature = "etcd")]
pub use etcd_impl::EtcdRegistry;
#[cfg(feature = "etcd")]
mod etcd_impl {
use super::{ZmqHandle, ZmqRegistry};
use crate::adapters::etcd::EtcdConnection;
use anyhow::Result;
use etcd_client::{Client, PutOptions};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
use tokio::sync::Notify;
const LEASE_TTL_SECS: i64 = 30;
const KEEPALIVE_INTERVAL_SECS: u64 = 10;
pub struct EtcdRegistry {
conn: EtcdConnection,
}
impl EtcdRegistry {
pub fn new(conn: impl Into<EtcdConnection>) -> Self {
Self { conn: conn.into() }
}
pub fn with_endpoints(endpoints: impl IntoIterator<Item = impl Into<String>>) -> Self {
Self {
conn: EtcdConnection::with_endpoints(endpoints),
}
}
}
struct EtcdHandle {
lease_id: i64,
conn: EtcdConnection,
shutdown: Arc<Notify>,
keepalive_thread: Option<JoinHandle<()>>,
}
impl ZmqHandle for EtcdHandle {
fn revoke(&mut self) {
self.shutdown.notify_one();
if let Some(t) = self.keepalive_thread.take() {
let _ = t.join();
}
revoke_lease(self.lease_id, &self.conn);
}
}
impl ZmqRegistry for EtcdRegistry {
fn register(&self, name: &str, address: &str) -> Result<Box<dyn ZmqHandle>> {
let handle = etcd_register(name, address, &self.conn)?;
Ok(Box::new(handle))
}
fn lookup(&self, name: &str) -> Result<String> {
etcd_lookup(name, &self.conn)
}
}
fn etcd_register(name: &str, address: &str, conn: &EtcdConnection) -> Result<EtcdHandle> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let lease_id = rt.block_on(async {
let mut client = Client::connect(&conn.endpoints, None)
.await
.map_err(|e| anyhow::anyhow!("etcd connect failed: {e}"))?;
let lease_resp = client
.lease_grant(LEASE_TTL_SECS, None)
.await
.map_err(|e| anyhow::anyhow!("etcd lease_grant failed: {e}"))?;
let id = lease_resp.id();
let opts = PutOptions::new().with_lease(id);
client
.put(name, address, Some(opts))
.await
.map_err(|e| anyhow::anyhow!("etcd put failed: {e}"))?;
Ok::<i64, anyhow::Error>(id)
})?;
let shutdown = Arc::new(Notify::new());
let shutdown_clone = shutdown.clone();
let endpoints = conn.endpoints.clone();
let keepalive_thread = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("etcd keepalive runtime");
rt.block_on(async {
let mut client = match Client::connect(&endpoints, None).await {
Ok(c) => c,
Err(e) => {
log::error!("etcd keepalive connect failed: {e}");
return;
}
};
let (mut keeper, mut ka_stream) = match client.lease_keep_alive(lease_id).await {
Ok(pair) => pair,
Err(e) => {
log::error!("etcd lease_keep_alive failed: {e}");
return;
}
};
loop {
tokio::select! {
_ = shutdown_clone.notified() => break,
_ = tokio::time::sleep(Duration::from_secs(KEEPALIVE_INTERVAL_SECS)) => {}
}
if keeper.keep_alive().await.is_err() {
break;
}
match ka_stream.message().await {
Ok(Some(_)) => {}
_ => break,
}
}
});
});
Ok(EtcdHandle {
lease_id,
conn: conn.clone(),
shutdown,
keepalive_thread: Some(keepalive_thread),
})
}
fn etcd_lookup(name: &str, conn: &EtcdConnection) -> Result<String> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(async {
let mut client = Client::connect(&conn.endpoints, None)
.await
.map_err(|e| anyhow::anyhow!("etcd connect failed: {e}"))?;
let resp = client
.get(name, None)
.await
.map_err(|e| anyhow::anyhow!("etcd get failed: {e}"))?;
match resp.kvs().first() {
Some(kv) => {
let addr = kv
.value_str()
.map_err(|e| anyhow::anyhow!("etcd value is not valid UTF-8: {e}"))?
.to_string();
Ok(addr)
}
None => Err(anyhow::anyhow!(
"no publisher named {:?} found in etcd",
name
)),
}
})
}
fn revoke_lease(lease_id: i64, conn: &EtcdConnection) {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::error!("etcd revoke runtime failed: {e}");
return;
}
};
rt.block_on(async {
let mut client = match Client::connect(&conn.endpoints, None).await {
Ok(c) => c,
Err(e) => {
log::error!("etcd revoke connect failed: {e}");
return;
}
};
if let Err(e) = client.lease_revoke(lease_id).await {
log::error!("etcd lease_revoke failed: {e}");
}
});
}
}