use std::collections::HashMap;
use std::fmt::Debug;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use maplit::hashmap;
pub use rusoto_core::Region;
use rusoto_core::RusotoError;
#[cfg(feature = "sts")]
use rusoto_credential::{AutoRefreshingProvider, CredentialsError};
use rusoto_dynamodb::*;
#[cfg(feature = "sts")]
use rusoto_sts::WebIdentityProvider;
use uuid::Uuid;
#[derive(Clone, Debug)]
pub struct LockItem {
pub owner_name: String,
pub record_version_number: String,
pub lease_duration: Option<u64>,
pub is_released: bool,
pub data: Option<String>,
pub lookup_time: u128,
pub acquired_expired_lock: bool,
pub is_non_acquirable: bool,
}
#[async_trait::async_trait]
pub trait LockClient: Send + Sync + Debug {
async fn try_acquire_lock(&self, data: &str) -> Result<Option<LockItem>, DynamoError>;
async fn get_lock(&self) -> Result<Option<LockItem>, DynamoError>;
async fn update_data(&self, lock: &LockItem) -> Result<LockItem, DynamoError>;
async fn release_lock(&self, lock: &LockItem) -> Result<bool, DynamoError>;
}
pub mod dynamo_lock_options {
pub const DYNAMO_LOCK_PARTITION_KEY_VALUE: &str = "DYNAMO_LOCK_PARTITION_KEY_VALUE";
pub const DYNAMO_LOCK_TABLE_NAME: &str = "DYNAMO_LOCK_TABLE_NAME";
pub const DYNAMO_LOCK_OWNER_NAME: &str = "DYNAMO_LOCK_OWNER_NAME";
pub const DYNAMO_LOCK_LEASE_DURATION: &str = "DYNAMO_LOCK_LEASE_DURATION";
pub const DYNAMO_LOCK_REFRESH_PERIOD_MILLIS: &str = "DYNAMO_LOCK_REFRESH_PERIOD_MILLIS";
pub const DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS: &str =
"DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS";
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DynamoDbOptions {
pub partition_key_value: String,
pub table_name: String,
pub owner_name: String,
pub lease_duration: u64,
pub refresh_period: Duration,
pub additional_time_to_wait_for_lock: Duration,
}
impl Default for DynamoDbOptions {
fn default() -> Self {
Self::from_map(HashMap::new())
}
}
impl DynamoDbOptions {
pub fn from_map(options: HashMap<String, String>) -> Self {
let refresh_period = Duration::from_millis(Self::u64_opt(
&options,
dynamo_lock_options::DYNAMO_LOCK_REFRESH_PERIOD_MILLIS,
10_000,
));
let additional_time_to_wait_for_lock = Duration::from_millis(Self::u64_opt(
&options,
dynamo_lock_options::DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS,
10_000,
));
Self {
partition_key_value: Self::str_opt(
&options,
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE,
"delta-rs".to_string(),
),
table_name: Self::str_opt(
&options,
dynamo_lock_options::DYNAMO_LOCK_TABLE_NAME,
"delta_rs_lock_table".to_string(),
),
owner_name: Self::str_opt(
&options,
dynamo_lock_options::DYNAMO_LOCK_OWNER_NAME,
Uuid::new_v4().to_string(),
),
lease_duration: Self::u64_opt(
&options,
dynamo_lock_options::DYNAMO_LOCK_LEASE_DURATION,
20,
),
refresh_period,
additional_time_to_wait_for_lock,
}
}
fn str_opt(map: &HashMap<String, String>, key: &str, default: String) -> String {
map.get(key)
.map(|v| v.to_owned())
.unwrap_or_else(|| std::env::var(key).unwrap_or(default))
}
fn u64_opt(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
map.get(key)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or_else(|| {
std::env::var(key)
.ok()
.and_then(|e| e.parse::<u64>().ok())
.unwrap_or(default)
})
}
}
impl LockItem {
fn is_expired(&self) -> bool {
if self.is_released {
return true;
}
match self.lease_duration {
None => false,
Some(lease_duration) => {
let lease_duration = lease_duration as u128;
if lease_duration < (365 * 86400) {
now_millis() - self.lookup_time > (lease_duration * 1000)
} else {
lease_duration < self.lookup_time
}
}
}
}
}
#[derive(thiserror::Error, Debug, PartialEq)]
pub enum DynamoError {
#[cfg(feature = "sts")]
#[error("Failed to authenticate: {0}")]
AuthenticationError(CredentialsError),
#[error("Dynamo table not found")]
TableNotFound,
#[error("Conditional check failed")]
ConditionalCheckFailed,
#[error("DynamoDB item has invalid schema")]
InvalidItemSchema,
#[error("Could not acquire lock for {0} sec")]
TimedOut(u64),
#[error("The existing lock in dynamodb is non-acquirable")]
NonAcquirableLock,
#[error("Maximum allowed provisioned throughput for the table exceeded")]
ProvisionedThroughputExceeded,
#[error("Put item error: {0}")]
PutItemError(RusotoError<PutItemError>),
#[error("Delete item error: {0}")]
DeleteItemError(#[from] RusotoError<DeleteItemError>),
#[error("Get item error: {0}")]
GetItemError(RusotoError<GetItemError>),
}
impl From<RusotoError<PutItemError>> for DynamoError {
fn from(error: RusotoError<PutItemError>) -> Self {
match error {
RusotoError::Service(PutItemError::ConditionalCheckFailed(_)) => {
DynamoError::ConditionalCheckFailed
}
RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_)) => {
DynamoError::ProvisionedThroughputExceeded
}
_ => DynamoError::PutItemError(error),
}
}
}
#[cfg(feature = "sts")]
impl From<CredentialsError> for DynamoError {
fn from(error: CredentialsError) -> Self {
DynamoError::AuthenticationError(error)
}
}
impl From<RusotoError<GetItemError>> for DynamoError {
fn from(error: RusotoError<GetItemError>) -> Self {
match error {
RusotoError::Service(GetItemError::ResourceNotFound(_)) => DynamoError::TableNotFound,
RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_)) => {
DynamoError::ProvisionedThroughputExceeded
}
_ => DynamoError::GetItemError(error),
}
}
}
pub const PARTITION_KEY_NAME: &str = "key";
pub const OWNER_NAME: &str = "ownerName";
pub const RECORD_VERSION_NUMBER: &str = "recordVersionNumber";
pub const IS_RELEASED: &str = "isReleased";
pub const LEASE_DURATION: &str = "leaseDuration";
pub const IS_NON_ACQUIRABLE: &str = "isNonAcquirable";
pub const DATA: &str = "data";
pub const DATA_SOURCE: &str = "src";
pub const DATA_DESTINATION: &str = "dst";
mod expressions {
pub const ACQUIRE_LOCK_THAT_DOESNT_EXIST: &str = "attribute_not_exists(#pk)";
pub const PK_EXISTS_AND_IS_RELEASED: &str = "attribute_exists(#pk) AND #ir = :ir";
pub const PK_EXISTS_AND_RVN_MATCHES: &str = "attribute_exists(#pk) AND #rvn = :rvn";
pub const PK_EXISTS_AND_OWNER_RVN_MATCHES: &str =
"attribute_exists(#pk) AND #rvn = :rvn AND #on = :on";
}
mod vars {
pub const PK_PATH: &str = "#pk";
pub const RVN_PATH: &str = "#rvn";
pub const RVN_VALUE: &str = ":rvn";
pub const IS_RELEASED_PATH: &str = "#ir";
pub const IS_RELEASED_VALUE: &str = ":ir";
pub const OWNER_NAME_PATH: &str = "#on";
pub const OWNER_NAME_VALUE: &str = ":on";
}
pub struct DynamoDbLockClient {
client: DynamoDbClient,
opts: DynamoDbOptions,
}
impl std::fmt::Debug for DynamoDbLockClient {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "DynamoDbLockClient")
}
}
impl Default for DynamoDbLockClient {
fn default() -> Self {
Self::for_region(Region::UsEast1)
}
}
#[async_trait::async_trait]
impl LockClient for DynamoDbLockClient {
async fn try_acquire_lock(&self, data: &str) -> Result<Option<LockItem>, DynamoError> {
Ok(self.try_acquire_lock(Some(data)).await?)
}
async fn get_lock(&self) -> Result<Option<LockItem>, DynamoError> {
Ok(self.get_lock().await?)
}
async fn update_data(&self, lock: &LockItem) -> Result<LockItem, DynamoError> {
Ok(self.update_data(lock).await?)
}
async fn release_lock(&self, lock: &LockItem) -> Result<bool, DynamoError> {
Ok(self.release_lock(lock).await?)
}
}
impl DynamoDbLockClient {
pub fn for_region(region: Region) -> Self {
Self::new(DynamoDbClient::new(region), DynamoDbOptions::default())
}
pub fn with_client(mut self, client: DynamoDbClient) -> Self {
self.client = client;
self
}
pub fn with_options(mut self, options: DynamoDbOptions) -> Self {
self.opts = options;
self
}
fn new(client: DynamoDbClient, opts: DynamoDbOptions) -> Self {
Self { client, opts }
}
pub async fn try_acquire_lock(
&self,
data: Option<&str>,
) -> Result<Option<LockItem>, DynamoError> {
match self.acquire_lock(data).await {
Ok(lock) => Ok(Some(lock)),
Err(DynamoError::TimedOut(_)) => Ok(None),
Err(DynamoError::ProvisionedThroughputExceeded) => Ok(None),
Err(e) => Err(e),
}
}
pub async fn acquire_lock(&self, data: Option<&str>) -> Result<LockItem, DynamoError> {
let mut state = AcquireLockState {
client: self,
cached_lock: None,
started: Instant::now(),
timeout_in: self.opts.additional_time_to_wait_for_lock,
};
loop {
match state.try_acquire_lock(data).await {
Ok(lock) => return Ok(lock),
Err(DynamoError::ConditionalCheckFailed) => {
if state.has_timed_out() {
return Err(DynamoError::TimedOut(state.started.elapsed().as_secs()));
}
tokio::time::sleep(self.opts.refresh_period).await;
}
Err(e) => return Err(e),
}
}
}
pub async fn get_lock(&self) -> Result<Option<LockItem>, DynamoError> {
let output = self
.client
.get_item(GetItemInput {
consistent_read: Some(true),
table_name: self.opts.table_name.clone(),
key: hashmap! {
PARTITION_KEY_NAME.to_string() => attr(self.opts.partition_key_value.clone())
},
..Default::default()
})
.await?;
if let Some(item) = output.item {
let lease_duration = {
match item.get(LEASE_DURATION).and_then(|v| v.s.clone()) {
None => None,
Some(v) => Some(
v.parse::<u64>()
.map_err(|_| DynamoError::InvalidItemSchema)?,
),
}
};
let data = item.get(DATA).and_then(|r| r.s.clone());
return Ok(Some(LockItem {
owner_name: get_string(item.get(OWNER_NAME))?,
record_version_number: get_string(item.get(RECORD_VERSION_NUMBER))?,
lease_duration,
is_released: item.contains_key(IS_RELEASED),
data,
lookup_time: now_millis(),
acquired_expired_lock: false,
is_non_acquirable: item.contains_key(IS_NON_ACQUIRABLE),
}));
}
Ok(None)
}
pub async fn update_data(&self, lock: &LockItem) -> Result<LockItem, DynamoError> {
self.upsert_item(
lock.data.as_deref(),
false,
Some(expressions::PK_EXISTS_AND_OWNER_RVN_MATCHES.to_string()),
Some(hashmap! {
vars::PK_PATH.to_string() => PARTITION_KEY_NAME.to_string(),
vars::RVN_PATH.to_string() => RECORD_VERSION_NUMBER.to_string(),
vars::OWNER_NAME_PATH.to_string() => OWNER_NAME.to_string(),
}),
Some(hashmap! {
vars::RVN_VALUE.to_string() => attr(&lock.record_version_number),
vars::OWNER_NAME_VALUE.to_string() => attr(&lock.owner_name),
}),
)
.await
}
pub async fn release_lock(&self, lock: &LockItem) -> Result<bool, DynamoError> {
if lock.owner_name != self.opts.owner_name {
return Ok(false);
}
self.delete_lock(lock).await
}
pub async fn delete_lock(&self, lock: &LockItem) -> Result<bool, DynamoError> {
self.delete_item(&lock.record_version_number, &lock.owner_name)
.await
}
async fn upsert_item(
&self,
data: Option<&str>,
acquired_expired_lock: bool,
condition_expression: Option<String>,
expression_attribute_names: Option<HashMap<String, String>>,
expression_attribute_values: Option<HashMap<String, AttributeValue>>,
) -> Result<LockItem, DynamoError> {
let rvn = Uuid::new_v4().to_string();
let mut item = hashmap! {
PARTITION_KEY_NAME.to_string() => attr(self.opts.partition_key_value.clone()),
OWNER_NAME.to_string() => attr(&self.opts.owner_name),
RECORD_VERSION_NUMBER.to_string() => attr(&rvn),
LEASE_DURATION.to_string() => num_attr(lease_duration_after(self.opts.lease_duration)),
};
if let Some(d) = data {
item.insert(DATA.to_string(), attr(d));
}
self.client
.put_item(PutItemInput {
table_name: self.opts.table_name.clone(),
item,
condition_expression,
expression_attribute_names,
expression_attribute_values,
..Default::default()
})
.await?;
Ok(LockItem {
owner_name: self.opts.owner_name.clone(),
record_version_number: rvn,
lease_duration: Some(self.opts.lease_duration),
is_released: false,
data: data.map(String::from),
lookup_time: now_millis(),
acquired_expired_lock,
is_non_acquirable: false,
})
}
async fn delete_item(&self, rvn: &str, owner: &str) -> Result<bool, DynamoError> {
let result = self.client.delete_item(DeleteItemInput {
table_name: self.opts.table_name.clone(),
key: hashmap! {
PARTITION_KEY_NAME.to_string() => attr(self.opts.partition_key_value.clone())
},
condition_expression: Some(expressions::PK_EXISTS_AND_OWNER_RVN_MATCHES.to_string()),
expression_attribute_names: Some(hashmap! {
vars::PK_PATH.to_string() => PARTITION_KEY_NAME.to_string(),
vars::RVN_PATH.to_string() => RECORD_VERSION_NUMBER.to_string(),
vars::OWNER_NAME_PATH.to_string() => OWNER_NAME.to_string(),
}),
expression_attribute_values: Some(hashmap! {
vars::RVN_VALUE.to_string() => attr(rvn),
vars::OWNER_NAME_VALUE.to_string() => attr(owner),
}),
..Default::default()
});
match result.await {
Ok(_) => Ok(true),
Err(RusotoError::Service(DeleteItemError::ConditionalCheckFailed(_))) => Ok(false),
Err(e) => Err(DynamoError::DeleteItemError(e)),
}
}
}
fn lease_duration_after(after: u64) -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
+ after
}
fn now_millis() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
}
fn attr<T: ToString>(s: T) -> AttributeValue {
AttributeValue {
s: Some(s.to_string()),
..Default::default()
}
}
fn num_attr<T: ToString>(s: T) -> AttributeValue {
AttributeValue {
n: Some(s.to_string()),
..Default::default()
}
}
fn get_string(attr: Option<&AttributeValue>) -> Result<String, DynamoError> {
Ok(attr
.and_then(|r| r.s.as_ref())
.ok_or(DynamoError::InvalidItemSchema)?
.clone())
}
struct AcquireLockState<'a> {
client: &'a DynamoDbLockClient,
cached_lock: Option<LockItem>,
started: Instant,
timeout_in: Duration,
}
impl<'a> AcquireLockState<'a> {
fn has_timed_out(&self) -> bool {
self.started.elapsed() > self.timeout_in && {
let non_expirable = if let Some(ref cached_lock) = self.cached_lock {
cached_lock.lease_duration.is_none()
} else {
false
};
!non_expirable
}
}
async fn try_acquire_lock(&mut self, data: Option<&str>) -> Result<LockItem, DynamoError> {
match self.client.get_lock().await? {
None => {
Ok(self.upsert_new_lock(data).await?)
}
Some(existing) if existing.is_non_acquirable => Err(DynamoError::NonAcquirableLock),
Some(existing) if existing.is_released => {
Ok(self.upsert_released_lock(data).await?)
}
Some(existing) => {
let cached = match self.cached_lock.as_ref() {
None => {
let lease_duration = existing
.lease_duration
.unwrap_or(self.client.opts.lease_duration);
self.timeout_in =
Duration::from_secs(self.timeout_in.as_secs() + lease_duration);
self.cached_lock = Some(existing);
return Err(DynamoError::ConditionalCheckFailed);
}
Some(cached) => cached,
};
let cached_rvn = &cached.record_version_number;
if cached_rvn == &existing.record_version_number {
if cached.is_expired() {
self.upsert_expired_lock(cached_rvn, existing.data.as_deref())
.await
} else {
Err(DynamoError::ConditionalCheckFailed)
}
} else {
self.cached_lock = Some(existing);
Err(DynamoError::ConditionalCheckFailed)
}
}
}
}
async fn upsert_new_lock(&self, data: Option<&str>) -> Result<LockItem, DynamoError> {
self.client
.upsert_item(
data,
false,
Some(expressions::ACQUIRE_LOCK_THAT_DOESNT_EXIST.to_string()),
Some(hashmap! {
vars::PK_PATH.to_string() => PARTITION_KEY_NAME.to_string(),
}),
None,
)
.await
}
async fn upsert_released_lock(&self, data: Option<&str>) -> Result<LockItem, DynamoError> {
self.client
.upsert_item(
data,
false,
Some(expressions::PK_EXISTS_AND_IS_RELEASED.to_string()),
Some(hashmap! {
vars::PK_PATH.to_string() => PARTITION_KEY_NAME.to_string(),
vars::IS_RELEASED_PATH.to_string() => IS_RELEASED.to_string(),
}),
Some(hashmap! {
vars::IS_RELEASED_VALUE.to_string() => attr("1")
}),
)
.await
}
async fn upsert_expired_lock(
&self,
existing_rvn: &str,
data: Option<&str>,
) -> Result<LockItem, DynamoError> {
self.client
.upsert_item(
data,
true,
Some(expressions::PK_EXISTS_AND_RVN_MATCHES.to_string()),
Some(hashmap! {
vars::PK_PATH.to_string() => PARTITION_KEY_NAME.to_string(),
vars::RVN_PATH.to_string() => RECORD_VERSION_NUMBER.to_string(),
}),
Some(hashmap! {
vars::RVN_VALUE.to_string() => attr(existing_rvn)
}),
)
.await
}
}
#[cfg(feature = "sts")]
fn get_web_identity_provider() -> Result<AutoRefreshingProvider<WebIdentityProvider>, DynamoError> {
let provider = WebIdentityProvider::from_k8s_env();
Ok(AutoRefreshingProvider::new(provider)?)
}
#[cfg(test)]
mod tests {
use super::*;
use maplit::hashmap;
#[test]
fn lock_options_default_test() {
std::env::set_var(dynamo_lock_options::DYNAMO_LOCK_TABLE_NAME, "some_table");
std::env::set_var(dynamo_lock_options::DYNAMO_LOCK_OWNER_NAME, "some_owner");
std::env::set_var(
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE,
"some_pk",
);
std::env::set_var(dynamo_lock_options::DYNAMO_LOCK_LEASE_DURATION, "40");
std::env::set_var(
dynamo_lock_options::DYNAMO_LOCK_REFRESH_PERIOD_MILLIS,
"2000",
);
std::env::set_var(
dynamo_lock_options::DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS,
"3000",
);
let options = DynamoDbOptions::default();
assert_eq!(
DynamoDbOptions {
partition_key_value: "some_pk".to_string(),
table_name: "some_table".to_string(),
owner_name: "some_owner".to_string(),
lease_duration: 40,
refresh_period: Duration::from_millis(2000),
additional_time_to_wait_for_lock: Duration::from_millis(3000),
},
options
);
}
#[test]
fn lock_options_from_map_test() {
let options = DynamoDbOptions::from_map(hashmap! {
dynamo_lock_options::DYNAMO_LOCK_TABLE_NAME.to_string() => "a_table".to_string(),
dynamo_lock_options::DYNAMO_LOCK_OWNER_NAME.to_string() => "an_owner".to_string(),
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => "a_pk".to_string(),
dynamo_lock_options::DYNAMO_LOCK_LEASE_DURATION.to_string() => "60".to_string(),
dynamo_lock_options::DYNAMO_LOCK_REFRESH_PERIOD_MILLIS.to_string() => "4000".to_string(),
dynamo_lock_options::DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS.to_string() => "5000".to_string(),
});
assert_eq!(
DynamoDbOptions {
partition_key_value: "a_pk".to_string(),
table_name: "a_table".to_string(),
owner_name: "an_owner".to_string(),
lease_duration: 60,
refresh_period: Duration::from_millis(4000),
additional_time_to_wait_for_lock: Duration::from_millis(5000),
},
options
);
}
#[test]
fn lock_options_mixed_test() {
std::env::set_var(dynamo_lock_options::DYNAMO_LOCK_TABLE_NAME, "some_table");
std::env::set_var(dynamo_lock_options::DYNAMO_LOCK_OWNER_NAME, "some_owner");
std::env::set_var(
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE,
"some_pk",
);
std::env::set_var(dynamo_lock_options::DYNAMO_LOCK_LEASE_DURATION, "40");
std::env::set_var(
dynamo_lock_options::DYNAMO_LOCK_REFRESH_PERIOD_MILLIS,
"2000",
);
std::env::set_var(
dynamo_lock_options::DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS,
"3000",
);
let options = DynamoDbOptions::from_map(hashmap! {
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => "overridden_key".to_string()
});
assert_eq!(
DynamoDbOptions {
partition_key_value: "overridden_key".to_string(),
table_name: "some_table".to_string(),
owner_name: "some_owner".to_string(),
lease_duration: 40,
refresh_period: Duration::from_millis(2000),
additional_time_to_wait_for_lock: Duration::from_millis(3000),
},
options
);
}
#[test]
fn test_lease_duration_after() {
use std::time::SystemTime;
let now = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
};
let duration: u64 = lease_duration_after(60);
assert!(duration > now);
assert!(duration >= (now + 60));
assert!(duration <= (now + 70));
}
#[test]
fn test_lease_duration_attr() {
let n = num_attr(1);
assert!(n.n.is_some());
if let Some(num) = n.n {
assert_eq!(1, num.parse::<u64>().unwrap());
} else {
println!("attr {n:?}");
assert!(false);
}
}
#[test]
fn test_lockitem_normal_lease() {
let item = LockItem {
owner_name: "test".into(),
record_version_number: "1".into(),
lease_duration: None,
is_released: false,
data: Some("test-data".into()),
lookup_time: now_millis(),
acquired_expired_lock: false,
is_non_acquirable: true,
};
assert_eq!(false, item.is_expired());
}
#[test]
fn test_lockitem_is_released() {
let item = LockItem {
owner_name: "test".into(),
record_version_number: "1".into(),
lease_duration: None,
is_released: true,
data: Some("test-data".into()),
lookup_time: now_millis(),
acquired_expired_lock: false,
is_non_acquirable: true,
};
assert_eq!(true, item.is_expired());
}
#[test]
fn test_lockitem_expired_oldstyle_lease_duration() {
let item = LockItem {
owner_name: "test".into(),
record_version_number: "1".into(),
lease_duration: Some(60),
is_released: false,
data: Some("test-data".into()),
lookup_time: (now_millis() - (2 * 60000)),
acquired_expired_lock: false,
is_non_acquirable: true,
};
assert_eq!(true, item.is_expired());
}
#[test]
fn test_lockitem_expired_newstyle_lease_duration() {
let item = LockItem {
owner_name: "test".into(),
record_version_number: "1".into(),
lease_duration: Some(lease_duration_after(60)),
is_released: false,
data: Some("test-data".into()),
lookup_time: (now_millis() - (2 * 60000)),
acquired_expired_lock: false,
is_non_acquirable: true,
};
assert_eq!(true, item.is_expired());
}
}