atm0s_media_server_transport_sip/
server.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    net::SocketAddr,
4    time::Duration,
5};
6
7use async_std::{net::UdpSocket, stream::Interval};
8use futures::{select, FutureExt, StreamExt};
9use media_utils::{SystemTimer, Timer};
10use rsip::headers::CallId;
11
12//TODO implement simple firewall here for blocking some ip addresses
13
14use crate::{
15    sip_request::SipRequest,
16    sip_response::SipResponse,
17    virtual_socket::{VirtualSocket, VirtualSocketContext, VirtualSocketPlane},
18    GroupId, SipCore, SipMessage, SipServerEvent,
19};
20
21#[derive(Debug)]
22pub enum SipServerSocketError {
23    MessageParseError,
24    NetworkError(std::io::Error),
25}
26
27pub enum SipServerSocketMessage {
28    Continue,
29    RegisterValidate(GroupId, String, String, String, String, String),
30    InCall(VirtualSocket<GroupId, SipMessage>, SipRequest),
31}
32
33struct RemoteInfo {
34    username: String,
35    accepted: bool,
36}
37
38pub struct SipServerSocket {
39    main_socket: UdpSocket,
40    buf: [u8; 2048],
41    sip_core: SipCore,
42    timer: SystemTimer,
43    interval: Interval,
44    remote_users: HashMap<SocketAddr, RemoteInfo>,
45    virtual_socket_plane: VirtualSocketPlane<GroupId, SipMessage>,
46}
47
48impl SipServerSocket {
49    pub async fn new(bind_addr: SocketAddr) -> Self {
50        log::info!("Listerning on port 5060 for UDP");
51        Self {
52            main_socket: UdpSocket::bind(bind_addr).await.expect("Should bind udp socket"),
53            buf: [0u8; 2048],
54            sip_core: SipCore::new(),
55            timer: SystemTimer(),
56            interval: async_std::stream::interval(Duration::from_millis(100)),
57            remote_users: HashMap::new(),
58            virtual_socket_plane: Default::default(),
59        }
60    }
61
62    pub fn accept_register(&mut self, session: &GroupId) {
63        log::info!("Accept register {:?}", session);
64        if let Some(remote_info) = self.remote_users.get_mut(&session.addr()) {
65            remote_info.accepted = true;
66        }
67        self.sip_core.reply_register_validate(session, true);
68    }
69
70    pub fn reject_register(&mut self, session: &GroupId) {
71        log::info!("Reject register {:?}", session);
72        self.remote_users.remove(&session.addr());
73        self.sip_core.reply_register_validate(session, false);
74    }
75
76    pub fn create_call(&mut self, call_id: &CallId, dest: SocketAddr) -> VirtualSocket<GroupId, SipMessage> {
77        let group_id = GroupId::from_raw(dest, &call_id);
78        self.sip_core.open_out_call(&group_id);
79        self.virtual_socket_plane.new_socket(group_id, VirtualSocketContext { remote_addr: dest, username: None })
80    }
81
82    pub async fn recv(&mut self) -> Result<SipServerSocketMessage, SipServerSocketError> {
83        while let Some(output) = self.sip_core.pop_action() {
84            match output {
85                SipServerEvent::OnRegisterValidate(group, digest, nonce, username, realm, hashed_password) => {
86                    log::info!("Register validate {} {}", username, hashed_password);
87                    self.remote_users.insert(
88                        group.addr(),
89                        RemoteInfo {
90                            username: username.clone(),
91                            accepted: false,
92                        },
93                    );
94                    return Ok(SipServerSocketMessage::RegisterValidate(group, digest, nonce, username, realm, hashed_password));
95                }
96                SipServerEvent::OnInCallStarted(group_id, req) => {
97                    log::info!("InCall started {:?}", group_id);
98                    let ctx = VirtualSocketContext {
99                        remote_addr: group_id.addr(),
100                        username: match self.remote_users.get(&group_id.addr()) {
101                            Some(remote_info) => {
102                                if remote_info.accepted {
103                                    Some(remote_info.username.clone())
104                                } else {
105                                    None
106                                }
107                            }
108                            None => None,
109                        },
110                    };
111                    let socket = self.virtual_socket_plane.new_socket(group_id, ctx);
112                    return Ok(SipServerSocketMessage::InCall(socket, req));
113                }
114                SipServerEvent::OnInCallRequest(group_id, req) => {
115                    self.virtual_socket_plane
116                        .forward(&group_id, SipMessage::Request(req))
117                        .expect("Should forward to correct virtual socket");
118                }
119                SipServerEvent::OnInCallResponse(group_id, res) => {
120                    self.virtual_socket_plane
121                        .forward(&group_id, SipMessage::Response(res))
122                        .expect("Should forward to correct virtual socket");
123                }
124                SipServerEvent::OnOutCallRequest(group_id, req) => {
125                    self.virtual_socket_plane
126                        .forward(&group_id, SipMessage::Request(req))
127                        .expect("Should forward to correct virtual socket");
128                }
129                SipServerEvent::OnOutCallResponse(group_id, res) => {
130                    self.virtual_socket_plane
131                        .forward(&group_id, SipMessage::Response(res))
132                        .expect("Should forward to correct virtual socket");
133                }
134                SipServerEvent::SendRes(dest, res) => {
135                    let buf = res.to_bytes();
136                    log::info!("Send res to {} {}", dest, String::from_utf8_lossy(&buf));
137                    if let Err(e) = self.main_socket.send_to(&buf, dest).await {
138                        log::error!("Sending udp to {dest} error {:?}", e);
139                    }
140                }
141                SipServerEvent::SendReq(dest, req) => {
142                    let buf = req.to_bytes();
143                    log::debug!("Send req to {} {}", dest, String::from_utf8_lossy(&buf));
144                    if let Err(e) = self.main_socket.send_to(&buf, dest).await {
145                        log::error!("Sending udp to {dest} error {:?}", e);
146                    }
147                }
148            }
149        }
150
151        let mut out_msgs = VecDeque::new();
152        select! {
153            _ = self.interval.next().fuse() => {
154                self.sip_core.on_tick(self.timer.now_ms());
155            },
156            e = self.virtual_socket_plane.recv().fuse() => {
157                match e {
158                    Some((group_id, msg)) => {
159                        match msg {
160                            Some((dest, msg)) => {
161                                let dest = dest.unwrap_or(group_id.addr());
162                                log::info!("Group {:?} send to {} {}", group_id, dest, msg);
163                                out_msgs.push_back((dest, msg));
164                            },
165                            None => {
166                                log::info!("Group {:?} close socket", group_id);
167                                self.virtual_socket_plane.close_socket(&group_id);
168                                self.sip_core.close_in_call(&group_id);
169                                self.sip_core.close_out_call(&group_id);
170                            }
171                        }
172                    },
173                    None => {}
174                }
175            },
176            e = self.main_socket.recv_from(&mut self.buf).fuse() => {
177                match e {
178                    Ok((0..=4, addr)) => {
179                        log::info!("Ping from {}", addr);
180                    }
181                    Ok((len, addr)) => {
182                        log::info!("Recv from {}\n{}", addr, String::from_utf8(self.buf[..len].to_vec()).unwrap());
183                        let req = match rsip::SipMessage::try_from(&self.buf[..len]) {
184                            Ok(req) => req,
185                            Err(e) => {
186                                log::warn!("Can not parse request: {} {:?}", e, &self.buf[..len]);
187                                return Err(SipServerSocketError::MessageParseError);
188                            }
189                        };
190
191                        match req {
192                            rsip::SipMessage::Request(req) => {
193                                match SipRequest::from(req) {
194                                    Ok(req) => {
195                                        log::debug!("on req from {} {}", addr, req.method());
196                                        if let Err(e) = self.sip_core.on_req(self.timer.now_ms(), addr, req) {
197                                            log::error!("Process sip request error {:?}", e);
198                                        }
199                                    },
200                                    Err(e) => {
201                                        log::warn!("Can not parse request: {:?}", e);
202                                        return Err(SipServerSocketError::MessageParseError);
203                                    }
204                                }
205                            }
206                            rsip::SipMessage::Response(res) => {
207                                match SipResponse::from(res) {
208                                    Ok(res) => {
209                                        log::info!("on res from {} {}", addr, res.raw.status_code());
210                                        if let Err(e) = self.sip_core.on_res(self.timer.now_ms(), addr, res) {
211                                            log::error!("Process sip response error {:?}", e);
212                                        }
213                                    },
214                                    Err(e) => {
215                                        log::warn!("Can not parse response: {:?}", e);
216                                        return Err(SipServerSocketError::MessageParseError);
217                                    }
218                                }
219                            }
220                        }
221                    },
222                    Err(e) => {
223                        log::warn!("Can not recv_from: {}", e);
224                        return Err(SipServerSocketError::NetworkError(e));
225                    }
226                };
227            }
228        };
229
230        while let Some((dest, msg)) = out_msgs.pop_front() {
231            let buf = msg.to_bytes();
232            log::info!("Send to {}\n{}", dest, String::from_utf8_lossy(&buf));
233            if let Err(e) = self.main_socket.send_to(&buf, dest).await {
234                log::error!("Sending udp to {dest} error {:?}", e);
235            }
236        }
237
238        Ok(SipServerSocketMessage::Continue)
239    }
240}