use crate::error::SmolmixError;
use futures::channel::mpsc;
use futures::StreamExt;
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
use nym_sdk::ipr_wrapper::IpMixStream;
use tokio::sync::oneshot;
use tracing::{debug, error, info, trace, warn};
pub(crate) struct NymIprBridge {
stream: IpMixStream,
outgoing_rx: mpsc::UnboundedReceiver<Vec<u8>>,
incoming_tx: mpsc::UnboundedSender<Vec<u8>>,
shutdown_rx: oneshot::Receiver<()>,
}
pub(crate) struct BridgeShutdownHandle {
tx: Option<oneshot::Sender<()>>,
}
impl BridgeShutdownHandle {
pub(crate) fn shutdown(mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}
impl Drop for BridgeShutdownHandle {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}
impl NymIprBridge {
pub(crate) fn new(
stream: IpMixStream,
outgoing_rx: mpsc::UnboundedReceiver<Vec<u8>>,
incoming_tx: mpsc::UnboundedSender<Vec<u8>>,
) -> (Self, BridgeShutdownHandle) {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
(
Self {
stream,
outgoing_rx,
incoming_tx,
shutdown_rx,
},
BridgeShutdownHandle {
tx: Some(shutdown_tx),
},
)
}
pub(crate) async fn run(mut self) -> Result<(), SmolmixError> {
info!("Starting bridge");
let mut packets_sent: u64 = 0;
let mut packets_received: u64 = 0;
loop {
tokio::select! {
_ = &mut self.shutdown_rx => {
info!(packets_sent, packets_received, "Bridge received shutdown signal");
break;
}
Some(packet) = self.outgoing_rx.next() => {
trace!(len = packet.len(), "Sending packet to mixnet");
let bundled = MultiIpPacketCodec::bundle_one_packet(packet.into());
if let Err(e) = self.stream.send_ip_packet(&bundled).await {
error!("Failed to send packet through mixnet: {e}");
} else {
packets_sent += 1;
debug!(packets_sent, "Packet sent");
}
}
result = self.stream.handle_incoming() => {
match result {
Ok(packets) if !packets.is_empty() => {
trace!(count = packets.len(), "Received packets from mixnet");
for packet in packets {
if self.incoming_tx.unbounded_send(packet.to_vec()).is_err() {
error!("Device channel closed");
return Err(SmolmixError::ChannelClosed);
}
packets_received += 1;
}
debug!(packets_received, "Packets received");
}
Ok(_) => {} Err(e) => {
warn!("Mixnet receive error: {e}");
}
}
}
else => {
info!(packets_sent, packets_received, "All channels closed, shutting down");
break;
}
}
}
info!("Disconnecting from mixnet...");
self.stream.disconnect().await;
info!("Disconnected");
Ok(())
}
}