rust_rcs_client/conference/mod.rs
1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 ffi::CStr,
17 ptr::NonNull,
18 sync::{Arc, Mutex},
19};
20
21use futures::channel::oneshot;
22use tokio::{runtime::Runtime, sync::mpsc};
23
24use uuid::Uuid;
25
26use rust_rcs_core::{
27 internet::{
28 body::{message_body::MessageBody, multipart_body::MultipartBody},
29 header,
30 headers::AsContentType,
31 AsHeaderField, Body, Header,
32 },
33 io::Serializable,
34 sip::{
35 sip_core::SipDialogCache,
36 sip_session::{SipSession, SipSessionEvent, SipSessionEventReceiver},
37 sip_transaction::{client_transaction::ClientTransactionNilCallbacks, server_transaction},
38 ClientTransactionCallbacks, ServerTransaction, ServerTransactionEvent, SipCore, SipDialog,
39 SipDialogEventCallbacks, SipMessage, SipTransactionManager, SipTransport,
40 TransactionHandler, INVITE,
41 },
42 util::rand::create_raw_alpha_numeric_string,
43};
44
45use crate::messaging::cpm::session::UpdateMessageCallbacks;
46
47use self::{
48 ffi::{
49 MultiConferenceEventListener, MultiConferenceEventListenerContextWrapper,
50 MultiConferenceV1InviteResponse,
51 },
52 sip::AsConferenceV1Contact,
53 subscription::MultiConferenceEvent,
54};
55
56pub mod ffi;
57pub mod sip;
58pub mod subscription;
59
60struct MultiConferenceV1Session {
61 conference_id: String,
62 // sdp: String,
63 tx: mpsc::Sender<MultiConferenceEvent>,
64 // event_listener: Arc<
65 // Mutex<
66 // Option<(
67 // MultiConferenceEventCallback,
68 // MultiConferenceEventCallbackContextWrapper,
69 // )>,
70 // >,
71 // >,
72}
73
74impl MultiConferenceV1Session {
75 pub fn get_conference_id(&self) -> String {
76 self.conference_id.clone()
77 }
78 // pub fn get_sdp(&self) -> String {
79 // self.sdp.clone()
80 // }
81
82 pub fn start(
83 &self,
84 mut rx: mpsc::Receiver<MultiConferenceEvent>,
85 event_cb: MultiConferenceEventListener,
86 event_cb_context: Arc<Mutex<MultiConferenceEventListenerContextWrapper>>,
87 rt: &Arc<Runtime>,
88 ) {
89 rt.spawn(async move {
90 while let Some(ev) = rx.recv().await {
91 let ev_type = ev.get_event_type();
92 let ev = Some(Box::new(ev));
93 let event_cb_context = event_cb_context.lock().unwrap();
94 let cb_context = event_cb_context.0.as_ptr();
95 event_cb(ev_type, ev, cb_context);
96 }
97 });
98 }
99}
100
101struct MultiConferenceV1SessionDialogEventReceiver {
102 sip_session: Arc<SipSession<MultiConferenceV1Session>>,
103 event_listener: Arc<
104 Mutex<
105 Option<(
106 mpsc::Receiver<MultiConferenceEvent>,
107 MultiConferenceEventListener,
108 MultiConferenceEventListenerContextWrapper,
109 )>,
110 >,
111 >,
112 rt: Arc<Runtime>,
113}
114
115// MultiConferenceV1 server does not actively initiate requests
116impl SipDialogEventCallbacks for MultiConferenceV1SessionDialogEventReceiver {
117 fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {
118 let mut guard = self.event_listener.lock().unwrap();
119 if let Some((rx, event_cb, event_cb_context)) = guard.take() {
120 let session = self.sip_session.get_inner();
121
122 let event_cb_context = Arc::new(Mutex::new(event_cb_context));
123
124 session.start(rx, event_cb, event_cb_context, &self.rt);
125 }
126 }
127
128 fn on_new_request(
129 &self,
130 _transaction: Arc<ServerTransaction>,
131 _tx: mpsc::Sender<ServerTransactionEvent>,
132 _rt: &Arc<Runtime>,
133 ) -> Option<(u16, bool)> {
134 None
135 }
136
137 fn on_terminating_request(&self, _message: &SipMessage) {
138 let inner = self.sip_session.get_inner();
139
140 let tx = inner.tx.clone();
141
142 self.rt.spawn(async move {
143 match tx.send(MultiConferenceEvent::conference_end()).await {
144 Ok(()) => {}
145 Err(e) => {}
146 }
147 });
148 }
149
150 fn on_terminating_response(&self, _message: &SipMessage) {
151 let inner = self.sip_session.get_inner();
152
153 let tx = inner.tx.clone();
154
155 self.rt.spawn(async move {
156 match tx.send(MultiConferenceEvent::conference_end()).await {
157 Ok(()) => {}
158 Err(e) => {}
159 }
160 });
161 }
162}
163
164struct MultiConferenceV1Internal {
165 sip_session: Arc<SipSession<MultiConferenceV1Session>>,
166 // cb: MultiConferenceEventCallback,
167 // cb_context : MultiConferenceEventCallbackContextWrapper,
168}
169
170impl MultiConferenceV1Internal {
171 // pub fn get_sdp(&self) -> String {
172 // let session = self.sip_session.get_inner();
173 // session.get_sdp()
174 // }
175 pub fn keep_alive(&self, _rt: &Arc<Runtime>) {
176 self.sip_session.mark_session_active()
177 }
178}
179
180pub struct MultiConferenceV1 {
181 inner: Arc<MultiConferenceV1Internal>,
182}
183
184// impl MultiConferenceV1 {
185// pub fn register_event_listener(
186// &mut self,
187// cb: MultiConferenceEventCallback,
188// cb_context: MultiConferenceEventCallbackContextWrapper,
189// ) {
190// let inner = self.inner.sip_session.get_inner();
191
192// let mut guard = inner.event_listener.lock().unwrap();
193
194// *guard = Some((cb, cb_context));
195// }
196// }
197
198impl MultiConferenceV1 {
199 // pub fn get_sdp(&self) -> String {
200 // self.inner.get_sdp()
201 // }
202 pub fn keep_alive(&self, rt: &Arc<Runtime>) {
203 self.inner.keep_alive(rt)
204 }
205}
206
207/**
208 * conference v1 is set to use with go-rcs-server and rcs-mediasoup-server
209 */
210pub struct MultiConferenceServiceV1 {
211 multi_conference_invite_handler: Arc<
212 dyn Fn(MultiConferenceV1, String, MultiConferenceV1InviteResponseReceiver) + Send + Sync,
213 >,
214
215 registered_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
216}
217
218impl MultiConferenceServiceV1 {
219 pub fn new<MCIHF>(multi_conference_invite_handler_function: MCIHF) -> MultiConferenceServiceV1
220 where
221 MCIHF: Fn(MultiConferenceV1, String, MultiConferenceV1InviteResponseReceiver)
222 + Send
223 + Sync
224 + 'static,
225 {
226 MultiConferenceServiceV1 {
227 multi_conference_invite_handler: Arc::new(multi_conference_invite_handler_function),
228
229 registered_public_identity: Arc::new(Mutex::new(None)),
230 }
231 }
232
233 pub fn set_registered_public_identity(
234 &self,
235 registered_public_identity: String,
236 sip_instance_id: String,
237 transport: Arc<SipTransport>,
238 ) {
239 (*self.registered_public_identity.lock().unwrap()).replace((
240 transport,
241 registered_public_identity,
242 sip_instance_id,
243 ));
244 }
245
246 pub fn create_conference<F>(
247 &self,
248 recipients: &str,
249 offer_sdp: &str,
250 event_cb: Option<MultiConferenceEventListener>,
251 event_cb_context: MultiConferenceEventListenerContextWrapper,
252 core: &Arc<SipCore>,
253 rt: &Arc<Runtime>,
254 callback: F,
255 ) where
256 F: FnOnce(Option<(MultiConferenceV1, String)>) + Send + Sync + 'static,
257 {
258 let event_cb_context = Arc::new(Mutex::new(event_cb_context));
259
260 if let Some((transport, public_user_identity, instance_id)) =
261 core.get_default_public_identity()
262 {
263 let mut invite_message =
264 SipMessage::new_request(INVITE, b"sip:conference-focus@example.com");
265
266 invite_message.add_header(Header::new(
267 b"Call-ID",
268 String::from(
269 Uuid::new_v4()
270 .as_hyphenated()
271 .encode_lower(&mut Uuid::encode_buffer()),
272 ),
273 ));
274
275 invite_message.add_header(Header::new(b"CSeq", b"1 INVITE"));
276
277 let tag = create_raw_alpha_numeric_string(8);
278 let tag = String::from_utf8_lossy(&tag);
279
280 invite_message.add_header(Header::new(b"From", format!("<{}>;", public_user_identity)));
281
282 invite_message.add_header(Header::new(b"To", b"<sip:conference-focus@example.com>"));
283
284 let boundary = create_raw_alpha_numeric_string(16);
285 let boundary_ = String::from_utf8_lossy(&boundary);
286 let mut parts = Vec::with_capacity(2);
287
288 let mut recipients_part_headers = Vec::new();
289 recipients_part_headers.push(Header::new(b"Content-Type", b"application/json"));
290 let recipients_json_length = recipients.len();
291 recipients_part_headers.push(Header::new(
292 b"Content-Length",
293 format!("{}", recipients_json_length),
294 ));
295
296 parts.push(Arc::new(Body::Message(MessageBody {
297 headers: recipients_part_headers,
298 body: Arc::new(Body::Raw(recipients.as_bytes().to_vec())),
299 })));
300
301 let mut sdp_part_headers = Vec::new();
302 sdp_part_headers.push(Header::new(b"Content-Type", b"application/sdp"));
303 let sdp_content_length = offer_sdp.len();
304 sdp_part_headers.push(Header::new(
305 b"Content-Length",
306 format!("{}", sdp_content_length),
307 ));
308
309 parts.push(Arc::new(Body::Message(MessageBody {
310 headers: sdp_part_headers,
311 body: Arc::new(Body::Raw(offer_sdp.as_bytes().to_vec())),
312 })));
313
314 invite_message.add_header(Header::new(
315 b"Content-Type",
316 format!("multipart/mixed; boundary={}", boundary_),
317 ));
318
319 let multipart = Body::Multipart(MultipartBody { boundary, parts });
320
321 let content_length = multipart.estimated_size();
322 invite_message.add_header(Header::new(
323 b"Content-Length",
324 format!("{}", content_length),
325 ));
326
327 let multipart = Arc::new(multipart);
328 invite_message.set_body(multipart);
329
330 let req_headers = invite_message.copy_headers();
331
332 let (client_transaction_tx, mut client_transaction_rx) = mpsc::channel(1);
333
334 let ongoing_dialogs = core.get_ongoing_dialogs();
335
336 let core_ = Arc::clone(&core);
337 let rt_ = Arc::clone(rt);
338
339 rt.spawn(async move {
340 if let Some(res) = client_transaction_rx.recv().await {
341 match res {
342 Ok((resp_message, conference_id, sdp)) => {
343 let (d_tx, mut d_rx) = mpsc::channel(1);
344
345 let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
346
347 rt_.spawn(async move {
348 if let Some(dialog) = d_rx.recv().await {
349 ongoing_dialogs_.remove_dialog(&dialog);
350 }
351 });
352
353 if let Ok(dialog) =
354 SipDialog::try_new_as_uac(&req_headers, &resp_message, move |d| {
355 match d_tx.blocking_send(d) {
356 Ok(()) => {}
357 Err(e) => {}
358 }
359 })
360 {
361 let dialog = Arc::new(dialog);
362
363 ongoing_dialogs.add_dialog(&dialog);
364
365 let (conf_tx, conf_rx) = mpsc::channel(8);
366
367 let session = MultiConferenceV1Session {
368 conference_id,
369 tx: conf_tx,
370 };
371 let session = Arc::new(session);
372
373 let (sess_tx, mut sess_rx) = mpsc::channel(8);
374
375 let sip_session = SipSession::new(
376 &session,
377 SipSessionEventReceiver {
378 tx: sess_tx,
379 rt: Arc::clone(&rt_),
380 },
381 );
382
383 let sip_session = Arc::new(sip_session);
384 let sip_session_ = Arc::clone(&sip_session);
385
386 let core = Arc::clone(&core_);
387 let rt = Arc::clone(&rt_);
388
389 rt_.spawn(async move {
390 while let Some(ev) = sess_rx.recv().await {
391 match ev {
392 SipSessionEvent::ShouldRefresh(dialog) => {
393 if let Ok(mut message) =
394 dialog.make_request(b"UPDATE", None)
395 {
396 message.add_header(Header::new(
397 b"Supported",
398 b"timer",
399 ));
400
401 if let Some((transport, _, _)) =
402 core.get_default_public_identity()
403 {
404 core.get_transaction_manager()
405 .send_request(
406 message,
407 &transport,
408 UpdateMessageCallbacks {
409 // session_expires: None,
410 dialog,
411 sip_session: Arc::clone(
412 &sip_session_,
413 ),
414 rt: Arc::clone(&rt),
415 },
416 &rt,
417 );
418 }
419 }
420 }
421
422 SipSessionEvent::Expired(dialog) => {
423 if let Ok(message) =
424 dialog.make_request(b"BYE", None)
425 {
426 if let Some((transport, _, _)) =
427 core.get_default_public_identity()
428 {
429 core.get_transaction_manager()
430 .send_request(
431 message,
432 &transport,
433 ClientTransactionNilCallbacks {},
434 &rt,
435 );
436 }
437 }
438 }
439
440 _ => {}
441 }
442 }
443 });
444
445 sip_session.setup_confirmed_dialog(
446 &dialog,
447 MultiConferenceV1SessionDialogEventReceiver {
448 sip_session: Arc::clone(&sip_session),
449 event_listener: Arc::new(Mutex::new(None)),
450 rt: Arc::clone(&rt_),
451 },
452 );
453
454 if let Ok(ack_message) = dialog.make_request(b"ACK", Some(1)) {
455 if let Some((transport, _, _)) =
456 core_.get_default_public_identity()
457 {
458 core_.get_transaction_manager().send_request(
459 ack_message,
460 &transport,
461 ClientTransactionNilCallbacks {},
462 &rt_,
463 );
464 }
465 }
466
467 // to-do: start progressing events
468
469 if let Some(event_cb) = event_cb {
470 session.start(conf_rx, event_cb, event_cb_context, &rt_);
471 }
472
473 // rt_.spawn(async move {
474 // while let Some(ev) = conf_rx.recv().await {
475 // if let Some(cb) = event_cb {
476 // let ev = Some(Box::new(ev));
477 // let event_cb_context = event_cb_context.lock().unwrap();
478 // let cb_context = event_cb_context.0.as_ptr();
479 // cb(ev, cb_context);
480 // }
481 // }
482 // });
483
484 // to-do: setup subscriber
485
486 let inner = MultiConferenceV1Internal { sip_session };
487 let inner = Arc::new(inner);
488
489 callback(Some((MultiConferenceV1 { inner }, sdp)));
490
491 return;
492 }
493
494 callback(None)
495 }
496
497 Err(e) => callback(None),
498 }
499 }
500 });
501
502 core.get_transaction_manager().send_request(
503 invite_message,
504 &transport,
505 ConferenceV1InviteCallbacks {
506 client_transaction_tx,
507 rt: Arc::clone(rt),
508 },
509 rt,
510 );
511
512 return;
513 }
514
515 callback(None)
516 }
517}
518
519struct ConferenceV1InviteCallbacks {
520 client_transaction_tx: mpsc::Sender<Result<(SipMessage, String, String), u16>>,
521 rt: Arc<Runtime>,
522}
523
524impl ClientTransactionCallbacks for ConferenceV1InviteCallbacks {
525 fn on_provisional_response(&self, _message: SipMessage) {}
526
527 fn on_final_response(&self, message: SipMessage) {
528 if let SipMessage::Response(l, headers, _) = &message {
529 if l.status_code >= 200 && l.status_code < 300 {
530 if let Some(headers) = headers {
531 if let Some(contact_header) = header::search(headers, b"Contact", true) {
532 if let Some(conference_v1_contact) = contact_header
533 .get_value()
534 .as_header_field()
535 .as_conference_v1_contact()
536 {
537 if let Some(content_type_header) =
538 header::search(headers, b"Content-Type", true)
539 {
540 if let Some(content_type) = content_type_header
541 .get_value()
542 .as_header_field()
543 .as_content_type()
544 {
545 if content_type.major_type.eq_ignore_ascii_case(b"application")
546 && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
547 {
548 if let Some(body) = message.get_body() {
549 if let Body::Raw(bytes) = body.as_ref() {
550 if let Ok(sdp) = std::str::from_utf8(bytes) {
551 let sdp = String::from(sdp);
552
553 let tx = self.client_transaction_tx.clone();
554
555 self.rt.spawn(async move {
556 match tx
557 .send(Ok((
558 message,
559 conference_v1_contact
560 .conference_uri,
561 sdp,
562 )))
563 .await
564 {
565 Ok(()) => {}
566 Err(e) => {}
567 }
568 });
569
570 return;
571 }
572 }
573 }
574 }
575 }
576 }
577
578 return;
579 }
580 }
581
582 let tx = self.client_transaction_tx.clone();
583
584 self.rt.spawn(async move {
585 match tx.send(Err(400)).await {
586 Ok(()) => {}
587 Err(e) => {}
588 }
589 });
590
591 return;
592 }
593
594 let code = l.status_code;
595
596 let tx = self.client_transaction_tx.clone();
597
598 self.rt.spawn(async move {
599 match tx.send(Err(code)).await {
600 Ok(()) => {}
601 Err(e) => {}
602 }
603 });
604
605 return;
606 }
607 }
608 }
609
610 fn on_transport_error(&self) {
611 let client_transaction_tx = self.client_transaction_tx.clone();
612 self.rt.spawn(async move {
613 match client_transaction_tx.send(Err(0)).await {
614 Ok(()) => {}
615 Err(e) => {}
616 }
617 });
618 }
619}
620
621pub struct MultiConferenceServiceV1Wrapper {
622 pub service: Arc<MultiConferenceServiceV1>,
623 pub tm: Arc<SipTransactionManager>,
624}
625
626struct MultiConferenceV1InviteResponseInner {
627 pub status_code: u16,
628 pub answer_sdp: String,
629 pub event_listener: MultiConferenceEventListener,
630 pub event_listener_context: MultiConferenceEventListenerContextWrapper,
631}
632
633pub struct MultiConferenceV1InviteResponseReceiver {
634 tx: Option<oneshot::Sender<MultiConferenceV1InviteResponseInner>>,
635}
636
637impl MultiConferenceV1InviteResponseReceiver {
638 pub fn accept_result(&mut self, resp: &MultiConferenceV1InviteResponse) {
639 if let Some(tx) = self.tx.take() {
640 if let Some(event_listener) = resp.event_listener {
641 let answer_sdp = unsafe {
642 CStr::from_ptr(resp.answer_sdp)
643 .to_string_lossy()
644 .into_owned()
645 };
646
647 let event_listener_context = MultiConferenceEventListenerContextWrapper(
648 NonNull::new(resp.event_listener_context).unwrap(),
649 );
650
651 match tx.send(MultiConferenceV1InviteResponseInner {
652 status_code: resp.status_code,
653 answer_sdp,
654 event_listener,
655 event_listener_context,
656 }) {
657 Ok(()) => {}
658 Err(e) => {}
659 }
660 }
661 }
662 }
663
664 pub fn drop_result(&mut self) {
665 self.tx.take();
666 }
667}
668
669impl TransactionHandler for MultiConferenceServiceV1Wrapper {
670 fn handle_transaction(
671 &self,
672 transaction: &Arc<ServerTransaction>,
673 ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
674 channels: &mut Option<(
675 mpsc::Sender<ServerTransactionEvent>,
676 mpsc::Receiver<ServerTransactionEvent>,
677 )>,
678 rt: &Arc<Runtime>,
679 ) -> bool {
680 let message = transaction.message();
681
682 if let SipMessage::Request(req_line, Some(req_headers), Some(req_body)) = message {
683 if req_line.method == INVITE {
684 let mut is_v1_multi_conference_invite = false;
685
686 if let Some(header) = header::search(req_headers, b"P-Asserted-Service", true) {
687 if header
688 .get_value()
689 .eq_ignore_ascii_case(b"urn:example:conference-service")
690 {
691 is_v1_multi_conference_invite = true;
692 }
693 }
694
695 if is_v1_multi_conference_invite {
696 if let Some(header) = header::search(req_headers, b"Contact", true) {
697 if let Some(conference_v1_contact) = header
698 .get_value()
699 .as_header_field()
700 .as_conference_v1_contact()
701 {
702 if let Some(header) = header::search(req_headers, b"Content-Type", true)
703 {
704 if let Some(content_type) =
705 header.get_value().as_header_field().as_content_type()
706 {
707 if content_type.major_type.eq_ignore_ascii_case(b"application")
708 && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
709 {
710 if let Body::Raw(raw) = req_body.as_ref() {
711 if let Ok(sdp_string) = std::str::from_utf8(raw) {
712 let offer_sdp = String::from(sdp_string);
713
714 if let Some(resp_message) =
715 server_transaction::make_response(
716 message,
717 transaction.to_tag(),
718 180,
719 b"Ringing",
720 )
721 {
722 if let Some((tx, mut rx)) = channels.take() {
723 let (d_tx, mut d_rx) = mpsc::channel(1);
724
725 let ongoing_dialogs_ =
726 Arc::clone(&ongoing_dialogs);
727
728 rt.spawn(async move {
729 if let Some(dialog) = d_rx.recv().await
730 {
731 ongoing_dialogs_
732 .remove_dialog(&dialog);
733 }
734 });
735
736 if let Ok(dialog) =
737 SipDialog::try_new_as_uas(
738 message,
739 &resp_message,
740 move |d| match d_tx.blocking_send(d)
741 {
742 Ok(()) => {}
743 Err(e) => {}
744 },
745 )
746 {
747 let dialog = Arc::new(dialog);
748
749 ongoing_dialogs.add_dialog(&dialog);
750
751 let (conf_tx, conf_rx) =
752 mpsc::channel(8);
753
754 let session =
755 MultiConferenceV1Session {
756 conference_id:
757 conference_v1_contact
758 .conference_uri,
759 tx: conf_tx,
760 };
761
762 let session = Arc::new(session);
763
764 let (sess_tx, mut sess_rx) =
765 mpsc::channel(8);
766
767 let sip_session = SipSession::new(
768 &session,
769 SipSessionEventReceiver {
770 tx: sess_tx,
771 rt: Arc::clone(&rt),
772 },
773 );
774
775 let sip_session = Arc::new(sip_session);
776 let sip_session_ =
777 Arc::clone(&sip_session);
778
779 let registered_public_identity =
780 Arc::clone(
781 &self
782 .service
783 .registered_public_identity,
784 );
785
786 let tm_ = Arc::clone(&self.tm);
787 let rt_ = Arc::clone(&rt);
788
789 rt.spawn(async move {
790 while let Some(ev) =
791 sess_rx.recv().await
792 {
793 match ev {
794 SipSessionEvent::ShouldRefresh(dialog) => {
795 if let Ok(mut message) =
796 dialog.make_request(b"UPDATE", None)
797 {
798 message.add_header(Header::new(
799 b"Supported",
800 b"timer",
801 ));
802
803 let guard = registered_public_identity.lock().unwrap();
804
805 if let Some((transport, _, _)) = &*guard
806 {
807 tm_.send_request(
808 message,
809 transport,
810 UpdateMessageCallbacks {
811 // session_expires: None,
812 dialog,
813 sip_session: Arc::clone(
814 &sip_session_,
815 ),
816 rt: Arc::clone(&rt_),
817 },
818 &rt_,
819 );
820 }
821 }
822 }
823
824 SipSessionEvent::Expired(dialog) => {
825 if let Ok(message) =
826 dialog.make_request(b"BYE", None)
827 {
828 let guard = registered_public_identity.lock().unwrap();
829
830 if let Some((transport, _, _)) = &*guard
831 {
832 tm_.send_request(
833 message,
834 transport,
835 ClientTransactionNilCallbacks {},
836 &rt_,
837 );
838 }
839 }
840 }
841
842 _ => {}
843 }
844 }
845 });
846
847 let event_listener =
848 Arc::new(Mutex::new(None));
849 let event_listener_ =
850 Arc::clone(&event_listener);
851
852 sip_session.setup_early_dialog(&dialog, MultiConferenceV1SessionDialogEventReceiver {
853 sip_session: Arc::clone(&sip_session),
854 event_listener,
855 rt: Arc::clone(&rt),
856 });
857
858 let inner = MultiConferenceV1Internal {
859 sip_session,
860 };
861 let inner = Arc::new(inner);
862
863 let conference_v1 =
864 MultiConferenceV1 { inner };
865
866 let (handler_tx, handler_rx) =
867 oneshot::channel();
868
869 let response_receiver = MultiConferenceV1InviteResponseReceiver{
870 tx: Some(handler_tx),
871 };
872
873 (self
874 .service
875 .multi_conference_invite_handler)(
876 conference_v1,
877 offer_sdp,
878 response_receiver,
879 );
880
881 let (tx_, mut rx_) = mpsc::channel::<
882 ServerTransactionEvent,
883 >(
884 8
885 );
886
887 rt.spawn(async move {
888 while let Some(ev) = rx.recv().await {
889 match ev {
890 ServerTransactionEvent::Cancelled => {
891 todo!()
892 }
893 _ => {}
894 }
895
896 match tx_.send(ev).await {
897 Ok(()) => {}
898 Err(e) => {}
899 }
900 }
901 });
902
903 let registered_public_identity =
904 Arc::clone(
905 &self
906 .service
907 .registered_public_identity,
908 );
909
910 let transaction_ =
911 Arc::clone(transaction);
912
913 let rt_ = Arc::clone(&rt);
914
915 rt.spawn(async move {
916
917 let message = transaction_.message();
918
919 match handler_rx.await {
920 Ok(resp) => {
921
922 if resp.status_code >= 200 && resp.status_code < 300 {
923
924 if let Some(mut resp_message) = server_transaction::make_response(message, transaction_.to_tag(), 200, b"OK") {
925
926 {
927 let guard = registered_public_identity.lock().unwrap();
928
929 if let Some((
930 transport,
931 contact_identity,
932 instance_id,
933 )) = &*guard {
934 let transport_ = transaction_.transport();
935 if Arc::ptr_eq(transport, transport_) {
936 resp_message.add_header(Header::new(
937 b"Contact",
938 format!(
939 "<{}>;+sip.instance=\"{}\"",
940 contact_identity, instance_id
941 ),
942 ));
943 } // to-do: treat as error if transport has changed somehow
944 }
945 }
946
947 let sdp_body = resp.answer_sdp.as_bytes().to_vec();
948 let sdp_body = Body::Raw(sdp_body);
949 let sdp_body = Arc::new(sdp_body);
950 let sdp_body_len = resp.answer_sdp.len();
951
952 resp_message.add_header(Header::new(b"Content-Type", b"application/json"));
953
954 resp_message.add_header(Header::new(b"Content-Length", format!("{}", sdp_body_len)));
955
956 resp_message.set_body(sdp_body);
957
958 let mut guard = event_listener_.lock().unwrap();
959
960 *guard = Some((conf_rx, resp.event_listener, resp.event_listener_context));
961
962 rt_.spawn(async move {
963
964 while let Some(ev) = rx_.recv().await {
965 match ev {
966 ServerTransactionEvent::Cancelled => {
967 todo!()
968 },
969 _ => {}
970 }
971 }
972 });
973
974 dialog.confirm();
975
976 server_transaction::send_response(
977 transaction_,
978 resp_message,
979 tx,
980 // &timer,
981 &rt_,
982 );
983
984 return ;
985 }
986 }
987 }
988
989 Err(e) => {
990
991 }
992 }
993
994 if let Some(resp_message) = server_transaction::make_response(message, transaction_.to_tag(), 486, b"Busy Here") {
995 server_transaction::send_response(
996 transaction_,
997 resp_message,
998 tx,
999 // &timer,
1000 &rt_,
1001 );
1002 }
1003 });
1004
1005 return true;
1006 }
1007
1008 server_transaction::send_response(
1009 Arc::clone(transaction),
1010 resp_message,
1011 tx,
1012 // &timer,
1013 rt,
1014 );
1015 }
1016 }
1017
1018 return true;
1019 }
1020 }
1021 }
1022 }
1023 }
1024
1025 if let Some(resp_message) = server_transaction::make_response(
1026 message,
1027 transaction.to_tag(),
1028 400,
1029 b"Bad Request",
1030 ) {
1031 if let Some((tx, mut rx)) = channels.take() {
1032 server_transaction::send_response(
1033 Arc::clone(transaction),
1034 resp_message,
1035 tx,
1036 // &timer,
1037 rt,
1038 );
1039 }
1040 }
1041
1042 return true;
1043 }
1044 }
1045 }
1046 }
1047 }
1048
1049 false
1050 }
1051}