use std::{sync::Arc, time::Duration};
use crate::leased_lock::{Error, ErrorKind, lease};
use crate::state_store;
use azure_iot_operations_protocol::common::hybrid_logical_clock::HybridLogicalClock;
#[derive(Clone)]
pub struct Client {
lease_client: lease::Client,
}
impl Client {
pub fn new(
state_store: Arc<state_store::Client>,
lock_name: Vec<u8>,
lock_holder_name: Vec<u8>,
) -> Result<Self, Error> {
if lock_name.is_empty() {
return Err(Error(ErrorKind::InvalidArgument(
"lock_name is empty".to_string(),
)));
}
if lock_holder_name.is_empty() {
return Err(Error(ErrorKind::InvalidArgument(
"lock_holder_name is empty".to_string(),
)));
}
let lease_client = lease::Client::new(state_store, lock_name, lock_holder_name)?;
Ok(Self { lease_client })
}
pub async fn lock(
&self,
lock_expiration: Duration,
request_timeout: Duration,
renewal_period: Option<Duration>,
) -> Result<HybridLogicalClock, Error> {
let mut observe_response = self.lease_client.observe(request_timeout).await?;
let mut acquire_result;
loop {
acquire_result = self
.lease_client
.acquire(lock_expiration, request_timeout, renewal_period)
.await;
match acquire_result {
Ok(_) => {
break;
}
Err(ref acquire_error) => match acquire_error.kind() {
ErrorKind::LeaseAlreadyHeld => { }
_ => {
break;
}
},
}
loop {
let Some((notification, _)) = observe_response.recv_notification().await else {
observe_response = self.lease_client.observe(request_timeout).await?;
break;
};
if notification.operation == state_store::Operation::Del {
break;
}
}
}
_ = self.lease_client.unobserve(request_timeout).await?;
acquire_result
}
pub async fn unlock(&self, request_timeout: Duration) -> Result<(), Error> {
self.lease_client.release(request_timeout).await
}
#[must_use]
pub fn current_lock_fencing_token(&self) -> Option<HybridLogicalClock> {
self.lease_client.current_lease_fencing_token()
}
}