use super::*;
use crate::peering::config::{DomusConfig, DomusConfigAccess};
use crate::peering::taberna::TabernaInbox;
use bytes::Bytes;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use tokio::sync::{mpsc, watch, Notify};
use tokio::time::timeout;
#[derive(Clone, Default)]
struct TestBackend;
#[async_trait::async_trait]
impl TransportBackend for TestBackend {
type Addr = DomusAddr;
type Listener = ();
type Stream = tokio::io::DuplexStream;
async fn bind(&self, _local: &Self::Addr) -> Result<Self::Listener, AureliaError> {
Ok(())
}
async fn accept(
&self,
_listener: &mut Self::Listener,
) -> Result<super::backend::AuthenticatedStream<Self::Stream, Self::Addr>, AureliaError> {
Err(AureliaError::new(ErrorId::PeerUnavailable))
}
async fn dial(
&self,
_peer: &Self::Addr,
) -> Result<super::backend::AuthenticatedStream<Self::Stream, Self::Addr>, AureliaError> {
Err(AureliaError::new(ErrorId::PeerUnavailable))
}
}
struct RecordingSink {
received: Mutex<Vec<(MessageType, Bytes, Option<crate::peering::BlobReceiver>)>>,
expected_msg_types: Vec<MessageType>,
}
impl RecordingSink {
fn new(expected_msg_types: Vec<MessageType>) -> Self {
Self {
received: Mutex::new(Vec::new()),
expected_msg_types,
}
}
async fn take(&self) -> Vec<(MessageType, Bytes, Option<crate::peering::BlobReceiver>)> {
let mut guard = self.received.lock().await;
std::mem::take(&mut *guard)
}
}
#[async_trait::async_trait]
impl TabernaInbox for RecordingSink {
async fn enqueue(
&self,
msg_type: MessageType,
payload: Bytes,
blob_receiver: Option<crate::peering::BlobReceiver>,
notify: Option<Arc<Notify>>,
) -> Result<tokio::sync::oneshot::Receiver<Result<(), AureliaError>>, AureliaError> {
assert!(
self.expected_msg_types.contains(&msg_type),
"unexpected msg_type {msg_type}; expected one of {:?}",
self.expected_msg_types
);
self.received
.lock()
.await
.push((msg_type, payload, blob_receiver));
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = tx.send(Ok(()));
if let Some(notify) = notify.as_ref() {
notify.notify_one();
}
Ok(rx)
}
}
mod auth;
mod backend;
mod blob;
mod callis;
mod handshake;
mod limits;
mod peer;
mod primary;