atm0s_media_server_transport_sip/
virtual_socket.rs1use std::{collections::HashMap, fmt::Debug, hash::Hash, net::SocketAddr};
2
3use async_std::channel::{bounded, Receiver, Sender};
4
5pub struct VirtualSocketContext {
6 pub remote_addr: SocketAddr,
7 pub username: Option<String>,
8}
9
10pub enum VirtualSocketError {
11 ChannelFull,
12 ChannelClosed,
13}
14
15pub struct VirtualSocket<ID: Debug + Clone, MSG> {
16 id: ID,
17 main_tx: Sender<(ID, Option<(Option<SocketAddr>, MSG)>)>,
18 rx: Receiver<MSG>,
19 closed: bool,
20 ctx: VirtualSocketContext,
21}
22
23impl<ID: Debug + Clone, MSG> VirtualSocket<ID, MSG> {
24 pub fn ctx(&self) -> &VirtualSocketContext {
25 &self.ctx
26 }
27
28 pub fn send_to(&self, dest: Option<SocketAddr>, msg: MSG) -> Result<(), VirtualSocketError> {
29 self.main_tx.try_send((self.id.clone(), Some((dest, msg)))).map_err(|_| VirtualSocketError::ChannelFull)
30 }
31
32 pub async fn recv(&self) -> Result<MSG, VirtualSocketError> {
33 self.rx.recv().await.map_err(|_| VirtualSocketError::ChannelClosed)
34 }
35
36 pub async fn close(&mut self) {
37 self.closed = true;
38 if let Err(e) = self.main_tx.send((self.id.clone(), None)).await {
39 log::error!("[VirtualSocket {:?}] close error {:?}", self.id, e);
40 }
41 }
42}
43
44impl<ID: Debug + Clone, MSG> Drop for VirtualSocket<ID, MSG> {
45 fn drop(&mut self) {
46 if !self.closed {
47 log::error!("[VirtualSocket {:?}] drop without close", self.id);
48 if let Err(e) = self.main_tx.try_send((self.id.clone(), None)) {
49 log::error!("[VirtualSocket {:?}] close error {:?}", self.id, e);
50 }
51 }
52 }
53}
54pub struct VirtualSocketPlane<ID, MSG> {
55 sockets: HashMap<ID, Sender<MSG>>,
56 main_tx: Sender<(ID, Option<(Option<SocketAddr>, MSG)>)>,
57 main_rx: Receiver<(ID, Option<(Option<SocketAddr>, MSG)>)>,
58}
59
60impl<ID, MSG> Default for VirtualSocketPlane<ID, MSG> {
61 fn default() -> Self {
62 let (main_tx, main_rx) = bounded(1000);
63 Self {
64 sockets: HashMap::new(),
65 main_tx,
66 main_rx,
67 }
68 }
69}
70
71impl<ID: Debug + Clone + Hash + Eq, MSG> VirtualSocketPlane<ID, MSG> {
72 pub fn new_socket(&mut self, id: ID, ctx: VirtualSocketContext) -> VirtualSocket<ID, MSG> {
73 log::info!("Create socket for {:?}", id);
74 let (tx, rx) = bounded(1000);
75 self.sockets.insert(id.clone(), tx);
76 VirtualSocket {
77 id,
78 main_tx: self.main_tx.clone(),
79 rx,
80 closed: false,
81 ctx,
82 }
83 }
84
85 pub fn forward(&mut self, id: &ID, msg: MSG) -> Option<()> {
86 let tx = self.sockets.get_mut(&id)?;
87 tx.try_send(msg).ok()
88 }
89
90 pub async fn recv(&mut self) -> Option<(ID, Option<(Option<SocketAddr>, MSG)>)> {
91 self.main_rx.recv().await.ok()
92 }
93
94 pub fn close_socket(&mut self, id: &ID) {
95 log::info!("Close socket {:?}", id);
96 self.sockets.remove(id);
97 }
98}