1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use std::thread;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use mongo_driver::client::{Client, ClientPool};
use mongo_driver::collection::TailOptions;
use mongo_driver::CommandAndFindOptions;
use bson::Bson;
use op;
use errors;
fn tail_the_oplog(client: Client, tx: Sender<op::Op>) -> Result<(), errors::OpLogError> {
try!(client.get_server_status(None));
let db_name = "local";
let coll_name = "oplog.rs";
let gt = Bson::TimeStamp(0);
info!("starting tail on {}.{} at {}", db_name, coll_name, gt);
let coll = client.get_collection(db_name, coll_name);
let query = doc! {
"ts" => {
"$gt" => gt
}
};
let opts = CommandAndFindOptions::default();
let tail_opts = TailOptions::default();
let cur = coll.tail(query, Some(opts), Some(tail_opts));
for res in cur {
let res = try!(res);
let op = try!(op::Op::from_doc(&res));
if let Err(_) = tx.send(op) {
info!("disconnected from tail since receiver has dropped");
break;
}
}
Ok(())
}
pub fn create_oplog_receiver(pool: Arc<ClientPool>) -> (Receiver<op::Op>, thread::JoinHandle<()>) {
let (tx, rx) = channel::<op::Op>();
let handle: thread::JoinHandle<()> = thread::Builder::new()
.name("oplog-read-thread".to_string())
.spawn(move || {
let client = pool.pop();
let result = tail_the_oplog(client, tx);
if result.is_err() {
panic!("tailing ended early: {:?}", result.err().unwrap());
}
()
})
.unwrap();
(rx, handle)
}