Skip to main content

pdk_lock_lib/
lib.rs

1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! PDK Lock Library
6//!
7//! Primitives for coordinating work among Flex Gateway workers and policies. Generated locks
8//! expire automatically if not refreshed and are released on drop.
9//!
10//! ## Primary types
11//!
12//! - [`LockBuilder`]/[`LockBuilderInstance`]: construct lock instances with
13//!   custom expiration and optional isolation ([`LockBuilderInstance::shared`] option)
14//! - [`TryLock`]: attempts to acquire a lock via [`TryLock::try_lock`]
15//! - [`Lock`]: an acquired lock that can be refreshed via [`Lock::refresh_lock`]
16//!
17
18mod builder;
19
20#[cfg(test)]
21mod test;
22
23use pdk_core::classy::{Clock, SharedData};
24use std::cell::Cell;
25use std::ops::Add;
26use std::rc::Rc;
27use std::time::{Duration, SystemTime};
28
29pub use builder::{LockBuilder, LockBuilderInstance};
30
31/// The built lock. Use this object's [`try_lock`](TryLock::try_lock) to ensure mutual exclusion.
32pub struct TryLock {
33    key: String,
34    lock_expiration: Duration,
35    clock: Rc<dyn Clock>,
36    shared_data: Rc<dyn SharedData>,
37}
38
39impl TryLock {
40    pub(crate) fn new(
41        key: String,
42        lock_expiration: Duration,
43        clock: Rc<dyn Clock>,
44        shared_data: Rc<dyn SharedData>,
45    ) -> Self {
46        Self {
47            key,
48            lock_expiration,
49            clock,
50            shared_data,
51        }
52    }
53
54    fn has_time_expired(&self, now: SystemTime, time: SystemTime) -> bool {
55        time.add(self.lock_expiration).lt(&now)
56    }
57
58    fn current_lock(&self) -> (Option<SystemTime>, u32) {
59        let (data, cas) = self.shared_data.shared_data_get(self.key.as_str());
60        let data = data.and_then(|data| bincode::deserialize(&data).ok());
61        // An empty cas means the key was not present in the shared data. Cas sequentially increases
62        // from 0, it is safe to assume that no u32::MAX store operations happened between this get
63        // and the following store.
64        let cas = cas.unwrap_or(u32::MAX);
65        (data, cas)
66    }
67
68    /// Return the cas for the lock after updating the value, it will return None if the value does
69    /// not match with the value that was saved.
70    /// This method is necessary since we have seen cases where, the cas after a store operation
71    /// increases by more than '1'. It still remains monotonically increasing.
72    fn current_cas(&self, saved: SystemTime) -> Option<u32> {
73        if let (Some(current), cas) = self.current_lock() {
74            if current.eq(&saved) {
75                return Some(cas);
76            }
77        }
78        None
79    }
80
81    fn store_lock(&self, lock: Option<SystemTime>, cas: u32) -> bool {
82        let data = match lock {
83            Some(time) => bincode::serialize(&time).unwrap_or_default(),
84            None => Vec::new(),
85        };
86
87        self.shared_data
88            .shared_data_set(self.key.as_str(), data.as_slice(), Some(cas))
89            .is_ok()
90    }
91
92    /// Acquire the lock. Returns the lock if the lock was acquired, None if the lock is currently held
93    /// by someone else or it is already owned by the current worker. The lock is released on drop.
94    pub fn try_lock(&self) -> Option<Lock> {
95        let now = self.clock.get_current_time();
96        let (current, cas) = self.current_lock();
97
98        let get_lock = current
99            .map(|time| self.has_time_expired(now, time))
100            .unwrap_or(true);
101
102        if get_lock {
103            pdk_core::logger::debug!(
104                "Lock is free or expired. Obtaining lock for key {}",
105                self.key
106            );
107            let option_lock = self
108                .store_lock(Some(now), cas)
109                .then(|| self.current_cas(now))
110                .flatten()
111                .map(|cas| Lock::new(self, cas));
112
113            if option_lock.is_some() {
114                pdk_core::logger::debug!("Obtained lock for key {}", self.key);
115            } else {
116                pdk_core::logger::debug!("Could not obtain lock for key {}", self.key);
117            }
118
119            option_lock
120        } else {
121            pdk_core::logger::debug!(
122                "Lock {} is currently held by some one else. Will expire in {:?}.",
123                self.key,
124                current
125                    .map(|time| time
126                        .add(self.lock_expiration)
127                        .duration_since(now)
128                        .unwrap_or_default())
129                    .unwrap_or_default()
130            );
131            None
132        }
133    }
134}
135
136/// This is a try lock that expires automatically if it was not refreshed before the expiration time.
137/// The lock is released on drop.
138pub struct Lock<'a> {
139    lock: &'a TryLock,
140    current_lock: Cell<u32>,
141}
142
143impl<'a> Lock<'a> {
144    fn new(lock: &'a TryLock, current_lock: u32) -> Self {
145        Self {
146            lock,
147            current_lock: Cell::new(current_lock),
148        }
149    }
150}
151
152impl Lock<'_> {
153    /// Tries to update the remaining duration of the lock. Returns true if the refresh was
154    /// successful or false if the lock was already taken by someone else
155    pub fn refresh_lock(&self) -> bool {
156        let updated = self.lock.clock.get_current_time();
157
158        let result = self
159            .lock
160            .store_lock(Some(updated), self.current_lock.get())
161            .then(|| self.lock.current_cas(updated))
162            .flatten();
163
164        match result {
165            None => false,
166            Some(cas) => {
167                self.current_lock.replace(cas);
168                true
169            }
170        }
171    }
172}
173
174impl Drop for Lock<'_> {
175    fn drop(&mut self) {
176        self.lock.store_lock(None, self.current_lock.get());
177    }
178}