1use std::io::ErrorKind;
4use std::ops::ControlFlow;
5use std::os::fd::AsFd;
6use std::os::unix::prelude::BorrowedFd;
7use std::time::Duration;
8
9use rustix::io::Errno;
10use tokio::io::Interest;
11use tokio::io::unix::AsyncFd;
12
13use crate::protocol::{
14 AiResponseHeader, IoState, IpAddrIterator, IsEmpty, IsWouldblock, interpret_data, open_socket,
15 read_data, read_header, write_request,
16};
17pub use crate::sync::DEFAULT_TIMEOUT;
18use crate::sync::Error as SyncError;
19
20#[inline]
28pub async fn lookup(
29 host: impl AsRef<[u8]>,
30 buf: &mut Vec<u8>,
31 timeout: Option<Duration>,
32) -> Result<Option<IpAddrIterator<'_>>, Error> {
33 let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT);
34 tokio::time::timeout(timeout, do_lookup(host.as_ref(), buf))
35 .await
36 .map_err(|_| Error::Sync(SyncError::Timeout(None)))?
37}
38
39async fn do_lookup<'a>(
40 host: &[u8],
41 buf: &'a mut Vec<u8>,
42) -> Result<Option<IpAddrIterator<'a>>, Error> {
43 if let Some(resp) = fill_buf(host, buf).await? {
44 let iter = interpret_data(&resp, buf).map_err(|err| Error::Sync(SyncError::Data(err)))?;
45 Ok(Some(iter))
46 } else {
47 Ok(None)
48 }
49}
50
51pub(crate) async fn fill_buf(
52 host: &[u8],
53 buf: &mut Vec<u8>,
54) -> Result<Option<AiResponseHeader>, Error> {
55 let sock = open_socket().map_err(|err| Error::Sync(SyncError::Socket(err)))?;
56 let sock = AsyncFd::new(sock.as_fd()).map_err(Error::New)?;
57
58 let mut io = IoState::default();
59 while try_io(&sock, Interest::WRITABLE, |sock: BorrowedFd<'_>| {
60 write_request(sock, &mut io, host)
61 })
62 .await?
63 .is_continue()
64 {
65 continue;
66 }
67
68 io = IoState::default();
69 let mut resp = AiResponseHeader::default();
70 let data_len = loop {
71 match try_io(&sock, Interest::READABLE, |sock: BorrowedFd<'_>| {
72 read_header(sock, &mut io, &mut resp)
73 })
74 .await?
75 {
76 ControlFlow::Continue(()) => continue,
77 ControlFlow::Break(IsEmpty::Empty) => return Ok(None),
78 ControlFlow::Break(IsEmpty::HasData(data_len)) => break data_len,
79 }
80 };
81 buf.resize(data_len, 0);
82
83 io = IoState::default();
84 while try_io(&sock, Interest::READABLE, |sock: BorrowedFd<'_>| {
85 read_data(sock, &mut io, buf)
86 })
87 .await?
88 .is_continue()
89 {
90 continue;
91 }
92
93 Ok(Some(resp))
94}
95
96async fn try_io<T, E, F>(
97 sock: &AsyncFd<BorrowedFd<'_>>,
98 interest: Interest,
99 f: F,
100) -> Result<ControlFlow<T, ()>, Error>
101where
102 F: FnOnce(BorrowedFd<'_>) -> Result<ControlFlow<T, ()>, E>,
103 E: IsWouldblock + Into<SyncError>,
104{
105 let mut guard = sock.ready(interest).await.map_err(Error::Writable)?;
106 let result = guard.try_io(|sock| {
107 let result = f(sock.as_fd());
108 match result {
109 Ok(cf) => Ok(Ok(cf)),
110 Err(err) => {
111 if err.is_wouldblock() {
112 Err(std::io::Error::new(
113 ErrorKind::WouldBlock,
114 Errno::WOULDBLOCK,
115 ))
116 } else {
117 Ok(Err(err))
118 }
119 }
120 }
121 });
122 let Ok(result) = result else {
123 return Ok(ControlFlow::Continue(()));
124 };
125 result
126 .map_err(Error::Writable)?
127 .map_err(|err| Error::Sync(err.into()))
128}
129
130#[derive(Debug, thiserror::Error)]
132pub enum Error {
133 #[error(transparent)]
135 Sync(#[from] SyncError),
136 #[error("Cannot use socket with tokio")]
138 New(#[source] std::io::Error),
139 #[error("Could not wait for socket to become writable")]
141 Writable(#[source] std::io::Error),
142 #[error("Could not wait for socket to become readable")]
144 Readable(#[source] std::io::Error),
145}