use std::time::Duration;
use tracing::{span, Level};
use kad::prelude::DhtEntry;
use dsf_core::net;
use dsf_core::prelude::*;
use dsf_rpc::{ConnectInfo, ConnectOptions};
use crate::daemon::dht::Ctx;
use crate::daemon::Dsf;
use crate::io;
use crate::core::peers::PeerAddress;
use crate::error::Error as DsfError;
use crate::daemon::dht::TryAdapt;
impl<C> Dsf<C>
where
C: io::Connector + Clone + Sync + Send + 'static,
{
pub async fn connect(&mut self, options: ConnectOptions) -> Result<ConnectInfo, DsfError> {
let span = span!(Level::DEBUG, "connect");
let _enter = span.enter();
info!("Connect: {:?}", options.address);
let req_id = rand::random::<u16>();
let flag = Flags::ADDRESS_REQUEST | Flags::PUB_KEY_REQUEST;
let mut req = net::Request::new(
self.id(),
req_id,
net::RequestKind::FindNode(self.id()),
flag,
);
let service = self.service();
req.common.public_key = Some(service.public_key());
let our_id = self.id();
let _peers = self.peers();
let _dht = self.dht();
let address = options.address.clone();
trace!("Sending request");
let d = options.timeout.or(Some(Duration::from_secs(2))).unwrap();
let res = self.request(address.into(), req, d).await;
let resp = match res {
Ok(r) => r,
Err(e) => {
warn!("Error connecting to {:?}: {:?}", address, e);
return Err(e);
}
};
trace!("response from peer: {:?}", &resp.from);
let peer = self.peers().find_or_create(
resp.from.clone(),
PeerAddress::Implicit(address.into()),
None,
);
let mut info = ConnectInfo {
id: resp.from.clone(),
peers: 0,
};
trace!("Starting DHT connect");
let mut data = resp.data.try_to((our_id.clone(), self.peers())).unwrap();
if let kad::common::Response::NodesFound(_id, nodes) = &mut data {
nodes.retain(|n| n.id() != &our_id);
}
let ctx = Ctx::INCLUDE_PUBLIC_KEY | Ctx::PUB_KEY_REQUEST | Ctx::ADDRESS_REQUEST;
match self
.dht()
.handle_connect_response(DhtEntry::new(resp.from.clone(), peer), data, ctx)
.await
{
Ok(nodes) => {
info.peers = nodes.len();
}
Err(e) => {
error!("connect response error: {:?}", e);
return Err(DsfError::Unknown);
}
}
info!("Connect complete! {:?}", info);
Ok(info)
}
}