use crate::{
crypto::{chachapoly::ChaChaPoly, EphemeralPrivateKey, StaticPublicKey},
error::QueryError,
i2np::{
database::lookup::{DatabaseLookupBuilder, LookupType, ReplyType},
garlic::{DeliveryInstructions, GarlicMessageBuilder, GARLIC_MESSAGE_OVERHEAD},
MessageBuilder, MessageType, I2NP_MESSAGE_EXPIRATION,
},
netdb::Dht,
primitives::{Lease, LeaseSet2, MessageId, RouterId, TunnelId},
profile::ProfileStorage,
router::context::RouterContext,
runtime::{Instant, Runtime},
};
use bytes::{BufMut, Bytes, BytesMut};
use futures_channel::oneshot;
use hashbrown::HashSet;
use rand::Rng;
use alloc::{vec, vec::Vec};
use core::{fmt, time::Duration};
const QUERY_TOTAL_TIMEOUT: Duration = Duration::from_secs(20);
pub struct TunnelSelector<T: Clone> {
iterator: usize,
tunnels: Vec<T>,
}
impl<T: Clone> TunnelSelector<T> {
pub fn new() -> Self {
Self {
iterator: 0usize,
tunnels: Vec::new(),
}
}
pub fn add_tunnel(&mut self, tunnel: T) {
self.tunnels.push(tunnel);
}
pub fn len(&self) -> usize {
self.tunnels.len()
}
pub fn remove_tunnel(&mut self, predicate: impl Fn(&T) -> bool) {
self.tunnels.retain(|tunnel| predicate(tunnel))
}
pub fn next_tunnel(&mut self) -> Option<T> {
if self.tunnels.is_empty() {
return None;
}
let index = {
let index = self.iterator;
self.iterator = self.iterator.wrapping_add(1usize);
index
};
Some(self.tunnels[index % self.tunnels.len()].clone())
}
}
pub struct Query<R: Runtime, T> {
pub key: Bytes,
pub pending: HashSet<RouterId>,
pub queried: HashSet<RouterId>,
pub queryable: HashSet<RouterId>,
pub selected: Option<RouterId>,
pub started: R::Instant,
pub subscribers: Vec<oneshot::Sender<Result<T, QueryError>>>,
}
impl<R: Runtime, T: Clone> Query<R, T> {
pub fn new(key: Bytes, tx: oneshot::Sender<Result<T, QueryError>>, selected: RouterId) -> Self {
Self {
key,
pending: HashSet::new(),
queried: HashSet::from_iter([selected.clone()]),
queryable: HashSet::new(),
selected: Some(selected),
started: R::now(),
subscribers: vec![tx],
}
}
pub fn handle_search_reply(
&mut self,
routers: &[RouterId],
profile_storage: &ProfileStorage<R>,
) -> Vec<RouterId> {
self.selected = None;
routers
.iter()
.filter_map(|router_id| {
if self.queried.contains(router_id) {
return None;
}
if profile_storage.contains(router_id) {
self.queryable.insert(router_id.clone());
return None;
}
self.pending.insert(router_id.clone());
Some(router_id.clone())
})
.collect()
}
pub fn handle_timeout(
&mut self,
dht: &Dht<R>,
profile_storage: &ProfileStorage<R>,
) -> Result<RouterId, QueryError> {
if self.started.elapsed() >= QUERY_TOTAL_TIMEOUT {
return Err(QueryError::Timeout);
}
self.pending.retain(|router_id| {
if profile_storage.contains(router_id) {
self.queryable.insert(router_id.clone());
return false;
}
true
});
match Dht::<R>::get_closest(&self.key, &self.queryable) {
Some(floodfill) => {
self.queryable.remove(&floodfill);
Ok(floodfill)
}
None => dht
.closest_with_ignore(&self.key, 1usize, &self.queried)
.next()
.ok_or(QueryError::NoFloodfills),
}
}
pub fn add_subscriber(&mut self, tx: oneshot::Sender<Result<T, QueryError>>) {
self.subscribers.push(tx);
}
pub fn complete(self, value: Result<T, QueryError>) {
self.subscribers.into_iter().for_each(|tx| {
let _ = tx.send(value.clone());
});
}
}
pub enum QueryKind<R: Runtime> {
LeaseSet {
query: Query<R, LeaseSet2>,
},
RouterInfo {
query: Query<R, ()>,
},
Exploration,
Router,
}
impl<R: Runtime> fmt::Debug for QueryKind<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::LeaseSet { .. } => f.debug_struct("QueryKind::LeaseSet").finish_non_exhaustive(),
Self::RouterInfo { .. } =>
f.debug_struct("QueryKind::RouterInfo").finish_non_exhaustive(),
Self::Exploration => f.debug_struct("QueryKind::Exploration").finish(),
Self::Router => f.debug_struct("QueryKind::Router").finish(),
}
}
}
pub struct NetDbMessageBuilder<R: Runtime> {
pub outbound_tunnels: TunnelSelector<TunnelId>,
pub inbound_tunnels: TunnelSelector<Lease>,
router_ctx: RouterContext<R>,
}
impl<R: Runtime> NetDbMessageBuilder<R> {
pub fn new(router_ctx: RouterContext<R>) -> Self {
Self {
outbound_tunnels: TunnelSelector::new(),
inbound_tunnels: TunnelSelector::new(),
router_ctx,
}
}
pub fn create_lease_set_query(
&mut self,
key: Bytes,
static_key: StaticPublicKey,
) -> Result<(Vec<u8>, TunnelId), QueryError> {
let outbound_tunnel = self.outbound_tunnels.next_tunnel().ok_or(QueryError::NoTunnel)?;
let Lease {
router_id,
tunnel_id,
..
} = self.inbound_tunnels.next_tunnel().ok_or(QueryError::NoTunnel)?;
let message = DatabaseLookupBuilder::new(key.clone(), LookupType::LeaseSet)
.with_reply_type(ReplyType::Tunnel {
tunnel_id,
router_id,
})
.build();
let mut message = GarlicMessageBuilder::default()
.with_date_time(R::time_since_epoch().as_secs() as u32)
.with_garlic_clove(
MessageType::DatabaseLookup,
MessageId::from(R::rng().next_u32()),
R::time_since_epoch() + I2NP_MESSAGE_EXPIRATION,
DeliveryInstructions::Local,
&message,
)
.build();
let ephemeral_secret = EphemeralPrivateKey::random(R::rng());
let ephemeral_public = ephemeral_secret.public();
let (garlic_key, garlic_tag) =
self.router_ctx.noise().derive_outbound_garlic_key(static_key, ephemeral_secret);
let mut out = BytesMut::with_capacity(message.len() + GARLIC_MESSAGE_OVERHEAD);
ChaChaPoly::new(&garlic_key)
.encrypt_with_ad_new(&garlic_tag, &mut message)
.expect("to succeed");
out.put_u32(message.len() as u32 + 32);
out.put_slice(&ephemeral_public.to_vec());
out.put_slice(&message);
Ok((
MessageBuilder::standard()
.with_expiration(R::time_since_epoch() + I2NP_MESSAGE_EXPIRATION)
.with_message_type(MessageType::Garlic)
.with_message_id(R::rng().next_u32())
.with_payload(&out)
.build(),
outbound_tunnel,
))
}
pub fn create_router_info_query(
&mut self,
key: Bytes,
) -> Result<(Vec<u8>, TunnelId), QueryError> {
let outbound_tunnel = self.outbound_tunnels.next_tunnel().ok_or(QueryError::NoTunnel)?;
let Lease {
router_id,
tunnel_id,
..
} = self.inbound_tunnels.next_tunnel().ok_or(QueryError::NoTunnel)?;
Ok((
MessageBuilder::standard()
.with_expiration(R::time_since_epoch() + I2NP_MESSAGE_EXPIRATION)
.with_message_type(MessageType::DatabaseLookup)
.with_message_id(R::rng().next_u32())
.with_payload(
&DatabaseLookupBuilder::new(key, LookupType::Router)
.with_reply_type(ReplyType::Tunnel {
tunnel_id,
router_id,
})
.build(),
)
.build(),
outbound_tunnel,
))
}
pub fn create_router_exploration(
&mut self,
key: Bytes,
) -> Result<(Vec<u8>, TunnelId), QueryError> {
let outbound_tunnel = self.outbound_tunnels.next_tunnel().ok_or(QueryError::NoTunnel)?;
let Lease {
router_id,
tunnel_id,
..
} = self.inbound_tunnels.next_tunnel().ok_or(QueryError::NoTunnel)?;
Ok((
MessageBuilder::standard()
.with_expiration(R::time_since_epoch() + I2NP_MESSAGE_EXPIRATION)
.with_message_type(MessageType::DatabaseLookup)
.with_message_id(R::rng().next_u32())
.with_payload(
&DatabaseLookupBuilder::new(key, LookupType::Exploration)
.with_reply_type(ReplyType::Tunnel {
tunnel_id,
router_id,
})
.build(),
)
.build(),
outbound_tunnel,
))
}
}