native_db_32bit/watch/
mod.rs1mod batch;
2mod event;
3mod filter;
4pub mod query;
5mod request;
6mod sender;
7
8pub(crate) use batch::*;
9pub use event::*;
10pub(crate) use filter::*;
11pub(crate) use request::*;
12pub(crate) use sender::*;
13
14use std::sync::{Arc, RwLock, TryLockError};
15use thiserror::Error;
16
17#[derive(Error, Debug)]
18pub enum WatchEventError {
19 #[error("TryLockErrorPoisoned")]
20 TryLockErrorPoisoned,
22 #[error("TryLockErrorWouldBlock")]
23 TryLockErrorWouldBlock,
25 #[cfg(not(feature = "tokio"))]
26 #[error("SendError")]
27 SendError(#[from] std::sync::mpsc::SendError<Event>),
28 #[cfg(feature = "tokio")]
29 #[error("SendError")]
30 SendError(#[from] tokio::sync::mpsc::error::SendError<Event>),
31}
32
33#[cfg(not(feature = "tokio"))]
34pub type MpscSender<T> = std::sync::mpsc::Sender<T>;
35#[cfg(not(feature = "tokio"))]
36pub type MpscReceiver<T> = std::sync::mpsc::Receiver<T>;
37
38#[cfg(feature = "tokio")]
39pub type MpscSender<T> = tokio::sync::mpsc::UnboundedSender<T>;
40#[cfg(feature = "tokio")]
41pub type MpscReceiver<T> = tokio::sync::mpsc::UnboundedReceiver<T>;
42
43pub(crate) fn push_batch(
44 senders: Arc<RwLock<Watchers>>,
45 batch: Batch,
46) -> Result<(), WatchEventError> {
47 let watchers = senders.try_read().map_err(|err| match err {
48 TryLockError::Poisoned(_) => WatchEventError::TryLockErrorPoisoned,
49 TryLockError::WouldBlock => WatchEventError::TryLockErrorWouldBlock,
50 })?;
51
52 for (watcher_request, event) in batch {
53 for sender in watchers.find_senders(&watcher_request) {
54 let sender = sender.lock().unwrap();
55 sender.send(event.clone())?;
56 }
57 }
58
59 Ok(())
60}