1use std::cell::RefCell;
2use std::iter::Iterator;
3use std::sync::mpsc::{channel, Receiver, Sender};
4
5use crate::compression::Compression;
6use crate::error;
7use crate::frame::events::{
8 SchemaChange as FrameSchemaChange, ServerEvent as FrameServerEvent,
9 SimpleServerEvent as FrameSimpleServerEvent,
10};
11use crate::frame::parser::parse_frame;
12use crate::transport::CDRSTransport;
13
14pub type ServerEvent = FrameServerEvent;
16
17pub type SimpleServerEvent = FrameSimpleServerEvent;
20
21pub type SchemaChange = FrameSchemaChange;
23
24pub fn new_listener<X>(transport: X) -> (Listener<X>, EventStream) {
33 let (tx, rx) = channel();
34 let listener = Listener {
35 transport: transport,
36 tx: tx,
37 };
38 let stream = EventStream { rx: rx };
39 (listener, stream)
40}
41
42pub struct Listener<X> {
47 transport: X,
48 tx: Sender<ServerEvent>,
49}
50
51impl<X: CDRSTransport + 'static> Listener<RefCell<X>> {
52 pub fn start(self, compressor: &Compression) -> error::Result<()> {
54 loop {
55 let event_opt = parse_frame(&self.transport, compressor)?
56 .get_body()?
57 .into_server_event();
58
59 let event = if event_opt.is_some() {
60 event_opt.unwrap().event as ServerEvent
62 } else {
63 continue;
64 };
65 match self.tx.send(event) {
66 Err(err) => return Err(error::Error::General(err.to_string())),
67 _ => continue,
68 }
69 }
70 }
71}
72
73pub struct EventStream {
76 rx: Receiver<ServerEvent>,
77}
78
79impl Iterator for EventStream {
80 type Item = ServerEvent;
81
82 fn next(&mut self) -> Option<Self::Item> {
83 self.rx.recv().ok()
84 }
85}
86
87impl Into<EventStreamNonBlocking> for EventStream {
88 fn into(self) -> EventStreamNonBlocking {
89 EventStreamNonBlocking { rx: self.rx }
90 }
91}
92
93pub struct EventStreamNonBlocking {
96 rx: Receiver<ServerEvent>,
97}
98
99impl Iterator for EventStreamNonBlocking {
100 type Item = ServerEvent;
101
102 fn next(&mut self) -> Option<Self::Item> {
103 self.rx.try_recv().ok()
104 }
105}