ftth_rsipstack/transport/
channel.rs

1use tokio_util::sync::CancellationToken;
2
3use super::{
4    connection::{TransportReceiver, TransportSender},
5    SipAddr, SipConnection,
6};
7use crate::Result;
8use std::sync::{Arc, Mutex};
9
10struct ChannelInner {
11    incoming: Mutex<Option<TransportReceiver>>,
12    outgoing: TransportSender,
13    addr: SipAddr,
14}
15
16#[derive(Clone)]
17pub struct ChannelConnection {
18    inner: Arc<ChannelInner>,
19    cancel_token: Option<CancellationToken>,
20}
21
22impl ChannelConnection {
23    pub async fn create_connection(
24        incoming: TransportReceiver,
25        outgoing: TransportSender,
26        addr: SipAddr,
27        cancel_token: Option<CancellationToken>,
28    ) -> Result<Self> {
29        let t = ChannelConnection {
30            inner: Arc::new(ChannelInner {
31                incoming: Mutex::new(Some(incoming)),
32                outgoing,
33                addr,
34            }),
35            cancel_token,
36        };
37        Ok(t)
38    }
39
40    pub async fn send(&self, msg: rsip::SipMessage) -> crate::Result<()> {
41        let transport = SipConnection::Channel(self.clone());
42        let source = self.get_addr().clone();
43        self.inner
44            .outgoing
45            .send(super::TransportEvent::Incoming(msg, transport, source))
46            .map_err(|e| e.into())
47    }
48
49    pub fn get_addr(&self) -> &SipAddr {
50        &self.inner.addr
51    }
52
53    pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
54        let mut incoming = match self.inner.clone().incoming.lock().unwrap().take() {
55            Some(incoming) => incoming,
56            None => {
57                return Err(crate::Error::Error(
58                    "ChannelTransport::serve_loop called twice".to_string(),
59                ));
60            }
61        };
62        while let Some(event) = incoming.recv().await {
63            sender.send(event)?;
64        }
65        Ok(())
66    }
67    pub async fn close(&self) -> Result<()> {
68        Ok(())
69    }
70    pub fn cancel_token(&self) -> Option<CancellationToken> {
71        self.cancel_token.clone()
72    }
73}
74
75impl std::fmt::Display for ChannelConnection {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "*:*")
78    }
79}
80
81impl std::fmt::Debug for ChannelConnection {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(f, "*:*")
84    }
85}