rust_rcs_client/conference/
mod.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    ffi::CStr,
17    ptr::NonNull,
18    sync::{Arc, Mutex},
19};
20
21use futures::channel::oneshot;
22use tokio::{runtime::Runtime, sync::mpsc};
23
24use uuid::Uuid;
25
26use rust_rcs_core::{
27    internet::{
28        body::{message_body::MessageBody, multipart_body::MultipartBody},
29        header,
30        headers::AsContentType,
31        AsHeaderField, Body, Header,
32    },
33    io::Serializable,
34    sip::{
35        sip_core::SipDialogCache,
36        sip_session::{SipSession, SipSessionEvent, SipSessionEventReceiver},
37        sip_transaction::{client_transaction::ClientTransactionNilCallbacks, server_transaction},
38        ClientTransactionCallbacks, ServerTransaction, ServerTransactionEvent, SipCore, SipDialog,
39        SipDialogEventCallbacks, SipMessage, SipTransactionManager, SipTransport,
40        TransactionHandler, INVITE,
41    },
42    util::rand::create_raw_alpha_numeric_string,
43};
44
45use crate::messaging::cpm::session::UpdateMessageCallbacks;
46
47use self::{
48    ffi::{
49        MultiConferenceEventListener, MultiConferenceEventListenerContextWrapper,
50        MultiConferenceV1InviteResponse,
51    },
52    sip::AsConferenceV1Contact,
53    subscription::MultiConferenceEvent,
54};
55
56pub mod ffi;
57pub mod sip;
58pub mod subscription;
59
60struct MultiConferenceV1Session {
61    conference_id: String,
62    // sdp: String,
63    tx: mpsc::Sender<MultiConferenceEvent>,
64    // event_listener: Arc<
65    //     Mutex<
66    //         Option<(
67    //             MultiConferenceEventCallback,
68    //             MultiConferenceEventCallbackContextWrapper,
69    //         )>,
70    //     >,
71    // >,
72}
73
74impl MultiConferenceV1Session {
75    pub fn get_conference_id(&self) -> String {
76        self.conference_id.clone()
77    }
78    // pub fn get_sdp(&self) -> String {
79    //     self.sdp.clone()
80    // }
81
82    pub fn start(
83        &self,
84        mut rx: mpsc::Receiver<MultiConferenceEvent>,
85        event_cb: MultiConferenceEventListener,
86        event_cb_context: Arc<Mutex<MultiConferenceEventListenerContextWrapper>>,
87        rt: &Arc<Runtime>,
88    ) {
89        rt.spawn(async move {
90            while let Some(ev) = rx.recv().await {
91                let ev_type = ev.get_event_type();
92                let ev = Some(Box::new(ev));
93                let event_cb_context = event_cb_context.lock().unwrap();
94                let cb_context = event_cb_context.0.as_ptr();
95                event_cb(ev_type, ev, cb_context);
96            }
97        });
98    }
99}
100
101struct MultiConferenceV1SessionDialogEventReceiver {
102    sip_session: Arc<SipSession<MultiConferenceV1Session>>,
103    event_listener: Arc<
104        Mutex<
105            Option<(
106                mpsc::Receiver<MultiConferenceEvent>,
107                MultiConferenceEventListener,
108                MultiConferenceEventListenerContextWrapper,
109            )>,
110        >,
111    >,
112    rt: Arc<Runtime>,
113}
114
115// MultiConferenceV1 server does not actively initiate requests
116impl SipDialogEventCallbacks for MultiConferenceV1SessionDialogEventReceiver {
117    fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {
118        let mut guard = self.event_listener.lock().unwrap();
119        if let Some((rx, event_cb, event_cb_context)) = guard.take() {
120            let session = self.sip_session.get_inner();
121
122            let event_cb_context = Arc::new(Mutex::new(event_cb_context));
123
124            session.start(rx, event_cb, event_cb_context, &self.rt);
125        }
126    }
127
128    fn on_new_request(
129        &self,
130        _transaction: Arc<ServerTransaction>,
131        _tx: mpsc::Sender<ServerTransactionEvent>,
132        _rt: &Arc<Runtime>,
133    ) -> Option<(u16, bool)> {
134        None
135    }
136
137    fn on_terminating_request(&self, _message: &SipMessage) {
138        let inner = self.sip_session.get_inner();
139
140        let tx = inner.tx.clone();
141
142        self.rt.spawn(async move {
143            match tx.send(MultiConferenceEvent::conference_end()).await {
144                Ok(()) => {}
145                Err(e) => {}
146            }
147        });
148    }
149
150    fn on_terminating_response(&self, _message: &SipMessage) {
151        let inner = self.sip_session.get_inner();
152
153        let tx = inner.tx.clone();
154
155        self.rt.spawn(async move {
156            match tx.send(MultiConferenceEvent::conference_end()).await {
157                Ok(()) => {}
158                Err(e) => {}
159            }
160        });
161    }
162}
163
164struct MultiConferenceV1Internal {
165    sip_session: Arc<SipSession<MultiConferenceV1Session>>,
166    // cb: MultiConferenceEventCallback,
167    // cb_context : MultiConferenceEventCallbackContextWrapper,
168}
169
170impl MultiConferenceV1Internal {
171    // pub fn get_sdp(&self) -> String {
172    //     let session = self.sip_session.get_inner();
173    //     session.get_sdp()
174    // }
175    pub fn keep_alive(&self, _rt: &Arc<Runtime>) {
176        self.sip_session.mark_session_active()
177    }
178}
179
180pub struct MultiConferenceV1 {
181    inner: Arc<MultiConferenceV1Internal>,
182}
183
184// impl MultiConferenceV1 {
185//     pub fn register_event_listener(
186//         &mut self,
187//         cb: MultiConferenceEventCallback,
188//         cb_context: MultiConferenceEventCallbackContextWrapper,
189//     ) {
190//         let inner = self.inner.sip_session.get_inner();
191
192//         let mut guard = inner.event_listener.lock().unwrap();
193
194//         *guard = Some((cb, cb_context));
195//     }
196// }
197
198impl MultiConferenceV1 {
199    // pub fn get_sdp(&self) -> String {
200    //     self.inner.get_sdp()
201    // }
202    pub fn keep_alive(&self, rt: &Arc<Runtime>) {
203        self.inner.keep_alive(rt)
204    }
205}
206
207/**
208 * conference v1 is set to use with go-rcs-server and rcs-mediasoup-server
209 */
210pub struct MultiConferenceServiceV1 {
211    multi_conference_invite_handler: Arc<
212        dyn Fn(MultiConferenceV1, String, MultiConferenceV1InviteResponseReceiver) + Send + Sync,
213    >,
214
215    registered_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
216}
217
218impl MultiConferenceServiceV1 {
219    pub fn new<MCIHF>(multi_conference_invite_handler_function: MCIHF) -> MultiConferenceServiceV1
220    where
221        MCIHF: Fn(MultiConferenceV1, String, MultiConferenceV1InviteResponseReceiver)
222            + Send
223            + Sync
224            + 'static,
225    {
226        MultiConferenceServiceV1 {
227            multi_conference_invite_handler: Arc::new(multi_conference_invite_handler_function),
228
229            registered_public_identity: Arc::new(Mutex::new(None)),
230        }
231    }
232
233    pub fn set_registered_public_identity(
234        &self,
235        registered_public_identity: String,
236        sip_instance_id: String,
237        transport: Arc<SipTransport>,
238    ) {
239        (*self.registered_public_identity.lock().unwrap()).replace((
240            transport,
241            registered_public_identity,
242            sip_instance_id,
243        ));
244    }
245
246    pub fn create_conference<F>(
247        &self,
248        recipients: &str,
249        offer_sdp: &str,
250        event_cb: Option<MultiConferenceEventListener>,
251        event_cb_context: MultiConferenceEventListenerContextWrapper,
252        core: &Arc<SipCore>,
253        rt: &Arc<Runtime>,
254        callback: F,
255    ) where
256        F: FnOnce(Option<(MultiConferenceV1, String)>) + Send + Sync + 'static,
257    {
258        let event_cb_context = Arc::new(Mutex::new(event_cb_context));
259
260        if let Some((transport, public_user_identity, instance_id)) =
261            core.get_default_public_identity()
262        {
263            let mut invite_message =
264                SipMessage::new_request(INVITE, b"sip:conference-focus@example.com");
265
266            invite_message.add_header(Header::new(
267                b"Call-ID",
268                String::from(
269                    Uuid::new_v4()
270                        .as_hyphenated()
271                        .encode_lower(&mut Uuid::encode_buffer()),
272                ),
273            ));
274
275            invite_message.add_header(Header::new(b"CSeq", b"1 INVITE"));
276
277            let tag = create_raw_alpha_numeric_string(8);
278            let tag = String::from_utf8_lossy(&tag);
279
280            invite_message.add_header(Header::new(b"From", format!("<{}>;", public_user_identity)));
281
282            invite_message.add_header(Header::new(b"To", b"<sip:conference-focus@example.com>"));
283
284            let boundary = create_raw_alpha_numeric_string(16);
285            let boundary_ = String::from_utf8_lossy(&boundary);
286            let mut parts = Vec::with_capacity(2);
287
288            let mut recipients_part_headers = Vec::new();
289            recipients_part_headers.push(Header::new(b"Content-Type", b"application/json"));
290            let recipients_json_length = recipients.len();
291            recipients_part_headers.push(Header::new(
292                b"Content-Length",
293                format!("{}", recipients_json_length),
294            ));
295
296            parts.push(Arc::new(Body::Message(MessageBody {
297                headers: recipients_part_headers,
298                body: Arc::new(Body::Raw(recipients.as_bytes().to_vec())),
299            })));
300
301            let mut sdp_part_headers = Vec::new();
302            sdp_part_headers.push(Header::new(b"Content-Type", b"application/sdp"));
303            let sdp_content_length = offer_sdp.len();
304            sdp_part_headers.push(Header::new(
305                b"Content-Length",
306                format!("{}", sdp_content_length),
307            ));
308
309            parts.push(Arc::new(Body::Message(MessageBody {
310                headers: sdp_part_headers,
311                body: Arc::new(Body::Raw(offer_sdp.as_bytes().to_vec())),
312            })));
313
314            invite_message.add_header(Header::new(
315                b"Content-Type",
316                format!("multipart/mixed; boundary={}", boundary_),
317            ));
318
319            let multipart = Body::Multipart(MultipartBody { boundary, parts });
320
321            let content_length = multipart.estimated_size();
322            invite_message.add_header(Header::new(
323                b"Content-Length",
324                format!("{}", content_length),
325            ));
326
327            let multipart = Arc::new(multipart);
328            invite_message.set_body(multipart);
329
330            let req_headers = invite_message.copy_headers();
331
332            let (client_transaction_tx, mut client_transaction_rx) = mpsc::channel(1);
333
334            let ongoing_dialogs = core.get_ongoing_dialogs();
335
336            let core_ = Arc::clone(&core);
337            let rt_ = Arc::clone(rt);
338
339            rt.spawn(async move {
340                if let Some(res) = client_transaction_rx.recv().await {
341                    match res {
342                        Ok((resp_message, conference_id, sdp)) => {
343                            let (d_tx, mut d_rx) = mpsc::channel(1);
344
345                            let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
346
347                            rt_.spawn(async move {
348                                if let Some(dialog) = d_rx.recv().await {
349                                    ongoing_dialogs_.remove_dialog(&dialog);
350                                }
351                            });
352
353                            if let Ok(dialog) =
354                                SipDialog::try_new_as_uac(&req_headers, &resp_message, move |d| {
355                                    match d_tx.blocking_send(d) {
356                                        Ok(()) => {}
357                                        Err(e) => {}
358                                    }
359                                })
360                            {
361                                let dialog = Arc::new(dialog);
362
363                                ongoing_dialogs.add_dialog(&dialog);
364
365                                let (conf_tx, conf_rx) = mpsc::channel(8);
366
367                                let session = MultiConferenceV1Session {
368                                    conference_id,
369                                    tx: conf_tx,
370                                };
371                                let session = Arc::new(session);
372
373                                let (sess_tx, mut sess_rx) = mpsc::channel(8);
374
375                                let sip_session = SipSession::new(
376                                    &session,
377                                    SipSessionEventReceiver {
378                                        tx: sess_tx,
379                                        rt: Arc::clone(&rt_),
380                                    },
381                                );
382
383                                let sip_session = Arc::new(sip_session);
384                                let sip_session_ = Arc::clone(&sip_session);
385
386                                let core = Arc::clone(&core_);
387                                let rt = Arc::clone(&rt_);
388
389                                rt_.spawn(async move {
390                                    while let Some(ev) = sess_rx.recv().await {
391                                        match ev {
392                                            SipSessionEvent::ShouldRefresh(dialog) => {
393                                                if let Ok(mut message) =
394                                                    dialog.make_request(b"UPDATE", None)
395                                                {
396                                                    message.add_header(Header::new(
397                                                        b"Supported",
398                                                        b"timer",
399                                                    ));
400
401                                                    if let Some((transport, _, _)) =
402                                                        core.get_default_public_identity()
403                                                    {
404                                                        core.get_transaction_manager()
405                                                            .send_request(
406                                                                message,
407                                                                &transport,
408                                                                UpdateMessageCallbacks {
409                                                                    // session_expires: None,
410                                                                    dialog,
411                                                                    sip_session: Arc::clone(
412                                                                        &sip_session_,
413                                                                    ),
414                                                                    rt: Arc::clone(&rt),
415                                                                },
416                                                                &rt,
417                                                            );
418                                                    }
419                                                }
420                                            }
421
422                                            SipSessionEvent::Expired(dialog) => {
423                                                if let Ok(message) =
424                                                    dialog.make_request(b"BYE", None)
425                                                {
426                                                    if let Some((transport, _, _)) =
427                                                        core.get_default_public_identity()
428                                                    {
429                                                        core.get_transaction_manager()
430                                                            .send_request(
431                                                                message,
432                                                                &transport,
433                                                                ClientTransactionNilCallbacks {},
434                                                                &rt,
435                                                            );
436                                                    }
437                                                }
438                                            }
439
440                                            _ => {}
441                                        }
442                                    }
443                                });
444
445                                sip_session.setup_confirmed_dialog(
446                                    &dialog,
447                                    MultiConferenceV1SessionDialogEventReceiver {
448                                        sip_session: Arc::clone(&sip_session),
449                                        event_listener: Arc::new(Mutex::new(None)),
450                                        rt: Arc::clone(&rt_),
451                                    },
452                                );
453
454                                if let Ok(ack_message) = dialog.make_request(b"ACK", Some(1)) {
455                                    if let Some((transport, _, _)) =
456                                        core_.get_default_public_identity()
457                                    {
458                                        core_.get_transaction_manager().send_request(
459                                            ack_message,
460                                            &transport,
461                                            ClientTransactionNilCallbacks {},
462                                            &rt_,
463                                        );
464                                    }
465                                }
466
467                                // to-do: start progressing events
468
469                                if let Some(event_cb) = event_cb {
470                                    session.start(conf_rx, event_cb, event_cb_context, &rt_);
471                                }
472
473                                // rt_.spawn(async move {
474                                //     while let Some(ev) = conf_rx.recv().await {
475                                //         if let Some(cb) = event_cb {
476                                //             let ev = Some(Box::new(ev));
477                                //             let event_cb_context = event_cb_context.lock().unwrap();
478                                //             let cb_context = event_cb_context.0.as_ptr();
479                                //             cb(ev, cb_context);
480                                //         }
481                                //     }
482                                // });
483
484                                // to-do: setup subscriber
485
486                                let inner = MultiConferenceV1Internal { sip_session };
487                                let inner = Arc::new(inner);
488
489                                callback(Some((MultiConferenceV1 { inner }, sdp)));
490
491                                return;
492                            }
493
494                            callback(None)
495                        }
496
497                        Err(e) => callback(None),
498                    }
499                }
500            });
501
502            core.get_transaction_manager().send_request(
503                invite_message,
504                &transport,
505                ConferenceV1InviteCallbacks {
506                    client_transaction_tx,
507                    rt: Arc::clone(rt),
508                },
509                rt,
510            );
511
512            return;
513        }
514
515        callback(None)
516    }
517}
518
519struct ConferenceV1InviteCallbacks {
520    client_transaction_tx: mpsc::Sender<Result<(SipMessage, String, String), u16>>,
521    rt: Arc<Runtime>,
522}
523
524impl ClientTransactionCallbacks for ConferenceV1InviteCallbacks {
525    fn on_provisional_response(&self, _message: SipMessage) {}
526
527    fn on_final_response(&self, message: SipMessage) {
528        if let SipMessage::Response(l, headers, _) = &message {
529            if l.status_code >= 200 && l.status_code < 300 {
530                if let Some(headers) = headers {
531                    if let Some(contact_header) = header::search(headers, b"Contact", true) {
532                        if let Some(conference_v1_contact) = contact_header
533                            .get_value()
534                            .as_header_field()
535                            .as_conference_v1_contact()
536                        {
537                            if let Some(content_type_header) =
538                                header::search(headers, b"Content-Type", true)
539                            {
540                                if let Some(content_type) = content_type_header
541                                    .get_value()
542                                    .as_header_field()
543                                    .as_content_type()
544                                {
545                                    if content_type.major_type.eq_ignore_ascii_case(b"application")
546                                        && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
547                                    {
548                                        if let Some(body) = message.get_body() {
549                                            if let Body::Raw(bytes) = body.as_ref() {
550                                                if let Ok(sdp) = std::str::from_utf8(bytes) {
551                                                    let sdp = String::from(sdp);
552
553                                                    let tx = self.client_transaction_tx.clone();
554
555                                                    self.rt.spawn(async move {
556                                                        match tx
557                                                            .send(Ok((
558                                                                message,
559                                                                conference_v1_contact
560                                                                    .conference_uri,
561                                                                sdp,
562                                                            )))
563                                                            .await
564                                                        {
565                                                            Ok(()) => {}
566                                                            Err(e) => {}
567                                                        }
568                                                    });
569
570                                                    return;
571                                                }
572                                            }
573                                        }
574                                    }
575                                }
576                            }
577
578                            return;
579                        }
580                    }
581
582                    let tx = self.client_transaction_tx.clone();
583
584                    self.rt.spawn(async move {
585                        match tx.send(Err(400)).await {
586                            Ok(()) => {}
587                            Err(e) => {}
588                        }
589                    });
590
591                    return;
592                }
593
594                let code = l.status_code;
595
596                let tx = self.client_transaction_tx.clone();
597
598                self.rt.spawn(async move {
599                    match tx.send(Err(code)).await {
600                        Ok(()) => {}
601                        Err(e) => {}
602                    }
603                });
604
605                return;
606            }
607        }
608    }
609
610    fn on_transport_error(&self) {
611        let client_transaction_tx = self.client_transaction_tx.clone();
612        self.rt.spawn(async move {
613            match client_transaction_tx.send(Err(0)).await {
614                Ok(()) => {}
615                Err(e) => {}
616            }
617        });
618    }
619}
620
621pub struct MultiConferenceServiceV1Wrapper {
622    pub service: Arc<MultiConferenceServiceV1>,
623    pub tm: Arc<SipTransactionManager>,
624}
625
626struct MultiConferenceV1InviteResponseInner {
627    pub status_code: u16,
628    pub answer_sdp: String,
629    pub event_listener: MultiConferenceEventListener,
630    pub event_listener_context: MultiConferenceEventListenerContextWrapper,
631}
632
633pub struct MultiConferenceV1InviteResponseReceiver {
634    tx: Option<oneshot::Sender<MultiConferenceV1InviteResponseInner>>,
635}
636
637impl MultiConferenceV1InviteResponseReceiver {
638    pub fn accept_result(&mut self, resp: &MultiConferenceV1InviteResponse) {
639        if let Some(tx) = self.tx.take() {
640            if let Some(event_listener) = resp.event_listener {
641                let answer_sdp = unsafe {
642                    CStr::from_ptr(resp.answer_sdp)
643                        .to_string_lossy()
644                        .into_owned()
645                };
646
647                let event_listener_context = MultiConferenceEventListenerContextWrapper(
648                    NonNull::new(resp.event_listener_context).unwrap(),
649                );
650
651                match tx.send(MultiConferenceV1InviteResponseInner {
652                    status_code: resp.status_code,
653                    answer_sdp,
654                    event_listener,
655                    event_listener_context,
656                }) {
657                    Ok(()) => {}
658                    Err(e) => {}
659                }
660            }
661        }
662    }
663
664    pub fn drop_result(&mut self) {
665        self.tx.take();
666    }
667}
668
669impl TransactionHandler for MultiConferenceServiceV1Wrapper {
670    fn handle_transaction(
671        &self,
672        transaction: &Arc<ServerTransaction>,
673        ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
674        channels: &mut Option<(
675            mpsc::Sender<ServerTransactionEvent>,
676            mpsc::Receiver<ServerTransactionEvent>,
677        )>,
678        rt: &Arc<Runtime>,
679    ) -> bool {
680        let message = transaction.message();
681
682        if let SipMessage::Request(req_line, Some(req_headers), Some(req_body)) = message {
683            if req_line.method == INVITE {
684                let mut is_v1_multi_conference_invite = false;
685
686                if let Some(header) = header::search(req_headers, b"P-Asserted-Service", true) {
687                    if header
688                        .get_value()
689                        .eq_ignore_ascii_case(b"urn:example:conference-service")
690                    {
691                        is_v1_multi_conference_invite = true;
692                    }
693                }
694
695                if is_v1_multi_conference_invite {
696                    if let Some(header) = header::search(req_headers, b"Contact", true) {
697                        if let Some(conference_v1_contact) = header
698                            .get_value()
699                            .as_header_field()
700                            .as_conference_v1_contact()
701                        {
702                            if let Some(header) = header::search(req_headers, b"Content-Type", true)
703                            {
704                                if let Some(content_type) =
705                                    header.get_value().as_header_field().as_content_type()
706                                {
707                                    if content_type.major_type.eq_ignore_ascii_case(b"application")
708                                        && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
709                                    {
710                                        if let Body::Raw(raw) = req_body.as_ref() {
711                                            if let Ok(sdp_string) = std::str::from_utf8(raw) {
712                                                let offer_sdp = String::from(sdp_string);
713
714                                                if let Some(resp_message) =
715                                                    server_transaction::make_response(
716                                                        message,
717                                                        transaction.to_tag(),
718                                                        180,
719                                                        b"Ringing",
720                                                    )
721                                                {
722                                                    if let Some((tx, mut rx)) = channels.take() {
723                                                        let (d_tx, mut d_rx) = mpsc::channel(1);
724
725                                                        let ongoing_dialogs_ =
726                                                            Arc::clone(&ongoing_dialogs);
727
728                                                        rt.spawn(async move {
729                                                            if let Some(dialog) = d_rx.recv().await
730                                                            {
731                                                                ongoing_dialogs_
732                                                                    .remove_dialog(&dialog);
733                                                            }
734                                                        });
735
736                                                        if let Ok(dialog) =
737                                                            SipDialog::try_new_as_uas(
738                                                                message,
739                                                                &resp_message,
740                                                                move |d| match d_tx.blocking_send(d)
741                                                                {
742                                                                    Ok(()) => {}
743                                                                    Err(e) => {}
744                                                                },
745                                                            )
746                                                        {
747                                                            let dialog = Arc::new(dialog);
748
749                                                            ongoing_dialogs.add_dialog(&dialog);
750
751                                                            let (conf_tx, conf_rx) =
752                                                                mpsc::channel(8);
753
754                                                            let session =
755                                                                MultiConferenceV1Session {
756                                                                    conference_id:
757                                                                        conference_v1_contact
758                                                                            .conference_uri,
759                                                                    tx: conf_tx,
760                                                                };
761
762                                                            let session = Arc::new(session);
763
764                                                            let (sess_tx, mut sess_rx) =
765                                                                mpsc::channel(8);
766
767                                                            let sip_session = SipSession::new(
768                                                                &session,
769                                                                SipSessionEventReceiver {
770                                                                    tx: sess_tx,
771                                                                    rt: Arc::clone(&rt),
772                                                                },
773                                                            );
774
775                                                            let sip_session = Arc::new(sip_session);
776                                                            let sip_session_ =
777                                                                Arc::clone(&sip_session);
778
779                                                            let registered_public_identity =
780                                                                Arc::clone(
781                                                                    &self
782                                                                        .service
783                                                                        .registered_public_identity,
784                                                                );
785
786                                                            let tm_ = Arc::clone(&self.tm);
787                                                            let rt_ = Arc::clone(&rt);
788
789                                                            rt.spawn(async move {
790                                                                while let Some(ev) =
791                                                                    sess_rx.recv().await
792                                                                {
793                                                                    match ev {
794                                                                        SipSessionEvent::ShouldRefresh(dialog) => {
795                                                                            if let Ok(mut message) =
796                                                                                dialog.make_request(b"UPDATE", None)
797                                                                            {
798                                                                                message.add_header(Header::new(
799                                                                                    b"Supported",
800                                                                                    b"timer",
801                                                                                ));
802
803                                                                                let guard = registered_public_identity.lock().unwrap();
804
805                                                                                if let Some((transport, _, _)) = &*guard
806                                                                                {
807                                                                                    tm_.send_request(
808                                                                                            message,
809                                                                                            transport,
810                                                                                            UpdateMessageCallbacks {
811                                                                                                // session_expires: None,
812                                                                                                dialog,
813                                                                                                sip_session: Arc::clone(
814                                                                                                    &sip_session_,
815                                                                                                ),
816                                                                                                rt: Arc::clone(&rt_),
817                                                                                            },
818                                                                                            &rt_,
819                                                                                        );
820                                                                                }
821                                                                            }
822                                                                        }
823
824                                                                        SipSessionEvent::Expired(dialog) => {
825                                                                            if let Ok(message) =
826                                                                                dialog.make_request(b"BYE", None)
827                                                                            {
828                                                                                let guard = registered_public_identity.lock().unwrap();
829
830                                                                                if let Some((transport, _, _)) = &*guard
831                                                                                {
832                                                                                    tm_.send_request(
833                                                                                            message,
834                                                                                            transport,
835                                                                                            ClientTransactionNilCallbacks {},
836                                                                                            &rt_,
837                                                                                        );
838                                                                                }
839                                                                            }
840                                                                        }
841
842                                                                        _ => {}
843                                                                    }
844                                                                }
845                                                            });
846
847                                                            let event_listener =
848                                                                Arc::new(Mutex::new(None));
849                                                            let event_listener_ =
850                                                                Arc::clone(&event_listener);
851
852                                                            sip_session.setup_early_dialog(&dialog, MultiConferenceV1SessionDialogEventReceiver {
853                                                                sip_session: Arc::clone(&sip_session),
854                                                                event_listener,
855                                                                rt: Arc::clone(&rt),
856                                                            });
857
858                                                            let inner = MultiConferenceV1Internal {
859                                                                sip_session,
860                                                            };
861                                                            let inner = Arc::new(inner);
862
863                                                            let conference_v1 =
864                                                                MultiConferenceV1 { inner };
865
866                                                            let (handler_tx, handler_rx) =
867                                                                oneshot::channel();
868
869                                                            let response_receiver = MultiConferenceV1InviteResponseReceiver{
870                                                                tx: Some(handler_tx),
871                                                            };
872
873                                                            (self
874                                                                .service
875                                                                .multi_conference_invite_handler)(
876                                                                conference_v1,
877                                                                offer_sdp,
878                                                                response_receiver,
879                                                            );
880
881                                                            let (tx_, mut rx_) = mpsc::channel::<
882                                                                ServerTransactionEvent,
883                                                            >(
884                                                                8
885                                                            );
886
887                                                            rt.spawn(async move {
888                                                                while let Some(ev) = rx.recv().await {
889                                                                    match ev {
890                                                                        ServerTransactionEvent::Cancelled => {
891                                                                            todo!()
892                                                                        }
893                                                                        _ => {}
894                                                                    }
895
896                                                                    match tx_.send(ev).await {
897                                                                        Ok(()) => {}
898                                                                        Err(e) => {}
899                                                                    }
900                                                                }
901                                                            });
902
903                                                            let registered_public_identity =
904                                                                Arc::clone(
905                                                                    &self
906                                                                        .service
907                                                                        .registered_public_identity,
908                                                                );
909
910                                                            let transaction_ =
911                                                                Arc::clone(transaction);
912
913                                                            let rt_ = Arc::clone(&rt);
914
915                                                            rt.spawn(async move {
916
917                                                                let message = transaction_.message();
918
919                                                                match handler_rx.await {
920                                                                    Ok(resp) => {
921
922                                                                        if resp.status_code >= 200 && resp.status_code < 300 {
923
924                                                                            if let Some(mut resp_message) = server_transaction::make_response(message, transaction_.to_tag(), 200, b"OK") {
925
926                                                                                {
927                                                                                    let guard = registered_public_identity.lock().unwrap();
928
929                                                                                    if let Some((
930                                                                                        transport,
931                                                                                        contact_identity,
932                                                                                        instance_id,
933                                                                                    )) = &*guard {
934                                                                                        let transport_ = transaction_.transport();
935                                                                                        if Arc::ptr_eq(transport, transport_) {
936                                                                                            resp_message.add_header(Header::new(
937                                                                                                b"Contact",
938                                                                                                format!(
939                                                                                                    "<{}>;+sip.instance=\"{}\"",
940                                                                                                    contact_identity, instance_id
941                                                                                                ),
942                                                                                            ));
943                                                                                        } // to-do: treat as error if transport has changed somehow
944                                                                                    }
945                                                                                }
946
947                                                                                let sdp_body = resp.answer_sdp.as_bytes().to_vec();
948                                                                                let sdp_body = Body::Raw(sdp_body);
949                                                                                let sdp_body = Arc::new(sdp_body);
950                                                                                let sdp_body_len = resp.answer_sdp.len();
951
952                                                                                resp_message.add_header(Header::new(b"Content-Type", b"application/json"));
953
954                                                                                resp_message.add_header(Header::new(b"Content-Length", format!("{}", sdp_body_len)));
955
956                                                                                resp_message.set_body(sdp_body);
957
958                                                                                let mut guard = event_listener_.lock().unwrap();
959
960                                                                                *guard = Some((conf_rx, resp.event_listener, resp.event_listener_context));
961
962                                                                                rt_.spawn(async move {
963
964                                                                                    while let Some(ev) = rx_.recv().await {
965                                                                                        match ev {
966                                                                                            ServerTransactionEvent::Cancelled => {
967                                                                                                todo!()
968                                                                                            },
969                                                                                            _ => {}
970                                                                                        }
971                                                                                    }
972                                                                                });
973
974                                                                                dialog.confirm();
975
976                                                                                server_transaction::send_response(
977                                                                                    transaction_,
978                                                                                    resp_message,
979                                                                                    tx,
980                                                                                    // &timer,
981                                                                                    &rt_,
982                                                                                );
983
984                                                                                return ;
985                                                                            }
986                                                                        }
987                                                                    }
988
989                                                                    Err(e) => {
990
991                                                                    }
992                                                                }
993
994                                                                if let Some(resp_message) = server_transaction::make_response(message, transaction_.to_tag(), 486, b"Busy Here") {
995                                                                    server_transaction::send_response(
996                                                                        transaction_,
997                                                                        resp_message,
998                                                                        tx,
999                                                                        // &timer,
1000                                                                        &rt_,
1001                                                                    );
1002                                                                }
1003                                                            });
1004
1005                                                            return true;
1006                                                        }
1007
1008                                                        server_transaction::send_response(
1009                                                            Arc::clone(transaction),
1010                                                            resp_message,
1011                                                            tx,
1012                                                            // &timer,
1013                                                            rt,
1014                                                        );
1015                                                    }
1016                                                }
1017
1018                                                return true;
1019                                            }
1020                                        }
1021                                    }
1022                                }
1023                            }
1024
1025                            if let Some(resp_message) = server_transaction::make_response(
1026                                message,
1027                                transaction.to_tag(),
1028                                400,
1029                                b"Bad Request",
1030                            ) {
1031                                if let Some((tx, mut rx)) = channels.take() {
1032                                    server_transaction::send_response(
1033                                        Arc::clone(transaction),
1034                                        resp_message,
1035                                        tx,
1036                                        // &timer,
1037                                        rt,
1038                                    );
1039                                }
1040                            }
1041
1042                            return true;
1043                        }
1044                    }
1045                }
1046            }
1047        }
1048
1049        false
1050    }
1051}