1use std::io::Read;
16use std::sync::{Arc, Mutex};
17
18use tokio::runtime::Runtime;
19use tokio::sync::mpsc;
20
21use crate::ffi::log::platform_log;
22use crate::internet::header::Header;
23use crate::internet::header::{self, HeaderSearch};
24use crate::internet::header_field::AsHeaderField;
25use crate::internet::header_field::HeaderField;
26use crate::internet::name_addr::AsNameAddr;
27use crate::internet::uri::AsURI;
28
29use crate::io::{DynamicChain, Serializable};
30
31use crate::sip::sip_headers::cseq::AsCSeq;
32use crate::sip::sip_headers::cseq::CSeq;
33use crate::sip::sip_headers::from_to::AsFromTo;
34use crate::sip::sip_headers::from_to::FromTo;
35use crate::sip::sip_message::SipMessage;
36use crate::sip::sip_message::ACK;
37use crate::sip::sip_message::CANCEL;
38use crate::sip::sip_transaction::server_transaction;
39use crate::sip::sip_transaction::server_transaction::ServerTransaction;
40use crate::sip::sip_transaction::server_transaction::ServerTransactionEvent;
41
42const LOG_TAG: &str = "sip";
45
46pub enum SipDialogEvent {
47 Ack(Arc<ServerTransaction>),
48 MidDialogRequest(),
49 Terminate(),
50}
51
52enum State {
53 Early(
54 Vec<Arc<dyn SipDialogEventCallbacks + Send + Sync>>,
55 Option<Box<dyn FnOnce(Arc<SipDialog>) + Send + Sync>>,
56 ),
57 Confirmed(
58 Vec<Arc<dyn SipDialogEventCallbacks + Send + Sync>>,
59 Option<Box<dyn FnOnce(Arc<SipDialog>) + Send + Sync>>,
60 ),
61 Completed, Terminated,
63}
64
65pub trait SipDialogEventCallbacks {
68 fn on_ack(&self, transaction: &Arc<ServerTransaction>);
69 fn on_new_request(
70 &self,
71 transaction: Arc<ServerTransaction>,
72 tx: mpsc::Sender<ServerTransactionEvent>,
73 rt: &Arc<Runtime>,
75 ) -> Option<(u16, bool)>;
76 fn on_terminating_request(&self, message: &SipMessage); fn on_terminating_response(&self, message: &SipMessage); }
79
80#[derive(PartialEq)]
81pub struct SipDialogIdentifier<'a> {
82 pub call_id: &'a [u8],
83 pub local_tag: &'a [u8],
84 pub remote_tag: &'a [u8],
85}
86
87pub struct SipDialog {
88 state: Arc<Mutex<State>>,
89
90 call_id: Vec<u8>,
91
92 local_seq: Arc<Mutex<Option<u32>>>,
93 local_uri: Vec<u8>,
94 local_tag: Vec<u8>,
95
96 remote_seq: Arc<Mutex<Option<u32>>>,
97 remote_uri: Vec<u8>,
98 remote_tag: Vec<u8>,
99 remote_target: Arc<Mutex<Vec<u8>>>,
100
101 route_set: Arc<Mutex<Vec<Vec<u8>>>>,
102
103 ongoing_transactions:
121 Arc<Mutex<Vec<(Arc<ServerTransaction>, mpsc::Sender<ServerTransactionEvent>)>>>, }
123
124impl SipDialog {
125 fn try_new<T>(
126 req_headers: &Vec<Header>,
127 resp_message: &SipMessage,
128 as_uac: bool,
129 on_last_user_removed: T,
130 ) -> Result<SipDialog, &'static str>
131 where
132 T: FnOnce(Arc<SipDialog>) + Send + Sync + 'static,
133 {
134 if let SipMessage::Response(resp_line, Some(resp_headers), _) = resp_message {
135 let req_cseq_header = header::search(req_headers, b"CSeq", true);
136 let req_call_id_header = header::search(req_headers, b"Call-ID", true);
137 let req_from_header = header::search(req_headers, b"From", true);
138
139 let resp_to_header = header::search(resp_headers, b"To", true);
140
141 let contact_header;
142 if as_uac {
143 contact_header = header::search(resp_headers, b"Contact", true);
144 } else {
145 contact_header = header::search(req_headers, b"Contact", true);
146 }
147
148 if let (
149 Some(req_cseq_header),
150 Some(req_call_id_header),
151 Some(req_from_header),
152 Some(resp_to_header),
153 Some(contact_header),
154 ) = (
155 req_cseq_header,
156 req_call_id_header,
157 req_from_header,
158 resp_to_header,
159 contact_header,
160 ) {
161 let req_cseq_header_field = req_cseq_header.get_value().as_header_field();
162 let req_cseq = req_cseq_header_field.as_cseq();
163 let req_from_header_field = req_from_header.get_value().as_header_field();
164 let req_from = req_from_header_field.as_from_to();
165 let resp_to_header_field = resp_to_header.get_value().as_header_field();
166 let resp_to = resp_to_header_field.as_from_to();
167 let contact_addresses = contact_header.get_value().as_name_addresses();
168
169 if let (Some(req_cseq), Some(req_from_tag), Some(resp_to_tag)) =
170 (req_cseq, req_from.tag, resp_to.tag)
171 {
172 let req_from_address = req_from.addresses.first();
173 let resp_to_address = resp_to.addresses.first();
174 let contact_address = contact_addresses.first();
175
176 if let (Some(req_from_address), Some(resp_to_address), Some(contact_address)) =
177 (req_from_address, resp_to_address, contact_address)
178 {
179 if let (Some(req_from_uri_part), Some(resp_to_uri_part)) =
180 (&req_from_address.uri_part, &resp_to_address.uri_part)
181 {
182 if let Some(uri_part) = &contact_address.uri_part {
183 let data_size = uri_part.estimated_size();
184 let mut data = Vec::with_capacity(data_size);
185 {
186 let mut readers = Vec::new();
187 uri_part.get_readers(&mut readers);
188 match DynamicChain::new(readers).read_to_end(&mut data) {
189 Ok(_) => {}
190 Err(_) => {} }
192 }
193
194 let remote_target = data;
195
196 let state = if resp_line.status_code >= 100
197 && resp_line.status_code < 200
198 {
199 State::Early(Vec::new(), Some(Box::new(on_last_user_removed)))
200 } else {
201 State::Confirmed(
202 Vec::new(),
203 Some(Box::new(on_last_user_removed)),
204 )
205 };
206
207 let mut route_set = Vec::new();
208
209 if as_uac {
210 let mut iter = resp_headers.iter();
211 while let Some(position) =
212 iter.position(|h| h.get_name() == b"Record-Route")
213 {
214 let record_route_header = &resp_headers[position];
215 route_set.push(record_route_header.get_value().to_vec());
216 }
217 } else {
218 let mut iter = req_headers.iter();
219 while let Some(position) =
220 iter.position(|h| h.get_name() == b"Record-Route")
221 {
222 let record_route_header = &req_headers[position];
223 route_set.push(record_route_header.get_value().to_vec());
224 }
225 }
226
227 return Ok(SipDialog {
231 state: Arc::new(Mutex::new(state)),
232
233 call_id: req_call_id_header.get_value().to_vec(),
234
235 local_seq: if as_uac {
236 Arc::new(Mutex::new(Some(req_cseq.seq)))
237 } else {
238 Arc::new(Mutex::new(None))
239 },
240
241 local_uri: req_from_uri_part.uri.to_vec(),
242 local_tag: req_from_tag.to_vec(),
243
244 remote_seq: if as_uac {
245 Arc::new(Mutex::new(None))
246 } else {
247 Arc::new(Mutex::new(Some(req_cseq.seq)))
248 },
249
250 remote_uri: resp_to_uri_part.uri.to_vec(),
251 remote_tag: resp_to_tag.to_vec(),
252
253 remote_target: Arc::new(Mutex::new(remote_target)),
254
255 route_set: Arc::new(Mutex::new(route_set)),
256
257 ongoing_transactions: Arc::new(Mutex::new(Vec::new())),
263 });
264 }
265 }
266 }
267 }
268 }
269 }
270
271 Err("Missing header information")
272 }
273
274 pub fn try_new_as_uac<T>(
275 req_headers: &Vec<Header>,
276 resp_message: &SipMessage,
277 on_last_user_removed: T, ) -> Result<SipDialog, &'static str>
279 where
280 T: Fn(Arc<SipDialog>) + Send + Sync + 'static,
281 {
282 Self::try_new(req_headers, resp_message, true, on_last_user_removed)
283 }
284
285 pub fn try_new_as_uas<T>(
286 req_message: &SipMessage,
287 resp_message: &SipMessage,
288 on_last_user_removed: T, ) -> Result<SipDialog, &'static str>
290 where
291 T: Fn(Arc<SipDialog>) + Send + Sync + 'static,
292 {
293 if let SipMessage::Request(_, Some(req_headers), _) = req_message {
294 Self::try_new(req_headers, resp_message, false, on_last_user_removed)
295 } else {
296 Err("Missing header information")
297 }
298 }
299
300 pub fn dialog_identifier(&self) -> SipDialogIdentifier {
301 SipDialogIdentifier {
302 call_id: &self.call_id,
303 local_tag: &self.local_tag,
304 remote_tag: &self.remote_tag,
305 }
306 }
307
308 pub fn confirm(&self) {
309 let mut guard = self.state.lock().unwrap();
310 if let State::Early(dialog_users, on_dispose) = &mut *guard {
311 let mut dialog_users_ = Vec::new();
312 dialog_users_.append(dialog_users);
313 let on_dispose_ = on_dispose.take();
314 *guard = State::Confirmed(dialog_users_, on_dispose_);
315 }
316 }
317
318 pub fn register_user<T>(&self, callbacks: T) -> Arc<T>
330 where
331 T: SipDialogEventCallbacks + Send + Sync + 'static,
332 {
333 let callbacks = Arc::new(callbacks);
334 let callbacks_ = Arc::clone(&callbacks);
335 let mut guard = self.state.lock().unwrap();
336 match &mut *guard {
337 State::Early(dialog_users, _) | State::Confirmed(dialog_users, _) => {
338 dialog_users.push(callbacks_);
339 }
340 _ => {}
341 }
342 callbacks
343 }
344
345 pub fn unregister_user(
375 &self,
376 callbacks: &Arc<dyn SipDialogEventCallbacks + Send + Sync>,
377 ) -> Option<Box<dyn FnOnce(Arc<SipDialog>) + Send + Sync>> {
378 let mut guard = self.state.lock().unwrap();
379 match &mut *guard {
380 State::Early(dialog_users, on_dispose) | State::Confirmed(dialog_users, on_dispose) => {
381 if let Some(idx) = dialog_users
382 .iter()
383 .position(|callback| Arc::ptr_eq(callback, callbacks))
384 {
385 dialog_users.swap_remove(idx);
386 if dialog_users.is_empty() {
387 if let Some(on_dispose) = on_dispose.take() {
388 *guard = State::Completed;
389 return Some(on_dispose);
390 }
391 }
392 }
393 }
394 _ => {}
395 }
396
397 None
398 }
399
400 pub fn register_transaction(
405 &self,
406 transaction: (
407 Arc<ServerTransaction>,
408 mpsc::Sender<ServerTransactionEvent>,
409 mpsc::Receiver<ServerTransactionEvent>,
410 ),
411 ) {
412 }
413
414 pub fn remote_seq(&self) -> &Arc<Mutex<Option<u32>>> {
415 &self.remote_seq
416 }
417
418 pub fn on_ack(&self, transaction: &Arc<ServerTransaction>) {
419 let guard = self.state.lock().unwrap();
420 match &*guard {
421 State::Early(dialog_users, _) | State::Confirmed(dialog_users, _) => {
422 for callback in dialog_users {
423 callback.on_ack(transaction);
424 }
425 }
426 _ => {}
427 }
428 }
429
430 fn terminate_transactions(&self, rt: &Arc<Runtime>) {
431 let mut guard = self.ongoing_transactions.lock().unwrap();
432 for (transaction, tx) in &*guard {
433 let message = transaction.message();
434
435 if let Some(resp_message) = server_transaction::make_response(
436 message,
437 transaction.to_tag(),
438 487,
439 b"Request Terminated",
440 ) {
441 server_transaction::send_response(
442 Arc::clone(transaction),
443 resp_message,
444 tx.clone(),
445 rt,
447 );
448 }
449 }
450
451 (*guard).clear();
452 }
453
454 pub fn on_request(
455 &self,
456 transaction: &Arc<ServerTransaction>,
457 tx: mpsc::Sender<ServerTransactionEvent>,
458 rt: &Arc<Runtime>,
460 seq_guard: &mut std::sync::MutexGuard<Option<u32>>,
461 message_seq: u32,
462 ) -> Option<Box<dyn FnOnce(Arc<SipDialog>) + Send + Sync>> {
463 platform_log(LOG_TAG, "sip dialog on request");
464
465 let mut handled = false;
466
467 let mut res = None;
468
469 let mut guard = self.state.lock().unwrap();
471
472 match &mut *guard {
473 State::Early(dialog_users, on_dispose) | State::Confirmed(dialog_users, on_dispose) => {
474 let mut i = 0;
475 for callback in &mut *dialog_users {
476 if let Some((status_code, terminated)) =
477 callback.on_new_request(Arc::clone(transaction), tx.clone(), rt)
478 {
479 handled = true;
480
481 if status_code >= 100 && status_code < 200 {
482 (*self.ongoing_transactions.lock().unwrap())
483 .push((Arc::clone(transaction), tx.clone()));
484 } else {
485 **seq_guard = Some(message_seq);
486 let message = transaction.message();
487 if let Some(headers) = message.headers() {
488 if let Some(contact_header) =
489 header::search(headers, b"Contact", true)
490 {
491 let contact_addresses =
492 contact_header.get_value().as_name_addresses();
493 let contact_address = contact_addresses.first();
494 if let Some(contact_address) = contact_address {
495 if let Some(uri_part) = &contact_address.uri_part {
496 let data_size = uri_part.estimated_size();
497 let mut data = Vec::with_capacity(data_size);
498 {
499 let mut readers = Vec::new();
500 uri_part.get_readers(&mut readers);
501 match DynamicChain::new(readers)
502 .read_to_end(&mut data)
503 {
504 Ok(_) => {}
505 Err(_) => {} }
507 }
508
509 *self.remote_target.lock().unwrap() = data;
510 }
511 }
512 }
513 }
514 }
515
516 if terminated {
517 dialog_users.swap_remove(i);
518
519 if dialog_users.is_empty() {
520 res = on_dispose.take();
521 *guard = State::Terminated;
522
523 self.terminate_transactions(rt);
524 }
525 }
526
527 break;
528 }
529
530 i += 1;
531 }
532 }
533 State::Completed => {
534 todo!()
535 }
536 _ => {}
537 }
538
539 if !handled {
600 let message = transaction.message();
601
602 if let Some(resp_message) = server_transaction::make_response(
603 message,
604 transaction.to_tag(),
605 488,
606 b"Not Acceptable Here",
607 ) {
608 server_transaction::send_response(
609 Arc::clone(transaction),
610 resp_message,
611 tx,
612 &rt,
614 );
615 }
616 }
617
618 res
619 }
620
621 pub fn on_terminating_request(&self, message: &SipMessage, rt: &Arc<Runtime>) {
640 let mut guard = self.state.lock().unwrap();
641 match &mut *guard {
642 State::Early(dialog_users, on_dispose) | State::Confirmed(dialog_users, on_dispose) => {
643 for callback in dialog_users {
644 callback.on_terminating_request(message);
645 }
646 *guard = State::Terminated;
647 }
648 State::Completed => {
649 *guard = State::Terminated;
650 }
651 _ => {}
652 }
653 }
654
655 pub fn on_terminating_response(&self, message: &SipMessage, rt: &Arc<Runtime>) {
674 let mut guard = self.state.lock().unwrap();
675 match &mut *guard {
676 State::Early(dialog_users, on_dispose) | State::Confirmed(dialog_users, on_dispose) => {
677 for callback in dialog_users {
678 callback.on_terminating_response(message);
679 }
680 *guard = State::Terminated;
681 }
682 State::Completed => {
683 *guard = State::Terminated;
684 }
685 _ => {}
686 }
687 }
688
689 pub fn make_request(
690 &self,
691 method: &[u8],
692 seq: Option<u32>,
693 ) -> Result<SipMessage, &'static str> {
694 let mut remote_target = (&*self.remote_target.lock().unwrap()).to_vec();
695
696 let guard = self.route_set.lock().unwrap();
697
698 let mut final_route_set = (*guard).clone();
699
700 let mut contains_lr = false;
701 let mut new_remote_target = Vec::new();
702
703 if let Some(route) = final_route_set.first() {
704 if let Some(addr) = route.as_name_addresses().first() {
705 if let Some(uri_part) = &addr.uri_part {
706 for p in uri_part.get_parameter_iterator() {
709 if p.name.eq_ignore_ascii_case(b"lr") {
710 contains_lr = true;
711
712 let data_size = uri_part.estimated_size();
713 new_remote_target.reserve(data_size);
714 {
715 let mut readers = Vec::new();
716 uri_part.get_readers(&mut readers);
717 match DynamicChain::new(readers).read_to_end(&mut new_remote_target)
718 {
719 Ok(_) => {}
720 Err(_) => {} }
722 }
723
724 break;
725 }
726 }
727
728 }
741 } else {
742 return Err("Abnormal Route-Set");
743 }
744 }
745
746 if contains_lr {
747 final_route_set = final_route_set.split_off(1);
748 let mut r = b"<".to_vec();
749 r.extend(remote_target);
750 r.extend(b">");
751 final_route_set.push(r);
752
753 remote_target = new_remote_target;
754 }
755
756 if let Some(_) = remote_target.as_standard_uri() {
757 let mut message = SipMessage::new_request(method, &remote_target);
758
759 let from_addresses = self.local_uri.as_name_addresses();
760 let to_addresses = self.remote_uri.as_name_addresses();
761
762 if from_addresses.len() > 0 && to_addresses.len() > 0 {
763 let from = FromTo {
764 addresses: from_addresses,
765 tag: Some(&self.local_tag),
766 };
767
768 let from_data_size = from.estimated_size();
769 let mut from_data = Vec::with_capacity(from_data_size);
770 {
771 let mut readers = Vec::new();
772 from.get_readers(&mut readers);
773 match DynamicChain::new(readers).read_to_end(&mut from_data) {
774 Ok(_) => {}
775 Err(_) => {} }
777 }
778
779 message.add_header(Header::new(b"From", from_data));
780
781 let to = FromTo {
782 addresses: to_addresses,
783 tag: Some(&self.remote_tag),
784 };
785
786 let to_data_size = to.estimated_size();
787 let mut to_data = Vec::with_capacity(to_data_size);
788 {
789 let mut readers = Vec::new();
790 to.get_readers(&mut readers);
791 match DynamicChain::new(readers).read_to_end(&mut to_data) {
792 Ok(_) => {}
793 Err(_) => {} }
795 }
796
797 message.add_header(Header::new(b"To", to_data));
798
799 message.add_header(Header::new(b"Call-ID", self.call_id.to_vec()));
800
801 for route in final_route_set {
802 message.add_header(Header::new(b"Route", route));
803 }
804
805 let cseq;
806
807 if method == ACK || method == CANCEL {
808 if let Some(seq) = seq {
809 cseq = CSeq { seq, method }
810 } else {
811 return Err("Seq not provided");
812 }
813 } else {
814 let seq;
815 let mut guard = self.local_seq.lock().unwrap();
816 match *guard {
817 Some(v) => {
818 seq = v + 1;
819 *guard = Some(seq);
820 }
821 None => {
822 seq = 100;
823 *guard = Some(seq);
824 }
825 }
826 cseq = CSeq { seq, method }
827 }
828
829 let cseq_data_size = to.estimated_size();
830 let mut cseq_data = Vec::with_capacity(cseq_data_size);
831 match cseq.reader().read_to_end(&mut cseq_data) {
832 Ok(_) => {}
833 Err(_) => {} }
835
836 message.add_header(Header::new(b"CSeq", cseq_data));
837
838 return Ok(message);
839 }
840 }
841
842 Err("Error building message")
843 }
844
845 pub fn cmcc_patch_route_set_on_subscriber_2xx_response(&self, resp_message: &SipMessage) {
846 let mut new_route_set = Vec::new();
847 if let Some(headers) = resp_message.headers() {
848 for record_route_header in HeaderSearch::new(headers, b"Record-Route", true) {
849 let route = record_route_header
850 .get_value()
851 .as_header_field()
852 .value
853 .to_vec();
854 new_route_set.push(route);
855 }
856 }
857 *self.route_set.lock().unwrap() = new_route_set;
858 }
859}
860
861pub trait GetDialogHeaders {
862 fn get_dialog_headers<'a>(&'a self) -> Option<(&'a Header, HeaderField<'a>, HeaderField<'a>)>;
863}
864
865pub trait GetDialogHeaderInfo {
866 fn get_dialog_header_info<'a>(&'a self) -> (FromTo<'a>, FromTo<'a>);
867}
868
869pub trait GetDialogIdentifier {
870 fn get_dialog_identifier<'a>(&'a self) -> Option<SipDialogIdentifier<'a>>;
871}
872
873impl GetDialogHeaderInfo for (HeaderField<'_>, HeaderField<'_>) {
874 fn get_dialog_header_info<'a>(&'a self) -> (FromTo<'a>, FromTo<'a>) {
875 (self.0.as_from_to(), self.1.as_from_to())
876 }
877}
878
879impl GetDialogIdentifier for (&'_ Header, FromTo<'_>, FromTo<'_>, bool) {
880 fn get_dialog_identifier<'a>(&'a self) -> Option<SipDialogIdentifier<'a>> {
881 if let (Some(from_tag), Some(to_tag)) = (self.1.tag, self.2.tag) {
882 return Some(SipDialogIdentifier {
883 call_id: &self.0.get_value(),
884 local_tag: if self.3 { from_tag } else { to_tag },
885 remote_tag: if self.3 { to_tag } else { from_tag },
886 });
887 }
888
889 None
890 }
891}