use parking_lot::RwLock;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use crate::ZmqError;
type PipeId = usize;
#[derive(Debug)]
pub(crate) struct PipeState {
semaphore: Arc<Semaphore>,
is_closing: Arc<AtomicBool>,
}
#[derive(Debug)]
pub struct WritePipeCoordinator {
pipe_states: RwLock<HashMap<PipeId, Arc<PipeState>>>,
}
impl WritePipeCoordinator {
pub fn new() -> Self {
Self {
pipe_states: RwLock::new(HashMap::new()),
}
}
pub async fn add_pipe(&self, pipe_id: PipeId) {
let mut states_guard = self.pipe_states.write();
states_guard.entry(pipe_id).or_insert_with(|| {
Arc::new(PipeState {
semaphore: Arc::new(Semaphore::new(1)),
is_closing: Arc::new(AtomicBool::new(false)),
})
});
tracing::trace!(
pipe_id,
"WritePipeCoordinator: Added/Ensured semaphore for pipe."
);
}
pub async fn remove_pipe(&self, pipe_id: PipeId) -> Option<Arc<PipeState>> {
let mut states_guard = self.pipe_states.write();
let removed_state = states_guard.remove(&pipe_id);
if let Some(ref state) = removed_state {
state.is_closing.store(true, Ordering::SeqCst);
state.semaphore.close();
tracing::trace!(
pipe_id,
"WritePipeCoordinator: Marked pipe as closing and removed."
);
}
removed_state
}
pub async fn acquire_send_permit(
&self,
pipe_id: PipeId,
timeout_opt: Option<Duration>,
) -> Result<OwnedSemaphorePermit, ZmqError> {
let pipe_state_arc = {
let states_guard = self.pipe_states.read();
states_guard.get(&pipe_id).cloned()
};
match pipe_state_arc {
Some(state) => {
if state.is_closing.load(Ordering::Relaxed) {
return Err(ZmqError::HostUnreachable(
"Target pipe is closing or has been detached".into(),
));
}
let acquire_future = state.semaphore.clone().acquire_owned();
let permit_result = if let Some(duration) = timeout_opt {
if duration.is_zero() {
return match state.semaphore.clone().try_acquire_owned() {
Ok(permit) => Ok(permit),
Err(_) => Err(ZmqError::ResourceLimitReached),
};
}
tokio::time::timeout(duration, acquire_future).await
} else {
Ok(acquire_future.await)
};
match permit_result {
Ok(Ok(permit)) => Ok(permit),
Ok(Err(_closed_err)) => {
tracing::debug!(
pipe_id,
"Woke from acquire on a closed semaphore; peer detached."
);
Err(ZmqError::HostUnreachable(
"Target pipe was detached during send operation".into(),
))
}
Err(_timeout_elapsed) => {
tracing::debug!(
pipe_id,
?timeout_opt,
"WritePipeCoordinator: Timeout acquiring send permit."
);
Err(ZmqError::Timeout)
}
}
}
None => {
tracing::warn!(
pipe_id,
"WritePipeCoordinator: Attempted to acquire permit for unknown/detached pipe."
);
Err(ZmqError::HostUnreachable(
"Target pipe for send not available or recently detached".into(),
))
}
}
}
}