pdk-lock-lib 1.7.0

PDK Lock Library
Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

//! PDK Lock Library
//!
//! Primitives for coordinating work among Flex Gateway workers and policies. Generated locks
//! expire automatically if not refreshed and are released on drop.
//!
//! ## Primary types
//!
//! - [`LockBuilder`]/[`LockBuilderInstance`]: construct lock instances with
//!   custom expiration and optional isolation ([`LockBuilderInstance::shared`] option)
//! - [`TryLock`]: attempts to acquire a lock via [`TryLock::try_lock`]
//! - [`Lock`]: an acquired lock that can be refreshed via [`Lock::refresh_lock`]
//!

mod builder;

#[cfg(test)]
mod test;

use pdk_core::classy::{Clock, SharedData};
use std::cell::Cell;
use std::ops::Add;
use std::rc::Rc;
use std::time::{Duration, SystemTime};

pub use builder::{LockBuilder, LockBuilderInstance};

/// The built lock. Use this object's [`try_lock`](TryLock::try_lock) to ensure mutual exclusion.
pub struct TryLock {
    key: String,
    lock_expiration: Duration,
    clock: Rc<dyn Clock>,
    shared_data: Rc<dyn SharedData>,
}

impl TryLock {
    pub(crate) fn new(
        key: String,
        lock_expiration: Duration,
        clock: Rc<dyn Clock>,
        shared_data: Rc<dyn SharedData>,
    ) -> Self {
        Self {
            key,
            lock_expiration,
            clock,
            shared_data,
        }
    }

    fn has_time_expired(&self, now: SystemTime, time: SystemTime) -> bool {
        time.add(self.lock_expiration).lt(&now)
    }

    fn current_lock(&self) -> (Option<SystemTime>, u32) {
        let (data, cas) = self.shared_data.shared_data_get(self.key.as_str());
        let data = data.and_then(|data| bincode::deserialize(&data).ok());
        // An empty cas means the key was not present in the shared data. Cas sequentially increases
        // from 0, it is safe to assume that no u32::MAX store operations happened between this get
        // and the following store.
        let cas = cas.unwrap_or(u32::MAX);
        (data, cas)
    }

    /// Return the cas for the lock after updating the value, it will return None if the value does
    /// not match with the value that was saved.
    /// This method is necessary since we have seen cases where, the cas after a store operation
    /// increases by more than '1'. It still remains monotonically increasing.
    fn current_cas(&self, saved: SystemTime) -> Option<u32> {
        if let (Some(current), cas) = self.current_lock() {
            if current.eq(&saved) {
                return Some(cas);
            }
        }
        None
    }

    fn store_lock(&self, lock: Option<SystemTime>, cas: u32) -> bool {
        let data = match lock {
            Some(time) => bincode::serialize(&time).unwrap_or_default(),
            None => Vec::new(),
        };

        self.shared_data
            .shared_data_set(self.key.as_str(), data.as_slice(), Some(cas))
            .is_ok()
    }

    /// Acquire the lock. Returns the lock if the lock was acquired, None if the lock is currently held
    /// by someone else or it is already owned by the current worker. The lock is released on drop.
    pub fn try_lock(&self) -> Option<Lock> {
        let now = self.clock.get_current_time();
        let (current, cas) = self.current_lock();

        let get_lock = current
            .map(|time| self.has_time_expired(now, time))
            .unwrap_or(true);

        if get_lock {
            pdk_core::logger::debug!(
                "Lock is free or expired. Obtaining lock for key {}",
                self.key
            );
            let option_lock = self
                .store_lock(Some(now), cas)
                .then(|| self.current_cas(now))
                .flatten()
                .map(|cas| Lock::new(self, cas));

            if option_lock.is_some() {
                pdk_core::logger::debug!("Obtained lock for key {}", self.key);
            } else {
                pdk_core::logger::debug!("Could not obtain lock for key {}", self.key);
            }

            option_lock
        } else {
            pdk_core::logger::debug!(
                "Lock {} is currently held by some one else. Will expire in {:?}.",
                self.key,
                current
                    .map(|time| time
                        .add(self.lock_expiration)
                        .duration_since(now)
                        .unwrap_or_default())
                    .unwrap_or_default()
            );
            None
        }
    }
}

/// This is a try lock that expires automatically if it was not refreshed before the expiration time.
/// The lock is released on drop.
pub struct Lock<'a> {
    lock: &'a TryLock,
    current_lock: Cell<u32>,
}

impl<'a> Lock<'a> {
    fn new(lock: &'a TryLock, current_lock: u32) -> Self {
        Self {
            lock,
            current_lock: Cell::new(current_lock),
        }
    }
}

impl Lock<'_> {
    /// Tries to update the remaining duration of the lock. Returns true if the refresh was
    /// successful or false if the lock was already taken by someone else
    pub fn refresh_lock(&self) -> bool {
        let updated = self.lock.clock.get_current_time();

        let result = self
            .lock
            .store_lock(Some(updated), self.current_lock.get())
            .then(|| self.lock.current_cas(updated))
            .flatten();

        match result {
            None => false,
            Some(cas) => {
                self.current_lock.replace(cas);
                true
            }
        }
    }
}

impl Drop for Lock<'_> {
    fn drop(&mut self) {
        self.lock.store_lock(None, self.current_lock.get());
    }
}