use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, watch};
use crate::client::decrement_active_key;
use crate::state::ResumeKey;
use crate::watch::{GapReason, ResumeStart};
use crate::{ClientError, Notification};
pub(crate) struct ActiveKeyGuard {
pub(crate) active: Arc<std::sync::Mutex<std::collections::HashMap<ResumeKey, usize>>>,
pub(crate) key: ResumeKey,
}
impl Drop for ActiveKeyGuard {
fn drop(&mut self) {
decrement_active_key(&self.active, &self.key);
}
}
pub(crate) async fn send_or_cancel(
tx: &mpsc::Sender<Result<Notification, ClientError>>,
msg: Result<Notification, ClientError>,
cancel: &mut oneshot::Receiver<()>,
parent_cancel: &mut watch::Receiver<bool>,
) -> Result<(), ()> {
tokio::select! {
biased;
_ = parent_cancel.changed() => Err(()),
_ = &mut *cancel => Err(()),
result = tx.send(msg) => result.map_err(|_| ()),
}
}
pub(crate) struct GapGuard {
expected: Option<u64>,
relaxed: bool,
}
impl GapGuard {
pub(super) fn starting_from(from: Option<&ResumeStart>) -> Self {
Self::new(from, false)
}
pub(super) fn relaxed_starting_from(from: Option<&ResumeStart>) -> Self {
Self::new(from, true)
}
fn new(from: Option<&ResumeStart>, relaxed: bool) -> Self {
let expected = match from {
Some(ResumeStart::AfterSequence(n)) => n.checked_add(1),
_ => None,
};
Self { expected, relaxed }
}
pub(super) fn observe(&mut self, observed: u64) -> Result<(), GapReason> {
match self.expected {
None => {
self.expected = observed.checked_add(1);
Ok(())
}
Some(expected) if observed == expected => {
self.expected = observed.checked_add(1);
Ok(())
}
Some(expected) if observed > expected => {
if self.relaxed {
tracing::debug!(
event.name = "client.watch.filtered_gap",
expected,
observed,
"sequence jump observed; treating as filtered-out events (filtered listener)"
);
self.expected = observed.checked_add(1);
Ok(())
} else {
Err(GapReason::SequenceJump { expected, observed })
}
}
Some(_) => Ok(()),
}
}
}