rustpbx 0.4.4

A SIP PBX implementation in Rust
Documentation
use crate::call::RouteContext;
use crate::call::app::CallController;
use crate::call::app::{AppAction, ApplicationContext, CallApp, CallAppType};
use crate::rwi::gateway::{RwiGateway, SessionId};
use crate::rwi::proto::RwiEvent;
use crate::rwi::session::OwnershipMode;
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::RwLock;

pub const RWI_APP_NAME: &str = "rwi";

#[derive(Clone)]
pub struct RwiAddon {
    gateway: Arc<RwLock<RwiGateway>>,
}

impl RwiAddon {
    pub fn new(gateway: Arc<RwLock<RwiGateway>>) -> Self {
        Self { gateway }
    }
}

#[async_trait]
impl crate::call::CallAppFactory for RwiAddon {
    async fn create_app(
        &self,
        app_name: &str,
        _context: &RouteContext<'_>,
        params: &serde_json::Value,
    ) -> Option<Box<dyn CallApp>> {
        if app_name != RWI_APP_NAME {
            return None;
        }

        let context_name = params
            .get("context")
            .and_then(|v| v.as_str())
            .unwrap_or("default")
            .to_string();

        let session_id = params
            .get("session_id")
            .and_then(|v| v.as_str())
            .map(String::from);

        Some(Box::new(RwiApp::new(
            context_name,
            session_id,
            self.gateway.clone(),
        )))
    }
}

pub struct RwiApp {
    context_name: String,
    session_id: Option<SessionId>,
    gateway: Arc<RwLock<RwiGateway>>,
    owned: bool,
    /// The call_id of the call this app is handling, set in `on_enter`.
    owned_call_id: Option<String>,
    /// Track ID of the currently playing audio, if any.
    #[allow(dead_code)]
    current_track_id: Option<String>,
    /// If `true`, the next DTMF digit will interrupt the current playback.
    #[allow(dead_code)]
    interrupt_on_dtmf: bool,
}

impl RwiApp {
    pub fn new(
        context_name: String,
        session_id: Option<SessionId>,
        gateway: Arc<RwLock<RwiGateway>>,
    ) -> Self {
        Self {
            context_name,
            session_id,
            gateway,
            owned: false,
            owned_call_id: None,
            current_track_id: None,
            interrupt_on_dtmf: false,
        }
    }

    async fn send_event(&self, event: RwiEvent) {
        let gw = self.gateway.read().await;
        if let Some(session_id) = &self.session_id {
            gw.send_event_to_session(session_id, &event);
        }
        // Get call_id from event if available
        let call_id = event
            .call_id()
            .map(|s| s.to_string())
            .unwrap_or_else(|| self.context_name.clone());
        gw.fan_out_event_to_context(&self.context_name, &event, &call_id);
    }
}

#[async_trait]
impl CallApp for RwiApp {
    fn app_type(&self) -> CallAppType {
        CallAppType::Custom
    }

    fn name(&self) -> &str {
        RWI_APP_NAME
    }

    async fn on_enter(
        &mut self,
        _controller: &mut CallController,
        context: &ApplicationContext,
    ) -> anyhow::Result<AppAction> {
        let call_id = context.call_info.session_id.clone();

        // Populate CallMetaStore for event enrichment
        {
            let gateway = self.gateway.read().await;
            gateway
                .meta_store
                .insert(
                    call_id.clone(),
                    crate::rwi::proto::CallMeta {
                        caller: Some(context.call_info.caller.clone()),
                        callee: Some(context.call_info.callee.clone()),
                        direction: Some(context.call_info.direction.clone()),
                        ..Default::default()
                    },
                )
                .await;
        }

        self.send_event(RwiEvent::CallIncoming(
            crate::rwi::proto::CallIncomingData {
                call_id: call_id.clone(),
                context: self.context_name.clone(),
                caller: context.call_info.caller.clone(),
                callee: context.call_info.callee.clone(),
                dial_direction: context.call_info.direction.clone(),
                trunk: None,
                sip_headers: std::collections::HashMap::new(),
                root_call_id: None,
                ani: None,
                dnis: None,
                called_phone: None,
                app_id: None,
                routing_target: None,
                uuid: None,
                routing_path: None,
            },
        ))
        .await;

        if let Some(session_id) = &self.session_id {
            let claim_ok = {
                let mut gateway = self.gateway.write().await;
                gateway
                    .claim_call_ownership(session_id, call_id.clone(), OwnershipMode::Control)
                    .await
                    .is_ok()
            };

            if claim_ok {
                self.owned = true;
                self.owned_call_id = Some(call_id.clone());
                self.send_event(RwiEvent::CallAnswered {
                    call_id: call_id.clone(),
                    context: Default::default(),
                })
                .await;
            }
        }

        Ok(AppAction::Continue)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::rwi::auth::RwiIdentity;

    fn create_test_gateway() -> Arc<RwLock<RwiGateway>> {
        Arc::new(RwLock::new(RwiGateway::new()))
    }

    /// Verifies that when there is no session_id (anonymous context app),
    /// only CallIncoming is emitted — no CallAnswered.
    #[tokio::test]
    async fn test_on_enter_without_session_id_no_call_answered() {
        let gateway = create_test_gateway();
        let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();

        {
            let mut gw = gateway.write().await;
            let identity = RwiIdentity {
                token: "sub".into(),
                scopes: vec![],
            };
            let session = gw.create_session(identity);
            let sid = session.read().await.id.clone();
            gw.set_session_event_sender(&sid, event_tx);
            gw.subscribe(&sid, vec!["ctx-anon".to_string()], None).await;
        }

        let app = RwiApp::new("ctx-anon".to_string(), None, gateway.clone());
        app.send_event(crate::rwi::proto::RwiEvent::CallIncoming(
            crate::rwi::proto::CallIncomingData {
                call_id: "c-anon".to_string(),
                context: "ctx-anon".to_string(),
                caller: "1002".to_string(),
                callee: "2001".to_string(),
                dial_direction: "inbound".to_string(),
                trunk: None,
                sip_headers: std::collections::HashMap::new(),
                root_call_id: None,
                ani: None,
                dnis: None,
                called_phone: None,
                app_id: None,
                routing_target: None,
                uuid: None,
                routing_path: None,
            },
        ))
        .await;

        let ev = event_rx.try_recv().expect("CallIncoming should arrive");
        let ev_str = serde_json::to_string(&ev).unwrap();
        assert!(
            ev_str.contains("call_incoming") || ev_str.contains("CallIncoming"),
            "expected call_incoming, got: {ev_str}"
        );
        assert!(
            event_rx.try_recv().is_err(),
            "no CallAnswered should be emitted when there is no session_id"
        );
    }
}