rust_rcs_core/sip/
sip_core.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use tokio::runtime::Runtime;
20use tokio::sync::mpsc;
21use tokio::time::Instant;
22use uuid::Uuid;
23
24use crate::ffi::log::platform_log;
25use crate::internet::header::{search, Header};
26use crate::internet::{header, AsHeaderField};
27
28use crate::sip::sip_dialog::GetDialogHeaderInfo;
29use crate::sip::sip_dialog::GetDialogHeaders;
30use crate::sip::sip_dialog::GetDialogIdentifier;
31use crate::sip::sip_dialog::SipDialog;
32use crate::sip::sip_headers::cseq;
33use crate::sip::sip_message::SipMessage;
34use crate::sip::sip_message::ACK;
35use crate::sip::sip_message::BYE;
36use crate::sip::sip_message::NOTIFY;
37use crate::sip::sip_transaction::server_transaction;
38use crate::sip::sip_transaction::server_transaction::ServerTransaction;
39use crate::sip::sip_transaction::server_transaction::ServerTransactionEvent;
40use crate::sip::sip_transaction::SipTransactionManager;
41use crate::sip::sip_transaction::SipTransactionManagerEventInterface;
42
43use crate::util::raw_string::StrEq;
44
45use super::sip_headers::subscription_state::AsSubscriptionState;
46use super::sip_subscription::subscriber::SubscriberEvent;
47use super::sip_subscription::subscription_identifier::{
48    get_identifier_from_sip_notify, SubscriptionIdentifier,
49};
50use super::sip_subscription::{
51    schedule_refresh, Subscription, SubscriptionDialogListener, SubscriptionManager,
52    SERVER_SUPPORT_RFC_6665,
53};
54use super::{SipDialogEventCallbacks, SipTransport};
55
56const LOG_TAG: &str = "sip";
57
58pub struct SipCore {
59    subscription_manager: Arc<SubscriptionManager>,
60    transaction_manager: Arc<SipTransactionManager>,
61    default_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>, // fix-me: SipCore should not be bound to a single identity, the mapping should be managed by client transport
62    ongoing_dialogs: Arc<Mutex<Vec<Arc<SipDialog>>>>,
63}
64
65impl SipCore {
66    pub fn new(
67        sm: &Arc<SubscriptionManager>,
68        tm: &Arc<SipTransactionManager>,
69        mut tm_event_itf: SipTransactionManagerEventInterface,
70        allowed_methods: Vec<&'static [u8]>,
71        transaction_handlers: Vec<Box<dyn TransactionHandler + Send + Sync>>,
72        rt: &Arc<Runtime>,
73    ) -> SipCore {
74        let default_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>> =
75            Arc::new(Mutex::new(None));
76        let default_public_identity_ = Arc::clone(&default_public_identity);
77        let ongoing_dialogs: Arc<Mutex<Vec<Arc<SipDialog>>>> = Arc::new(Mutex::new(Vec::new()));
78        let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
79        // let timer = Arc::new(Timer::new());
80        // let timer_ = Arc::clone(&timer);
81
82        let sm_ = Arc::clone(sm);
83        let tm_ = Arc::clone(tm);
84        let rt_ = Arc::clone(&rt);
85
86        rt.spawn(async move {
87            let default_public_identity = default_public_identity_;
88            let ongoing_dialogs = ongoing_dialogs_;
89            let allowed_methods = &allowed_methods;
90            let transaction_handlers = &transaction_handlers;
91            // let timer = timer_;
92
93            'thread: loop {
94                let sm = Arc::clone(&sm_);
95                let tm = Arc::clone(&tm_);
96                let rt = Arc::clone(&rt_);
97                if let Some((transaction, tx, rx)) = tm_event_itf.event_rx.recv().await {
98                    let message = transaction.message();
99                    if let SipMessage::Request(req_line, _, _) = message {
100                        if !allowed_methods
101                            .iter()
102                            .any(|item| *item == &req_line.method[..])
103                        {
104                            if let Some(mut resp_message) = server_transaction::make_response(
105                                message,
106                                transaction.to_tag(),
107                                405,
108                                b"Method Not Allowed",
109                            ) {
110                                let mut allowed = Vec::new();
111                                let mut first = true;
112                                for method in allowed_methods {
113                                    if first {
114                                        first = false;
115                                    } else {
116                                        allowed.extend(b", ");
117                                    }
118                                    allowed.extend_from_slice(method);
119                                }
120
121                                resp_message.add_header(Header::new(b"Allow", allowed));
122
123                                server_transaction::send_response(
124                                    transaction,
125                                    resp_message,
126                                    tx,
127                                    // &timer,
128                                    &rt,
129                                );
130                            }
131
132                            continue 'thread;
133                        }
134
135                        if let Some((call_id, from, to)) = message.get_dialog_headers() {
136                            let from_to = (from, to);
137                            let (from, to) = from_to.get_dialog_header_info();
138                            if let Some(lh_dialog_identifier) =
139                                (call_id, from, to, false).get_dialog_identifier()
140                            {
141                                if !SERVER_SUPPORT_RFC_6665 {
142                                    if req_line.method == NOTIFY {
143                                        process_out_of_dialog_server_transaction(
144                                            sm,
145                                            tm,
146                                            transaction_handlers,
147                                            transaction,
148                                            &default_public_identity,
149                                            &ongoing_dialogs,
150                                            tx,
151                                            rx,
152                                            // &timer,
153                                            &rt,
154                                        );
155
156                                        continue 'thread;
157                                    }
158                                }
159
160                                let guard = ongoing_dialogs.lock().unwrap();
161                                for dialog in &*guard {
162                                    let rh_dialog_identifier = dialog.dialog_identifier();
163                                    if lh_dialog_identifier == rh_dialog_identifier {
164                                        if req_line.method == ACK {
165                                            dialog.on_ack(&transaction);
166                                            continue 'thread;
167                                        }
168
169                                        let lh_seq = cseq::get_message_seq(message);
170
171                                        match lh_seq {
172                                            Ok(lh_seq) => {
173                                                let seq = dialog.remote_seq();
174
175                                                let mut seq_guard = seq.lock().unwrap();
176
177                                                if let Some(rh_seq) = *seq_guard {
178                                                    if rh_seq >= lh_seq {
179                                                        if let Some(resp_message) =
180                                                            server_transaction::make_response(
181                                                                message,
182                                                                transaction.to_tag(),
183                                                                500,
184                                                                b"Server Internal Error",
185                                                            )
186                                                        {
187                                                            server_transaction::send_response(
188                                                                transaction,
189                                                                resp_message,
190                                                                tx,
191                                                                // &timer,
192                                                                &rt,
193                                                            );
194                                                        }
195
196                                                        continue 'thread;
197                                                    }
198                                                }
199
200                                                if req_line.method == BYE {
201                                                    dialog.on_terminating_request(message, &rt);
202
203                                                    if let Some(resp_message) =
204                                                        server_transaction::make_response(
205                                                            message,
206                                                            transaction.to_tag(),
207                                                            200,
208                                                            b"OK",
209                                                        )
210                                                    {
211                                                        server_transaction::send_response(
212                                                            transaction,
213                                                            resp_message,
214                                                            tx,
215                                                            // &timer,
216                                                            &rt,
217                                                        );
218                                                    }
219
220                                                    continue 'thread;
221                                                }
222
223                                                // 	if !subscription.SERVER_SUPPORT_RFC_6665 {
224
225                                                // 		if r.Method == "NOTIFY" {
226
227                                                // 			// since we created dialog on 200 OK response, we need to add usages here
228
229                                                // 			sm := subscription.GetSubscriptionManager()
230
231                                                // 			sm.CheckSubscriptionMissing(message, d, func(missing bool, s *subscription.Subscription) {
232
233                                                // 				if missing {
234
235                                                // 					u := &dialog.DialogUser{
236                                                // 						D:   d,
237                                                // 						Itf: s,
238                                                // 					}
239
240                                                // 					d.AddUser(u)
241
242                                                // 					s.SetDialogUser(u)
243                                                // 				}
244
245                                                // 				d.ProcessMidDialogRequest(st)
246                                                // 			})
247
248                                                // 			return
249                                                // 		}
250                                                // 	}
251
252                                                dialog.on_request(
253                                                    &transaction,
254                                                    tx,
255                                                    &rt,
256                                                    &mut seq_guard,
257                                                    lh_seq,
258                                                );
259
260                                                continue 'thread;
261                                            }
262
263                                            Err(_) => {
264                                                if let Some(resp_message) =
265                                                    server_transaction::make_response(
266                                                        message,
267                                                        transaction.to_tag(),
268                                                        400,
269                                                        b"Bad Request",
270                                                    )
271                                                {
272                                                    server_transaction::send_response(
273                                                        transaction,
274                                                        resp_message,
275                                                        tx,
276                                                        // &timer,
277                                                        &rt,
278                                                    );
279                                                }
280
281                                                continue 'thread;
282                                            }
283                                        }
284                                    }
285                                }
286
287                                if let Some(resp_message) = server_transaction::make_response(
288                                    message,
289                                    transaction.to_tag(),
290                                    481,
291                                    b"Call Does Not Exist",
292                                ) {
293                                    server_transaction::send_response(
294                                        transaction,
295                                        resp_message,
296                                        tx,
297                                        // &timer
298                                        &rt,
299                                    );
300                                }
301
302                                continue 'thread;
303                            }
304                        }
305
306                        process_out_of_dialog_server_transaction(
307                            sm,
308                            tm,
309                            transaction_handlers,
310                            transaction,
311                            &default_public_identity,
312                            &ongoing_dialogs,
313                            tx,
314                            rx,
315                            // &timer,
316                            &rt,
317                        );
318                    } else {
319                        panic! {"impossible condition"};
320                    }
321                } else {
322                    return ();
323                }
324            }
325        });
326
327        SipCore {
328            subscription_manager: Arc::clone(sm),
329            transaction_manager: Arc::clone(tm),
330            default_public_identity,
331            ongoing_dialogs,
332            // timer,
333        }
334    }
335
336    pub fn get_subscription_manager(&self) -> Arc<SubscriptionManager> {
337        Arc::clone(&self.subscription_manager)
338    }
339
340    pub fn get_transaction_manager(&self) -> Arc<SipTransactionManager> {
341        Arc::clone(&self.transaction_manager)
342    }
343
344    pub fn set_default_public_identity(
345        &self,
346        default_public_identity: String,
347        sip_instance_id: String,
348        transport: Arc<SipTransport>,
349    ) {
350        (*self.default_public_identity.lock().unwrap()).replace((
351            transport,
352            default_public_identity,
353            sip_instance_id,
354        ));
355    }
356
357    pub fn get_default_public_identity(&self) -> Option<(Arc<SipTransport>, String, String)> {
358        self.default_public_identity.lock().unwrap().clone()
359    }
360
361    pub fn get_ongoing_dialogs(&self) -> Arc<Mutex<Vec<Arc<SipDialog>>>> {
362        Arc::clone(&self.ongoing_dialogs)
363    }
364
365    // pub fn get_timer(&self) -> Arc<Timer> {
366    //     Arc::clone(&self.timer)
367    // }
368}
369
370pub trait SipDialogCache {
371    fn add_dialog(&self, dialog: &Arc<SipDialog>);
372    fn add_dialog_if_not_duplicate(&self, dialog: &Arc<SipDialog>) -> Arc<SipDialog>;
373    fn remove_dialog(&self, dialog: &Arc<SipDialog>);
374}
375
376impl SipDialogCache for Arc<Mutex<Vec<Arc<SipDialog>>>> {
377    fn add_dialog(&self, dialog: &Arc<SipDialog>) {
378        self.lock().unwrap().push(Arc::clone(dialog));
379    }
380
381    fn add_dialog_if_not_duplicate(&self, dialog: &Arc<SipDialog>) -> Arc<SipDialog> {
382        let mut guard = self.lock().unwrap();
383        for d in &*guard {
384            if d.dialog_identifier() == dialog.dialog_identifier() {
385                return Arc::clone(d);
386            }
387        }
388        guard.push(Arc::clone(dialog));
389        Arc::clone(dialog)
390    }
391
392    fn remove_dialog(&self, dialog: &Arc<SipDialog>) {
393        let mut guard = self.lock().unwrap();
394        if let Some(idx) = guard.iter().position(|d| Arc::ptr_eq(d, dialog)) {
395            guard.swap_remove(idx);
396        }
397    }
398}
399
400fn process_out_of_dialog_notify_request(
401    sm: Arc<SubscriptionManager>,
402    tm: Arc<SipTransactionManager>,
403    subscription_identifier: &SubscriptionIdentifier,
404    transaction: Arc<ServerTransaction>,
405    default_public_identity: &Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
406    ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
407    tx: mpsc::Sender<ServerTransactionEvent>,
408    rx: mpsc::Receiver<ServerTransactionEvent>,
409    rt: &Arc<Runtime>,
410) {
411    platform_log(LOG_TAG, "calling process_out_of_dialog_notify_request()");
412
413    let mut subscribe_request_can_fork = true;
414
415    if subscription_identifier
416        .event_type
417        .equals_bytes(b"reg", false)
418    {
419        subscribe_request_can_fork = false;
420    }
421
422    let message = transaction.message();
423
424    if let Some(headers) = message.headers() {
425        if let Some(subscription_state_header) = search(headers, b"Subscription-State", true) {
426            let subscription_state_header_field =
427                subscription_state_header.get_value().as_header_field();
428            if let Some(subscription_state) =
429                subscription_state_header_field.as_subscription_state()
430            {
431                platform_log(
432                    LOG_TAG,
433                    format!("with subscription state of {:?}", subscription_state),
434                );
435
436                if let Some(subscribe_request) =
437                    sm.get_registered_request(&subscription_identifier, !subscribe_request_can_fork)
438                {
439                    platform_log(LOG_TAG, "found corresponding subscribe request");
440                    subscribe_request.on_event();
441                    if subscription_state.state.equals_bytes(b"terminated", false) {
442                        platform_log(LOG_TAG, "process terminated subscription state");
443                        subscribe_request.on_terminating_event(subscription_state, message);
444                    } else {
445                        if let Some(mut resp_message) = server_transaction::make_response(
446                            message,
447                            transaction.to_tag(),
448                            200,
449                            b"OK",
450                        ) {
451                            let guard = default_public_identity.lock().unwrap();
452
453                            if let Some((transport, contact_identity, instance_id)) = &*guard {
454                                let transport_ = transaction.transport();
455                                if Arc::ptr_eq(transport, transport_) {
456                                    resp_message.add_header(Header::new(
457                                        b"Contact",
458                                        format!(
459                                            "<{}>;+sip.instance=\"{}\"",
460                                            contact_identity, instance_id
461                                        ), // to-do: is transport_address neccessary? also check reg-flow SUBSCRIBE
462                                    ));
463                                }
464                            }
465
466                            let (d_tx, mut d_rx) = tokio::sync::mpsc::channel(1);
467
468                            let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
469
470                            rt.spawn(async move {
471                                if let Some(dialog) = d_rx.recv().await {
472                                    ongoing_dialogs_.remove_dialog(&dialog);
473                                }
474                            });
475
476                            if let Ok(dialog) =
477                                SipDialog::try_new_as_uas(message, &resp_message, move |d| {
478                                    match d_tx.blocking_send(d) {
479                                        Ok(()) => {}
480                                        Err(e) => {}
481                                    }
482                                })
483                            {
484                                let mut dialog = Arc::new(dialog);
485
486                                if SERVER_SUPPORT_RFC_6665 {
487                                    ongoing_dialogs.add_dialog(&dialog);
488                                } else {
489                                    dialog = ongoing_dialogs.add_dialog_if_not_duplicate(&dialog);
490                                }
491
492                                server_transaction::send_response(
493                                    Arc::clone(&transaction),
494                                    resp_message,
495                                    tx,
496                                    // timer,
497                                    rt,
498                                );
499
500                                let subscriber = subscribe_request.get_subscriber();
501
502                                let identifier = subscription_identifier.clone();
503
504                                let transport = transaction.transport();
505
506                                let subscription = Subscription::new(
507                                    identifier,
508                                    Arc::clone(transport),
509                                    &dialog,
510                                    subscriber,
511                                );
512
513                                let subscription = Arc::new(subscription);
514
515                                let subscription_key = Uuid::new_v4();
516
517                                let dialog_user_key: Arc<
518                                    dyn SipDialogEventCallbacks + Send + Sync,
519                                > = dialog.register_user(SubscriptionDialogListener::new(
520                                    &subscription,
521                                    subscription_key,
522                                ));
523
524                                let dialog_user_key_ = Arc::clone(&dialog_user_key);
525                                if subscription_state.state.equals_bytes(b"pending", false) {
526                                    platform_log(LOG_TAG, "process pending subscription state");
527                                    subscriber.attach_subscription(
528                                        subscription_key,
529                                        dialog_user_key_,
530                                        &subscription,
531                                    );
532
533                                    let expiration_value = subscription_state.expires;
534
535                                    let scheduled_expiration = Instant::now().add(
536                                        Duration::from_secs(subscription_state.expires as u64),
537                                    );
538
539                                    let scheduled_refresh_point =
540                                        if subscription_state.expires > 300 {
541                                            Instant::now().add(Duration::from_secs(
542                                                subscription_state.expires as u64 - 300,
543                                            ))
544                                        } else {
545                                            Instant::now()
546                                        };
547
548                                    schedule_refresh(
549                                        &subscription_key,
550                                        &dialog_user_key,
551                                        subscription,
552                                        expiration_value,
553                                        scheduled_expiration,
554                                        scheduled_refresh_point,
555                                        &sm,
556                                        &tm,
557                                        // timer,
558                                        rt,
559                                    )
560                                } else if subscription_state.state.equals_bytes(b"active", false) {
561                                    platform_log(LOG_TAG, "process active subscription state");
562
563                                    subscriber.attach_subscription(
564                                        subscription_key,
565                                        dialog_user_key,
566                                        &subscription,
567                                    );
568
569                                    // subscription_update_expire_timer(s, subscription_state->expires);
570
571                                    if let Some(headers) = message.headers() {
572                                        if let Some(content_type_header) =
573                                            header::search(headers, b"Content-Type", true)
574                                        {
575                                            platform_log(LOG_TAG, "got Content-Type header");
576                                            if let Some(body) = message.get_body() {
577                                                platform_log(LOG_TAG, "got message body");
578                                                subscriber.on_event(
579                                                    SubscriberEvent::ReceivedNotify(
580                                                        Some(subscription_key),
581                                                        content_type_header.get_value().to_vec(),
582                                                        body,
583                                                    ),
584                                                );
585                                            }
586                                        }
587                                    }
588                                }
589                            } else {
590                                platform_log(LOG_TAG, "error creating dialog");
591
592                                server_transaction::send_response(
593                                    transaction,
594                                    resp_message,
595                                    tx,
596                                    // timer,
597                                    rt,
598                                );
599                            }
600                        }
601                    }
602                } else {
603                    platform_log(LOG_TAG, "missing corresponding subscribe request");
604                    if let Some(resp_message) = server_transaction::make_response(
605                        message,
606                        transaction.to_tag(),
607                        481,
608                        b"Call Does Not Exist",
609                    ) {
610                        server_transaction::send_response(transaction, resp_message, tx, rt);
611                    }
612                }
613
614                return;
615            }
616        }
617
618        if let Some(resp_message) =
619            server_transaction::make_response(message, transaction.to_tag(), 400, b"Bad Request")
620        {
621            server_transaction::send_response(
622                transaction,
623                resp_message,
624                tx,
625                // timer,
626                rt,
627            );
628        }
629    }
630}
631
632fn process_out_of_dialog_server_transaction(
633    sm: Arc<SubscriptionManager>,
634    tm: Arc<SipTransactionManager>,
635    transaction_handlers: &Vec<Box<dyn TransactionHandler + Send + Sync>>,
636    transaction: Arc<ServerTransaction>,
637    default_public_identity: &Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
638    ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
639    tx: mpsc::Sender<ServerTransactionEvent>,
640    rx: mpsc::Receiver<ServerTransactionEvent>,
641    // timer: &Arc<Timer>,
642    rt: &Arc<Runtime>,
643) {
644    platform_log(
645        LOG_TAG,
646        "calling process_out_of_dialog_server_transaction()",
647    );
648
649    let message = transaction.message();
650
651    if let SipMessage::Request(req_line, _, _) = message {
652        if req_line.method == ACK || req_line.method == BYE {
653            if let Some(resp_message) = server_transaction::make_response(
654                message,
655                transaction.to_tag(),
656                481,
657                b"Call Does Not Exist",
658            ) {
659                server_transaction::send_response(transaction, resp_message, tx, rt);
660            }
661
662            return;
663        }
664
665        if req_line.method == NOTIFY {
666            if let Ok(subscription_identifier) = get_identifier_from_sip_notify(message) {
667                platform_log(
668                    LOG_TAG,
669                    format!(
670                        "receiving NOTIFY with identifier {:?}",
671                        &subscription_identifier
672                    ),
673                );
674
675                process_out_of_dialog_notify_request(
676                    sm,
677                    tm,
678                    &subscription_identifier,
679                    transaction,
680                    default_public_identity,
681                    ongoing_dialogs,
682                    tx,
683                    rx,
684                    // timer,
685                    rt,
686                );
687                return;
688            }
689
690            if let Some(resp_message) = server_transaction::make_response(
691                message,
692                transaction.to_tag(),
693                400,
694                b"Bad Request",
695            ) {
696                server_transaction::send_response(transaction, resp_message, tx, rt);
697            }
698            return;
699        }
700
701        let mut channels = Some((tx, rx));
702
703        for handler in transaction_handlers {
704            if handler.handle_transaction(
705                &transaction,
706                ongoing_dialogs,
707                &mut channels,
708                //  timer,
709                rt,
710            ) {
711                platform_log(
712                    LOG_TAG,
713                    "out of dialog server transaction successfully handled",
714                );
715
716                return;
717            }
718        }
719
720        if let Some((tx, _)) = channels {
721            if let Some(resp_message) = server_transaction::make_response(
722                message,
723                transaction.to_tag(),
724                500,
725                b"Server Internal Error",
726            ) {
727                server_transaction::send_response(
728                    transaction,
729                    resp_message,
730                    tx,
731                    // timer,
732                    rt,
733                );
734            }
735        }
736    }
737}
738
739pub trait TransactionHandler {
740    fn handle_transaction(
741        &self,
742        transaction: &Arc<ServerTransaction>,
743        ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
744        channels: &mut Option<(
745            mpsc::Sender<ServerTransactionEvent>,
746            mpsc::Receiver<ServerTransactionEvent>,
747        )>,
748        // timer: &Timer,
749        rt: &Arc<Runtime>,
750    ) -> bool;
751}