1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use context::*;
use dns_message::*;
use dnstap_builder::*;
use mio::*;
use std::any::Any;
use std::io;
use std::thread;
pub struct DNSTapPendingWriter {
dnstap_tx: channel::SyncSender<DNSMessage>,
context: Context,
}
impl DNSTapPendingWriter {
pub fn listen(builder: DNSTapBuilder) -> Result<DNSTapPendingWriter, &'static str> {
let (dnstap_tx, dnstap_rx) = channel::sync_channel(builder.backlog);
let mio_poll = Poll::new().unwrap();
mio_poll.register(&dnstap_rx,
NOTIFY_TOK,
Ready::readable(),
PollOpt::edge() | PollOpt::oneshot())
.unwrap();
let mio_timers = timer::Timer::default();
mio_poll.register(&mio_timers, TIMER_TOK, Ready::readable(), PollOpt::edge()).unwrap();
assert!(builder.unix_socket_path.is_some());
let context = Context {
mio_poll: mio_poll,
mio_timers: mio_timers,
retry_timeout: None,
dnstap_rx: dnstap_rx,
unix_socket_path: builder.unix_socket_path,
unix_stream: None,
frame_stream: None,
};
Ok(DNSTapPendingWriter {
dnstap_tx: dnstap_tx,
context: context,
})
}
pub fn start(self) -> io::Result<DNSTapWriter> {
DNSTapWriter::start(self)
}
#[inline]
pub fn sender(&self) -> Sender {
Sender(self.dnstap_tx.clone())
}
}
pub struct DNSTapWriter {
dnstap_tx: channel::SyncSender<DNSMessage>,
tid: thread::JoinHandle<()>,
}
impl DNSTapWriter {
pub fn start(mut dnstap_pending_writer: DNSTapPendingWriter) -> io::Result<DNSTapWriter> {
dnstap_pending_writer.context.connect();
let mut events = Events::with_capacity(512);
let dnstap_tx = dnstap_pending_writer.dnstap_tx.clone();
let tid = try!(thread::Builder::new().name("dnstap".to_owned()).spawn(move || {
while dnstap_pending_writer.context
.mio_poll
.poll(&mut events, None)
.is_ok() {
for event in events.iter() {
match event.token() {
UNIX_SOCKET_TOK => dnstap_pending_writer.context.write_cb(event),
NOTIFY_TOK => dnstap_pending_writer.context.message_cb(),
TIMER_TOK => dnstap_pending_writer.context.connect(),
_ => unreachable!(),
}
}
}
if let Some(frame_stream) = dnstap_pending_writer.context.frame_stream {
frame_stream.finish().unwrap();
}
}));
Ok(DNSTapWriter {
dnstap_tx: dnstap_tx,
tid: tid,
})
}
pub fn join(self) -> Result<(), Box<Any + Send + 'static>> {
self.tid.join()
}
#[inline]
pub fn sender(&self) -> Sender {
Sender(self.dnstap_tx.clone())
}
}
#[derive(Clone)]
pub struct Sender(channel::SyncSender<DNSMessage>);
impl Sender {
#[inline]
pub fn send(&self, dns_message: DNSMessage) -> Result<(), channel::TrySendError<DNSMessage>> {
self.0.try_send(dns_message)
}
}