1#[macro_use]
4extern crate log;
5extern crate futures;
6extern crate hyper;
7extern crate rand;
8extern crate tokio_core;
9extern crate tokio_reactor;
10extern crate trust_dns;
11
12use futures::future;
13use futures::future::Future;
14use hyper::client::{Connect, Service};
15use hyper::Uri;
16use rand::Rng;
17use std::io;
18use std::time::Duration;
19use trust_dns::client::ClientHandle;
20
21#[derive(Debug, Clone)]
23pub enum RecordType {
24 A,
26 SRV,
28 AUTO,
30}
31
32#[derive(Debug, Clone)]
34pub struct DnsConnector<C> {
35 connector: C,
36 record_type: RecordType,
37 dns_addr: std::net::SocketAddr,
38}
39
40impl<C> DnsConnector<C>
41where
42 C: Connect,
43{
44 pub fn new(dns_addr: std::net::SocketAddr, connector: C) -> DnsConnector<C> {
45 Self::new_with_resolve_type(dns_addr, connector, RecordType::AUTO)
46 }
47
48 pub fn new_with_resolve_type(
50 dns_addr: std::net::SocketAddr,
51 connector: C,
52 record_type: RecordType,
53 ) -> DnsConnector<C> {
54 DnsConnector {
55 connector: connector,
56 record_type: record_type,
57 dns_addr: dns_addr,
58 }
59 }
60}
61
62impl<C> Service for DnsConnector<C>
63where
64 C: Service<Request = Uri, Error = io::Error> + 'static,
65 C: Clone,
66{
67 type Request = C::Request;
68 type Response = C::Response;
69 type Error = C::Error;
70 type Future =
71 Box<Future<Item = <C::Future as Future>::Item, Error = <C::Future as Future>::Error>>;
72
73 fn call(&self, uri: Uri) -> Self::Future {
74 let connector = self.connector.clone();
75
76 let timeout = Duration::from_millis(30000);
79
80 let (stream, sender) =
81 trust_dns::tcp::TcpClientStream::with_timeout(self.dns_addr, timeout);
82
83 let dns_client = trust_dns::client::ClientFuture::new(stream, sender, None);
84
85 match uri.host().unwrap().to_string().parse() {
87 Ok(std::net::Ipv4Addr { .. }) => {
88 Box::new(connector.call(uri.clone()))
90 }
91 Err(_) => {
92 let port = uri.port().clone();
93 let scheme = uri.scheme().unwrap().to_string();
94 let host = uri.host().unwrap().to_string();
95
96 debug!("Trying to resolve {}://{}", scheme, &host);
97
98 let name = trust_dns::rr::Name::parse(&format!("{}.", host), None).unwrap();
100
101 let trust_record_type = match self.record_type {
102 RecordType::A => trust_dns::rr::RecordType::A,
103 RecordType::SRV => trust_dns::rr::RecordType::SRV,
104 RecordType::AUTO => {
105 if port.is_none() {
108 trust_dns::rr::RecordType::SRV
109 } else {
110 debug!("Using A record lookup for: {}", &host);
111 trust_dns::rr::RecordType::A
112 }
113 }
114 };
115
116 debug!("Sending DNS request");
117
118 let name_clone = name.clone();
119
120 let future = dns_client
121 .and_then(move |mut client| {
122 client.query(
123 name_clone.clone(),
124 trust_dns::rr::DNSClass::IN,
125 trust_record_type,
126 )
127 })
128 .or_else(|_| {
129 return future::err(
130 std::io::Error::new(
131 std::io::ErrorKind::Other,
132 "Failed to query DNS server",
133 ).into(),
134 );
135 })
136 .and_then(move |res| {
137 let answers = res.answers();
138
139 if answers.is_empty() {
140 return future::err(
141 std::io::Error::new(
142 std::io::ErrorKind::Other,
143 "No valid DNS answers",
144 ).into(),
145 );
146 }
147
148 let mut rng = rand::thread_rng();
149
150 let (target, a_records, new_port) = if let trust_dns::rr::RecordType::SRV =
152 trust_record_type
153 {
154 let answer = rng.choose(answers).expect("Sort out what to return here");
155
156 let srv = match *answer.rdata() {
157 trust_dns::rr::RData::SRV(ref srv) => srv,
158 _ => {
159 return future::err(
160 std::io::Error::new(
161 std::io::ErrorKind::Other,
162 "Unexpected DNS response",
163 ).into(),
164 )
165 }
166 };
167
168 (srv.target().clone(), res.additionals(), Some(srv.port()))
169 } else {
170 (name.clone(), answers, port)
173 };
174
175 let entry = a_records.iter().find(|record| record.name() == &target);
176
177 if let Some(entry) = entry {
178 let addr = match *entry.rdata() {
179 trust_dns::rr::RData::A(ref addr) => addr,
180 _ => {
181 return future::err(
182 std::io::Error::new(
183 std::io::ErrorKind::Other,
184 "Did not receive a valid record",
185 ).into(),
186 )
187 }
188 };
189
190 future::ok((addr.to_string(), new_port))
191 } else {
192 return future::err(
193 std::io::Error::new(
194 std::io::ErrorKind::Other,
195 "Did not receive a valid record",
196 ).into(),
197 );
198 }
199 })
200 .and_then(move |(ip, port)| {
201 let new_uri_str = if let Some(port) = port {
202 format!("{}://{}:{}", scheme, &ip, port)
203 } else {
204 format!("{}://{}", scheme, &ip)
205 };
206
207 debug!("Resolved request to {}", &new_uri_str);
208
209 let new_uri = new_uri_str.parse::<Uri>().unwrap();
210
211 connector.call(new_uri)
212 });
213
214 Box::new(future)
215 }
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 #[test]
223 fn it_works() {}
224}