atm0s_media_server_transport_sip/
sip.rs

1use std::{collections::HashMap, fmt::Display, net::SocketAddr};
2
3use bytes::Bytes;
4use rsip::{
5    headers::{CallId, UntypedHeader},
6    Method,
7};
8
9use crate::processor::Processor;
10
11use self::{
12    processor::{register::RegisterProcessor, ProcessorAction, ProcessorError},
13    sip_request::SipRequest,
14    sip_response::SipResponse,
15};
16
17mod data;
18pub mod processor;
19pub mod sip_request;
20pub mod sip_response;
21mod transaction;
22mod utils;
23
24#[derive(Debug, Hash, PartialEq, Eq, Clone)]
25pub struct GroupId(SocketAddr, String);
26
27impl GroupId {
28    pub fn from_raw(from: SocketAddr, call_id: &CallId) -> Self {
29        Self(from, call_id.value().to_string())
30    }
31
32    pub fn addr(&self) -> SocketAddr {
33        self.0
34    }
35
36    pub fn call_id(&self) -> &str {
37        &self.1
38    }
39}
40
41impl Display for GroupId {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        write!(f, "{}/{}", self.0, self.1)
44    }
45}
46
47pub enum SipMessage {
48    Request(SipRequest),
49    Response(SipResponse),
50}
51
52impl SipMessage {
53    pub fn to_bytes(self) -> Bytes {
54        match self {
55            SipMessage::Request(req) => req.to_bytes(),
56            SipMessage::Response(res) => res.to_bytes(),
57        }
58    }
59}
60
61impl Display for SipMessage {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            SipMessage::Request(req) => write!(f, "Req({})", req.method()),
65            SipMessage::Response(res) => write!(f, "Res({})", res.raw.status_code()),
66        }
67    }
68}
69
70#[derive(Debug)]
71pub enum SipServerError {
72    ProcessorError(ProcessorError),
73    ProcessorNotFound,
74}
75
76#[derive(Debug)]
77pub enum SipServerEvent {
78    OnRegisterValidate(GroupId, String, String, String, String, String),
79    OnInCallStarted(GroupId, SipRequest),
80    OnInCallRequest(GroupId, SipRequest),
81    OnInCallResponse(GroupId, SipResponse),
82    OnOutCallRequest(GroupId, SipRequest),
83    OnOutCallResponse(GroupId, SipResponse),
84    SendRes(SocketAddr, SipResponse),
85    SendReq(SocketAddr, SipRequest),
86}
87
88pub struct SipCore {
89    register_processors: HashMap<GroupId, RegisterProcessor>,
90    invite_in_groups: HashMap<GroupId, ()>,
91    invite_out_groups: HashMap<GroupId, ()>,
92    actions: Vec<SipServerEvent>,
93}
94
95impl SipCore {
96    pub fn new() -> Self {
97        Self {
98            register_processors: HashMap::new(),
99            invite_in_groups: HashMap::new(),
100            invite_out_groups: HashMap::new(),
101            actions: Vec::new(),
102        }
103    }
104
105    pub fn on_tick(&mut self, _now_ms: u64) {}
106
107    pub fn reply_register_validate(&mut self, group_id: &GroupId, accept: bool) {
108        if let Some(processor) = self.register_processors.get_mut(group_id) {
109            processor.response(accept);
110            self.process_register_processor(group_id);
111        }
112    }
113
114    pub fn open_out_call(&mut self, group_id: &GroupId) {
115        log::info!("create out call {:?}", group_id);
116        self.invite_out_groups.insert(group_id.clone(), ());
117    }
118
119    pub fn close_out_call(&mut self, group_id: &GroupId) {
120        self.invite_out_groups.remove(group_id);
121    }
122
123    pub fn close_in_call(&mut self, group_id: &GroupId) {
124        self.invite_in_groups.remove(group_id);
125    }
126
127    pub fn on_req(&mut self, now_ms: u64, from: SocketAddr, req: SipRequest) -> Result<(), SipServerError> {
128        match req.method() {
129            Method::Register => {
130                let group_id = GroupId(from, req.call_id.clone().into());
131                match self.register_processors.entry(group_id.clone()) {
132                    std::collections::hash_map::Entry::Occupied(mut entry) => {
133                        entry.get_mut().on_req(now_ms, req).map_err(|e| SipServerError::ProcessorError(e))?;
134                    }
135                    std::collections::hash_map::Entry::Vacant(entry) => {
136                        let mut processor = RegisterProcessor::new(now_ms, req);
137                        processor.start(now_ms).map_err(|e| SipServerError::ProcessorError(e))?;
138                        entry.insert(processor);
139                    }
140                };
141                self.process_register_processor(&group_id);
142                Ok(())
143            }
144            Method::Invite => {
145                let group_id = GroupId(from, req.call_id.clone().into());
146                if let Some(_) = self.invite_in_groups.get(&group_id) {
147                    self.actions.push(SipServerEvent::OnInCallRequest(group_id, req));
148                    Ok(())
149                } else {
150                    self.invite_in_groups.insert(group_id.clone(), ());
151                    self.actions.push(SipServerEvent::OnInCallStarted(group_id, req));
152                    Ok(())
153                }
154            }
155            _ => {
156                let group_id = GroupId(from, req.call_id.clone().into());
157                if let Some(_) = self.invite_in_groups.get(&group_id) {
158                    self.actions.push(SipServerEvent::OnInCallRequest(group_id, req));
159                    Ok(())
160                } else if let Some(_) = self.invite_out_groups.get(&group_id) {
161                    self.actions.push(SipServerEvent::OnOutCallRequest(group_id, req));
162                    Ok(())
163                } else {
164                    log::info!("on_req not found {:?}, {:?}", group_id, self.invite_out_groups);
165                    Err(SipServerError::ProcessorNotFound)
166                }
167            }
168        }
169    }
170
171    pub fn on_res(&mut self, _now_ms: u64, from: SocketAddr, res: SipResponse) -> Result<(), SipServerError> {
172        let group_id = GroupId(from, res.call_id.clone().into());
173        if let Some(_) = self.invite_in_groups.get(&group_id) {
174            self.actions.push(SipServerEvent::OnInCallResponse(group_id, res));
175            Ok(())
176        } else if let Some(_) = self.invite_out_groups.get(&group_id) {
177            self.actions.push(SipServerEvent::OnOutCallResponse(group_id, res));
178            Ok(())
179        } else {
180            log::info!("on_res not found {:?}, {:?}", group_id, self.invite_out_groups);
181            Err(SipServerError::ProcessorNotFound)
182        }
183    }
184
185    pub fn pop_action(&mut self) -> Option<SipServerEvent> {
186        self.actions.pop()
187    }
188
189    fn process_register_processor(&mut self, group_id: &GroupId) -> Option<()> {
190        let processor = self.register_processors.get_mut(group_id)?;
191        while let Some(action) = processor.pop_action() {
192            match action {
193                ProcessorAction::Finished(_) => {
194                    self.register_processors.remove(group_id);
195                    break;
196                }
197                ProcessorAction::SendRequest(remote_addr, req) => {
198                    self.actions.push(SipServerEvent::SendReq(remote_addr.unwrap_or(group_id.0), req));
199                }
200                ProcessorAction::SendResponse(remote_addr, res) => {
201                    self.actions.push(SipServerEvent::SendRes(remote_addr.unwrap_or(group_id.0), res));
202                }
203                ProcessorAction::LogicOutput(action) => match action {
204                    processor::register::RegisterProcessorAction::Validate(digest, nonce, username, realm, hashed_password) => {
205                        self.actions.push(SipServerEvent::OnRegisterValidate(group_id.clone(), digest, nonce, username, realm, hashed_password));
206                    }
207                },
208            }
209        }
210        Some(())
211    }
212}