use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};
use tokio::{
sync::oneshot,
task::JoinHandle,
time::{sleep, timeout},
};
use crate::discovery_etcd::EtcdDiscoveryConfig;
use crate::discovery_etcd::client::SharedEtcdClient;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EtcdLeaseState {
Active,
Failed(String),
Stopped,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EtcdLeaseStatus {
pub lease_id: i64,
pub state: EtcdLeaseState,
}
pub(crate) type LeaseStatuses = Arc<Mutex<BTreeMap<String, EtcdLeaseStatus>>>;
pub(crate) struct LeaseRegistration {
pub(crate) lease_id: i64,
stop: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
}
impl LeaseRegistration {
pub(crate) fn stop(mut self) {
if let Some(stop) = self.stop.take() {
let _ = stop.send(());
}
self.task.abort();
}
}
enum KeepAliveOutcome {
Stopped,
Failed(String),
}
pub(crate) fn spawn_keep_alive_task(
config: EtcdDiscoveryConfig,
client: SharedEtcdClient,
key: String,
lease_id: i64,
statuses: LeaseStatuses,
) -> LeaseRegistration {
set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Active);
let (stop, stop_receiver) = oneshot::channel();
let task = tokio::spawn(lease_keep_alive_task(
config,
client,
key,
lease_id,
statuses,
stop_receiver,
));
LeaseRegistration {
lease_id,
stop: Some(stop),
task,
}
}
pub(crate) fn set_lease_status(
statuses: &LeaseStatuses,
key: &str,
lease_id: i64,
state: EtcdLeaseState,
) {
statuses
.lock()
.expect("lease status mutex poisoned")
.insert(key.to_string(), EtcdLeaseStatus { lease_id, state });
}
async fn lease_keep_alive_task(
config: EtcdDiscoveryConfig,
client: SharedEtcdClient,
key: String,
lease_id: i64,
statuses: LeaseStatuses,
mut stop: oneshot::Receiver<()>,
) {
loop {
let outcome = run_keep_alive_once(&config, &client, lease_id, &mut stop).await;
match outcome {
KeepAliveOutcome::Stopped => {
set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Stopped);
break;
}
KeepAliveOutcome::Failed(message) => {
set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Failed(message));
}
}
tokio::select! {
_ = &mut stop => {
set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Stopped);
break;
}
_ = sleep(config.reconnect_interval) => {}
}
}
}
async fn run_keep_alive_once(
config: &EtcdDiscoveryConfig,
client: &SharedEtcdClient,
lease_id: i64,
stop: &mut oneshot::Receiver<()>,
) -> KeepAliveOutcome {
let (mut keeper, mut stream) = match open_keep_alive(config, client, lease_id).await {
Ok(stream) => stream,
Err(error) => return KeepAliveOutcome::Failed(error),
};
if let Err(error) = send_keep_alive(&mut keeper, &mut stream, config.keep_alive_timeout).await {
return KeepAliveOutcome::Failed(error);
}
loop {
tokio::select! {
_ = &mut *stop => return KeepAliveOutcome::Stopped,
_ = sleep(config.keep_alive_interval) => {
if let Err(error) =
send_keep_alive(&mut keeper, &mut stream, config.keep_alive_timeout).await
{
return KeepAliveOutcome::Failed(error);
}
}
}
}
}
async fn send_keep_alive(
keeper: &mut etcd_client::LeaseKeeper,
stream: &mut etcd_client::LeaseKeepAliveStream,
response_timeout: std::time::Duration,
) -> Result<(), String> {
keeper
.keep_alive()
.await
.map_err(|error| error.to_string())?;
match timeout(response_timeout, stream.message()).await {
Ok(Ok(Some(response))) if response.ttl() > 0 => Ok(()),
Ok(Ok(Some(_))) => Err("lease keep-alive returned non-positive ttl".to_string()),
Ok(Ok(None)) => Err("lease keep-alive stream closed".to_string()),
Ok(Err(error)) => Err(error.to_string()),
Err(_) => Err("lease keep-alive timed out".to_string()),
}
}
async fn open_keep_alive(
config: &EtcdDiscoveryConfig,
client: &SharedEtcdClient,
lease_id: i64,
) -> Result<(etcd_client::LeaseKeeper, etcd_client::LeaseKeepAliveStream), String> {
let mut client = client.lock().await;
timeout(config.operation_timeout, client.lease_keep_alive(lease_id))
.await
.map_err(|_| "lease_keep_alive timed out".to_string())?
.map_err(|error| error.to_string())
}