mod error;
mod lease;
mod lock;
use std::sync::Arc;
pub use crate::{error::*, lease::*, lock::*};
use etcd_client::{
Client, Compare, CompareOp, Event, PutOptions, ResponseHeader, Txn, TxnOp,
WatchOptions, WatchStream,
};
use futures::prelude::*;
use log::*;
use rand::random;
pub enum TtlOrLeaseId {
Ttl(i64),
LeaseId(i64),
}
pub async fn try_race(
client: &mut Client,
key: &str,
ttl_or_lease_id: TtlOrLeaseId,
) -> CrateResult<bool> {
let value = random::<u128>().to_le_bytes();
let lease_id = match ttl_or_lease_id {
TtlOrLeaseId::Ttl(ttl) => client.lease_grant(ttl, None).await?.id(),
TtlOrLeaseId::LeaseId(id) => id,
};
let locked = client
.txn(
Txn::new()
.when(vec![Compare::create_revision(
key.as_bytes(),
CompareOp::Equal,
0,
)])
.and_then(vec![TxnOp::put(
key.as_bytes(),
&value[..],
Some(PutOptions::new().with_lease(lease_id)),
)]),
)
.await?
.succeeded();
Ok(locked)
}
fn get_lock_txn(name: &str, lock_value: &[u8], lease_id: i64) -> Txn {
Txn::new()
.when([Compare::create_revision(name, CompareOp::Equal, 0)])
.and_then([TxnOp::put(
name,
lock_value,
Some(PutOptions::new().with_lease(lease_id)),
)])
}
pub(crate) fn get_release_txn(
name: &str,
lock_value: &[u8],
lease_id: i64,
) -> Txn {
Txn::new()
.when([
Compare::value(name, CompareOp::Equal, lock_value),
Compare::lease(name, CompareOp::Equal, lease_id),
])
.and_then([TxnOp::delete(name, None)])
}
async fn watch_until_lock_available(
etcd_client: &mut Client,
watch_stream: &mut Option<WatchStream>,
name: &str,
last_locked_revision: Option<i64>,
) -> CrateResult<()> {
loop {
let mut etcd_client = etcd_client.clone();
let mut inner_watch_stream = match watch_stream.take() {
Some(watch_stream) => watch_stream,
None => {
let mut watch_options = WatchOptions::new();
if let Some(last_locked_revision) = last_locked_revision {
watch_options = watch_options
.with_start_revision(last_locked_revision + 1);
}
let (_, watch_stream) =
etcd_client.watch(name, Some(watch_options)).await?;
watch_stream
}
};
let watch_response = inner_watch_stream
.try_next()
.await?
.expect("Unexpected end of etcd watch stream");
*watch_stream = Some(inner_watch_stream);
let is_available = matches!(watch_response.events().last().and_then(Event::kv), Some(kv) if kv.create_revision() == 0);
trace!(
"Got update from watch stream for key: {} (avilable: {})",
name,
is_available
);
if is_available {
return Ok(());
}
}
}
impl Lease {
pub(crate) async fn stop_keep_alive(
self,
) -> Result<(), etcd_client::Error> {
if let Ok(LeaseInner {
stop_tx,
keep_alive_handle,
}) = Arc::try_unwrap(self.inner)
{
let _ = stop_tx.send(());
keep_alive_handle.await.unwrap()?;
}
Ok(())
}
pub async fn release(
self,
client: &mut Client,
) -> Result<(), etcd_client::Error> {
let lease_id = self.lease_id;
self.stop_keep_alive().await?;
client.lease_revoke(lease_id).await?;
debug!("Released lease: {:#x}", lease_id);
Ok(())
}
async fn _lock<'l, 'n>(
&'l mut self,
client: &mut Client,
name: &'n str,
wait: bool,
) -> CrateResult<LockGuard<'l, 'n>> {
let lock_value = random::<u128>().to_le_bytes();
let mut watch_stream = None;
let locked = loop {
let txn_response = client
.txn(get_lock_txn(name, &lock_value, self.lease_id))
.await?;
if txn_response.succeeded() {
break true;
} else if !wait {
break false;
}
watch_until_lock_available(
client,
&mut watch_stream,
name,
txn_response.header().map(ResponseHeader::revision),
)
.await?;
};
if !locked {
return Err(Error::Taken);
} else {
debug!("Acquired lock: {} (lease: {:#x})", name, self.lease_id);
}
Ok(LockGuard {
lock_value,
name,
lease: &*self,
})
}
pub async fn lock<'l, 'n>(
&'l mut self,
client: &mut Client,
name: &'n str,
) -> CrateResult<LockGuard<'l, 'n>> {
self._lock(client, name, true).await
}
pub async fn try_lock<'l, 'n>(
&'l mut self,
client: &mut Client,
name: &'n str,
) -> CrateResult<LockGuard<'l, 'n>> {
self._lock(client, name, false).await
}
pub async fn with_lock<F, Fut, T>(
&mut self,
client: &mut Client,
name: &str,
f: F,
) -> CrateResult<T>
where
F: FnOnce() -> Fut,
Fut: Future<Output = T>,
{
let guard = self.lock(client, name).await?;
let result = f().await;
guard.release(client).await?;
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::{random, Rng};
use std::{
ops::AddAssign,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
sync::{Barrier, Mutex},
time::sleep,
};
async fn create_client() -> CrateResult<Client> {
Ok(Client::connect(vec!["http://localhost:2379"], None).await?)
}
fn create_lock_name() -> String {
format!("{:x}", random::<u128>())
}
#[tokio::test]
async fn test_mutex() -> anyhow::Result<()> {
const WORKERS: usize = 10;
type Result<T> = anyhow::Result<T, anyhow::Error>;
let lock_name = create_lock_name();
let client = create_client().await?;
let locked_count = Arc::new(Mutex::new(0_usize));
stream::iter(0..WORKERS)
.map(Result::Ok)
.try_for_each_concurrent(None, |_| {
let mut client = client.clone();
let lock_name = lock_name.clone();
let locked_count = locked_count.clone();
let mut rng = rand::thread_rng();
async move {
let mut lease = acquire_lease(&mut client, 10).await?;
lease
.with_lock(&mut client, &lock_name, || async move {
{
let mut locked_count =
locked_count.try_lock()?;
sleep(Duration::from_millis(
rng.gen_range(0..2_000),
))
.await;
locked_count.add_assign(1);
}
Result::Ok(())
})
.await??;
Result::Ok(())
}
})
.await?;
assert_eq!(*locked_count.try_lock().unwrap(), WORKERS);
Ok(())
}
#[tokio::test]
async fn test_lease_expire() -> CrateResult<()> {
let lock_name = create_lock_name();
let mut client = create_client().await?;
let lease1 = acquire_lease(&mut client, 10).await?;
let mut lease2 = acquire_lease(&mut client, 10).await?;
let barrier1 = Arc::new(Barrier::new(2));
let barrier2 = Arc::new(Barrier::new(2));
future::try_join(
{
let mut client = client.clone();
let lock_name = lock_name.clone();
let barrier1 = barrier1.clone();
let barrier2 = barrier2.clone();
async move {
client
.txn(get_lock_txn(&lock_name, b"", lease1.lease_id))
.await?;
barrier1.wait().await;
barrier2.wait().await;
lease1.stop_keep_alive().await?;
Ok(())
}
},
{
let mut client = client.clone();
async move {
barrier1.wait().await;
assert!(matches!(
lease2
.try_lock(&mut client, &lock_name)
.await
.unwrap_err(),
Error::Taken
));
barrier2.wait().await;
lease2
.with_lock(&mut client, &lock_name, || async {
info!("Inside lock.");
})
.await
}
},
)
.await?;
Ok(())
}
#[tokio::test]
async fn test_lock_lost() -> CrateResult<()> {
let lock_name = create_lock_name();
let mut client = create_client().await?;
let mut lease = acquire_lease(&mut client, 10).await?;
let lease_id = lease.lease_id;
let res = lease
.with_lock(&mut client.clone(), &lock_name, || async {
client.lease_revoke(lease_id).await.unwrap();
})
.await;
assert!(matches!(res, Err(Error::Lost)));
Ok(())
}
#[tokio::test]
async fn test_try_race() -> CrateResult<()> {
const TTL: u64 = 5;
let lock_name = create_lock_name();
let mut client = create_client().await?;
let locked_count = Arc::new(AtomicUsize::new(0));
let wait_for_ttl = sleep(Duration::from_secs(TTL + 1));
stream::iter(0..20)
.map(|_| {
let mut client = client.clone();
let lock_name = lock_name.clone();
let locked_count = locked_count.clone();
let mut rng = rand::thread_rng();
async move {
sleep(Duration::from_secs(rng.gen_range(0..TTL - 1))).await;
if try_race(
&mut client,
&lock_name,
TtlOrLeaseId::Ttl(TTL as i64),
)
.await?
{
locked_count.fetch_add(1, Ordering::SeqCst);
}
CrateResult::Ok(())
}
})
.buffer_unordered(20)
.try_collect::<Vec<_>>()
.await?;
assert_eq!(locked_count.load(Ordering::SeqCst), 1);
wait_for_ttl.await;
assert!(
try_race(&mut client, &lock_name, TtlOrLeaseId::Ttl(10)).await?
);
Ok(())
}
}