use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::UdpSocket;
use crate::dht::krpc::{KrpcMessage, TransactionId};
use crate::error::{Error, ErrorKind};
pub struct DhtRpc {
socket: UdpSocket,
}
const RPC_TIMEOUT: Duration = Duration::from_secs(15);
impl DhtRpc {
pub async fn new(bind_addr: SocketAddr) -> Result<Self, Error> {
let socket = UdpSocket::bind(bind_addr).await?;
Ok(DhtRpc { socket })
}
pub async fn query(
&self,
addr: SocketAddr,
expected_tid: TransactionId,
data: &[u8],
) -> Result<KrpcMessage, Error> {
tracing::debug!("DHT query to {}", addr);
if let Err(e) = self.socket.send_to(data, addr).await {
return Err(Error::with_source(ErrorKind::Protocol, e));
}
let mut buf = [0u8; 2048];
let (len, _src) = tokio::time::timeout(RPC_TIMEOUT, self.socket.recv_from(&mut buf))
.await
.map_err(|_| Error::new(ErrorKind::Protocol))?
.map_err(Error::protocol)?;
let response = KrpcMessage::from_bytes(&buf[..len])?;
match &response {
KrpcMessage::Response { transaction_id, .. }
| KrpcMessage::Error { transaction_id, .. } => {
if *transaction_id != expected_tid {
return Err(Error::new(ErrorKind::Protocol));
}
}
KrpcMessage::Query { .. } => {
return Err(Error::new(ErrorKind::Protocol));
}
}
Ok(response)
}
pub async fn ping(
&self,
addr: SocketAddr,
tid: TransactionId,
node_id: &[u8; 20],
) -> Result<KrpcMessage, Error> {
let data = super::krpc::build_ping(tid, node_id);
self.query(addr, tid, &data).await
}
}