1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use std::cell::RefCell;
use std::iter::Iterator;
use std::sync::mpsc::{channel, Receiver, Sender};
use compression::Compression;
use error;
use frame::events::{
SchemaChange as FrameSchemaChange, ServerEvent as FrameServerEvent,
SimpleServerEvent as FrameSimpleServerEvent,
};
use frame::parser::parse_frame;
use std::error::Error;
use transport::CDRSTransport;
pub type ServerEvent = FrameServerEvent;
pub type SimpleServerEvent = FrameSimpleServerEvent;
pub type SchemaChange = FrameSchemaChange;
pub fn new_listener<X>(transport: X) -> (Listener<X>, EventStream) {
let (tx, rx) = channel();
let listener = Listener {
transport: transport,
tx: tx,
};
let stream = EventStream { rx: rx };
(listener, stream)
}
pub struct Listener<X> {
transport: X,
tx: Sender<ServerEvent>,
}
impl<X: CDRSTransport + 'static> Listener<RefCell<X>> {
pub fn start(self, compressor: &Compression) -> error::Result<()> {
loop {
let event_opt = try!(parse_frame(&self.transport, compressor))
.get_body()?
.into_server_event();
let event = if event_opt.is_some() {
event_opt.unwrap().event as ServerEvent
} else {
continue;
};
match self.tx.send(event) {
Err(err) => return Err(error::Error::General(err.description().to_string())),
_ => continue,
}
}
}
}
pub struct EventStream {
rx: Receiver<ServerEvent>,
}
impl Iterator for EventStream {
type Item = ServerEvent;
fn next(&mut self) -> Option<Self::Item> {
self.rx.recv().ok()
}
}