dist_lock 0.0.1

Distributed lock
Documentation
use std::cell::RefCell;
use std::fmt::Display;

use chrono::Utc;
use diesel::sql_types::BigInt;
use diesel::sql_types::VarChar;
use diesel::RunQueryDsl;
use gethostname::gethostname;

use crate::core::LockConfig;
use crate::core::LockState;
use crate::core::Lockable;
use crate::error::LockResult;

use super::help::sql_stmt::extend_lock_sql;
use super::help::sql_stmt::insert_lock_sql;
use super::help::sql_stmt::release_lock_sql;
use super::help::sql_stmt::update_lock_sql;

const LOCK_TABLE: &str = "dist_lock";

#[derive(Debug)]
pub struct DieselDriver<T> {
	name: String,
	table: String,
	transport: RefCell<T>,
}

impl<T> DieselDriver<T> {
	pub fn new<P>(lock_name: &String, table_prefix: Option<P>, transport: T) -> Self
	where
		P: Display,
	{
		DieselDriver {
			name: lock_name.to_owned(),
			table: match table_prefix {
				Some(prefix) => format!("{}_{}", prefix, LOCK_TABLE),
				None => LOCK_TABLE.to_owned(),
			},
			transport: RefCell::new(transport),
		}
	}

	pub fn name(&self) -> &String {
		&self.name
	}

	pub fn table(&self) -> &String {
		&self.table
	}

	pub fn transport(&self) -> &RefCell<T> {
		&self.transport
	}
}

macro_rules! impl_lockable_diesel {
	(
		$client: ty,
		$self: ident,
		$conn: expr
	) => {
		impl Lockable for DieselDriver<$client> {
			fn acquire_lock(&$self, config: &LockConfig) -> LockResult<LockState> {
				let now = Utc::now();
				let until = now + config.max_lock;

				let mut locked = match diesel::sql_query(insert_lock_sql(&$self.table))
				.bind::<VarChar, _>(&config.name)
				.bind::<BigInt, _>(until.timestamp_millis())
				.bind::<BigInt, _>(now.timestamp_millis())
				.bind::<VarChar, _>(gethostname().to_string_lossy())
				.execute($conn)
				{
					Ok(count)  => count > 0,
					Err(_) => false
				};

				if !locked {
					locked = diesel::sql_query(update_lock_sql(&$self.table))
						.bind::<BigInt, _>(until.timestamp_millis())
						.bind::<BigInt, _>(now.timestamp_millis())
						.bind::<VarChar, _>(gethostname().to_string_lossy())
						.bind::<VarChar, _>(&$self.name)
						.bind::<BigInt, _>(until.timestamp_millis())
						.execute($conn)? > 0;
				}

				Ok(LockState::new(locked, Utc::now()))
			}

			fn release_lock(&$self, config: &LockConfig, state: &LockState) -> LockResult<LockState> {
				let lock_until = config.lock_at_least_until(state.locked_at);
				diesel::sql_query(release_lock_sql(&$self.table))
					.bind::<BigInt, _>(lock_until.timestamp_millis())
					.bind::<VarChar, _>(&$self.name)
					.execute($conn)?;
				Ok(LockState::unlock())
			}

			fn extend_lock(&$self, config: &LockConfig) -> LockResult<LockState> {
				let now = Utc::now();
				let until = now + config.max_lock;
				let count = diesel::sql_query(extend_lock_sql(&$self.table))
				.bind::<BigInt, _>(until.timestamp_millis())
				.bind::<VarChar, _>(&$self.name)
				.bind::<VarChar, _>(gethostname().to_string_lossy())
				.bind::<BigInt, _>(now.timestamp_millis())
				.execute($conn)?;
				Ok(LockState::new(count > 0, Utc::now()))
			}
		}
	};
}

#[cfg(feature = "diesel_sqlite")]
impl_lockable_diesel!(::diesel::SqliteConnection, self, &mut *self.transport.borrow_mut());
#[cfg(feature = "diesel_postgres")]
impl_lockable_diesel!(::diesel::PgConnection, self, &mut *self.transport.borrow_mut());
#[cfg(feature = "diesel_mysql")]
impl_lockable_diesel!(::diesel::MysqlConnection, self, &mut *self.transport.borrow_mut());
#[cfg(feature = "diesel_sqlite_r2d2")]
impl_lockable_diesel!(
	::r2d2::Pool<::diesel::r2d2::ConnectionManager<::diesel::SqliteConnection>>,
	self,
	&mut self.transport.borrow().get()?
);
#[cfg(feature = "diesel_postgres_r2d2")]
impl_lockable_diesel!(
	::r2d2::Pool<::diesel::r2d2::ConnectionManager<::diesel::PgConnection>>,
	self,
	&mut self.transport.borrow().get()?
);
#[cfg(feature = "diesel_mysql_r2d2")]
impl_lockable_diesel!(
	::r2d2::Pool<::diesel::r2d2::ConnectionManager<::diesel::MysqlConnection>>,
	self,
	&mut self.transport.borrow().get()?
);