use super::connector::Connector;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
pub async fn create_lease(
connector: Arc<Connector>,
ttl: u64,
token: CancellationToken,
) -> anyhow::Result<u64> {
let mut lease_client = connector.get_client().lease_client();
let lease = lease_client.grant(ttl as i64, None).await?;
let id = lease.id() as u64;
let ttl = lease.ttl() as u64;
let child = token.child_token();
tokio::spawn(async move {
match keep_alive(connector, id, ttl, child).await {
Ok(_) => tracing::trace!("keep alive task exited successfully"),
Err(e) => {
tracing::error!(
error = %e,
"Unable to maintain lease. Check etcd server status"
);
token.cancel();
}
}
});
Ok(id)
}
async fn keep_alive(
connector: Arc<Connector>,
lease_id: u64,
mut ttl: u64,
token: CancellationToken,
) -> anyhow::Result<()> {
let mut deadline = Instant::now() + Duration::from_secs(ttl);
loop {
let mut lease_client = connector.get_client().lease_client();
let (mut heartbeat_sender, mut heartbeat_receiver) = match lease_client
.keep_alive(lease_id as i64)
.await
{
Ok((sender, receiver)) => {
tracing::debug!(lease_id, "Established keep-alive stream");
(sender, receiver)
}
Err(e) => {
tracing::warn!(lease_id, error = %e, "Failed to establish keep-alive stream");
tokio::select! {
biased;
reconnect_result = connector.reconnect(deadline) => {
match reconnect_result {
Err(e) => return Err(e),
_ => continue,
}
}
_ = token.cancelled() => {
tracing::debug!(lease_id, "Cancellation token triggered during reconnection");
return Ok(());
}
}
}
};
loop {
if deadline < std::time::Instant::now() {
anyhow::bail!(
"Unable to refresh lease - deadline exceeded. Check etcd server status"
);
}
tokio::select! {
biased;
status = heartbeat_receiver.message() => {
match status {
Ok(Some(resp)) => {
tracing::trace!(lease_id, "keep alive response received: {:?}", resp);
ttl = resp.ttl() as u64;
deadline = Instant::now() + Duration::from_secs(ttl);
if resp.ttl() == 0 {
anyhow::bail!("Unable to maintain lease - expired or revoked. Check etcd server status");
}
}
Ok(None) => {
tracing::warn!(lease_id, "Keep-alive stream unexpectedly ended");
break;
}
Err(e) => {
tracing::warn!(lease_id, error = %e, "Keep-alive stream error");
break;
}
}
}
_ = token.cancelled() => {
tracing::debug!(lease_id, "cancellation token triggered; revoking lease");
if let Err(e) = lease_client.revoke(lease_id as i64).await {
tracing::warn!(
lease_id,
error = %e,
"Failed to revoke lease during cancellation. Cleanup may be incomplete."
);
}
return Ok(());
}
_ = tokio::time::sleep(Duration::from_secs(ttl / 2)) => {
tracing::trace!(lease_id, "sending keep alive");
if let Err(e) = heartbeat_sender.keep_alive().await {
tracing::warn!(
lease_id,
error = %e,
"Unable to send lease heartbeat. Check etcd server status"
);
ttl = 0;
}
}
}
}
}
}