use crate::proto::outbound_sender::{OutboundPrimaryStreamSender, UnboundedReceiver};
use crate::proto::peer::p2p_conn_handler::generic_error;
use crate::proto::state_container::StateContainerInner;
use async_trait::async_trait;
use bytes::Bytes;
use citadel_crypt::ratchets::Ratchet;
use citadel_io::tokio::sync::Mutex;
use citadel_types::crypto::SecurityLevel;
use netbeam::reliable_conn::{ConnAddr, ReliableOrderedStreamToTarget};
use std::net::SocketAddr;
use std::str::FromStr;
pub(crate) struct ReliableOrderedCompatStream<R: Ratchet> {
to_primary_stream: OutboundPrimaryStreamSender,
from_stream: Mutex<UnboundedReceiver<Bytes>>,
peer_external_addr: SocketAddr,
local_bind_addr: SocketAddr,
hr: R,
security_level: SecurityLevel,
target_cid: u64,
}
impl<R: Ratchet> ReliableOrderedCompatStream<R> {
pub(crate) fn new(
to_primary_stream: OutboundPrimaryStreamSender,
state_container: &mut StateContainerInner<R>,
target_cid: u64,
hr: R,
security_level: SecurityLevel,
) -> Self {
let (from_stream_tx, from_stream_rx) = citadel_io::tokio::sync::mpsc::unbounded_channel();
let _ = state_container
.hole_puncher_pipes
.insert(target_cid, from_stream_tx);
let peer_external_addr = SocketAddr::from_str("1.2.3.4:1234").unwrap();
let local_bind_addr = SocketAddr::from_str("0.0.0.0:1234").unwrap();
Self {
to_primary_stream,
from_stream: Mutex::new(from_stream_rx),
peer_external_addr,
local_bind_addr,
hr,
security_level,
target_cid,
}
}
}
#[async_trait]
impl<R: Ratchet> ReliableOrderedStreamToTarget for ReliableOrderedCompatStream<R> {
async fn send_to_peer(&self, input: &[u8]) -> std::io::Result<()> {
let packet = crate::proto::packet_crafter::hole_punch::generate_packet(
&self.hr,
input,
self.security_level,
self.target_cid,
);
self.to_primary_stream
.unbounded_send(packet)
.map_err(|err| generic_error(err.to_string()))
}
async fn recv(&self) -> std::io::Result<Bytes> {
self.from_stream
.lock()
.await
.recv()
.await
.ok_or_else(|| generic_error("Inbound ordered reliable stream died"))
}
}
impl<R: Ratchet> ConnAddr for ReliableOrderedCompatStream<R> {
fn local_addr(&self) -> std::io::Result<SocketAddr> {
Ok(self.local_bind_addr)
}
fn peer_addr(&self) -> std::io::Result<SocketAddr> {
Ok(self.peer_external_addr)
}
}