1use std::{
2 convert::TryFrom,
3 fmt,
4 net::{IpAddr, SocketAddr},
5 rc::Rc,
6 time::{Duration, Instant},
7};
8
9use failure::format_err;
10use futures::{
11 future::{self, Either},
12 Future,
13};
14use tokio::{prelude::*, timer::Delay};
15use trust_dns::{
16 client::ClientHandle,
17 op::{DnsResponse, Message, Query},
18 proto::xfer::{DnsHandle, DnsRequestOptions},
19 rr::{self, Record},
20};
21
22use crate::{
23 record::{RecordSet, RsData},
24 tsig, update_message,
25 util::{self, SocketName},
26 DnsOpen, RuntimeHandle,
27};
28
29#[derive(Debug, Clone)]
30pub struct Update {
31 pub zone: rr::Name,
32 pub server: Option<SocketName>,
33 pub operation: Operation,
34 pub tsig_key: Option<tsig::Key>,
35 pub ttl: u32,
36}
37
38impl Update {
39 pub fn get_update(&self) -> Result<Message, tsig::Error> {
40 let ttl = self.ttl;
41 let mut message = match &self.operation {
42 Operation::Create(rset) => {
43 update_message::create(rset.to_rrset(ttl), self.zone.clone())
44 }
45 Operation::Append(rset) => {
46 update_message::append(rset.to_rrset(ttl), self.zone.clone(), false)
47 }
48 Operation::Delete(rset) => {
49 if rset.is_empty() {
50 let record = rr::Record::with(rset.name().clone(), rset.record_type(), ttl);
51 update_message::delete_rrset(record, self.zone.clone())
52 } else {
53 update_message::delete_by_rdata(rset.to_rrset(ttl), self.zone.clone())
54 }
55 }
56 Operation::DeleteAll(name) => {
57 update_message::delete_all(name.clone(), self.zone.clone(), rr::DNSClass::IN)
58 }
59 };
60 if let Some(key) = &self.tsig_key {
61 tsig::add_signature(&mut message, key)?;
62 }
63 Ok(message)
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct Monitor {
69 pub zone: rr::Name,
70 pub entry: rr::Name,
71 pub interval: Duration,
72 pub timeout: Duration,
73 pub verbose: bool,
74 pub exclude: Vec<IpAddr>,
75 pub expectation: Expectation,
76}
77
78impl Monitor {
79 fn get_query(&self) -> Query {
80 Query::query(self.entry.clone(), self.expectation.record_type())
81 }
82}
83
84#[derive(Debug, Clone, Eq, PartialEq)]
85pub enum Operation {
86 Create(RecordSet),
87 Append(RecordSet),
88 Delete(RecordSet),
89 DeleteAll(rr::Name),
90}
91
92impl Operation {
93 pub fn create(name: rr::Name, data: RsData) -> Self {
94 Operation::Create(RecordSet::new(name, data))
95 }
96
97 pub fn delete(name: rr::Name, data: RsData) -> Self {
98 Operation::Delete(RecordSet::new(name, data))
99 }
100}
101
102#[derive(Debug, Clone)]
103pub enum Expectation {
104 Is(RecordSet),
105 Contains(RecordSet),
106 Empty(rr::RecordType),
107 NotAny(RecordSet),
108}
109
110impl Expectation {
111 pub fn record_type(&self) -> rr::RecordType {
112 match self {
113 Expectation::Is(rset) => rset.record_type(),
114 Expectation::Contains(rset) => rset.record_type(),
115 Expectation::NotAny(rset) => rset.record_type(),
116 Expectation::Empty(rtype) => *rtype,
117 }
118 }
119
120 pub fn satisfied_by(&self, rrs: &[rr::Record]) -> bool {
121 match self {
122 Expectation::Is(other) => {
123 let rset = match RecordSet::try_from(rrs) {
124 Err(_) => return false,
125 Ok(rs) => rs,
126 };
127 rset == *other
128 }
129 Expectation::Contains(other) => {
130 let rset = match RecordSet::try_from(rrs) {
131 Err(_) => return false,
132 Ok(rs) => rs,
133 };
134 other.is_subset(&rset)
135 }
136 Expectation::Empty(_) => rrs.is_empty(),
137 Expectation::NotAny(other) => {
138 let rset = match RecordSet::try_from(rrs) {
139 Err(_) => return false,
140 Ok(rs) => rs,
141 };
142 !other.iter_data().any(|r| rset.contains(&r))
143 }
144 }
145 }
146}
147
148impl fmt::Display for Expectation {
149 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150 match self {
151 Expectation::Is(rset) => write!(f, "expected {}", rset.data()),
152 Expectation::Contains(rset) => write!(f, "expected at least {} records", rset.data()),
153 Expectation::Empty(rtype) => write!(f, "expected no {} records", rtype),
154 Expectation::NotAny(rset) => write!(f, "expected none of {}", rset),
155 }
156 }
157}
158
159pub fn perform_update<D>(
160 runtime: RuntimeHandle,
161 mut dns: D,
162 mut resolver: D::Client,
163 options: Update,
164) -> Result<impl Future<Item = (), Error = failure::Error>, failure::Error>
165where
166 D: DnsOpen,
167{
168 let message = options.get_update()?;
169 let get_master = if let Some(sockname) = options.server {
170 Box::new(sockname.resolve(resolver, 53))
171 as Box<dyn Future<Item = SocketAddr, Error = failure::Error>>
172 } else {
173 let get_soa = resolver
174 .query(options.zone.clone(), rr::DNSClass::IN, rr::RecordType::SOA)
175 .map_err(failure::Error::from);
176 let settings = options.clone();
177 let resolver = resolver.clone();
178 let resolve = get_soa.and_then(move |response| {
179 if let Some(soa) = response
180 .answers()
181 .first()
182 .and_then(|rr| rr.rdata().as_soa())
183 {
184 Either::A(
185 util::SocketName::HostName(soa.mname().clone(), None).resolve(resolver, 53),
186 )
187 } else {
188 Either::B(future::err(format_err!(
189 "SOA record for {} not found",
190 settings.zone
191 )))
192 }
193 });
194 Box::new(resolve)
195 };
196 let update = get_master
197 .and_then(move |master| {
198 let mut server = dns.open(runtime.clone(), master);
199 server.send(message).map_err(Into::into)
200 })
201 .map(|_| ()); Ok(update)
203}
204
205pub fn monitor_update<D>(
206 runtime: RuntimeHandle,
207 dns: D,
208 resolver: D::Client,
209 options: Monitor,
210) -> impl Future<Item = (), Error = failure::Error>
211where
212 D: DnsOpen,
213{
214 let options = Rc::new(options);
215 let get_authorative =
216 util::get_ns_records(resolver.clone(), options.zone.clone()).map_err(failure::Error::from);
217 let poll_servers = {
218 let options = Rc::clone(&options);
219 get_authorative.and_then(move |authorative| {
220 let names = authorative
221 .into_iter()
222 .filter_map(|r| r.rdata().as_ns().cloned());
223 poll_for_update(runtime, dns, resolver, names, options)
224 })
225 };
226 poll_servers
227 .timeout(options.timeout)
228 .map_err(|e| {
229 e.into_inner().unwrap_or_else(move || {
230 format_err!(
231 "timeout; update not complete within {}ms",
232 options.timeout.as_millis()
233 )
234 })
235 })
236 .map(|_| ())
237}
238
239fn poll_for_update<D, I>(
240 runtime: RuntimeHandle,
241 dns: D,
242 resolver: D::Client,
243 authorative: I,
244 options: Rc<Monitor>,
245) -> impl Future<Item = (), Error = failure::Error>
246where
247 I: IntoIterator<Item = rr::Name>,
248 D: DnsOpen,
249{
250 future::join_all(authorative.into_iter().map(move |server_name| {
251 let handle = runtime.clone();
252 let server_name = server_name.clone();
253 let inner_options = Rc::clone(&options);
254 let mut dns = dns.clone();
255 let resolve = util::resolve_ip(resolver.clone(), server_name.clone()).map(move |ip| {
256 if inner_options.exclude.contains(&ip) {
257 None
258 } else {
259 Some(dns.open(handle.clone(), SocketAddr::new(ip, 53)))
260 }
261 });
262 let server_name = server_name.clone();
263 let options = Rc::clone(&options);
264 resolve.and_then(move |maybe_server| match maybe_server {
265 None => Either::A(future::ok(())),
266 Some(server) => Either::B(poll_server(server.clone(), server_name, options)),
267 })
268 }))
269 .map(|_| ())
270}
271
272fn poll_entries<F>(
273 mut server: impl ClientHandle,
274 server_name: rr::Name,
275 settings: Rc<Monitor>,
276 report: F,
277) -> impl Future<Item = (), Error = failure::Error>
278where
279 F: Fn(&rr::Name, &[Record], bool) + 'static,
280{
281 use future::Loop;
282 future::loop_fn(report, move |report| {
283 let server_name = server_name.clone();
284 let settings = Rc::clone(&settings);
285 let query = settings.get_query();
286 server
287 .lookup(query, DnsRequestOptions::default())
288 .map_err(failure::Error::from)
289 .and_then(move |response: DnsResponse| {
290 let answers = response.answers();
291 let hit = settings.expectation.satisfied_by(answers);
292 report(&server_name, answers, hit);
293 if hit {
294 Either::A(future::ok(Loop::Break(())))
295 } else {
296 let when = Instant::now() + settings.interval;
297 Either::B(
298 Delay::new(when)
299 .map_err(failure::Error::from)
300 .map(|_| Loop::Continue(report)),
301 )
302 }
303 })
304 })
305}
306
307fn poll_server(
308 server: impl ClientHandle,
309 server_name: rr::Name,
310 settings: Rc<Monitor>,
311) -> impl Future<Item = (), Error = failure::Error> {
312 poll_entries(
313 server,
314 server_name,
315 Rc::clone(&settings),
316 move |server, records, hit| {
317 if settings.verbose {
318 if hit {
319 println!("{}: match found", server);
320 } else {
321 let rset = match RecordSet::try_from(records) {
322 Ok(rs) => format!("{}", rs.data()),
323 Err(e) => format!("{}", e),
324 };
325 println!(
326 "{}: records not matching: {}, found {}",
327 server, settings.expectation, rset,
328 );
329 }
330 }
331 },
332 )
333}