atm0s_media_server_transport_sip/
sip.rs1use std::{collections::HashMap, fmt::Display, net::SocketAddr};
2
3use bytes::Bytes;
4use rsip::{
5 headers::{CallId, UntypedHeader},
6 Method,
7};
8
9use crate::processor::Processor;
10
11use self::{
12 processor::{register::RegisterProcessor, ProcessorAction, ProcessorError},
13 sip_request::SipRequest,
14 sip_response::SipResponse,
15};
16
17mod data;
18pub mod processor;
19pub mod sip_request;
20pub mod sip_response;
21mod transaction;
22mod utils;
23
24#[derive(Debug, Hash, PartialEq, Eq, Clone)]
25pub struct GroupId(SocketAddr, String);
26
27impl GroupId {
28 pub fn from_raw(from: SocketAddr, call_id: &CallId) -> Self {
29 Self(from, call_id.value().to_string())
30 }
31
32 pub fn addr(&self) -> SocketAddr {
33 self.0
34 }
35
36 pub fn call_id(&self) -> &str {
37 &self.1
38 }
39}
40
41impl Display for GroupId {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 write!(f, "{}/{}", self.0, self.1)
44 }
45}
46
47pub enum SipMessage {
48 Request(SipRequest),
49 Response(SipResponse),
50}
51
52impl SipMessage {
53 pub fn to_bytes(self) -> Bytes {
54 match self {
55 SipMessage::Request(req) => req.to_bytes(),
56 SipMessage::Response(res) => res.to_bytes(),
57 }
58 }
59}
60
61impl Display for SipMessage {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 SipMessage::Request(req) => write!(f, "Req({})", req.method()),
65 SipMessage::Response(res) => write!(f, "Res({})", res.raw.status_code()),
66 }
67 }
68}
69
70#[derive(Debug)]
71pub enum SipServerError {
72 ProcessorError(ProcessorError),
73 ProcessorNotFound,
74}
75
76#[derive(Debug)]
77pub enum SipServerEvent {
78 OnRegisterValidate(GroupId, String, String, String, String, String),
79 OnInCallStarted(GroupId, SipRequest),
80 OnInCallRequest(GroupId, SipRequest),
81 OnInCallResponse(GroupId, SipResponse),
82 OnOutCallRequest(GroupId, SipRequest),
83 OnOutCallResponse(GroupId, SipResponse),
84 SendRes(SocketAddr, SipResponse),
85 SendReq(SocketAddr, SipRequest),
86}
87
88pub struct SipCore {
89 register_processors: HashMap<GroupId, RegisterProcessor>,
90 invite_in_groups: HashMap<GroupId, ()>,
91 invite_out_groups: HashMap<GroupId, ()>,
92 actions: Vec<SipServerEvent>,
93}
94
95impl SipCore {
96 pub fn new() -> Self {
97 Self {
98 register_processors: HashMap::new(),
99 invite_in_groups: HashMap::new(),
100 invite_out_groups: HashMap::new(),
101 actions: Vec::new(),
102 }
103 }
104
105 pub fn on_tick(&mut self, _now_ms: u64) {}
106
107 pub fn reply_register_validate(&mut self, group_id: &GroupId, accept: bool) {
108 if let Some(processor) = self.register_processors.get_mut(group_id) {
109 processor.response(accept);
110 self.process_register_processor(group_id);
111 }
112 }
113
114 pub fn open_out_call(&mut self, group_id: &GroupId) {
115 log::info!("create out call {:?}", group_id);
116 self.invite_out_groups.insert(group_id.clone(), ());
117 }
118
119 pub fn close_out_call(&mut self, group_id: &GroupId) {
120 self.invite_out_groups.remove(group_id);
121 }
122
123 pub fn close_in_call(&mut self, group_id: &GroupId) {
124 self.invite_in_groups.remove(group_id);
125 }
126
127 pub fn on_req(&mut self, now_ms: u64, from: SocketAddr, req: SipRequest) -> Result<(), SipServerError> {
128 match req.method() {
129 Method::Register => {
130 let group_id = GroupId(from, req.call_id.clone().into());
131 match self.register_processors.entry(group_id.clone()) {
132 std::collections::hash_map::Entry::Occupied(mut entry) => {
133 entry.get_mut().on_req(now_ms, req).map_err(|e| SipServerError::ProcessorError(e))?;
134 }
135 std::collections::hash_map::Entry::Vacant(entry) => {
136 let mut processor = RegisterProcessor::new(now_ms, req);
137 processor.start(now_ms).map_err(|e| SipServerError::ProcessorError(e))?;
138 entry.insert(processor);
139 }
140 };
141 self.process_register_processor(&group_id);
142 Ok(())
143 }
144 Method::Invite => {
145 let group_id = GroupId(from, req.call_id.clone().into());
146 if let Some(_) = self.invite_in_groups.get(&group_id) {
147 self.actions.push(SipServerEvent::OnInCallRequest(group_id, req));
148 Ok(())
149 } else {
150 self.invite_in_groups.insert(group_id.clone(), ());
151 self.actions.push(SipServerEvent::OnInCallStarted(group_id, req));
152 Ok(())
153 }
154 }
155 _ => {
156 let group_id = GroupId(from, req.call_id.clone().into());
157 if let Some(_) = self.invite_in_groups.get(&group_id) {
158 self.actions.push(SipServerEvent::OnInCallRequest(group_id, req));
159 Ok(())
160 } else if let Some(_) = self.invite_out_groups.get(&group_id) {
161 self.actions.push(SipServerEvent::OnOutCallRequest(group_id, req));
162 Ok(())
163 } else {
164 log::info!("on_req not found {:?}, {:?}", group_id, self.invite_out_groups);
165 Err(SipServerError::ProcessorNotFound)
166 }
167 }
168 }
169 }
170
171 pub fn on_res(&mut self, _now_ms: u64, from: SocketAddr, res: SipResponse) -> Result<(), SipServerError> {
172 let group_id = GroupId(from, res.call_id.clone().into());
173 if let Some(_) = self.invite_in_groups.get(&group_id) {
174 self.actions.push(SipServerEvent::OnInCallResponse(group_id, res));
175 Ok(())
176 } else if let Some(_) = self.invite_out_groups.get(&group_id) {
177 self.actions.push(SipServerEvent::OnOutCallResponse(group_id, res));
178 Ok(())
179 } else {
180 log::info!("on_res not found {:?}, {:?}", group_id, self.invite_out_groups);
181 Err(SipServerError::ProcessorNotFound)
182 }
183 }
184
185 pub fn pop_action(&mut self) -> Option<SipServerEvent> {
186 self.actions.pop()
187 }
188
189 fn process_register_processor(&mut self, group_id: &GroupId) -> Option<()> {
190 let processor = self.register_processors.get_mut(group_id)?;
191 while let Some(action) = processor.pop_action() {
192 match action {
193 ProcessorAction::Finished(_) => {
194 self.register_processors.remove(group_id);
195 break;
196 }
197 ProcessorAction::SendRequest(remote_addr, req) => {
198 self.actions.push(SipServerEvent::SendReq(remote_addr.unwrap_or(group_id.0), req));
199 }
200 ProcessorAction::SendResponse(remote_addr, res) => {
201 self.actions.push(SipServerEvent::SendRes(remote_addr.unwrap_or(group_id.0), res));
202 }
203 ProcessorAction::LogicOutput(action) => match action {
204 processor::register::RegisterProcessorAction::Validate(digest, nonce, username, realm, hashed_password) => {
205 self.actions.push(SipServerEvent::OnRegisterValidate(group_id.clone(), digest, nonce, username, realm, hashed_password));
206 }
207 },
208 }
209 }
210 Some(())
211 }
212}