1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::sync::mpsc::{Sender, Receiver, channel};
use std::iter::Iterator;

use std::error::Error;
use error;
use frame::events::{ServerEvent as FrameServerEvent, SimpleServerEvent as FrameSimpleServerEvent,
                    SchemaChange as FrameSchemaChange};
use frame::parser::parse_frame;
use compression::Compression;
use transport::CDRSTransport;

/// Full Server Event which includes all details about occured change.
pub type ServerEvent = FrameServerEvent;

/// Simplified Server event. It should be used to represent an event
/// which consumer wants listen to.
pub type SimpleServerEvent = FrameSimpleServerEvent;

/// Reexport of `FrameSchemaChange`.
pub type SchemaChange = FrameSchemaChange;

/// Factory function which returns a `Listener` and related `EventStream.`
///
/// `Listener` provides only one function `start` to start listening. It
/// blocks a thread so should be moved into a separate one to no release
/// main thread.
///
/// `EventStream` is an iterator which returns new events once they come.
/// It is similar to `Receiver::iter`.
pub fn new_listener<X>(transport: X) -> (Listener<X>, EventStream) {
    let (tx, rx) = channel();
    let listener = Listener {
        transport: transport,
        tx: tx,
    };
    let stream = EventStream { rx: rx };
    (listener, stream)
}

/// `Listener` provides only one function `start` to start listening. It
/// blocks a thread so should be moved into a separate one to no release
/// main thread.

pub struct Listener<X> {
    transport: X,
    tx: Sender<ServerEvent>,
}

impl<X: CDRSTransport> Listener<X> {
    /// It starts a process of listening to new events. Locks a frame.
    pub fn start(&mut self, compressor: &Compression) -> error::Result<()> {
        loop {
            let event_opt = try!(parse_frame(&mut self.transport, compressor))
                .get_body()?
                .into_server_event();

            let event = if event_opt.is_some() {
                // unwrap is safe is we've checked that event_opt.is_some()
                event_opt.unwrap().event as ServerEvent
            } else {
                continue;
            };
            match self.tx.send(event) {
                Err(err) => return Err(error::Error::General(err.description().to_string())),
                _ => continue,
            }
        }
    }
}

/// `EventStream` is an iterator which returns new events once they come.
/// It is similar to `Receiver::iter`.
pub struct EventStream {
    rx: Receiver<ServerEvent>,
}

impl Iterator for EventStream {
    type Item = ServerEvent;

    fn next(&mut self) -> Option<Self::Item> {
        self.rx.recv().ok()
    }
}