use std::default::Default;
use std::result::Result;
use std::time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH};
use uuid::Uuid;
use rusoto_core::reactor::{CredentialsProvider, RequestDispatcher};
use rusoto_core::{DispatchSignedRequest, ProvideAwsCredentials};
use rusoto_dynamodb::{AttributeValue, DynamoDb, DynamoDbClient, GetItemError, GetItemInput,
UpdateItemError, UpdateItemInput};
use {DistLock, DynaError, DynaErrorKind, Locking};
#[cfg(test)]
mod tests;
pub struct DynamoDbDriver<P = CredentialsProvider, D = RequestDispatcher>
where
P: ProvideAwsCredentials,
D: DispatchSignedRequest,
{
client: DynamoDbClient<P, D>,
table_name: String,
partition_key_field_name: String,
token_field_name: String,
duration_field_name: String,
ttl_field_name: String,
ttl_value: u64,
partition_key_value: String,
current_token: String,
}
impl<P, D> DynamoDbDriver<P, D>
where
P: ProvideAwsCredentials,
D: DispatchSignedRequest,
{
pub fn new(client: DynamoDbClient<P, D>, input: &DynamoDbDriverInput) -> Self {
DynamoDbDriver {
client: client,
table_name: input.table_name.clone(),
partition_key_field_name: input.partition_key_field_name.clone(),
partition_key_value: input.partition_key_value.clone(),
token_field_name: input.token_field_name.clone(),
duration_field_name: input.duration_field_name.clone(),
ttl_field_name: input.ttl_field_name.clone(),
ttl_value: input.ttl_value,
current_token: String::new(),
}
}
}
pub const DAY_SECONDS: u64 = 86400;
#[derive(Debug)]
pub struct DynamoDbDriverInput {
pub table_name: String,
pub partition_key_field_name: String,
pub partition_key_value: String,
pub token_field_name: String,
pub duration_field_name: String,
pub ttl_field_name: String,
pub ttl_value: u64,
}
impl Default for DynamoDbDriverInput {
fn default() -> Self {
DynamoDbDriverInput {
table_name: String::new(),
partition_key_field_name: String::new(),
partition_key_value: String::from("singleton"),
token_field_name: String::from("rvn"),
duration_field_name: String::from("duration"),
ttl_field_name: String::from("ttl"),
ttl_value: DAY_SECONDS * 7,
}
}
}
#[derive(Debug, Clone)]
pub struct DynamoDbLockInput {
pub timeout: Duration,
pub consistent_read: Option<bool>,
}
impl Default for DynamoDbLockInput {
fn default() -> Self {
DynamoDbLockInput {
timeout: Duration::from_secs(10),
consistent_read: Some(false),
}
}
}
mod expressions {
pub const ACQUIRE_UPDATE: &'static str =
"SET #token_field = :new_token, #duration_field = :lease, #ttl_field = :ttl";
pub const ACQUIRE_CONDITION: &'static str =
"attribute_not_exists(#token_field) OR #token_field = :cond_current_token";
pub const RELEASE_UPDATE: &'static str = "REMOVE #token_field";
pub const RELEASE_CONDITION: &'static str =
"attribute_exists(#token_field) AND #token_field = :cond_current_token";
}
impl<P, D> Locking for DistLock<DynamoDbDriver<P, D>>
where
P: ProvideAwsCredentials + 'static,
D: DispatchSignedRequest + 'static,
{
type AcquireLockInputType = DynamoDbLockInput;
type RefreshLockInputType = DynamoDbLockInput;
type ReleaseLockInputType = DynamoDbLockInput;
fn acquire_lock(&mut self, input: &Self::AcquireLockInputType) -> Result<Instant, DynaError> {
let new_token = Uuid::new_v4().hyphenated().to_string();
if self.driver.current_token.is_empty() {
self.driver.current_token = new_token.clone();
}
let ttl_secs =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() + self.driver.ttl_value;
let update_input = UpdateItemInput {
table_name: self.driver.table_name.clone(),
update_expression: Some(String::from(expressions::ACQUIRE_UPDATE)),
condition_expression: Some(String::from(expressions::ACQUIRE_CONDITION)),
expression_attribute_names: Some(hashmap! {
String::from("#token_field") => self.driver.token_field_name.clone(),
String::from("#duration_field") => self.driver.duration_field_name.clone(),
String::from("#ttl_field") => self.driver.ttl_field_name.clone(),
}),
expression_attribute_values: Some(hashmap! {
String::from(":new_token") => AttributeValue { s: Some(new_token.clone()), ..Default::default() },
String::from(":lease") => AttributeValue { n: Some(self.duration.as_secs().to_string()), ..Default::default() },
String::from(":ttl") => AttributeValue { n: Some(ttl_secs.to_string()), ..Default::default() },
String::from(":cond_current_token") => AttributeValue { s: Some(self.driver.current_token.clone()), ..Default::default() }
}),
key: hashmap! {
self.driver.partition_key_field_name.clone() => AttributeValue {
s: Some(self.driver.partition_key_value.clone()),
..Default::default()
},
},
..Default::default()
};
self.driver
.client
.update_item(&update_input)
.with_timeout(input.timeout)
.sync()?;
let start = Instant::now();
info!(
"lock '{}' acquired successfully, current token ({}) new token ({}) lease ({}s)",
self.driver.partition_key_value,
self.driver.current_token,
new_token,
self.duration.as_secs()
);
self.driver.current_token = new_token.clone();
Ok(start)
}
fn refresh_lock(&mut self, input: &Self::RefreshLockInputType) -> Result<(), DynaError> {
let get_input = GetItemInput {
consistent_read: input.consistent_read,
table_name: self.driver.table_name.clone(),
key: hashmap! {
self.driver.partition_key_field_name.clone() => AttributeValue {
s: Some(self.driver.partition_key_value.clone()),
..Default::default()
},
},
..Default::default()
};
let output = self.driver
.client
.get_item(&get_input)
.with_timeout(input.timeout)
.sync()?;
if output.item.is_some() {
let attr = output
.item
.as_ref()
.unwrap()
.get(&self.driver.token_field_name);
if attr.is_some() {
self.driver.current_token = attr.unwrap().s.as_ref().unwrap().clone();
info!(
"lock '{}' refreshed successful, found new token ({})",
self.driver.partition_key_value, self.driver.current_token
);
}
}
Ok(())
}
fn release_lock(&mut self, input: &Self::ReleaseLockInputType) -> Result<(), DynaError> {
let update_input = UpdateItemInput {
table_name: self.driver.table_name.clone(),
update_expression: Some(String::from(expressions::RELEASE_UPDATE)),
condition_expression: Some(String::from(expressions::RELEASE_CONDITION)),
expression_attribute_names: Some(hashmap! {
String::from("#token_field") => self.driver.token_field_name.clone(),
}),
expression_attribute_values: Some(hashmap! {
String::from(":cond_current_token") => AttributeValue { s: Some(self.driver.current_token.clone()), ..Default::default() }
}),
key: hashmap! {
self.driver.partition_key_field_name.clone() => AttributeValue {
s: Some(self.driver.partition_key_value.clone()),
..Default::default()
},
},
..Default::default()
};
self.driver
.client
.update_item(&update_input)
.with_timeout(input.timeout)
.sync()?;
info!(
"lock '{}' successfully released for token ({})",
self.driver.partition_key_value, self.driver.current_token
);
self.driver.current_token.clear();
Ok(())
}
fn remaining(&self, instant: Instant) -> Option<Duration> {
self.duration.checked_sub(instant.elapsed())
}
}
impl From<SystemTimeError> for DynaError {
fn from(err: SystemTimeError) -> DynaError {
error!("{}", err);
DynaError::new(DynaErrorKind::UnhandledError, Some(&err.to_string()))
}
}
impl From<GetItemError> for DynaError {
fn from(err: GetItemError) -> DynaError {
error!("{}", err);
DynaError::new(DynaErrorKind::ProviderError, Some(&err.to_string()))
}
}
impl From<UpdateItemError> for DynaError {
fn from(err: UpdateItemError) -> DynaError {
match err {
UpdateItemError::ConditionalCheckFailed(_) => {
warn!("{}", err);
DynaError::new(DynaErrorKind::LockAlreadyAcquired, None)
}
_ => {
error!("{}", err);
DynaError::new(DynaErrorKind::ProviderError, Some(&err.to_string()))
}
}
}
}