1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::sync::mpsc::{Sender, Receiver, channel};
use std::iter::Iterator;
use std::error::Error;
use error;
use frame::events::{ServerEvent as FrameServerEvent, SimpleServerEvent as FrameSimpleServerEvent,
SchemaChange as FrameSchemaChange};
use frame::parser::parse_frame;
use compression::Compression;
use transport::CDRSTransport;
pub type ServerEvent = FrameServerEvent;
pub type SimpleServerEvent = FrameSimpleServerEvent;
pub type SchemaChange = FrameSchemaChange;
pub fn new_listener<X>(transport: X) -> (Listener<X>, EventStream) {
let (tx, rx) = channel();
let listener = Listener {
transport: transport,
tx: tx,
};
let stream = EventStream { rx: rx };
(listener, stream)
}
pub struct Listener<X> {
transport: X,
tx: Sender<ServerEvent>,
}
impl<X: CDRSTransport> Listener<X> {
pub fn start(&mut self, compressor: &Compression) -> error::Result<()> {
loop {
let event_opt = try!(parse_frame(&mut self.transport, compressor))
.get_body()?
.into_server_event();
let event = if event_opt.is_some() {
event_opt.unwrap().event as ServerEvent
} else {
continue;
};
match self.tx.send(event) {
Err(err) => return Err(error::Error::General(err.description().to_string())),
_ => continue,
}
}
}
}
pub struct EventStream {
rx: Receiver<ServerEvent>,
}
impl Iterator for EventStream {
type Item = ServerEvent;
fn next(&mut self) -> Option<Self::Item> {
self.rx.recv().ok()
}
}