1use std::{
8 collections::{HashMap, HashSet},
9 sync::{
10 atomic::{AtomicBool, AtomicUsize, Ordering},
11 mpsc::{self, SyncSender, TrySendError},
12 Arc, RwLock,
13 },
14 thread,
15};
16
17use crate::{
18 event::{entry::EventEntry, filter::Filter, intermediary::IntermediaryEvent, Event, Id, Msg},
19 subscription::{Subscription, SubscriptionError, SubscriptionSender},
20 this_origin,
21};
22
23pub trait CaptureControl {
27 fn start(id: &Self) -> bool;
37
38 fn start_id() -> Self;
42
43 fn stop(id: &Self) -> bool;
53
54 fn stop_id() -> Self;
58}
59
60pub fn is_control_id(id: &impl CaptureControl) -> bool {
64 CaptureControl::stop(id) || CaptureControl::start(id)
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum CaptureMode {
70 Blocking,
72 NonBlocking,
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum EventTimestampKind {
80 Captured,
86 Created,
90}
91
92type Subscriber<K, M, T> = HashMap<crate::uuid::Uuid, SubscriptionSender<K, M, T>>;
95type IdSubscriber<K, M, T> = HashMap<K, Subscriber<K, M, T>>;
96type Capturer<K, M, T> = SyncSender<Event<K, M, T>>;
97
98pub struct EvidentPublisher<K, M, T, F>
102where
103 K: Id + CaptureControl,
104 M: Msg,
105 T: EventEntry<K, M>,
106 F: Filter<K, M>,
107{
108 pub(crate) subscriptions: Arc<RwLock<IdSubscriber<K, M, T>>>,
112
113 pub(crate) any_event: Arc<RwLock<Subscriber<K, M, T>>>,
117
118 pub(crate) capturer: Capturer<K, M, T>,
122
123 filter: Option<F>,
127
128 capturing: Arc<AtomicBool>,
132
133 capture_blocking: Arc<AtomicBool>,
135
136 capture_channel_bound: usize,
140
141 subscription_channel_bound: usize,
145
146 missed_captures: Arc<AtomicUsize>,
148
149 timestamp_kind: EventTimestampKind,
151}
152
153impl<K, M, T, F> EvidentPublisher<K, M, T, F>
154where
155 K: Id + CaptureControl,
156 M: Msg,
157 T: EventEntry<K, M>,
158 F: Filter<K, M>,
159{
160 fn create(
166 mut on_event: impl FnMut(Event<K, M, T>) + std::marker::Send + 'static,
167 filter: Option<F>,
168 capture_mode: CaptureMode,
169 capture_channel_bound: usize,
170 subscription_channel_bound: usize,
171 timestamp_kind: EventTimestampKind,
172 ) -> Self {
173 let (send, recv): (SyncSender<Event<K, M, T>>, _) =
174 mpsc::sync_channel(capture_channel_bound);
175
176 thread::spawn(move || {
178 while let Ok(mut event) = recv.recv() {
179 if timestamp_kind == EventTimestampKind::Captured {
180 event.timestamp = Some(std::time::SystemTime::now());
181 }
182
183 on_event(event);
184 }
185 });
186
187 let mode = match capture_mode {
188 CaptureMode::Blocking => Arc::new(AtomicBool::new(true)),
189 CaptureMode::NonBlocking => Arc::new(AtomicBool::new(false)),
190 };
191
192 EvidentPublisher {
193 subscriptions: Arc::new(RwLock::new(HashMap::new())),
194 any_event: Arc::new(RwLock::new(HashMap::new())),
195 capturer: send,
196 filter,
197 capturing: Arc::new(AtomicBool::new(true)),
199 capture_blocking: mode,
200 capture_channel_bound,
201 subscription_channel_bound,
202 missed_captures: Arc::new(AtomicUsize::new(0)),
203 timestamp_kind,
204 }
205 }
206
207 pub fn new(
213 on_event: impl FnMut(Event<K, M, T>) + std::marker::Send + 'static,
214 capture_mode: CaptureMode,
215 capture_channel_bound: usize,
216 subscription_channel_bound: usize,
217 time_stamp_kind: EventTimestampKind,
218 ) -> Self {
219 Self::create(
220 on_event,
221 None,
222 capture_mode,
223 capture_channel_bound,
224 subscription_channel_bound,
225 time_stamp_kind,
226 )
227 }
228
229 pub fn with(
235 on_event: impl FnMut(Event<K, M, T>) + std::marker::Send + 'static,
236 filter: F,
237 capture_mode: CaptureMode,
238 capture_channel_bound: usize,
239 subscription_channel_bound: usize,
240 timestamp_kind: EventTimestampKind,
241 ) -> Self {
242 Self::create(
243 on_event,
244 Some(filter),
245 capture_mode,
246 capture_channel_bound,
247 subscription_channel_bound,
248 timestamp_kind,
249 )
250 }
251
252 pub fn get_filter(&self) -> &Option<F> {
256 &self.filter
257 }
258
259 pub fn entry_allowed(&self, entry: &impl EventEntry<K, M>) -> bool {
263 if !is_control_id(entry.get_event_id()) {
264 if !self.capturing.load(Ordering::Acquire) {
265 return false;
266 }
267
268 if let Some(filter) = &self.filter {
269 if !filter.allow_entry(entry) {
270 return false;
271 }
272 }
273 }
274
275 true
276 }
277
278 #[doc(hidden)]
284 pub fn _capture<I: IntermediaryEvent<K, M, T>>(&self, interm_event: &mut I) {
285 let entry = interm_event.take_entry();
286
287 if !self.entry_allowed(&entry) {
289 return;
290 }
291
292 let mut event = Event::new(entry);
293 if self.timestamp_kind == EventTimestampKind::Created {
294 event.timestamp = Some(std::time::SystemTime::now());
295 }
296
297 if self.capture_blocking.load(Ordering::Acquire) {
298 let _ = self.capturer.send(event);
299 } else {
300 let res = self.capturer.try_send(event);
301
302 if let Err(TrySendError::Full(_)) = res {
303 let missed_captures = self.missed_captures.load(Ordering::Relaxed);
308 if missed_captures < usize::MAX {
309 self.missed_captures
310 .store(missed_captures + 1, Ordering::Relaxed);
311 }
312 }
313 }
314 }
315
316 pub fn get_capture_mode(&self) -> CaptureMode {
318 if self.capture_blocking.load(Ordering::Acquire) {
319 CaptureMode::Blocking
320 } else {
321 CaptureMode::NonBlocking
322 }
323 }
324
325 pub fn set_capture_mode(&self, mode: CaptureMode) {
327 match mode {
328 CaptureMode::Blocking => self.capture_blocking.store(true, Ordering::Release),
329 CaptureMode::NonBlocking => self.capture_blocking.store(false, Ordering::Release),
330 }
331 }
332
333 pub fn get_missed_captures(&self) -> usize {
335 self.missed_captures.load(Ordering::Relaxed)
336 }
337
338 pub fn reset_missed_captures(&self) {
340 self.missed_captures.store(0, Ordering::Relaxed);
341 }
342
343 pub fn subscribe(&self, id: K) -> Result<Subscription<K, M, T, F>, SubscriptionError<K>> {
348 self.subscribe_to_many(vec![id])
349 }
350
351 pub fn subscribe_to_many(
356 &self,
357 ids: Vec<K>,
358 ) -> Result<Subscription<K, M, T, F>, SubscriptionError<K>> {
359 let (sender, receiver) = mpsc::sync_channel(ids.len() + self.subscription_channel_bound);
362 let channel_id = crate::uuid::Uuid::new_v4();
363 let subscription_sender = SubscriptionSender { channel_id, sender };
364
365 match self.subscriptions.write().ok() {
366 Some(mut locked_subs) => {
367 for id in ids.clone() {
368 let entry = locked_subs.entry(id.clone());
369 entry
370 .and_modify(|v| {
371 v.insert(subscription_sender.channel_id, subscription_sender.clone());
372 })
373 .or_insert({
374 let mut h = HashMap::new();
375 h.insert(subscription_sender.channel_id, subscription_sender.clone());
376 h
377 });
378 }
379 }
380 None => {
381 return Err(SubscriptionError::CouldNotAccessPublisher);
382 }
383 }
384
385 Ok(Subscription {
386 channel_id,
387 receiver,
388 sub_to_all: false,
389 subscriptions: Some(HashSet::from_iter(ids)),
390 publisher: self,
391 })
392 }
393
394 pub fn subscribe_to_all_events(
399 &self,
400 ) -> Result<Subscription<K, M, T, F>, SubscriptionError<K>> {
401 let (sender, receiver) = mpsc::sync_channel(self.capture_channel_bound);
402 let channel_id = crate::uuid::Uuid::new_v4();
403
404 match self.any_event.write().ok() {
405 Some(mut locked_vec) => {
406 locked_vec.insert(channel_id, SubscriptionSender { channel_id, sender });
407 }
408 None => {
409 return Err(SubscriptionError::CouldNotAccessPublisher);
410 }
411 }
412
413 Ok(Subscription {
414 channel_id,
415 receiver,
416 sub_to_all: true,
417 subscriptions: None,
418 publisher: self,
419 })
420 }
421
422 pub fn is_capturing(&self) -> bool {
426 self.capturing.load(Ordering::Acquire)
427 }
428
429 pub fn start(&self) {
435 let empty_msg: Option<M> = None;
436 let start_event = Event::new(EventEntry::new(K::start_id(), empty_msg, this_origin!()));
437
438 let _ = self.capturer.send(start_event);
439
440 self.capturing.store(true, Ordering::Release);
441 }
442
443 pub fn stop(&self) {
447 let empty_msg: Option<M> = None;
448 let stop_event = Event::new(EventEntry::new(K::stop_id(), empty_msg, this_origin!()));
449
450 let _ = self.capturer.send(stop_event);
451
452 self.capturing.store(false, Ordering::Release);
453 }
454
455 #[doc(hidden)]
461 pub fn on_event(&self, event: Event<K, M, T>) {
462 let arc_event = Arc::new(event);
463 let key = arc_event.entry.get_event_id();
464
465 let mut bad_subs: Vec<crate::uuid::Uuid> = Vec::new();
466 let mut bad_any_event: Vec<crate::uuid::Uuid> = Vec::new();
467
468 if let Ok(locked_subscriptions) = self.subscriptions.read() {
469 if let Some(sub_senders) = locked_subscriptions.get(key) {
470 for (channel_id, sub_sender) in sub_senders.iter() {
471 let bad_channel = if self.capture_blocking.load(Ordering::Acquire) {
472 sub_sender.sender.send(arc_event.clone()).is_err()
473 } else {
474 matches!(
475 sub_sender.sender.try_send(arc_event.clone()),
476 Err(TrySendError::Disconnected(_))
477 )
478 };
479
480 if bad_channel {
481 bad_subs.push(*channel_id);
482 }
483 }
484 }
485 }
486
487 if let Ok(locked_vec) = self.any_event.read() {
488 for (channel_id, any_event_sender) in locked_vec.iter() {
489 let bad_channel = if self.capture_blocking.load(Ordering::Acquire) {
490 any_event_sender.sender.send(arc_event.clone()).is_err()
491 } else {
492 matches!(
493 any_event_sender.sender.try_send(arc_event.clone()),
494 Err(TrySendError::Disconnected(_))
495 )
496 };
497
498 if bad_channel {
499 bad_any_event.push(*channel_id);
500 }
501 }
502 }
503
504 if !bad_subs.is_empty() {
506 if let Ok(mut locked_subscriptions) = self.subscriptions.write() {
507 let mut entry = locked_subscriptions.entry(key.clone());
508 for i in bad_subs {
509 entry = entry.and_modify(|v| {
510 v.remove(&i);
511 });
512 }
513 }
514 }
515
516 if !bad_any_event.is_empty() {
517 if let Ok(mut locked_vec) = self.any_event.write() {
518 for i in bad_any_event {
519 locked_vec.remove(&i);
520 }
521 }
522 }
523 }
524}