use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use crate::errors::RtcResult;
use crate::t38::ifp::{DataField, IfpPacket, T30Indicator};
#[cfg(test)]
use crate::t38::t30::T30FaxConfig;
use crate::t38::t30::{T30Event, T30Session};
use crate::transports::udptl::{UdtlConfig, UdtlReceiveBuffer, UdtlTransport};
pub struct FaxEndpoint {
pub transport: Arc<UdtlTransport>,
pub session: tokio::sync::Mutex<T30Session>,
recv_buf: tokio::sync::Mutex<UdtlReceiveBuffer>,
#[allow(dead_code)]
config: UdtlConfig,
}
impl FaxEndpoint {
pub fn new(transport: Arc<UdtlTransport>, session: T30Session) -> Self {
Self {
transport,
session: tokio::sync::Mutex::new(session),
recv_buf: tokio::sync::Mutex::new(UdtlReceiveBuffer::new()),
config: UdtlConfig::default(),
}
}
pub async fn bind(
local: SocketAddr,
remote: SocketAddr,
session: T30Session,
config: UdtlConfig,
) -> RtcResult<Self> {
let socket = Arc::new(
UdpSocket::bind(local)
.await
.map_err(|e| crate::errors::RtcError::Transport(format!("bind: {e}")))?,
);
let transport = Arc::new(UdtlTransport::with_config(socket, remote, config.clone()));
Ok(Self {
transport,
session: tokio::sync::Mutex::new(session),
recv_buf: tokio::sync::Mutex::new(UdtlReceiveBuffer::new()),
config,
})
}
pub fn from_socket(
socket: Arc<UdpSocket>,
remote_addr: SocketAddr,
session: T30Session,
) -> Self {
let transport = Arc::new(UdtlTransport::new(socket, remote_addr));
Self {
transport,
session: tokio::sync::Mutex::new(session),
recv_buf: tokio::sync::Mutex::new(UdtlReceiveBuffer::new()),
config: UdtlConfig::default(),
}
}
pub async fn start_calling(&self) {
self.session.lock().await.start_calling();
}
pub async fn start_called(&self) {
self.session.lock().await.start_called();
}
pub async fn send_indicator(&self, ind: T30Indicator) -> RtcResult<()> {
let packet = IfpPacket::T30Indicator(vec![ind]);
let data = packet.encode()?;
self.transport.send(&data).await
}
pub async fn send_indicator_spandsp(&self, ind: T30Indicator) -> RtcResult<()> {
let packet = IfpPacket::T30Indicator(vec![ind]);
let data = packet.encode_spandsp()?;
self.transport.send(&data).await
}
pub async fn send_data(&self, fields: Vec<DataField>) -> RtcResult<()> {
let packet = IfpPacket::T30Data(fields);
let data = packet.encode()?;
self.transport.send(&data).await
}
pub async fn recv_timeout(&self, timeout: std::time::Duration) -> Option<IfpPacket> {
tokio::time::timeout(timeout, self.recv())
.await
.ok()
.flatten()
}
pub async fn recv(&self) -> Option<IfpPacket> {
loop {
let mut buf = self.recv_buf.lock().await;
match self.transport.recv(&mut buf).await {
Ok(Some(raw)) => {
drop(buf);
match IfpPacket::decode(&raw) {
Ok(pkt) => return Some(pkt),
Err(e) => {
tracing::warn!("FaxEndpoint: IFP decode error: {e}");
continue;
}
}
}
Ok(None) => {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
continue;
}
Err(e) => {
tracing::warn!("FaxEndpoint: UDPTL recv error: {e}");
return None;
}
}
}
}
pub async fn recv_raw(&self) -> RtcResult<Option<Vec<u8>>> {
let mut buf = self.recv_buf.lock().await;
self.transport.recv(&mut buf).await
}
pub async fn drain_events(&self) -> Vec<T30Event> {
let mut session = self.session.lock().await;
let events: Vec<_> = session.events.drain(..).collect();
events
}
pub async fn reset(&self) {
self.session.lock().await.reset();
self.recv_buf.lock().await.reset(1);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::t38::ifp::DataFieldType;
fn hdlc_field(frame_type: u8, data: &[u8]) -> DataField {
let mut frame = vec![0xFF, 0xFF, frame_type];
frame.extend_from_slice(data);
DataField {
field_type: DataFieldType::HdlcFcsOk,
data: frame,
}
}
#[tokio::test]
async fn test_fax_endpoint_create_and_send_recv() {
let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let a_addr = a.local_addr().unwrap();
let b_addr = b.local_addr().unwrap();
let ta = Arc::new(a);
let tb = Arc::new(b);
let fax_a = FaxEndpoint::from_socket(ta, b_addr, T30Session::new(T30FaxConfig::default()));
let fax_b = FaxEndpoint::from_socket(tb, a_addr, T30Session::new(T30FaxConfig::default()));
fax_a.send_indicator(T30Indicator::Cng).await.unwrap();
fax_a.session.lock().await.start_calling();
let recv = tokio::time::timeout(std::time::Duration::from_secs(1), fax_b.recv())
.await
.unwrap();
assert!(
matches!(recv, Some(IfpPacket::T30Indicator(ref v)) if v.contains(&T30Indicator::Cng))
);
}
#[tokio::test]
async fn test_fax_endpoint_indicator_roundtrip() {
let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let a_addr = a.local_addr().unwrap();
let b_addr = b.local_addr().unwrap();
let fax_a = FaxEndpoint::from_socket(
Arc::new(a),
b_addr,
T30Session::new(T30FaxConfig::default()),
);
let fax_b = FaxEndpoint::from_socket(
Arc::new(b),
a_addr,
T30Session::new(T30FaxConfig::default()),
);
for ind in &[
T30Indicator::Cng,
T30Indicator::Ced,
T30Indicator::V21Preamble,
] {
fax_a.send_indicator(*ind).await.unwrap();
let recv = tokio::time::timeout(std::time::Duration::from_millis(500), fax_b.recv())
.await
.unwrap();
assert!(matches!(recv, Some(IfpPacket::T30Indicator(ref v)) if v.contains(ind)));
}
}
#[tokio::test]
async fn test_fax_endpoint_data_roundtrip() {
let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let a_addr = a.local_addr().unwrap();
let b_addr = b.local_addr().unwrap();
let fax_a = FaxEndpoint::from_socket(
Arc::new(a),
b_addr,
T30Session::new(T30FaxConfig::default()),
);
let fax_b = FaxEndpoint::from_socket(
Arc::new(b),
a_addr,
T30Session::new(T30FaxConfig::default()),
);
let data = vec![0xFF, 0x01, 0x02, 0x80, 0x20];
fax_a
.send_data(vec![hdlc_field(0x01, &data)])
.await
.unwrap();
let recv = tokio::time::timeout(std::time::Duration::from_secs(1), fax_b.recv())
.await
.unwrap();
assert!(matches!(recv, Some(IfpPacket::T30Data(_))));
}
#[tokio::test]
async fn test_fax_endpoint_recv_timeout() {
let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let a_addr = a.local_addr().unwrap();
let b_addr = b.local_addr().unwrap();
let fax_b = FaxEndpoint::from_socket(
Arc::new(b),
a_addr,
T30Session::new(T30FaxConfig::default()),
);
let result = fax_b
.recv_timeout(std::time::Duration::from_millis(50))
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_fax_endpoint_bind() {
let session = T30Session::new(T30FaxConfig::default());
let remote = "127.0.0.1:9999".parse().unwrap();
let fax = FaxEndpoint::bind(
"127.0.0.1:0".parse().unwrap(),
remote,
session,
UdtlConfig::default(),
)
.await
.unwrap();
assert_eq!(
fax.transport.local_addr().unwrap().ip().to_string(),
"127.0.0.1"
);
}
}