dynamo-runtime 0.7.0-post1

Dynamo Runtime Library
Documentation
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::connector::Connector;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;

/// Create an etcd lease with the given TTL, attach it to the provided cancellation token,
/// spawn a keep-alive task, and return the lease id (u64).
///
/// Note: this function spawns a background task that maintains the lease until the token is
/// cancelled or an unrecoverable error occurs.
pub async fn create_lease(
    connector: Arc<Connector>,
    ttl: u64,
    token: CancellationToken,
) -> anyhow::Result<u64> {
    let mut lease_client = connector.get_client().lease_client();
    let lease = lease_client.grant(ttl as i64, None).await?;

    let id = lease.id() as u64;
    let ttl = lease.ttl() as u64;
    let child = token.child_token();

    tokio::spawn(async move {
        match keep_alive(connector, id, ttl, child).await {
            Ok(_) => tracing::trace!("keep alive task exited successfully"),
            Err(e) => {
                tracing::error!(
                    error = %e,
                    "Unable to maintain lease. Check etcd server status"
                );
                token.cancel();
            }
        }
    });

    Ok(id)
}

/// Task to keep leases alive with reconnection support.
///
/// If this task returns an error, the cancellation token will be invoked on the runtime.
async fn keep_alive(
    connector: Arc<Connector>,
    lease_id: u64,
    mut ttl: u64,
    token: CancellationToken,
) -> anyhow::Result<()> {
    let mut deadline = Instant::now() + Duration::from_secs(ttl);

    loop {
        // Try to establish or re-establish the keep-alive stream
        let mut lease_client = connector.get_client().lease_client();
        let (mut heartbeat_sender, mut heartbeat_receiver) = match lease_client
            .keep_alive(lease_id as i64)
            .await
        {
            Ok((sender, receiver)) => {
                tracing::debug!(lease_id, "Established keep-alive stream");
                (sender, receiver)
            }
            Err(e) => {
                tracing::warn!(lease_id, error = %e, "Failed to establish keep-alive stream");

                // Try to reconnect with the deadline, but also check for cancellation
                tokio::select! {
                    biased;

                    reconnect_result = connector.reconnect(deadline) => {
                        match reconnect_result {
                            Err(e) => return Err(e),
                            _ => continue,
                        }
                    }

                    _ = token.cancelled() => {
                        tracing::debug!(lease_id, "Cancellation token triggered during reconnection");
                        return Ok(());
                    }
                }
            }
        };

        // Keep-alive loop with the established stream
        loop {
            if deadline < std::time::Instant::now() {
                anyhow::bail!(
                    "Unable to refresh lease - deadline exceeded. Check etcd server status"
                );
            }

            tokio::select! {
                biased;

                status = heartbeat_receiver.message() => {
                    match status {
                        Ok(Some(resp)) => {
                            tracing::trace!(lease_id, "keep alive response received: {:?}", resp);

                            // Update ttl and deadline from response
                            ttl = resp.ttl() as u64;
                            deadline = Instant::now() + Duration::from_secs(ttl);

                            if resp.ttl() == 0 {
                                anyhow::bail!("Unable to maintain lease - expired or revoked. Check etcd server status");
                            }
                        }
                        Ok(None) => {
                            tracing::warn!(lease_id, "Keep-alive stream unexpectedly ended");
                            break;
                        }
                        Err(e) => {
                            tracing::warn!(lease_id, error = %e, "Keep-alive stream error");
                            break;
                        }
                    }
                }

                _ = token.cancelled() => {
                    tracing::debug!(lease_id, "cancellation token triggered; revoking lease");
                    if let Err(e) = lease_client.revoke(lease_id as i64).await {
                        tracing::warn!(
                            lease_id,
                            error = %e,
                            "Failed to revoke lease during cancellation. Cleanup may be incomplete."
                        );
                    }
                    return Ok(());
                }

                _ = tokio::time::sleep(Duration::from_secs(ttl / 2)) => {
                    tracing::trace!(lease_id, "sending keep alive");

                    // if we get a error issuing the heartbeat, set the ttl to 0
                    // this will allow us to poll the response stream once and the cancellation
                    // token once, then immediately try to tick the heartbeat
                    // this will repeat until either the heartbeat is reestablished or the deadline
                    // is exceeded
                    if let Err(e) = heartbeat_sender.keep_alive().await {
                        tracing::warn!(
                            lease_id,
                            error = %e,
                            "Unable to send lease heartbeat. Check etcd server status"
                        );
                        ttl = 0;
                    }
                }
            }
        }
    }
}