atm0s_media_server_transport_sip/
server.rs1use std::{
2 collections::{HashMap, VecDeque},
3 net::SocketAddr,
4 time::Duration,
5};
6
7use async_std::{net::UdpSocket, stream::Interval};
8use futures::{select, FutureExt, StreamExt};
9use media_utils::{SystemTimer, Timer};
10use rsip::headers::CallId;
11
12use crate::{
15 sip_request::SipRequest,
16 sip_response::SipResponse,
17 virtual_socket::{VirtualSocket, VirtualSocketContext, VirtualSocketPlane},
18 GroupId, SipCore, SipMessage, SipServerEvent,
19};
20
21#[derive(Debug)]
22pub enum SipServerSocketError {
23 MessageParseError,
24 NetworkError(std::io::Error),
25}
26
27pub enum SipServerSocketMessage {
28 Continue,
29 RegisterValidate(GroupId, String, String, String, String, String),
30 InCall(VirtualSocket<GroupId, SipMessage>, SipRequest),
31}
32
33struct RemoteInfo {
34 username: String,
35 accepted: bool,
36}
37
38pub struct SipServerSocket {
39 main_socket: UdpSocket,
40 buf: [u8; 2048],
41 sip_core: SipCore,
42 timer: SystemTimer,
43 interval: Interval,
44 remote_users: HashMap<SocketAddr, RemoteInfo>,
45 virtual_socket_plane: VirtualSocketPlane<GroupId, SipMessage>,
46}
47
48impl SipServerSocket {
49 pub async fn new(bind_addr: SocketAddr) -> Self {
50 log::info!("Listerning on port 5060 for UDP");
51 Self {
52 main_socket: UdpSocket::bind(bind_addr).await.expect("Should bind udp socket"),
53 buf: [0u8; 2048],
54 sip_core: SipCore::new(),
55 timer: SystemTimer(),
56 interval: async_std::stream::interval(Duration::from_millis(100)),
57 remote_users: HashMap::new(),
58 virtual_socket_plane: Default::default(),
59 }
60 }
61
62 pub fn accept_register(&mut self, session: &GroupId) {
63 log::info!("Accept register {:?}", session);
64 if let Some(remote_info) = self.remote_users.get_mut(&session.addr()) {
65 remote_info.accepted = true;
66 }
67 self.sip_core.reply_register_validate(session, true);
68 }
69
70 pub fn reject_register(&mut self, session: &GroupId) {
71 log::info!("Reject register {:?}", session);
72 self.remote_users.remove(&session.addr());
73 self.sip_core.reply_register_validate(session, false);
74 }
75
76 pub fn create_call(&mut self, call_id: &CallId, dest: SocketAddr) -> VirtualSocket<GroupId, SipMessage> {
77 let group_id = GroupId::from_raw(dest, &call_id);
78 self.sip_core.open_out_call(&group_id);
79 self.virtual_socket_plane.new_socket(group_id, VirtualSocketContext { remote_addr: dest, username: None })
80 }
81
82 pub async fn recv(&mut self) -> Result<SipServerSocketMessage, SipServerSocketError> {
83 while let Some(output) = self.sip_core.pop_action() {
84 match output {
85 SipServerEvent::OnRegisterValidate(group, digest, nonce, username, realm, hashed_password) => {
86 log::info!("Register validate {} {}", username, hashed_password);
87 self.remote_users.insert(
88 group.addr(),
89 RemoteInfo {
90 username: username.clone(),
91 accepted: false,
92 },
93 );
94 return Ok(SipServerSocketMessage::RegisterValidate(group, digest, nonce, username, realm, hashed_password));
95 }
96 SipServerEvent::OnInCallStarted(group_id, req) => {
97 log::info!("InCall started {:?}", group_id);
98 let ctx = VirtualSocketContext {
99 remote_addr: group_id.addr(),
100 username: match self.remote_users.get(&group_id.addr()) {
101 Some(remote_info) => {
102 if remote_info.accepted {
103 Some(remote_info.username.clone())
104 } else {
105 None
106 }
107 }
108 None => None,
109 },
110 };
111 let socket = self.virtual_socket_plane.new_socket(group_id, ctx);
112 return Ok(SipServerSocketMessage::InCall(socket, req));
113 }
114 SipServerEvent::OnInCallRequest(group_id, req) => {
115 self.virtual_socket_plane
116 .forward(&group_id, SipMessage::Request(req))
117 .expect("Should forward to correct virtual socket");
118 }
119 SipServerEvent::OnInCallResponse(group_id, res) => {
120 self.virtual_socket_plane
121 .forward(&group_id, SipMessage::Response(res))
122 .expect("Should forward to correct virtual socket");
123 }
124 SipServerEvent::OnOutCallRequest(group_id, req) => {
125 self.virtual_socket_plane
126 .forward(&group_id, SipMessage::Request(req))
127 .expect("Should forward to correct virtual socket");
128 }
129 SipServerEvent::OnOutCallResponse(group_id, res) => {
130 self.virtual_socket_plane
131 .forward(&group_id, SipMessage::Response(res))
132 .expect("Should forward to correct virtual socket");
133 }
134 SipServerEvent::SendRes(dest, res) => {
135 let buf = res.to_bytes();
136 log::info!("Send res to {} {}", dest, String::from_utf8_lossy(&buf));
137 if let Err(e) = self.main_socket.send_to(&buf, dest).await {
138 log::error!("Sending udp to {dest} error {:?}", e);
139 }
140 }
141 SipServerEvent::SendReq(dest, req) => {
142 let buf = req.to_bytes();
143 log::debug!("Send req to {} {}", dest, String::from_utf8_lossy(&buf));
144 if let Err(e) = self.main_socket.send_to(&buf, dest).await {
145 log::error!("Sending udp to {dest} error {:?}", e);
146 }
147 }
148 }
149 }
150
151 let mut out_msgs = VecDeque::new();
152 select! {
153 _ = self.interval.next().fuse() => {
154 self.sip_core.on_tick(self.timer.now_ms());
155 },
156 e = self.virtual_socket_plane.recv().fuse() => {
157 match e {
158 Some((group_id, msg)) => {
159 match msg {
160 Some((dest, msg)) => {
161 let dest = dest.unwrap_or(group_id.addr());
162 log::info!("Group {:?} send to {} {}", group_id, dest, msg);
163 out_msgs.push_back((dest, msg));
164 },
165 None => {
166 log::info!("Group {:?} close socket", group_id);
167 self.virtual_socket_plane.close_socket(&group_id);
168 self.sip_core.close_in_call(&group_id);
169 self.sip_core.close_out_call(&group_id);
170 }
171 }
172 },
173 None => {}
174 }
175 },
176 e = self.main_socket.recv_from(&mut self.buf).fuse() => {
177 match e {
178 Ok((0..=4, addr)) => {
179 log::info!("Ping from {}", addr);
180 }
181 Ok((len, addr)) => {
182 log::info!("Recv from {}\n{}", addr, String::from_utf8(self.buf[..len].to_vec()).unwrap());
183 let req = match rsip::SipMessage::try_from(&self.buf[..len]) {
184 Ok(req) => req,
185 Err(e) => {
186 log::warn!("Can not parse request: {} {:?}", e, &self.buf[..len]);
187 return Err(SipServerSocketError::MessageParseError);
188 }
189 };
190
191 match req {
192 rsip::SipMessage::Request(req) => {
193 match SipRequest::from(req) {
194 Ok(req) => {
195 log::debug!("on req from {} {}", addr, req.method());
196 if let Err(e) = self.sip_core.on_req(self.timer.now_ms(), addr, req) {
197 log::error!("Process sip request error {:?}", e);
198 }
199 },
200 Err(e) => {
201 log::warn!("Can not parse request: {:?}", e);
202 return Err(SipServerSocketError::MessageParseError);
203 }
204 }
205 }
206 rsip::SipMessage::Response(res) => {
207 match SipResponse::from(res) {
208 Ok(res) => {
209 log::info!("on res from {} {}", addr, res.raw.status_code());
210 if let Err(e) = self.sip_core.on_res(self.timer.now_ms(), addr, res) {
211 log::error!("Process sip response error {:?}", e);
212 }
213 },
214 Err(e) => {
215 log::warn!("Can not parse response: {:?}", e);
216 return Err(SipServerSocketError::MessageParseError);
217 }
218 }
219 }
220 }
221 },
222 Err(e) => {
223 log::warn!("Can not recv_from: {}", e);
224 return Err(SipServerSocketError::NetworkError(e));
225 }
226 };
227 }
228 };
229
230 while let Some((dest, msg)) = out_msgs.pop_front() {
231 let buf = msg.to_bytes();
232 log::info!("Send to {}\n{}", dest, String::from_utf8_lossy(&buf));
233 if let Err(e) = self.main_socket.send_to(&buf, dest).await {
234 log::error!("Sending udp to {dest} error {:?}", e);
235 }
236 }
237
238 Ok(SipServerSocketMessage::Continue)
239 }
240}