rustpbx 0.4.6

A SIP PBX implementation in Rust
Documentation
use crate::config::{MediaProxyMode, ProxyConfig};
use crate::proxy::routing::TrunkConfig;
use crate::proxy::tests::e2e_test_server::E2eTestServer;
use anyhow::Result;
use rsipstack::sip::{
    Method, SipMessage, Version,
    headers::CallId,
    headers::typed::CSeq,
    typed::{From as FromHeader, To as ToHeader, Via},
    uri::Tag,
    Param,
};
use rsipstack::transaction::key::{TransactionKey, TransactionRole};
use rsipstack::transaction::transaction::Transaction;
use rsipstack::transport::TransportLayer;
use rsipstack::transport::udp::UdpConnection;
use rsipstack::EndpointBuilder;
use std::collections::HashMap;
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::info;

fn trunk_options_proxy_config() -> ProxyConfig {
    let mut trunks = HashMap::new();
    trunks.insert(
        "carrier_a".to_string(),
        TrunkConfig {
            dest: "sip:10.68.144.74:5060".to_string(),
            inbound_hosts: vec!["10.68.144.74".to_string()],
            ..Default::default()
        },
    );
    ProxyConfig {
        media_proxy: MediaProxyMode::All,
        trunks,
        ..Default::default()
    }
}

async fn send_options(
    server_addr: &str,
    local_port: u16,
) -> Result<Option<u16>> {
    let cancel = CancellationToken::new();
    let tl = TransportLayer::new(cancel.child_token());
    let udp = UdpConnection::create_connection(
        format!("127.0.0.1:{local_port}").parse()?,
        None,
        Some(cancel.child_token()),
    )
    .await?;
    tl.add_transport(udp.into());

    let mut builder = EndpointBuilder::new();
    builder.with_user_agent("test-options-client/1.0");
    builder.with_transport_layer(tl);
    builder.with_cancel_token(cancel.child_token());
    builder.with_timer_interval(Duration::from_millis(50));
    let endpoint = builder.build();

    let ep_inner = endpoint.inner.clone();
    let ct = cancel.clone();
    crate::utils::spawn(async move {
        tokio::select! {
            _ = ct.cancelled() => {}
            r = ep_inner.serve() => { if let Err(e) = r { tracing::warn!("client serve: {e}"); } }
        }
    });

    sleep(Duration::from_millis(100)).await;

    let local_addr = format!("127.0.0.1:{local_port}");
    let branch = format!(
        "z9hG4bK-test{}",
        uuid::Uuid::new_v4().to_string().replace('-', "")
    );
    let tag = uuid::Uuid::new_v4()
        .to_string()
        .split('-')
        .next()
        .unwrap_or("tag")
        .to_string();

    let via = Via::parse(&format!("SIP/2.0/UDP {local_addr};branch={branch}"))?;
    let from = FromHeader {
        display_name: None,
        uri: format!("sip:health-check@{local_addr}").parse()?,
        params: vec![Param::Tag(Tag::new(&tag))],
    };
    let to = ToHeader {
        display_name: None,
        uri: format!("sip:{server_addr}").parse()?,
        params: vec![],
    };
    let call_id = CallId::new(format!(
        "test-{}",
        uuid::Uuid::new_v4().to_string().replace('-', "")
    ));

    let request = rsipstack::sip::Request {
        method: Method::Options,
        uri: format!("sip:{server_addr}").parse()?,
        version: Version::V2,
        headers: vec![
            via.into(),
            from.into(),
            to.into(),
            call_id.into(),
            CSeq {
                seq: 1,
                method: Method::Options,
            }
            .into(),
        ]
        .into(),
        body: vec![],
    };

    let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
    let mut tx = Transaction::new_client(key, request, endpoint.inner.clone(), None);
    tx.send().await?;

    let result = tokio::time::timeout(Duration::from_secs(3), tx.receive()).await;
    cancel.cancel();

    match result {
        Ok(Some(SipMessage::Response(resp))) => Ok(Some(u16::from(resp.status_code))),
        Ok(Some(_)) => Ok(None),
        Ok(None) => Ok(None),
        Err(_) => Ok(None),
    }
}

#[tokio::test]
async fn test_options_from_trunk_ip_gets_200_ok() -> Result<()> {
    let _ = tracing_subscriber::fmt::try_init();

    let mut config = trunk_options_proxy_config();
    config.trunks.insert(
        "local_test".to_string(),
        TrunkConfig {
            dest: "sip:carrier.example.com:5060".to_string(),
            inbound_hosts: vec!["127.0.0.1".to_string()],
            ..Default::default()
        },
    );

    let server = E2eTestServer::start_with_config(config).await?;
    sleep(Duration::from_millis(200)).await;

    let client_port = portpicker::pick_unused_port().unwrap();
    let server_addr = format!("127.0.0.1:{}", server.port);
    let status = send_options(&server_addr, client_port).await?;

    assert_eq!(status, Some(200), "OPTIONS from trunk IP should get 200 OK");

    server.stop();
    info!("test_options_from_trunk_ip_gets_200_ok PASSED");
    Ok(())
}

#[tokio::test]
async fn test_options_from_unknown_ip_gets_no_response() -> Result<()> {
    let _ = tracing_subscriber::fmt::try_init();

    let mut config = trunk_options_proxy_config();
    config.trunks.insert(
        "carrier_only".to_string(),
        TrunkConfig {
            dest: "sip:10.68.144.74:5060".to_string(),
            inbound_hosts: vec!["10.68.144.74".to_string()],
            ..Default::default()
        },
    );

    let server = E2eTestServer::start_with_config(config).await?;
    sleep(Duration::from_millis(200)).await;

    let client_port = portpicker::pick_unused_port().unwrap();
    let server_addr = format!("127.0.0.1:{}", server.port);
    let status = send_options(&server_addr, client_port).await?;

    assert_eq!(
        status, None,
        "OPTIONS from non-trunk IP should get no response (dropped)"
    );

    server.stop();
    info!("test_options_from_unknown_ip_gets_no_response PASSED");
    Ok(())
}