Skip to main content

cdrs/
events.rs

1use std::cell::RefCell;
2use std::iter::Iterator;
3use std::sync::mpsc::{channel, Receiver, Sender};
4
5use crate::compression::Compression;
6use crate::error;
7use crate::frame::events::{
8    SchemaChange as FrameSchemaChange, ServerEvent as FrameServerEvent,
9    SimpleServerEvent as FrameSimpleServerEvent,
10};
11use crate::frame::parser::parse_frame;
12use crate::transport::CDRSTransport;
13
14/// Full Server Event which includes all details about occured change.
15pub type ServerEvent = FrameServerEvent;
16
17/// Simplified Server event. It should be used to represent an event
18/// which consumer wants listen to.
19pub type SimpleServerEvent = FrameSimpleServerEvent;
20
21/// Reexport of `FrameSchemaChange`.
22pub type SchemaChange = FrameSchemaChange;
23
24/// Factory function which returns a `Listener` and related `EventStream.`
25///
26/// `Listener` provides only one function `start` to start listening. It
27/// blocks a thread so should be moved into a separate one to no release
28/// main thread.
29///
30/// `EventStream` is an iterator which returns new events once they come.
31/// It is similar to `Receiver::iter`.
32pub fn new_listener<X>(transport: X) -> (Listener<X>, EventStream) {
33    let (tx, rx) = channel();
34    let listener = Listener {
35        transport: transport,
36        tx: tx,
37    };
38    let stream = EventStream { rx: rx };
39    (listener, stream)
40}
41
42/// `Listener` provides only one function `start` to start listening. It
43/// blocks a thread so should be moved into a separate one to no release
44/// main thread.
45
46pub struct Listener<X> {
47    transport: X,
48    tx: Sender<ServerEvent>,
49}
50
51impl<X: CDRSTransport + 'static> Listener<RefCell<X>> {
52    /// It starts a process of listening to new events. Locks a frame.
53    pub fn start(self, compressor: &Compression) -> error::Result<()> {
54        loop {
55            let event_opt = parse_frame(&self.transport, compressor)?
56                .get_body()?
57                .into_server_event();
58
59            let event = if event_opt.is_some() {
60                // unwrap is safe as we've checked that event_opt.is_some()
61                event_opt.unwrap().event as ServerEvent
62            } else {
63                continue;
64            };
65            match self.tx.send(event) {
66                Err(err) => return Err(error::Error::General(err.to_string())),
67                _ => continue,
68            }
69        }
70    }
71}
72
73/// `EventStream` is an iterator which returns new events once they come.
74/// It is similar to `Receiver::iter`.
75pub struct EventStream {
76    rx: Receiver<ServerEvent>,
77}
78
79impl Iterator for EventStream {
80    type Item = ServerEvent;
81
82    fn next(&mut self) -> Option<Self::Item> {
83        self.rx.recv().ok()
84    }
85}
86
87impl Into<EventStreamNonBlocking> for EventStream {
88    fn into(self) -> EventStreamNonBlocking {
89        EventStreamNonBlocking { rx: self.rx }
90    }
91}
92
93/// `EventStreamNonBlocking` is an iterator which returns new events once they come.
94/// It is similar to `Receiver::iter`. It's a non-blocking version of `EventStream`
95pub struct EventStreamNonBlocking {
96    rx: Receiver<ServerEvent>,
97}
98
99impl Iterator for EventStreamNonBlocking {
100    type Item = ServerEvent;
101
102    fn next(&mut self) -> Option<Self::Item> {
103        self.rx.try_recv().ok()
104    }
105}