use crate::{
crypto::StaticPublicKey,
error::{ChannelError, QueryError},
netdb::LOG_TARGET,
primitives::{LeaseSet2, RouterId},
};
use bytes::Bytes;
use futures_channel::oneshot;
use thingbuf::mpsc;
use alloc::vec::Vec;
#[derive(Default, Clone)]
pub struct NetDbActionRecycle(());
impl thingbuf::Recycle<NetDbAction> for NetDbActionRecycle {
fn new_element(&self) -> NetDbAction {
NetDbAction::Dummy
}
fn recycle(&self, element: &mut NetDbAction) {
*element = NetDbAction::Dummy;
}
}
#[derive(Default)]
pub enum NetDbAction {
QueryLeaseSet2 {
key: Bytes,
tx: oneshot::Sender<Result<LeaseSet2, QueryError>>,
},
QueryRouterInfo {
router_id: RouterId,
tx: oneshot::Sender<Result<(), QueryError>>,
},
GetClosestFloodfills {
key: Bytes,
tx: oneshot::Sender<Vec<(RouterId, StaticPublicKey)>>,
},
PublishRouterInfo {
router_id: RouterId,
router_info: Bytes,
},
WaitUntilReady { tx: oneshot::Sender<()> },
#[default]
Dummy,
}
#[derive(Clone)]
pub struct NetDbHandle {
tx: mpsc::Sender<NetDbAction, NetDbActionRecycle>,
}
impl NetDbHandle {
pub(super) fn new(tx: mpsc::Sender<NetDbAction, NetDbActionRecycle>) -> Self {
Self { tx }
}
pub fn query_lease_set(
&self,
key: Bytes,
) -> Result<oneshot::Receiver<Result<LeaseSet2, QueryError>>, ChannelError> {
let (tx, rx) = oneshot::channel();
self.tx
.try_send(NetDbAction::QueryLeaseSet2 { key, tx })
.map(|_| rx)
.map_err(From::from)
}
pub fn try_query_router_info(
&self,
router_id: RouterId,
) -> Result<oneshot::Receiver<Result<(), QueryError>>, ChannelError> {
let (tx, rx) = oneshot::channel();
self.tx
.try_send(NetDbAction::QueryRouterInfo { router_id, tx })
.map(|_| rx)
.map_err(From::from)
}
pub fn get_closest_floodfills(
&self,
key: Bytes,
) -> Result<oneshot::Receiver<Vec<(RouterId, StaticPublicKey)>>, ChannelError> {
let (tx, rx) = oneshot::channel();
self.tx
.try_send(NetDbAction::GetClosestFloodfills { key, tx })
.map(|_| rx)
.map_err(From::from)
}
pub fn publish_router_info(&self, router_id: RouterId, router_info: Bytes) {
if let Err(error) = self.tx.try_send(NetDbAction::PublishRouterInfo {
router_id,
router_info,
}) {
tracing::warn!(
target: LOG_TARGET,
?error,
"failed to send router info for publication to netdb",
);
}
}
pub fn wait_until_ready(&self) -> Result<oneshot::Receiver<()>, ChannelError> {
let (tx, rx) = oneshot::channel();
self.tx
.try_send(NetDbAction::WaitUntilReady { tx })
.map(|_| rx)
.map_err(From::from)
}
#[cfg(test)]
pub fn create() -> (Self, mpsc::Receiver<NetDbAction, NetDbActionRecycle>) {
let (tx, rx) = mpsc::with_recycle(64, NetDbActionRecycle::default());
(Self { tx }, rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
primitives::{RouterInfo, RouterInfoBuilder},
runtime::mock::MockRuntime,
};
#[test]
fn send_leaseset_query() {
let (tx, rx) = mpsc::with_recycle(5, NetDbActionRecycle(()));
let handle = NetDbHandle::new(tx);
assert!(handle.query_lease_set(Bytes::from(vec![1, 2, 3, 4])).is_ok());
match rx.try_recv() {
Ok(NetDbAction::QueryLeaseSet2 { key, .. }) => {
assert_eq!(key, Bytes::from(vec![1, 2, 3, 4]));
}
_ => panic!("invalid event"),
}
}
#[test]
fn send_router_info_query() {
let (tx, rx) = mpsc::with_recycle(5, NetDbActionRecycle(()));
let handle = NetDbHandle::new(tx);
let remote = RouterId::random();
assert!(handle.try_query_router_info(remote.clone()).is_ok());
match rx.try_recv() {
Ok(NetDbAction::QueryRouterInfo { router_id, .. }) => {
assert_eq!(router_id, remote);
}
_ => panic!("invalid event"),
}
}
#[test]
fn publish_router_info() {
let (tx, rx) = mpsc::with_recycle(5, NetDbActionRecycle(()));
let handle = NetDbHandle::new(tx);
let (router_id, router_info) = {
let (router_info, _, signing_key) = RouterInfoBuilder::default().build();
let router_id = router_info.identity.id();
(router_id, Bytes::from(router_info.serialize(&signing_key)))
};
handle.publish_router_info(router_id.clone(), router_info);
match rx.try_recv() {
Ok(NetDbAction::PublishRouterInfo {
router_id: key,
router_info,
}) => {
assert_eq!(key, router_id);
assert!(RouterInfo::parse::<MockRuntime>(router_info).is_ok());
}
_ => panic!("invalid event"),
}
}
#[test]
fn channel_full() {
let (tx, _rx) = mpsc::with_recycle(5, NetDbActionRecycle(()));
let handle = NetDbHandle::new(tx);
for _ in 0..5 {
assert!(handle.query_lease_set(Bytes::from(vec![1, 2, 3, 4])).is_ok());
}
match handle.query_lease_set(Bytes::from(vec![1, 2, 3, 4])).unwrap_err() {
ChannelError::Full => {}
error => panic!("invalid error: {error}"),
}
}
}