rust_rcs_core/sip/sip_core.rs
1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use tokio::runtime::Runtime;
20use tokio::sync::mpsc;
21use tokio::time::Instant;
22use uuid::Uuid;
23
24use crate::ffi::log::platform_log;
25use crate::internet::header::{search, Header};
26use crate::internet::{header, AsHeaderField};
27
28use crate::sip::sip_dialog::GetDialogHeaderInfo;
29use crate::sip::sip_dialog::GetDialogHeaders;
30use crate::sip::sip_dialog::GetDialogIdentifier;
31use crate::sip::sip_dialog::SipDialog;
32use crate::sip::sip_headers::cseq;
33use crate::sip::sip_message::SipMessage;
34use crate::sip::sip_message::ACK;
35use crate::sip::sip_message::BYE;
36use crate::sip::sip_message::NOTIFY;
37use crate::sip::sip_transaction::server_transaction;
38use crate::sip::sip_transaction::server_transaction::ServerTransaction;
39use crate::sip::sip_transaction::server_transaction::ServerTransactionEvent;
40use crate::sip::sip_transaction::SipTransactionManager;
41use crate::sip::sip_transaction::SipTransactionManagerEventInterface;
42
43use crate::util::raw_string::StrEq;
44
45use super::sip_headers::subscription_state::AsSubscriptionState;
46use super::sip_subscription::subscriber::SubscriberEvent;
47use super::sip_subscription::subscription_identifier::{
48 get_identifier_from_sip_notify, SubscriptionIdentifier,
49};
50use super::sip_subscription::{
51 schedule_refresh, Subscription, SubscriptionDialogListener, SubscriptionManager,
52 SERVER_SUPPORT_RFC_6665,
53};
54use super::{SipDialogEventCallbacks, SipTransport};
55
56const LOG_TAG: &str = "sip";
57
58pub struct SipCore {
59 subscription_manager: Arc<SubscriptionManager>,
60 transaction_manager: Arc<SipTransactionManager>,
61 default_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>, // fix-me: SipCore should not be bound to a single identity, the mapping should be managed by client transport
62 ongoing_dialogs: Arc<Mutex<Vec<Arc<SipDialog>>>>,
63}
64
65impl SipCore {
66 pub fn new(
67 sm: &Arc<SubscriptionManager>,
68 tm: &Arc<SipTransactionManager>,
69 mut tm_event_itf: SipTransactionManagerEventInterface,
70 allowed_methods: Vec<&'static [u8]>,
71 transaction_handlers: Vec<Box<dyn TransactionHandler + Send + Sync>>,
72 rt: &Arc<Runtime>,
73 ) -> SipCore {
74 let default_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>> =
75 Arc::new(Mutex::new(None));
76 let default_public_identity_ = Arc::clone(&default_public_identity);
77 let ongoing_dialogs: Arc<Mutex<Vec<Arc<SipDialog>>>> = Arc::new(Mutex::new(Vec::new()));
78 let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
79 // let timer = Arc::new(Timer::new());
80 // let timer_ = Arc::clone(&timer);
81
82 let sm_ = Arc::clone(sm);
83 let tm_ = Arc::clone(tm);
84 let rt_ = Arc::clone(&rt);
85
86 rt.spawn(async move {
87 let default_public_identity = default_public_identity_;
88 let ongoing_dialogs = ongoing_dialogs_;
89 let allowed_methods = &allowed_methods;
90 let transaction_handlers = &transaction_handlers;
91 // let timer = timer_;
92
93 'thread: loop {
94 let sm = Arc::clone(&sm_);
95 let tm = Arc::clone(&tm_);
96 let rt = Arc::clone(&rt_);
97 if let Some((transaction, tx, rx)) = tm_event_itf.event_rx.recv().await {
98 let message = transaction.message();
99 if let SipMessage::Request(req_line, _, _) = message {
100 if !allowed_methods
101 .iter()
102 .any(|item| *item == &req_line.method[..])
103 {
104 if let Some(mut resp_message) = server_transaction::make_response(
105 message,
106 transaction.to_tag(),
107 405,
108 b"Method Not Allowed",
109 ) {
110 let mut allowed = Vec::new();
111 let mut first = true;
112 for method in allowed_methods {
113 if first {
114 first = false;
115 } else {
116 allowed.extend(b", ");
117 }
118 allowed.extend_from_slice(method);
119 }
120
121 resp_message.add_header(Header::new(b"Allow", allowed));
122
123 server_transaction::send_response(
124 transaction,
125 resp_message,
126 tx,
127 // &timer,
128 &rt,
129 );
130 }
131
132 continue 'thread;
133 }
134
135 if let Some((call_id, from, to)) = message.get_dialog_headers() {
136 let from_to = (from, to);
137 let (from, to) = from_to.get_dialog_header_info();
138 if let Some(lh_dialog_identifier) =
139 (call_id, from, to, false).get_dialog_identifier()
140 {
141 if !SERVER_SUPPORT_RFC_6665 {
142 if req_line.method == NOTIFY {
143 process_out_of_dialog_server_transaction(
144 sm,
145 tm,
146 transaction_handlers,
147 transaction,
148 &default_public_identity,
149 &ongoing_dialogs,
150 tx,
151 rx,
152 // &timer,
153 &rt,
154 );
155
156 continue 'thread;
157 }
158 }
159
160 let guard = ongoing_dialogs.lock().unwrap();
161 for dialog in &*guard {
162 let rh_dialog_identifier = dialog.dialog_identifier();
163 if lh_dialog_identifier == rh_dialog_identifier {
164 if req_line.method == ACK {
165 dialog.on_ack(&transaction);
166 continue 'thread;
167 }
168
169 let lh_seq = cseq::get_message_seq(message);
170
171 match lh_seq {
172 Ok(lh_seq) => {
173 let seq = dialog.remote_seq();
174
175 let mut seq_guard = seq.lock().unwrap();
176
177 if let Some(rh_seq) = *seq_guard {
178 if rh_seq >= lh_seq {
179 if let Some(resp_message) =
180 server_transaction::make_response(
181 message,
182 transaction.to_tag(),
183 500,
184 b"Server Internal Error",
185 )
186 {
187 server_transaction::send_response(
188 transaction,
189 resp_message,
190 tx,
191 // &timer,
192 &rt,
193 );
194 }
195
196 continue 'thread;
197 }
198 }
199
200 if req_line.method == BYE {
201 dialog.on_terminating_request(message, &rt);
202
203 if let Some(resp_message) =
204 server_transaction::make_response(
205 message,
206 transaction.to_tag(),
207 200,
208 b"OK",
209 )
210 {
211 server_transaction::send_response(
212 transaction,
213 resp_message,
214 tx,
215 // &timer,
216 &rt,
217 );
218 }
219
220 continue 'thread;
221 }
222
223 // if !subscription.SERVER_SUPPORT_RFC_6665 {
224
225 // if r.Method == "NOTIFY" {
226
227 // // since we created dialog on 200 OK response, we need to add usages here
228
229 // sm := subscription.GetSubscriptionManager()
230
231 // sm.CheckSubscriptionMissing(message, d, func(missing bool, s *subscription.Subscription) {
232
233 // if missing {
234
235 // u := &dialog.DialogUser{
236 // D: d,
237 // Itf: s,
238 // }
239
240 // d.AddUser(u)
241
242 // s.SetDialogUser(u)
243 // }
244
245 // d.ProcessMidDialogRequest(st)
246 // })
247
248 // return
249 // }
250 // }
251
252 dialog.on_request(
253 &transaction,
254 tx,
255 &rt,
256 &mut seq_guard,
257 lh_seq,
258 );
259
260 continue 'thread;
261 }
262
263 Err(_) => {
264 if let Some(resp_message) =
265 server_transaction::make_response(
266 message,
267 transaction.to_tag(),
268 400,
269 b"Bad Request",
270 )
271 {
272 server_transaction::send_response(
273 transaction,
274 resp_message,
275 tx,
276 // &timer,
277 &rt,
278 );
279 }
280
281 continue 'thread;
282 }
283 }
284 }
285 }
286
287 if let Some(resp_message) = server_transaction::make_response(
288 message,
289 transaction.to_tag(),
290 481,
291 b"Call Does Not Exist",
292 ) {
293 server_transaction::send_response(
294 transaction,
295 resp_message,
296 tx,
297 // &timer
298 &rt,
299 );
300 }
301
302 continue 'thread;
303 }
304 }
305
306 process_out_of_dialog_server_transaction(
307 sm,
308 tm,
309 transaction_handlers,
310 transaction,
311 &default_public_identity,
312 &ongoing_dialogs,
313 tx,
314 rx,
315 // &timer,
316 &rt,
317 );
318 } else {
319 panic! {"impossible condition"};
320 }
321 } else {
322 return ();
323 }
324 }
325 });
326
327 SipCore {
328 subscription_manager: Arc::clone(sm),
329 transaction_manager: Arc::clone(tm),
330 default_public_identity,
331 ongoing_dialogs,
332 // timer,
333 }
334 }
335
336 pub fn get_subscription_manager(&self) -> Arc<SubscriptionManager> {
337 Arc::clone(&self.subscription_manager)
338 }
339
340 pub fn get_transaction_manager(&self) -> Arc<SipTransactionManager> {
341 Arc::clone(&self.transaction_manager)
342 }
343
344 pub fn set_default_public_identity(
345 &self,
346 default_public_identity: String,
347 sip_instance_id: String,
348 transport: Arc<SipTransport>,
349 ) {
350 (*self.default_public_identity.lock().unwrap()).replace((
351 transport,
352 default_public_identity,
353 sip_instance_id,
354 ));
355 }
356
357 pub fn get_default_public_identity(&self) -> Option<(Arc<SipTransport>, String, String)> {
358 self.default_public_identity.lock().unwrap().clone()
359 }
360
361 pub fn get_ongoing_dialogs(&self) -> Arc<Mutex<Vec<Arc<SipDialog>>>> {
362 Arc::clone(&self.ongoing_dialogs)
363 }
364
365 // pub fn get_timer(&self) -> Arc<Timer> {
366 // Arc::clone(&self.timer)
367 // }
368}
369
370pub trait SipDialogCache {
371 fn add_dialog(&self, dialog: &Arc<SipDialog>);
372 fn add_dialog_if_not_duplicate(&self, dialog: &Arc<SipDialog>) -> Arc<SipDialog>;
373 fn remove_dialog(&self, dialog: &Arc<SipDialog>);
374}
375
376impl SipDialogCache for Arc<Mutex<Vec<Arc<SipDialog>>>> {
377 fn add_dialog(&self, dialog: &Arc<SipDialog>) {
378 self.lock().unwrap().push(Arc::clone(dialog));
379 }
380
381 fn add_dialog_if_not_duplicate(&self, dialog: &Arc<SipDialog>) -> Arc<SipDialog> {
382 let mut guard = self.lock().unwrap();
383 for d in &*guard {
384 if d.dialog_identifier() == dialog.dialog_identifier() {
385 return Arc::clone(d);
386 }
387 }
388 guard.push(Arc::clone(dialog));
389 Arc::clone(dialog)
390 }
391
392 fn remove_dialog(&self, dialog: &Arc<SipDialog>) {
393 let mut guard = self.lock().unwrap();
394 if let Some(idx) = guard.iter().position(|d| Arc::ptr_eq(d, dialog)) {
395 guard.swap_remove(idx);
396 }
397 }
398}
399
400fn process_out_of_dialog_notify_request(
401 sm: Arc<SubscriptionManager>,
402 tm: Arc<SipTransactionManager>,
403 subscription_identifier: &SubscriptionIdentifier,
404 transaction: Arc<ServerTransaction>,
405 default_public_identity: &Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
406 ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
407 tx: mpsc::Sender<ServerTransactionEvent>,
408 rx: mpsc::Receiver<ServerTransactionEvent>,
409 rt: &Arc<Runtime>,
410) {
411 platform_log(LOG_TAG, "calling process_out_of_dialog_notify_request()");
412
413 let mut subscribe_request_can_fork = true;
414
415 if subscription_identifier
416 .event_type
417 .equals_bytes(b"reg", false)
418 {
419 subscribe_request_can_fork = false;
420 }
421
422 let message = transaction.message();
423
424 if let Some(headers) = message.headers() {
425 if let Some(subscription_state_header) = search(headers, b"Subscription-State", true) {
426 let subscription_state_header_field =
427 subscription_state_header.get_value().as_header_field();
428 if let Some(subscription_state) =
429 subscription_state_header_field.as_subscription_state()
430 {
431 platform_log(
432 LOG_TAG,
433 format!("with subscription state of {:?}", subscription_state),
434 );
435
436 if let Some(subscribe_request) =
437 sm.get_registered_request(&subscription_identifier, !subscribe_request_can_fork)
438 {
439 platform_log(LOG_TAG, "found corresponding subscribe request");
440 subscribe_request.on_event();
441 if subscription_state.state.equals_bytes(b"terminated", false) {
442 platform_log(LOG_TAG, "process terminated subscription state");
443 subscribe_request.on_terminating_event(subscription_state, message);
444 } else {
445 if let Some(mut resp_message) = server_transaction::make_response(
446 message,
447 transaction.to_tag(),
448 200,
449 b"OK",
450 ) {
451 let guard = default_public_identity.lock().unwrap();
452
453 if let Some((transport, contact_identity, instance_id)) = &*guard {
454 let transport_ = transaction.transport();
455 if Arc::ptr_eq(transport, transport_) {
456 resp_message.add_header(Header::new(
457 b"Contact",
458 format!(
459 "<{}>;+sip.instance=\"{}\"",
460 contact_identity, instance_id
461 ), // to-do: is transport_address neccessary? also check reg-flow SUBSCRIBE
462 ));
463 }
464 }
465
466 let (d_tx, mut d_rx) = tokio::sync::mpsc::channel(1);
467
468 let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
469
470 rt.spawn(async move {
471 if let Some(dialog) = d_rx.recv().await {
472 ongoing_dialogs_.remove_dialog(&dialog);
473 }
474 });
475
476 if let Ok(dialog) =
477 SipDialog::try_new_as_uas(message, &resp_message, move |d| {
478 match d_tx.blocking_send(d) {
479 Ok(()) => {}
480 Err(e) => {}
481 }
482 })
483 {
484 let mut dialog = Arc::new(dialog);
485
486 if SERVER_SUPPORT_RFC_6665 {
487 ongoing_dialogs.add_dialog(&dialog);
488 } else {
489 dialog = ongoing_dialogs.add_dialog_if_not_duplicate(&dialog);
490 }
491
492 server_transaction::send_response(
493 Arc::clone(&transaction),
494 resp_message,
495 tx,
496 // timer,
497 rt,
498 );
499
500 let subscriber = subscribe_request.get_subscriber();
501
502 let identifier = subscription_identifier.clone();
503
504 let transport = transaction.transport();
505
506 let subscription = Subscription::new(
507 identifier,
508 Arc::clone(transport),
509 &dialog,
510 subscriber,
511 );
512
513 let subscription = Arc::new(subscription);
514
515 let subscription_key = Uuid::new_v4();
516
517 let dialog_user_key: Arc<
518 dyn SipDialogEventCallbacks + Send + Sync,
519 > = dialog.register_user(SubscriptionDialogListener::new(
520 &subscription,
521 subscription_key,
522 ));
523
524 let dialog_user_key_ = Arc::clone(&dialog_user_key);
525 if subscription_state.state.equals_bytes(b"pending", false) {
526 platform_log(LOG_TAG, "process pending subscription state");
527 subscriber.attach_subscription(
528 subscription_key,
529 dialog_user_key_,
530 &subscription,
531 );
532
533 let expiration_value = subscription_state.expires;
534
535 let scheduled_expiration = Instant::now().add(
536 Duration::from_secs(subscription_state.expires as u64),
537 );
538
539 let scheduled_refresh_point =
540 if subscription_state.expires > 300 {
541 Instant::now().add(Duration::from_secs(
542 subscription_state.expires as u64 - 300,
543 ))
544 } else {
545 Instant::now()
546 };
547
548 schedule_refresh(
549 &subscription_key,
550 &dialog_user_key,
551 subscription,
552 expiration_value,
553 scheduled_expiration,
554 scheduled_refresh_point,
555 &sm,
556 &tm,
557 // timer,
558 rt,
559 )
560 } else if subscription_state.state.equals_bytes(b"active", false) {
561 platform_log(LOG_TAG, "process active subscription state");
562
563 subscriber.attach_subscription(
564 subscription_key,
565 dialog_user_key,
566 &subscription,
567 );
568
569 // subscription_update_expire_timer(s, subscription_state->expires);
570
571 if let Some(headers) = message.headers() {
572 if let Some(content_type_header) =
573 header::search(headers, b"Content-Type", true)
574 {
575 platform_log(LOG_TAG, "got Content-Type header");
576 if let Some(body) = message.get_body() {
577 platform_log(LOG_TAG, "got message body");
578 subscriber.on_event(
579 SubscriberEvent::ReceivedNotify(
580 Some(subscription_key),
581 content_type_header.get_value().to_vec(),
582 body,
583 ),
584 );
585 }
586 }
587 }
588 }
589 } else {
590 platform_log(LOG_TAG, "error creating dialog");
591
592 server_transaction::send_response(
593 transaction,
594 resp_message,
595 tx,
596 // timer,
597 rt,
598 );
599 }
600 }
601 }
602 } else {
603 platform_log(LOG_TAG, "missing corresponding subscribe request");
604 if let Some(resp_message) = server_transaction::make_response(
605 message,
606 transaction.to_tag(),
607 481,
608 b"Call Does Not Exist",
609 ) {
610 server_transaction::send_response(transaction, resp_message, tx, rt);
611 }
612 }
613
614 return;
615 }
616 }
617
618 if let Some(resp_message) =
619 server_transaction::make_response(message, transaction.to_tag(), 400, b"Bad Request")
620 {
621 server_transaction::send_response(
622 transaction,
623 resp_message,
624 tx,
625 // timer,
626 rt,
627 );
628 }
629 }
630}
631
632fn process_out_of_dialog_server_transaction(
633 sm: Arc<SubscriptionManager>,
634 tm: Arc<SipTransactionManager>,
635 transaction_handlers: &Vec<Box<dyn TransactionHandler + Send + Sync>>,
636 transaction: Arc<ServerTransaction>,
637 default_public_identity: &Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
638 ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
639 tx: mpsc::Sender<ServerTransactionEvent>,
640 rx: mpsc::Receiver<ServerTransactionEvent>,
641 // timer: &Arc<Timer>,
642 rt: &Arc<Runtime>,
643) {
644 platform_log(
645 LOG_TAG,
646 "calling process_out_of_dialog_server_transaction()",
647 );
648
649 let message = transaction.message();
650
651 if let SipMessage::Request(req_line, _, _) = message {
652 if req_line.method == ACK || req_line.method == BYE {
653 if let Some(resp_message) = server_transaction::make_response(
654 message,
655 transaction.to_tag(),
656 481,
657 b"Call Does Not Exist",
658 ) {
659 server_transaction::send_response(transaction, resp_message, tx, rt);
660 }
661
662 return;
663 }
664
665 if req_line.method == NOTIFY {
666 if let Ok(subscription_identifier) = get_identifier_from_sip_notify(message) {
667 platform_log(
668 LOG_TAG,
669 format!(
670 "receiving NOTIFY with identifier {:?}",
671 &subscription_identifier
672 ),
673 );
674
675 process_out_of_dialog_notify_request(
676 sm,
677 tm,
678 &subscription_identifier,
679 transaction,
680 default_public_identity,
681 ongoing_dialogs,
682 tx,
683 rx,
684 // timer,
685 rt,
686 );
687 return;
688 }
689
690 if let Some(resp_message) = server_transaction::make_response(
691 message,
692 transaction.to_tag(),
693 400,
694 b"Bad Request",
695 ) {
696 server_transaction::send_response(transaction, resp_message, tx, rt);
697 }
698 return;
699 }
700
701 let mut channels = Some((tx, rx));
702
703 for handler in transaction_handlers {
704 if handler.handle_transaction(
705 &transaction,
706 ongoing_dialogs,
707 &mut channels,
708 // timer,
709 rt,
710 ) {
711 platform_log(
712 LOG_TAG,
713 "out of dialog server transaction successfully handled",
714 );
715
716 return;
717 }
718 }
719
720 if let Some((tx, _)) = channels {
721 if let Some(resp_message) = server_transaction::make_response(
722 message,
723 transaction.to_tag(),
724 500,
725 b"Server Internal Error",
726 ) {
727 server_transaction::send_response(
728 transaction,
729 resp_message,
730 tx,
731 // timer,
732 rt,
733 );
734 }
735 }
736 }
737}
738
739pub trait TransactionHandler {
740 fn handle_transaction(
741 &self,
742 transaction: &Arc<ServerTransaction>,
743 ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
744 channels: &mut Option<(
745 mpsc::Sender<ServerTransactionEvent>,
746 mpsc::Receiver<ServerTransactionEvent>,
747 )>,
748 // timer: &Timer,
749 rt: &Arc<Runtime>,
750 ) -> bool;
751}