lash-core 0.1.0-alpha.66

Sans-IO turn machine and runtime kernel for the lash agent runtime.
Documentation
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

use tokio_util::sync::CancellationToken;

use super::{Clock, RUNTIME_TURN_LEASE_TTL_MS};
use crate::store::{
    RuntimeCommit, RuntimeCommitResult, RuntimePersistence, SessionExecutionLease,
    SessionExecutionLeaseCompletion, SessionExecutionLeaseFence, StoreError,
};

pub(super) struct SessionExecutionLeaseGuard {
    store: Arc<dyn RuntimePersistence>,
    lease: Arc<StdMutex<SessionExecutionLease>>,
    released: Arc<AtomicBool>,
    lost: Arc<AtomicBool>,
    renew_task: tokio::task::JoinHandle<()>,
}

impl SessionExecutionLeaseGuard {
    pub(super) async fn try_acquire(
        store: Arc<dyn RuntimePersistence>,
        session_id: &str,
        owner_id: &str,
        clock: Arc<dyn Clock>,
        cancel: CancellationToken,
    ) -> Result<Option<Self>, StoreError> {
        let Some(lease) = store
            .try_claim_session_execution_lease(session_id, owner_id, RUNTIME_TURN_LEASE_TTL_MS)
            .await?
        else {
            tracing::debug!(
                session_id,
                owner_id,
                event = "session_execution_lease.busy",
                "session execution lease is busy"
            );
            return Ok(None);
        };
        tracing::debug!(
            session_id = %lease.session_id,
            owner_id = %lease.owner_id,
            fencing_token = lease.fencing_token,
            event = "session_execution_lease.acquired",
            "acquired session execution lease"
        );
        let lease = Arc::new(StdMutex::new(lease));
        let released = Arc::new(AtomicBool::new(false));
        let lost = Arc::new(AtomicBool::new(false));
        let renew_task = spawn_renewal_task(
            Arc::clone(&store),
            Arc::clone(&lease),
            Arc::clone(&released),
            Arc::clone(&lost),
            clock,
            cancel,
        );
        Ok(Some(Self {
            store,
            lease,
            released,
            lost,
            renew_task,
        }))
    }

    pub(super) fn fence(&self) -> SessionExecutionLeaseFence {
        self.lease.lock().expect("session lease lock").fence()
    }

    pub(super) fn completion(&self) -> SessionExecutionLeaseCompletion {
        self.lease.lock().expect("session lease lock").completion()
    }

    pub(super) fn mark_released(&self) {
        if self.released.swap(true, Ordering::AcqRel) {
            return;
        }
        self.renew_task.abort();
        let completion = self.completion();
        tracing::debug!(
            session_id = %completion.session_id,
            owner_id = %completion.owner_id,
            fencing_token = completion.fencing_token,
            event = "session_execution_lease.released",
            "released session execution lease"
        );
    }

    pub(super) fn is_lost(&self) -> bool {
        self.lost.load(Ordering::Acquire)
    }

    pub(super) async fn refresh_or_mark_lost(&self) -> Result<(), StoreError> {
        if self.is_lost() {
            let fence = self.fence();
            return Err(StoreError::SessionExecutionLeaseExpired {
                session_id: fence.session_id,
            });
        }
        let fence = self.fence();
        match self
            .store
            .renew_session_execution_lease(&fence, RUNTIME_TURN_LEASE_TTL_MS)
            .await
        {
            Ok(renewed) => {
                tracing::debug!(
                    session_id = %renewed.session_id,
                    owner_id = %renewed.owner_id,
                    fencing_token = renewed.fencing_token,
                    event = "session_execution_lease.renewed",
                    "renewed session execution lease"
                );
                *self.lease.lock().expect("session lease lock") = renewed;
                Ok(())
            }
            Err(err) => {
                self.lost.store(true, Ordering::Release);
                self.renew_task.abort();
                tracing::warn!(
                    error = %err,
                    session_id = %fence.session_id,
                    owner_id = %fence.owner_id,
                    fencing_token = fence.fencing_token,
                    event = "session_execution_lease.lost",
                    "lost session execution lease"
                );
                Err(err)
            }
        }
    }

    pub(super) async fn release_if_live(&self) -> Result<(), StoreError> {
        if self.released.swap(true, Ordering::AcqRel) {
            return Ok(());
        }
        self.renew_task.abort();
        if self.is_lost() {
            return Ok(());
        }
        let completion = self.completion();
        self.store
            .release_session_execution_lease(&completion)
            .await?;
        tracing::debug!(
            session_id = %completion.session_id,
            owner_id = %completion.owner_id,
            fencing_token = completion.fencing_token,
            event = "session_execution_lease.released",
            "released session execution lease"
        );
        Ok(())
    }
}

pub(super) async fn commit_runtime_state_with_fresh_session_execution_lease(
    store: Arc<dyn RuntimePersistence>,
    commit: RuntimeCommit,
    owner_id: &str,
    clock: Arc<dyn Clock>,
) -> Result<RuntimeCommitResult, StoreError> {
    let session_id = commit.session_id.clone();
    let Some(lease) = SessionExecutionLeaseGuard::try_acquire(
        Arc::clone(&store),
        &session_id,
        owner_id,
        clock,
        CancellationToken::new(),
    )
    .await?
    else {
        return Err(StoreError::Backend(format!(
            "session execution lease for session `{session_id}` is busy"
        )));
    };
    let commit = commit
        .with_session_execution_lease(lease.fence())
        .releasing_session_execution_lease(lease.completion());
    let result = store.commit_runtime_state(commit).await?;
    lease.mark_released();
    Ok(result)
}

impl Drop for SessionExecutionLeaseGuard {
    fn drop(&mut self) {
        self.renew_task.abort();
    }
}

fn spawn_renewal_task(
    store: Arc<dyn RuntimePersistence>,
    lease: Arc<StdMutex<SessionExecutionLease>>,
    released: Arc<AtomicBool>,
    lost: Arc<AtomicBool>,
    clock: Arc<dyn Clock>,
    cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> {
    let renew_every = Duration::from_millis((RUNTIME_TURN_LEASE_TTL_MS / 3).max(1));
    tokio::spawn(async move {
        loop {
            clock.sleep(renew_every).await;
            if released.load(Ordering::Acquire) {
                break;
            }
            let fence = lease.lock().expect("session lease lock").fence();
            match store
                .renew_session_execution_lease(&fence, RUNTIME_TURN_LEASE_TTL_MS)
                .await
            {
                Ok(renewed) => {
                    tracing::debug!(
                        session_id = %renewed.session_id,
                        owner_id = %renewed.owner_id,
                        fencing_token = renewed.fencing_token,
                        event = "session_execution_lease.renewed",
                        "renewed session execution lease"
                    );
                    *lease.lock().expect("session lease lock") = renewed;
                }
                Err(err) => {
                    lost.store(true, Ordering::Release);
                    tracing::warn!(
                        error = %err,
                        session_id = %fence.session_id,
                        owner_id = %fence.owner_id,
                        fencing_token = fence.fencing_token,
                        event = "session_execution_lease.lost",
                        "lost session execution lease"
                    );
                    cancel.cancel();
                    break;
                }
            }
        }
    })
}