native_db_32bit/watch/
mod.rs

1mod batch;
2mod event;
3mod filter;
4pub mod query;
5mod request;
6mod sender;
7
8pub(crate) use batch::*;
9pub use event::*;
10pub(crate) use filter::*;
11pub(crate) use request::*;
12pub(crate) use sender::*;
13
14use std::sync::{Arc, RwLock, TryLockError};
15use thiserror::Error;
16
17#[derive(Error, Debug)]
18pub enum WatchEventError {
19    #[error("TryLockErrorPoisoned")]
20    // TryLockErrorPoisoned(Batch<'a>), // TODO: remove 'a lifetime from Batch Error
21    TryLockErrorPoisoned,
22    #[error("TryLockErrorWouldBlock")]
23    // TryLockErrorWouldBlock(Batch<'a>), // TODO: remove 'a lifetime from Batch Error
24    TryLockErrorWouldBlock,
25    #[cfg(not(feature = "tokio"))]
26    #[error("SendError")]
27    SendError(#[from] std::sync::mpsc::SendError<Event>),
28    #[cfg(feature = "tokio")]
29    #[error("SendError")]
30    SendError(#[from] tokio::sync::mpsc::error::SendError<Event>),
31}
32
33#[cfg(not(feature = "tokio"))]
34pub type MpscSender<T> = std::sync::mpsc::Sender<T>;
35#[cfg(not(feature = "tokio"))]
36pub type MpscReceiver<T> = std::sync::mpsc::Receiver<T>;
37
38#[cfg(feature = "tokio")]
39pub type MpscSender<T> = tokio::sync::mpsc::UnboundedSender<T>;
40#[cfg(feature = "tokio")]
41pub type MpscReceiver<T> = tokio::sync::mpsc::UnboundedReceiver<T>;
42
43pub(crate) fn push_batch(
44    senders: Arc<RwLock<Watchers>>,
45    batch: Batch,
46) -> Result<(), WatchEventError> {
47    let watchers = senders.try_read().map_err(|err| match err {
48        TryLockError::Poisoned(_) => WatchEventError::TryLockErrorPoisoned,
49        TryLockError::WouldBlock => WatchEventError::TryLockErrorWouldBlock,
50    })?;
51
52    for (watcher_request, event) in batch {
53        for sender in watchers.find_senders(&watcher_request) {
54            let sender = sender.lock().unwrap();
55            sender.send(event.clone())?;
56        }
57    }
58
59    Ok(())
60}