atm0s_media_server_transport_sip/
virtual_socket.rs

1use std::{collections::HashMap, fmt::Debug, hash::Hash, net::SocketAddr};
2
3use async_std::channel::{bounded, Receiver, Sender};
4
5pub struct VirtualSocketContext {
6    pub remote_addr: SocketAddr,
7    pub username: Option<String>,
8}
9
10pub enum VirtualSocketError {
11    ChannelFull,
12    ChannelClosed,
13}
14
15pub struct VirtualSocket<ID: Debug + Clone, MSG> {
16    id: ID,
17    main_tx: Sender<(ID, Option<(Option<SocketAddr>, MSG)>)>,
18    rx: Receiver<MSG>,
19    closed: bool,
20    ctx: VirtualSocketContext,
21}
22
23impl<ID: Debug + Clone, MSG> VirtualSocket<ID, MSG> {
24    pub fn ctx(&self) -> &VirtualSocketContext {
25        &self.ctx
26    }
27
28    pub fn send_to(&self, dest: Option<SocketAddr>, msg: MSG) -> Result<(), VirtualSocketError> {
29        self.main_tx.try_send((self.id.clone(), Some((dest, msg)))).map_err(|_| VirtualSocketError::ChannelFull)
30    }
31
32    pub async fn recv(&self) -> Result<MSG, VirtualSocketError> {
33        self.rx.recv().await.map_err(|_| VirtualSocketError::ChannelClosed)
34    }
35
36    pub async fn close(&mut self) {
37        self.closed = true;
38        if let Err(e) = self.main_tx.send((self.id.clone(), None)).await {
39            log::error!("[VirtualSocket {:?}] close error {:?}", self.id, e);
40        }
41    }
42}
43
44impl<ID: Debug + Clone, MSG> Drop for VirtualSocket<ID, MSG> {
45    fn drop(&mut self) {
46        if !self.closed {
47            log::error!("[VirtualSocket {:?}] drop without close", self.id);
48            if let Err(e) = self.main_tx.try_send((self.id.clone(), None)) {
49                log::error!("[VirtualSocket {:?}] close error {:?}", self.id, e);
50            }
51        }
52    }
53}
54pub struct VirtualSocketPlane<ID, MSG> {
55    sockets: HashMap<ID, Sender<MSG>>,
56    main_tx: Sender<(ID, Option<(Option<SocketAddr>, MSG)>)>,
57    main_rx: Receiver<(ID, Option<(Option<SocketAddr>, MSG)>)>,
58}
59
60impl<ID, MSG> Default for VirtualSocketPlane<ID, MSG> {
61    fn default() -> Self {
62        let (main_tx, main_rx) = bounded(1000);
63        Self {
64            sockets: HashMap::new(),
65            main_tx,
66            main_rx,
67        }
68    }
69}
70
71impl<ID: Debug + Clone + Hash + Eq, MSG> VirtualSocketPlane<ID, MSG> {
72    pub fn new_socket(&mut self, id: ID, ctx: VirtualSocketContext) -> VirtualSocket<ID, MSG> {
73        log::info!("Create socket for {:?}", id);
74        let (tx, rx) = bounded(1000);
75        self.sockets.insert(id.clone(), tx);
76        VirtualSocket {
77            id,
78            main_tx: self.main_tx.clone(),
79            rx,
80            closed: false,
81            ctx,
82        }
83    }
84
85    pub fn forward(&mut self, id: &ID, msg: MSG) -> Option<()> {
86        let tx = self.sockets.get_mut(&id)?;
87        tx.try_send(msg).ok()
88    }
89
90    pub async fn recv(&mut self) -> Option<(ID, Option<(Option<SocketAddr>, MSG)>)> {
91        self.main_rx.recv().await.ok()
92    }
93
94    pub fn close_socket(&mut self, id: &ID) {
95        log::info!("Close socket {:?}", id);
96        self.sockets.remove(id);
97    }
98}