rust_rcs_client/messaging/cpm/session.rs
1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate rust_rcs_core;
16extern crate rust_strict_sdp;
17
18use std::io::Read;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21
22use base64::{engine::general_purpose, Engine as _};
23use futures::Future;
24
25use rust_rcs_core::cpim::{CPIMInfo, CPIMMessage};
26use rust_rcs_core::internet::body::VectorReader;
27use rust_rcs_core::internet::header::search;
28use rust_rcs_core::internet::header_field::AsHeaderField;
29use rust_rcs_core::internet::Body;
30use rust_rcs_core::internet::{header, Header};
31
32use rust_rcs_core::internet::headers::{AsContentType, Supported};
33
34use rust_rcs_core::internet::name_addr::AsNameAddr;
35use rust_rcs_core::io::network::stream::{ClientSocket, ClientStream};
36use rust_rcs_core::io::{DynamicChain, Serializable};
37use rust_rcs_core::msrp::info::msrp_info_reader::AsMsrpInfo;
38use rust_rcs_core::msrp::info::{MsrpDirection, MsrpInfo, MsrpInterfaceType, MsrpSetupMethod};
39use rust_rcs_core::msrp::msrp_channel::MsrpChannel;
40use rust_rcs_core::msrp::msrp_chunk::MsrpChunk;
41use rust_rcs_core::msrp::msrp_demuxer::MsrpDemuxer;
42use rust_rcs_core::msrp::msrp_muxer::{MsrpDataWriter, MsrpMessageReceiver, MsrpMuxer};
43use rust_rcs_core::msrp::msrp_transport::msrp_transport_start;
44// use rust_rcs_core::msrp::msrp_transport::MsrpTransportWrapper;
45use rust_rcs_core::sip::sip_core::SipDialogCache;
46use rust_rcs_core::sip::sip_session::{
47 choose_timeout_for_server_transaction_response,
48 choose_timeout_on_client_transaction_completion, Refresher, SipSession, SipSessionEvent,
49 SipSessionEventReceiver,
50};
51use rust_rcs_core::sip::sip_transaction::client_transaction::ClientTransactionNilCallbacks;
52use rust_rcs_core::sip::sip_transaction::server_transaction;
53
54use rust_rcs_core::sip::SipDialog;
55use rust_rcs_core::sip::{ClientTransactionCallbacks, SipTransactionManager};
56use rust_rcs_core::sip::{ServerTransaction, SipDialogEventCallbacks};
57use rust_rcs_core::sip::{ServerTransactionEvent, UPDATE};
58use rust_rcs_core::sip::{SipCore, SipTransport};
59
60use rust_rcs_core::sip::SipMessage;
61use rust_rcs_core::sip::TransactionHandler;
62
63use rust_rcs_core::sip::INVITE;
64
65use rust_rcs_core::sip::sip_headers::AsFromTo;
66
67use rust_rcs_core::util::rand::create_raw_alpha_numeric_string;
68use rust_rcs_core::util::raw_string::StrFind;
69
70use rust_strict_sdp::AsSDP;
71
72use tokio::runtime::Runtime;
73use tokio::sync::mpsc;
74use tokio::sync::oneshot;
75use uuid::Uuid;
76
77use crate::contact::ContactKnownIdentities;
78use crate::messaging::ffi::RecipientType;
79
80use super::session_invitation::{
81 try_accept_hanging_invitation, try_accept_invitation, CPMSessionInvitation,
82 CPMSessionInvitationResponse,
83};
84use super::sip::cpm_accept_contact::CPMAcceptContact;
85use super::sip::cpm_contact::{AsCPMContact, CPMContact, CPMServiceType};
86use super::{make_cpim_message_content_body, CPMMessageParam};
87
88// to-do: use enum
89pub struct CPMSessionInfo {
90 pub cpm_contact: CPMContact,
91 pub asserted_contact_uri: String,
92 pub conversation_id: Option<String>,
93 pub contribution_id: Option<String>,
94 pub subject: Option<String>,
95 pub referred_by: Option<String>,
96 pub is_deferred_session: bool,
97 pub contact_alias: Option<String>,
98 pub conversation_supports_anonymization: bool,
99 pub conversation_is_using_anonymization_token: bool,
100}
101
102impl CPMSessionInfo {
103 pub fn get_contact_known_identities(&self) -> ContactKnownIdentities {
104 let mut known_identities = ContactKnownIdentities::new();
105 known_identities.add_identity(self.asserted_contact_uri.clone());
106 match self.cpm_contact.service_type {
107 CPMServiceType::OneToOne | CPMServiceType::Chatbot => {
108 if let Ok(service_uri) = std::str::from_utf8(&self.cpm_contact.service_uri) {
109 known_identities.add_identity(String::from(service_uri));
110 }
111 }
112 _ => {}
113 }
114 if let Some(contact_alias) = &self.contact_alias {
115 known_identities.add_identity(String::from(contact_alias));
116 }
117 known_identities
118 }
119}
120
121pub fn get_cpm_session_info_from_message(
122 message: &SipMessage,
123 is_outgoing: bool,
124) -> Option<CPMSessionInfo> {
125 if let Some(headers) = message.headers() {
126 let mut contact_header = None;
127 let mut asserted_identity_header = None;
128 let mut subject_header = None;
129 let mut referred_by_header = None;
130
131 let mut from_header = None;
132 let mut to_header = None;
133
134 let mut conversation_id: Option<&[u8]> = None;
135 let mut contribution_id: Option<&[u8]> = None;
136
137 for header in headers {
138 if header.get_name().eq_ignore_ascii_case(b"Contact") {
139 contact_header = Some(header);
140 } else if header
141 .get_name()
142 .eq_ignore_ascii_case(b"P-Asserted-Identity")
143 {
144 asserted_identity_header = Some(header);
145 } else if header.get_name().eq_ignore_ascii_case(b"Conversation-ID") {
146 conversation_id = Some(header.get_value());
147 } else if header.get_name().eq_ignore_ascii_case(b"Contribution-ID") {
148 contribution_id = Some(header.get_value());
149 } else if header.get_name().eq_ignore_ascii_case(b"Subject") {
150 subject_header = Some(header);
151 } else if header.get_name().eq_ignore_ascii_case(b"Referred-By") {
152 referred_by_header = Some(header);
153 } else if header.get_name().eq_ignore_ascii_case(b"From") {
154 from_header = Some(header);
155 } else if header.get_name().eq_ignore_ascii_case(b"To") {
156 to_header = Some(header);
157 }
158 }
159
160 if let Some(contact_header) = contact_header {
161 if let Some(cpm_contact) = contact_header
162 .get_value()
163 .as_header_field()
164 .as_cpm_contact()
165 {
166 let mut asserted_contact_uri = None;
167 let mut is_deferred_session = false;
168 let mut conversation_supports_anonymization = false;
169 let mut conversation_is_using_anonymization_token = false;
170
171 match cpm_contact.service_type {
172 CPMServiceType::OneToOne | CPMServiceType::Chatbot => {
173 if let Some(asserted_identity_header) = asserted_identity_header {
174 if let Some(name_addr) = asserted_identity_header
175 .get_value()
176 .as_name_addresses()
177 .first()
178 {
179 if let Some(uri_part) = &name_addr.uri_part {
180 if uri_part.uri.start_with(b"rcse-standfw@") {
181 is_deferred_session = true;
182 if let Some(referred_by_header) = referred_by_header {
183 if let Some(referred_by) = referred_by_header
184 .get_value()
185 .as_name_addresses()
186 .first()
187 {
188 if let Some(uri_part) = &referred_by.uri_part {
189 asserted_contact_uri = Some(uri_part.uri);
190 for p in referred_by_header
191 .get_value()
192 .as_header_field()
193 .get_parameter_iterator()
194 {
195 if p.name == b"tk" {
196 match p.value {
197 Some(b"on") => {
198 conversation_supports_anonymization = true;
199 conversation_is_using_anonymization_token = true;
200 }
201 Some(b"off") => {
202 conversation_supports_anonymization = true;
203 }
204 _ => {}
205 }
206 break;
207 }
208 }
209 }
210 }
211 }
212 }
213
214 if !is_deferred_session {
215 asserted_contact_uri = Some(uri_part.uri);
216 for p in uri_part.get_parameter_iterator() {
217 if p.name == b"tk" {
218 match p.value {
219 Some(b"on") => {
220 conversation_supports_anonymization = true;
221 conversation_is_using_anonymization_token =
222 true;
223 }
224 Some(b"off") => {
225 conversation_supports_anonymization = true;
226 }
227 _ => {}
228 }
229 break;
230 }
231 }
232 }
233 }
234 }
235 }
236 }
237
238 _ => {
239 if let Some(asserted_identity_header) = asserted_identity_header {
240 if let Some(name_addr) = asserted_identity_header
241 .get_value()
242 .as_name_addresses()
243 .first()
244 {
245 if let Some(uri_part) = &name_addr.uri_part {
246 asserted_contact_uri = Some(uri_part.uri);
247 }
248 }
249 }
250 }
251 }
252
253 if asserted_contact_uri.is_none() {
254 asserted_contact_uri = Some(&cpm_contact.service_uri);
255 }
256
257 if let Some(asserted_contact_uri) = asserted_contact_uri {
258 if let Ok(asserted_contact_uri) = std::str::from_utf8(asserted_contact_uri) {
259 let asserted_contact_uri = String::from(asserted_contact_uri);
260
261 let mut contact_alias = None;
262 if is_outgoing {
263 if let Some(to_header) = to_header {
264 if let Some(name_addr) = to_header
265 .get_value()
266 .as_header_field()
267 .as_from_to()
268 .addresses
269 .first()
270 {
271 if let Some(display_name) = name_addr.display_name {
272 if let Ok(display_name) = std::str::from_utf8(display_name)
273 {
274 contact_alias = Some(String::from(display_name));
275 }
276 }
277 }
278 }
279 } else {
280 if let Some(from_header) = from_header {
281 if let Some(name_addr) = from_header
282 .get_value()
283 .as_header_field()
284 .as_from_to()
285 .addresses
286 .first()
287 {
288 if let Some(display_name) = name_addr.display_name {
289 if let Ok(display_name) = std::str::from_utf8(display_name)
290 {
291 contact_alias = Some(String::from(display_name));
292 }
293 }
294 }
295 }
296 }
297
298 return Some(CPMSessionInfo {
299 cpm_contact,
300 asserted_contact_uri,
301 conversation_id: match conversation_id {
302 Some(conversation_id) => match std::str::from_utf8(conversation_id)
303 {
304 Ok(conversation_id) => Some(String::from(conversation_id)),
305 Err(_) => None,
306 },
307 None => None,
308 },
309 contribution_id: match contribution_id {
310 Some(contribution_id) => match std::str::from_utf8(contribution_id)
311 {
312 Ok(contribution_id) => Some(String::from(contribution_id)),
313 Err(_) => None,
314 },
315 None => None,
316 },
317 subject: match subject_header {
318 Some(subject_header) => {
319 if let Ok(subject) =
320 std::str::from_utf8(subject_header.get_value())
321 {
322 Some(String::from(subject))
323 } else {
324 None
325 }
326 }
327 None => None,
328 },
329 referred_by: match referred_by_header {
330 Some(referred_by_header) => {
331 if let Ok(referred_by) =
332 std::str::from_utf8(referred_by_header.get_value())
333 {
334 Some(String::from(referred_by))
335 } else {
336 None
337 }
338 }
339 None => None,
340 },
341 is_deferred_session,
342 contact_alias,
343 conversation_supports_anonymization,
344 conversation_is_using_anonymization_token,
345 });
346 }
347 }
348 }
349 }
350 }
351
352 None
353}
354
355enum CPMSessionNegotiationState {
356 NoneSet,
357 LocalSet(Arc<Body>, Option<ClientSocket>, bool, String, u16, Vec<u8>),
358 RemoteSet(Arc<Body>, String, u16, Vec<u8>),
359}
360
361enum CPMSessionState {
362 Negotiating(CPMSessionNegotiationState),
363 Negotiated(
364 Arc<Body>,
365 Arc<Body>,
366 Option<ClientSocket>,
367 bool,
368 String,
369 u16,
370 Vec<u8>,
371 String,
372 u16,
373 Vec<u8>,
374 ),
375 Starting(
376 Arc<Body>,
377 Arc<Body>,
378 String,
379 u16,
380 String,
381 u16,
382 // Arc<MsrpChannel>,
383 mpsc::Sender<CPMMessageParam>,
384 ),
385 // Started(
386 // Arc<Body>,
387 // Arc<Body>,
388 // String,
389 // u16,
390 // String,
391 // u16,
392 // Arc<MsrpTransport>,
393 // ),
394 // also
395 // there is no Re-negotiating for CPM Session
396}
397
398pub struct CPMSession {
399 session_info: CPMSessionInfo,
400 session_state: Arc<Mutex<CPMSessionState>>,
401}
402
403impl CPMSession {
404 pub fn new(session_info: CPMSessionInfo) -> CPMSession {
405 CPMSession {
406 session_info,
407 session_state: Arc::new(Mutex::new(CPMSessionState::Negotiating(
408 CPMSessionNegotiationState::NoneSet,
409 ))),
410 }
411 }
412
413 pub fn get_session_info(&self) -> &CPMSessionInfo {
414 &self.session_info
415 }
416
417 pub fn set_local_sdp(
418 &self,
419 l_sdp: Arc<Body>,
420 cs: ClientSocket,
421 tls: bool,
422 laddr: String,
423 lport: u16,
424 lpath: Vec<u8>,
425 ) -> Result<(), &'static str> {
426 let mut guard = self.session_state.lock().unwrap();
427 match &mut *guard {
428 CPMSessionState::Negotiating(ref mut negotiation_state) => match negotiation_state {
429 CPMSessionNegotiationState::NoneSet => {
430 *guard = CPMSessionState::Negotiating(CPMSessionNegotiationState::LocalSet(
431 l_sdp,
432 Some(cs),
433 tls,
434 laddr,
435 lport,
436 lpath,
437 ));
438 Ok(())
439 }
440 CPMSessionNegotiationState::LocalSet(_, _, _, _, _, _) => Err("local already set"),
441 CPMSessionNegotiationState::RemoteSet(
442 ref r_sdp,
443 ref raddr,
444 ref rport,
445 ref mut rpath,
446 ) => {
447 let mut path = Vec::new();
448 path.append(rpath);
449 *guard = CPMSessionState::Negotiated(
450 l_sdp,
451 Arc::clone(r_sdp),
452 Some(cs),
453 tls,
454 laddr,
455 lport,
456 lpath,
457 String::from(raddr),
458 *rport,
459 path,
460 );
461 Ok(())
462 }
463 },
464
465 _ => Err("already negotiated"),
466 }
467 }
468
469 pub fn set_remote_sdp(
470 &self,
471 r_sdp: Arc<Body>,
472 raddr: String,
473 rport: u16,
474 rpath: Vec<u8>,
475 ) -> Result<(), &'static str> {
476 let mut guard = self.session_state.lock().unwrap();
477 match &mut *guard {
478 CPMSessionState::Negotiating(ref mut negotiation_state) => match negotiation_state {
479 CPMSessionNegotiationState::NoneSet => {
480 *guard = CPMSessionState::Negotiating(CPMSessionNegotiationState::RemoteSet(
481 r_sdp, raddr, rport, rpath,
482 ));
483 Ok(())
484 }
485 CPMSessionNegotiationState::LocalSet(
486 ref l_sdp,
487 ref mut sock,
488 ref tls,
489 ref laddr,
490 ref lport,
491 ref mut lpath,
492 ) => {
493 let mut path = Vec::new();
494 path.append(lpath);
495 *guard = CPMSessionState::Negotiated(
496 Arc::clone(l_sdp),
497 r_sdp,
498 sock.take(),
499 *tls,
500 String::from(laddr),
501 *lport,
502 path,
503 raddr,
504 rport,
505 rpath,
506 );
507 Ok(())
508 }
509 CPMSessionNegotiationState::RemoteSet(_, _, _, _) => Err("remote already set"),
510 },
511
512 _ => Err("already negotiated"),
513 }
514 }
515
516 pub fn start<MRL, F>(
517 &self,
518 public_user_identity: &str,
519 message_receive_listener: MRL,
520 connect_function: &Arc<
521 dyn Fn(
522 ClientSocket,
523 &String,
524 u16,
525 bool,
526 ) -> Pin<
527 Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>,
528 > + Send
529 + Sync
530 + 'static,
531 >,
532 session_end_function: F,
533 rt: &Arc<Runtime>,
534 ) -> Result<mpsc::Sender<CPMMessageParam>, ()>
535 where
536 MRL: Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync + 'static,
537 F: Fn() + Send + Sync + 'static,
538 {
539 let mut guard = self.session_state.lock().unwrap();
540 match &mut *guard {
541 CPMSessionState::Negotiated(
542 ref l_sdp,
543 ref r_sdp,
544 ref mut sock,
545 ref tls,
546 ref laddr,
547 ref lport,
548 ref mut lpath,
549 ref raddr,
550 ref rport,
551 ref mut rpath,
552 ) => {
553 if let Some(sock) = sock.take() {
554 let mut from_path = Vec::with_capacity(lpath.len());
555 from_path.append(lpath);
556 let mut to_path = Vec::with_capacity(rpath.len());
557 to_path.append(rpath);
558 let from_path_ = from_path.clone();
559 let to_path_ = to_path.clone();
560 let demuxer = MsrpDemuxer::new(from_path_, to_path_);
561 let demuxer = Arc::new(demuxer);
562 let demuxer_ = Arc::clone(&demuxer);
563 let message_receive_listener = Arc::new(message_receive_listener);
564 let muxer = MsrpMuxer::new(CPMSessionMessageReceiver {
565 callback: Arc::new(move |cpim_info, content_type, content_body| {
566 message_receive_listener(cpim_info, content_type, content_body)
567 }),
568 });
569
570 let from_path_ = from_path.clone();
571 let to_path_ = to_path.clone();
572
573 let mut session_channel =
574 MsrpChannel::new(from_path_, to_path_, demuxer, muxer);
575 // let session_channel = Arc::new(session_channel);
576 // let session_channel_ = Arc::clone(&session_channel);
577
578 let addr = String::from(raddr);
579 let port = *rport;
580 let tls = *tls;
581
582 let l_sdp = Arc::clone(l_sdp);
583 // let l_sdp_ = Arc::clone(&l_sdp);
584 let r_sdp = Arc::clone(r_sdp);
585 // let r_sdp_ = Arc::clone(&r_sdp);
586
587 let laddr = String::from(laddr);
588 // let laddr_ = String::from(&laddr);
589 let lport = *lport;
590 let raddr = String::from(raddr);
591 // let raddr_ = String::from(&raddr);
592 let rport = *rport;
593
594 let (message_tx, mut message_rx) = mpsc::channel::<CPMMessageParam>(8);
595 let message_tx_ = message_tx.clone();
596
597 *guard = CPMSessionState::Starting(
598 l_sdp, r_sdp, laddr, lport, raddr, rport,
599 // session_channel,
600 message_tx,
601 );
602
603 let t_id = String::from(public_user_identity);
604 let public_user_identity = String::from(public_user_identity);
605
606 let rt_ = Arc::clone(rt);
607
608 // let session_state = Arc::clone(&self.session_state);
609
610 let future = connect_function(sock, &addr, port, tls);
611
612 rt.spawn(async move {
613 if let Ok(cs) = future.await {
614 // let transport = MsrpTransportWrapper::new(
615 // cs,
616 // t_id,
617 // move |message_chunk| session_channel.on_message(message_chunk),
618 // move |transport| session_end_function(),
619 // &rt_,
620 // );
621
622 // let t = transport.get_transport();
623
624 // let mut guard = session_state.lock().unwrap();
625
626 // *guard = CPMSessionState::Started(l_sdp_, r_sdp_, laddr_, lport, raddr_, rport, t);
627
628 let (data_tx, data_rx) = mpsc::channel(8);
629 let data_tx_ = data_tx.clone();
630
631 msrp_transport_start(
632 cs,
633 t_id,
634 data_tx_,
635 data_rx,
636 move |message_chunk| session_channel.on_message(message_chunk),
637 move || session_end_function(),
638 &rt_,
639 );
640
641 let data_tx_ = data_tx.clone();
642
643 let (chunk_tx, mut chunk_rx) = mpsc::channel::<MsrpChunk>(16);
644
645 rt_.spawn(async move {
646 while let Some(chunk) = chunk_rx.recv().await {
647 let size = chunk.estimated_size();
648 let mut data = Vec::with_capacity(size);
649
650 {
651 let mut readers = Vec::new();
652 chunk.get_readers(&mut readers);
653 match DynamicChain::new(readers).read_to_end(&mut data) {
654 Ok(_) => {}
655 Err(_) => {} // to-do: early failure
656 }
657 }
658
659 match data_tx_.send(Some(data)).await {
660 Ok(()) => {}
661 Err(e) => {}
662 }
663 }
664 });
665
666 while let Some(message) = message_rx.recv().await {
667 let content_type = b"message/cpim".to_vec();
668 let content_type = Some(content_type);
669
670 let body = make_cpim_message_content_body(
671 &message.message_type,
672 &message.message_content,
673 &message.recipient_type,
674 &message.recipient_uri,
675 Uuid::new_v4(), // to-do: should be provided by user
676 &public_user_identity,
677 );
678
679 let size = body.estimated_size();
680
681 let mut data = Vec::with_capacity(size);
682 match body.reader() {
683 Ok(mut reader) => {
684 match reader.read_to_end(&mut data) {
685 Ok(_) => {}
686 Err(_) => {} // to-do: early failure
687 }
688 }
689 Err(_) => {} // to-do: early failure
690 }
691
692 let from_path_ = from_path.clone();
693 let to_path_ = to_path.clone();
694
695 let chunk_tx = chunk_tx.clone();
696
697 demuxer_.start(
698 from_path_,
699 to_path_,
700 content_type,
701 size,
702 VectorReader::new(data),
703 move |status_code, reason_phrase| {
704 (message.message_result_callback)(
705 status_code,
706 reason_phrase,
707 );
708 },
709 chunk_tx,
710 &rt_,
711 );
712 }
713 }
714 });
715
716 return Ok(message_tx_);
717 }
718 }
719 _ => {}
720 }
721
722 Err(())
723 }
724
725 pub fn send_message(&self, message: CPMMessageParam, rt: &Arc<Runtime>) {
726 let guard = self.session_state.lock().unwrap();
727 match &*guard {
728 CPMSessionState::Negotiating(_) => todo!(),
729 CPMSessionState::Negotiated(_, _, _, _, _, _, _, _, _, _) => todo!(),
730 CPMSessionState::Starting(_, _, _, _, _, _, message_tx) => {
731 let message_tx = message_tx.clone();
732 rt.spawn(async move {
733 match message_tx.send(message).await {
734 Ok(()) => {}
735 Err(e) => {
736 (e.0.message_result_callback)(500, String::from("Internal Error"))
737 }
738 }
739 });
740 }
741 }
742 }
743}
744
745struct CPMSessionCPIMMessageWriter {
746 message_id: Vec<u8>,
747 message_buf: Vec<u8>,
748 message_callback: Arc<dyn Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync>,
749}
750
751impl MsrpDataWriter for CPMSessionCPIMMessageWriter {
752 fn write(&mut self, data: &[u8]) -> Result<usize, (u16, &'static str)> {
753 self.message_buf.extend_from_slice(data);
754 Ok(data.len())
755 }
756
757 fn complete(&mut self) -> Result<(), (u16, &'static str)> {
758 match Body::construct_message(&self.message_buf) {
759 Ok(body) => match CPIMMessage::try_from(&body) {
760 Ok(cpim_message) => match cpim_message.get_info() {
761 Ok(cpim_message_info) => {
762 if let Some((content_type, content_body, base64_encoded)) =
763 cpim_message.get_message_body()
764 {
765 let content_type: &[u8] = match content_type {
766 Some(content_type) => content_type,
767 None => b"text/plain",
768 };
769
770 if base64_encoded {
771 if let Ok(decoded_content_body) =
772 general_purpose::STANDARD.decode(content_body)
773 {
774 (self.message_callback)(
775 &cpim_message_info,
776 content_type,
777 &decoded_content_body,
778 );
779 return Ok(());
780 }
781 } else {
782 (self.message_callback)(
783 &cpim_message_info,
784 content_type,
785 content_body,
786 );
787 return Ok(());
788 }
789 }
790
791 Err((400, "failed to get content body"))
792 }
793
794 Err(e) => Err((400, e)),
795 },
796
797 Err(e) => Err((400, e)),
798 },
799
800 Err(e) => Err((400, e)),
801 }
802 }
803}
804
805struct CPMSessionMessageReceiver {
806 callback: Arc<dyn Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync>,
807}
808
809impl MsrpMessageReceiver for CPMSessionMessageReceiver {
810 fn on_message(
811 &mut self,
812 message_id: &[u8],
813 content_type: &[u8],
814 ) -> Result<Box<dyn MsrpDataWriter + Send + Sync>, (u16, &'static str)> {
815 if content_type.eq_ignore_ascii_case(b"message/cpim") {
816 // to-do: should check accept-types
817 let cb = Arc::clone(&self.callback);
818 Ok(Box::new(CPMSessionCPIMMessageWriter {
819 message_id: message_id.to_vec(),
820 message_buf: Vec::with_capacity(1024),
821 message_callback: cb,
822 }))
823 } else {
824 Err((415, "Unsupported Media Type"))
825 }
826 }
827}
828
829pub enum CPMGroupEvent {
830 OnInvite,
831 OnJoined,
832}
833
834pub struct CPMSessionService {
835 ongoing_sessions: Arc<Mutex<Vec<(ContactKnownIdentities, Arc<SipSession<CPMSession>>)>>>,
836 ongoing_incoming_invitations: Arc<
837 Mutex<
838 Vec<
839 Arc<(
840 ContactKnownIdentities,
841 mpsc::Sender<CPMSessionInvitationResponse>,
842 )>,
843 >,
844 >,
845 >,
846 ongoing_outgoing_invitations:
847 Arc<Mutex<Vec<Arc<(ContactKnownIdentities, mpsc::Sender<CPMMessageParam>)>>>>,
848
849 registered_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
850
851 message_receive_listener:
852 Arc<dyn Fn(&Arc<SipSession<CPMSession>>, &[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>,
853
854 msrp_socket_allocator_function: Arc<
855 dyn Fn(
856 Option<&MsrpInfo>,
857 )
858 -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
859 + Send
860 + Sync
861 + 'static,
862 >,
863
864 msrp_socket_connect_function: Arc<
865 dyn Fn(
866 ClientSocket,
867 &String,
868 u16,
869 bool,
870 )
871 -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
872 + Send
873 + Sync
874 + 'static,
875 >,
876
877 conversation_invite_handler_function:
878 Box<dyn Fn(bool, &str, &str, &str, oneshot::Receiver<()>) -> u16 + Send + Sync + 'static>,
879
880 group_invite_handler_function: Box<
881 dyn Fn(&str, &str, &str, &str, &str, &str, oneshot::Receiver<()>) -> u16
882 + Send
883 + Sync
884 + 'static,
885 >,
886 group_invite_event_listener_function: Box<dyn Fn(CPMGroupEvent) + Send + Sync + 'static>,
887}
888
889impl CPMSessionService {
890 pub fn new<MRL, MAF, MCF, CF, GF, GL>(
891 message_receive_listener: MRL,
892 msrp_socket_allocator_function: MAF,
893 msrp_socket_connect_function: MCF,
894 conversation_invite_handler_function: CF,
895 group_invite_handler_function: GF,
896 group_invite_event_listener_function: GL,
897 ) -> CPMSessionService
898 where
899 MRL: Fn(&Arc<SipSession<CPMSession>>, &[u8], &CPIMInfo, &[u8], &[u8])
900 + Send
901 + Sync
902 + 'static,
903 MAF: Fn(
904 Option<&MsrpInfo>,
905 )
906 -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
907 + Send
908 + Sync
909 + 'static,
910 MCF: Fn(
911 ClientSocket,
912 &String,
913 u16,
914 bool,
915 )
916 -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
917 + Send
918 + Sync
919 + 'static,
920 CF: Fn(bool, &str, &str, &str, oneshot::Receiver<()>) -> u16 + Send + Sync + 'static,
921 GF: Fn(&str, &str, &str, &str, &str, &str, oneshot::Receiver<()>) -> u16
922 + Send
923 + Sync
924 + 'static,
925 GL: Fn(CPMGroupEvent) + Send + Sync + 'static,
926 {
927 CPMSessionService {
928 ongoing_sessions: Arc::new(Mutex::new(Vec::new())),
929 ongoing_incoming_invitations: Arc::new(Mutex::new(Vec::new())),
930 ongoing_outgoing_invitations: Arc::new(Mutex::new(Vec::new())),
931 registered_public_identity: Arc::new(Mutex::new(None)),
932 message_receive_listener: Arc::new(message_receive_listener),
933 msrp_socket_allocator_function: Arc::new(msrp_socket_allocator_function),
934 msrp_socket_connect_function: Arc::new(msrp_socket_connect_function),
935 conversation_invite_handler_function: Box::new(conversation_invite_handler_function),
936 group_invite_handler_function: Box::new(group_invite_handler_function),
937 group_invite_event_listener_function: Box::new(group_invite_event_listener_function),
938 }
939 }
940
941 pub fn set_registered_public_identity(
942 &self,
943 registered_public_identity: String,
944 sip_instance_id: String,
945 transport: Arc<SipTransport>,
946 ) {
947 (*self.registered_public_identity.lock().unwrap()).replace((
948 transport,
949 registered_public_identity,
950 sip_instance_id,
951 ));
952 }
953
954 pub fn accept_and_open_session(&self, recipient: &str, rt: &Arc<Runtime>) {
955 let guard = self.ongoing_incoming_invitations.lock().unwrap();
956 for invitation in &*guard {
957 let (invitation, tx) = invitation.as_ref();
958 if invitation == recipient {
959 let tx = tx.clone();
960 rt.spawn(async move {
961 match tx.send(CPMSessionInvitationResponse::Accept).await {
962 Ok(()) => {}
963 Err(e) => {}
964 }
965 });
966 return;
967 }
968 }
969 }
970
971 pub fn send_message<F>(
972 &self,
973 message_type: &str,
974 message_content: &str,
975 recipient: &str,
976 recipient_type: &RecipientType,
977 recipient_uri: &str,
978 message_result_callback: F,
979 core: &Arc<SipCore>,
980 // ctrl_itf: SipTransactionManagerControlInterface,
981 rt: &Arc<Runtime>,
982 ) where
983 F: FnOnce(u16, String) + Send + Sync + 'static,
984 {
985 let guard = self.ongoing_sessions.lock().unwrap();
986
987 for (known_identities, sip_session) in &*guard {
988 if known_identities == recipient {
989 sip_session.mark_session_active();
990
991 let cpm_session = sip_session.get_inner();
992
993 let message_result_callback = Arc::new(Mutex::new(Some(message_result_callback)));
994 let message_result_callback = move |status_code, reason_phrase| {
995 let mut guard = message_result_callback.lock().unwrap();
996 if let Some(message_result_callback) = guard.take() {
997 message_result_callback(status_code, reason_phrase);
998 }
999 };
1000
1001 let message = CPMMessageParam::new(
1002 String::from(message_type),
1003 String::from(message_content),
1004 recipient,
1005 recipient_type,
1006 recipient_uri,
1007 message_result_callback,
1008 );
1009
1010 cpm_session.send_message(message, rt);
1011
1012 return;
1013 }
1014 }
1015
1016 let guard = self.ongoing_outgoing_invitations.lock().unwrap();
1017
1018 for invitation in &*guard {
1019 let (known_identities, message_tx) = invitation.as_ref();
1020 if known_identities == recipient {
1021 let message_result_callback = Arc::new(Mutex::new(Some(message_result_callback)));
1022 let message_result_callback = move |status_code, reason_phrase| {
1023 let mut guard = message_result_callback.lock().unwrap();
1024 if let Some(message_result_callback) = guard.take() {
1025 message_result_callback(status_code, reason_phrase);
1026 }
1027 };
1028
1029 let message = CPMMessageParam::new(
1030 String::from(message_type),
1031 String::from(message_content),
1032 recipient,
1033 recipient_type,
1034 recipient_uri,
1035 message_result_callback,
1036 );
1037 let message_tx = message_tx.clone();
1038 rt.spawn(async move {
1039 match message_tx.send(message).await {
1040 Ok(()) => {}
1041 Err(e) => {
1042 (e.0.message_result_callback)(403, String::from("Forbidden"));
1043 }
1044 }
1045 });
1046 return;
1047 }
1048 }
1049
1050 if let Ok((sock, host, port, tls, active_setup, ipv6)) =
1051 (self.msrp_socket_allocator_function)(None)
1052 {
1053 let path_random = create_raw_alpha_numeric_string(16);
1054 let path_random = std::str::from_utf8(&path_random).unwrap();
1055 let path = if tls {
1056 format!("msrps://{}:{}/{};tcp", &host, port, path_random)
1057 } else {
1058 format!("msrp://{}:{}/{};tcp", &host, port, path_random)
1059 };
1060
1061 // to-do: accepted types for Group message and others
1062
1063 let path = path.into_bytes();
1064
1065 let msrp_info: MsrpInfo = MsrpInfo {
1066 protocol: if tls { b"TCP/TLS/MSRP" } else { b"TCP/MSRP" },
1067 address: host.as_bytes(),
1068 interface_type: if ipv6 {
1069 MsrpInterfaceType::IPv6
1070 } else {
1071 MsrpInterfaceType::IPv4
1072 },
1073 port,
1074 path: &path,
1075 inactive: false,
1076 direction: MsrpDirection::SendReceive,
1077 accept_types: if let RecipientType::Chatbot = recipient_type {
1078 b"message/cpim"
1079 } else {
1080 b"message/cpim application/im-iscomposing+xm"
1081 },
1082 setup_method: if active_setup {
1083 MsrpSetupMethod::Active
1084 } else {
1085 MsrpSetupMethod::Passive
1086 },
1087 accept_wrapped_types: if let RecipientType::Chatbot = recipient_type {
1088 Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml application/vnd.gsma.botmessage.v1.0+json application/vnd.gsma.botsuggestion.v1.0+json application/commontemplate+xml")
1089 } else {
1090 Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml")
1091 },
1092 file_info: None,
1093 };
1094
1095 let sdp = String::from(msrp_info);
1096 let sdp = sdp.into_bytes();
1097 let sdp = Body::Raw(sdp);
1098 let sdp = Arc::new(sdp);
1099
1100 let guard = self.registered_public_identity.lock().unwrap();
1101
1102 if let Some((transport, contact_identity, instance_id)) = &*guard {
1103 let mut invite_message = SipMessage::new_request(INVITE, recipient.as_bytes());
1104
1105 let call_id = String::from(
1106 Uuid::new_v4()
1107 .as_hyphenated()
1108 .encode_lower(&mut Uuid::encode_buffer()),
1109 );
1110 invite_message.add_header(Header::new(b"Call-ID", call_id));
1111
1112 invite_message.add_header(Header::new(b"CSeq", b"1 INVITE"));
1113
1114 let tag = create_raw_alpha_numeric_string(8);
1115 let tag = String::from_utf8_lossy(&tag);
1116
1117 invite_message.add_header(Header::new(
1118 b"From",
1119 format!("<{}>;tag={}", contact_identity, tag),
1120 ));
1121
1122 invite_message.add_header(Header::new(b"To", format!("<{}>", recipient_uri)));
1123
1124 invite_message.add_header(Header::new(
1125 b"Contact",
1126 if let RecipientType::Chatbot = recipient_type {
1127 format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session\";+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot\";+g.gsma.rcs.botversion=\"#=1,#=2\"", contact_identity, instance_id)
1128 } else {
1129 format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session\"", contact_identity, instance_id)
1130 },
1131 ));
1132
1133 invite_message.add_header(Header::new(
1134 b"Accept-Contact",
1135 "*;+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.msg\"",
1136 ));
1137
1138 if let RecipientType::Chatbot = recipient_type {
1139 invite_message.add_header(Header::new(b"Accept-Contact", "*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot\";+g.gsma.rcs.botversion=\"#=1,#=2\";require;explicit"));
1140 }
1141
1142 let conversation_id = String::from(
1143 Uuid::new_v4()
1144 .as_hyphenated()
1145 .encode_lower(&mut Uuid::encode_buffer()),
1146 );
1147 invite_message.add_header(Header::new(b"Conversation-ID", conversation_id));
1148
1149 let contribution_id = String::from(
1150 Uuid::new_v4()
1151 .as_hyphenated()
1152 .encode_lower(&mut Uuid::encode_buffer()),
1153 );
1154 invite_message.add_header(Header::new(b"Contribution-ID", contribution_id));
1155
1156 invite_message.add_header(Header::new(
1157 b"Allow",
1158 b"NOTIFY, OPTIONS, INVITE, UPDATE, CANCEL, BYE, ACK, MESSAGE",
1159 ));
1160
1161 invite_message.add_header(Header::new(b"Supported", b"timer"));
1162
1163 /*
1164 * to-do: 3gpp TS 24.229 5.1.3.1
1165 *
1166 * If the UE supports the Session Timer extension, the UE shall include the option-tag "timer" in the Supported header field and should either insert a Session-Expires header field with the header field value set to the configured session timer interval value, or should not include the Session-Expires header field in the initial INVITE request. The header field value of the Session-Expires header field may be configured using local configuration or using the Session_Timer_Initial_Interval node specified in 3GPP 24.167 [8G]. If the UE is configured with both the local configuration and the Session_Timer_Initial_Interval node specified in 3GPP 24.167 [8G], then the local configuration shall take precedence.
1167 * If the UE inserts the Session-Expires header field in the initial INVITE request, the UE may also include the "refresher" parameter with the "refresher" parameter value set to "uac".
1168 */
1169 invite_message.add_header(Header::new(b"Session-Expires", b"1800"));
1170
1171 invite_message.add_header(Header::new(
1172 b"P-Preferred-Service",
1173 b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.session",
1174 ));
1175
1176 invite_message.add_header(Header::new(
1177 b"P-Preferred-Identity",
1178 String::from(contact_identity),
1179 ));
1180
1181 invite_message.add_header(Header::new(b"Content-Type", "application/sdp"));
1182
1183 let content_length = sdp.estimated_size();
1184 invite_message.add_header(Header::new(
1185 b"Content-Length",
1186 format!("{}", content_length),
1187 ));
1188
1189 let l_sdp_body = Arc::clone(&sdp);
1190 invite_message.set_body(sdp);
1191
1192 let req_headers = invite_message.copy_headers();
1193
1194 let public_user_identity = String::from(contact_identity);
1195
1196 // let (client_transaction_tx, client_transaction_rx) = mpsc::channel::<mpsc::Sender<CPMMessageParam>>(1);
1197 let (client_transaction_tx, mut client_transaction_rx) = mpsc::channel::<
1198 Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>,
1199 >(1);
1200 let (message_tx, mut message_rx) = mpsc::channel::<CPMMessageParam>(8);
1201 let message_receive_listener = Arc::clone(&self.message_receive_listener);
1202 let message_receive_listener_ = Arc::clone(&message_receive_listener);
1203
1204 let connect_function = Arc::clone(&self.msrp_socket_connect_function);
1205
1206 let message_result_callback = Arc::new(Mutex::new(Some(message_result_callback)));
1207 let message_result_callback = move |status_code, reason_phrase| {
1208 let mut guard = message_result_callback.lock().unwrap();
1209 if let Some(message_result_callback) = guard.take() {
1210 message_result_callback(status_code, reason_phrase);
1211 }
1212 };
1213
1214 let message = CPMMessageParam::new(
1215 String::from(message_type),
1216 String::from(message_content),
1217 recipient,
1218 recipient_type,
1219 recipient_uri,
1220 message_result_callback,
1221 );
1222
1223 let ongoing_dialogs = core.get_ongoing_dialogs();
1224
1225 let mut known_identities = ContactKnownIdentities::new();
1226 known_identities.add_identity(String::from(recipient));
1227 let outgoing_invitation = Arc::new((known_identities, message_tx));
1228 let outgoing_invitation_ = Arc::clone(&outgoing_invitation);
1229
1230 let ongoing_outgoing_invitations = Arc::clone(&self.ongoing_outgoing_invitations);
1231 let ongoing_sessions = Arc::clone(&self.ongoing_sessions);
1232
1233 let registered_public_identity = Arc::clone(&self.registered_public_identity);
1234
1235 let tm = core.get_transaction_manager();
1236 let tm_ = Arc::clone(&tm);
1237
1238 let rt_ = Arc::clone(rt);
1239
1240 rt.spawn(async move {
1241 // if let Some(message_tx) = client_transaction_rx.recv().await {
1242 // match message_tx.send(message).await {
1243 // Ok(()) => {},
1244 // Err(e) => {
1245 // (e.0.message_result_callback)(403);
1246 // },
1247 // }
1248
1249 // while let Some(message) = message_rx.recv().await {
1250 // match message_tx.send(message).await {
1251 // Ok(()) => {},
1252 // Err(e) => {
1253 // (e.0.message_result_callback)(403);
1254 // },
1255 // }
1256 // }
1257 // }
1258
1259 if let Some(res) = client_transaction_rx.recv().await {
1260 match res {
1261 Ok((resp_message, session_info, r_sdp_body)) => {
1262 if let Some(l_sdp) = match l_sdp_body.as_ref() {
1263 Body::Raw(raw) => {
1264 raw.as_sdp()
1265 }
1266 _ => None,
1267 } {
1268 if let Some(r_sdp) = match r_sdp_body.as_ref() {
1269 Body::Raw(raw) => {
1270 raw.as_sdp()
1271 }
1272 _ => None,
1273 } {
1274 if let Some(r_msrp_info) = r_sdp.as_msrp_info() {
1275 if let Ok(raddr) = std::str::from_utf8(r_msrp_info.address) {
1276 let raddr = String::from(raddr);
1277 let rport = r_msrp_info.port;
1278 let rpath = r_msrp_info.path.to_vec();
1279
1280 let asserted_contact_uri = session_info.asserted_contact_uri.clone();
1281 let asserted_contact_uri_ = asserted_contact_uri.clone();
1282 let known_identities = session_info.get_contact_known_identities();
1283 let cpm_session = CPMSession::new(session_info);
1284
1285 if let Ok(_) = cpm_session.set_local_sdp(l_sdp_body, sock, tls, host, port, path) {
1286 if let Ok(_) = cpm_session.set_remote_sdp(r_sdp_body, raddr, rport, rpath) {
1287 let cpm_session = Arc::new(cpm_session);
1288 let cpm_session_ = Arc::clone(&cpm_session);
1289
1290 let (d_tx, mut d_rx) = mpsc::channel(1);
1291
1292 let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
1293
1294 rt_.spawn(async move {
1295 if let Some(dialog) = d_rx.recv().await {
1296 ongoing_dialogs_.remove_dialog(&dialog);
1297 }
1298 });
1299
1300 if let Ok(dialog) = SipDialog::try_new_as_uac(&req_headers, &resp_message, move |d| {
1301 match d_tx.blocking_send(d) {
1302 Ok(()) => {},
1303 Err(e) => {},
1304 }
1305 }) {
1306 let dialog = Arc::new(dialog);
1307
1308 ongoing_dialogs.add_dialog(&dialog);
1309
1310 let (sess_tx, mut sess_rx) = mpsc::channel::<SipSessionEvent>(8);
1311
1312 let sip_session = SipSession::new(&cpm_session, SipSessionEventReceiver {
1313 tx: sess_tx,
1314 rt: Arc::clone(&rt_),
1315 });
1316
1317 let sip_session = Arc::new(sip_session);
1318 let sip_session_ = Arc::clone(&sip_session);
1319
1320 let public_user_identity_ = public_user_identity.clone();
1321
1322 sip_session.setup_confirmed_dialog(&dialog, CPMSessionDialogEventReceiver {
1323 public_user_identity: public_user_identity_,
1324 sip_session: Arc::clone(&sip_session),
1325 ongoing_sessions: Arc::clone(&ongoing_sessions),
1326 message_receive_listener: Arc::new(move |cpim_info, content_type, content_body| {
1327 message_receive_listener_(&sip_session_, asserted_contact_uri_.as_bytes(), cpim_info, content_type, content_body)
1328 }),
1329 msrp_socket_connect_function: Arc::clone(
1330 &connect_function,
1331 ),
1332 rt: Arc::clone(&rt_),
1333 });
1334
1335 if let Some((timeout, refresher)) =
1336 choose_timeout_on_client_transaction_completion(None, &resp_message)
1337 {
1338 match refresher {
1339 Refresher::UAC => sip_session.schedule_refresh(timeout, true, &rt_),
1340
1341 Refresher::UAS => sip_session.schedule_refresh(timeout, false, &rt_),
1342 }
1343 }
1344
1345 let sip_session_1 = Arc::clone(&sip_session);
1346 let sip_session_2 = Arc::clone(&sip_session);
1347
1348 let registered_public_identity_ = Arc::clone(®istered_public_identity);
1349
1350 let tm = Arc::clone(&tm_);
1351 let rt = Arc::clone(&rt_);
1352
1353 rt_.spawn(async move {
1354 while let Some(ev) = sess_rx.recv().await {
1355 match ev {
1356 SipSessionEvent::ShouldRefresh(dialog) => {
1357 if let Ok(mut message) =
1358 dialog.make_request(b"UPDATE", None)
1359 {
1360 message.add_header(Header::new(
1361 b"Supported",
1362 b"timer",
1363 ));
1364
1365 let guard =
1366 registered_public_identity_.lock().unwrap();
1367
1368 if let Some((transport, _, _)) = &*guard {
1369 tm.send_request(
1370 message,
1371 transport,
1372 UpdateMessageCallbacks {
1373 // session_expires: None,
1374 dialog,
1375 sip_session: Arc::clone(&sip_session_1),
1376 rt: Arc::clone(&rt),
1377 },
1378 &rt,
1379 );
1380 }
1381 }
1382 }
1383
1384 SipSessionEvent::Expired(dialog) => {
1385 if let Ok(message) = dialog.make_request(b"BYE", None) {
1386 let guard =
1387 registered_public_identity_.lock().unwrap();
1388
1389 if let Some((transport, _, _)) = &*guard {
1390 tm.send_request(
1391 message,
1392 transport,
1393 ClientTransactionNilCallbacks {},
1394 &rt,
1395 );
1396 }
1397 }
1398 }
1399
1400 _ => {}
1401 }
1402 }
1403 });
1404
1405 if let Ok(ack_message) = dialog.make_request(b"ACK", Some(1)) {
1406
1407 let rt = Arc::clone(&rt_);
1408
1409 if let Some(transport) = {
1410 let i;
1411 {
1412 let guard = registered_public_identity.lock().unwrap();
1413
1414 if let Some((transport, _, _)) = &*guard {
1415 i = Some(Arc::clone(transport))
1416 } else {
1417 i = None
1418 }
1419 }
1420 i
1421 } {
1422 tm_.send_request(
1423 ack_message,
1424 &transport,
1425 ClientTransactionNilCallbacks {},
1426 &rt_,
1427 );
1428
1429 let ongoing_sessions_ = Arc::clone(&ongoing_sessions);
1430
1431 if let Ok(message_tx) = cpm_session.start(&public_user_identity, move |cpim_info, content_type, mesage_buf| {
1432 message_receive_listener(&sip_session_2, asserted_contact_uri.as_bytes(), cpim_info, content_type, mesage_buf)
1433 }, &connect_function, move || {
1434 let mut guard = ongoing_sessions_.lock().unwrap();
1435 if let Some(idx) = guard.iter().position(|(_, sip_session)| {
1436 let inner = sip_session.get_inner();
1437 Arc::ptr_eq(&inner, &cpm_session_)
1438 }) {
1439 guard.swap_remove(idx);
1440 }
1441 }, &rt) {
1442
1443 match message_tx.send(message).await {
1444 Ok(()) => {},
1445 Err(e) => {
1446 (e.0.message_result_callback)(403, String::from("Forbidden"));
1447 },
1448 }
1449
1450 while let Some(message) = message_rx.recv().await {
1451 match message_tx.send(message).await {
1452 Ok(()) => {},
1453 Err(e) => {
1454 (e.0.message_result_callback)(403, String::from("Forbidden"));
1455 },
1456 }
1457 }
1458
1459 let mut guard = ongoing_sessions.lock().unwrap();
1460
1461 guard.push((known_identities, sip_session));
1462
1463 let mut guard = ongoing_outgoing_invitations.lock().unwrap();
1464
1465 if let Some(idx) = guard.iter().position(|outgoing_invitation| Arc::ptr_eq(outgoing_invitation, &outgoing_invitation_)) {
1466 guard.swap_remove(idx);
1467 }
1468
1469 return;
1470 }
1471 }
1472 }
1473 }
1474 }
1475 }
1476 }
1477 }
1478 }
1479 }
1480 }
1481
1482 Err((error_code, error_reason)) => {
1483 (message.message_result_callback)(error_code, error_reason);
1484 return;
1485 }
1486 }
1487 }
1488
1489 (message.message_result_callback)(500, String::from("Internal Error"));
1490 });
1491
1492 tm.send_request(
1493 invite_message,
1494 transport,
1495 CPMSessionInviteCallbacks {
1496 // l_sdp,
1497 recipient_type: RecipientType::from(recipient_type),
1498 client_transaction_tx,
1499 // message_result_callback: Box::new(message_result_callback),
1500 // message_receive_listener: Arc::clone(&self.message_receive_listener),
1501 rt: Arc::clone(rt),
1502 },
1503 rt,
1504 );
1505
1506 self.ongoing_outgoing_invitations
1507 .lock()
1508 .unwrap()
1509 .push(outgoing_invitation);
1510 return;
1511 }
1512 }
1513
1514 (message_result_callback)(500, String::from("Internal Error"));
1515 }
1516}
1517
1518struct CPMSessionInviteCallbacks {
1519 // l_sdp: Arc<Body>,
1520 recipient_type: RecipientType,
1521 client_transaction_tx:
1522 mpsc::Sender<Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>>,
1523 // client_transaction_tx: mpsc::Sender<mpsc::Sender<CPMMessageParam>>,
1524 // message_result_callback: Box<dyn Fn(u16) + Send + Sync>,
1525 // message_receive_listener: Arc<dyn Fn(&[u8], &[u8], &[u8]) + Send + Sync>,
1526 rt: Arc<Runtime>,
1527}
1528
1529impl ClientTransactionCallbacks for CPMSessionInviteCallbacks {
1530 fn on_provisional_response(&self, message: SipMessage) {}
1531
1532 fn on_final_response(&self, message: SipMessage) {
1533 if let SipMessage::Response(l, _, _) = &message {
1534 let mut status_code = 500;
1535 let mut reason_phrase = String::from("Internal Error");
1536 if l.status_code >= 200 && l.status_code < 300 {
1537 if let Some(session_info) = get_cpm_session_info_from_message(&message, true) {
1538 loop {
1539 let accepting_chatbot_session_is_forbidden =
1540 if let RecipientType::Chatbot = self.recipient_type {
1541 match session_info.cpm_contact.service_type {
1542 CPMServiceType::Chatbot => false,
1543 _ => true,
1544 }
1545 } else {
1546 false
1547 };
1548
1549 if accepting_chatbot_session_is_forbidden {
1550 status_code = 606;
1551 reason_phrase = String::from("Not Acceptable");
1552 break;
1553 }
1554
1555 if let Some(resp_body) = message.get_body() {
1556 // let mut sdp: Option<Sdp> = None;
1557
1558 if let Some(headers) = message.headers() {
1559 if let Some(header) = header::search(headers, b"Content-Type", true)
1560 {
1561 if let Some(content_type) =
1562 header.get_value().as_header_field().as_content_type()
1563 {
1564 if content_type
1565 .major_type
1566 .eq_ignore_ascii_case(b"application")
1567 && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1568 {
1569 // match resp_body.as_ref() {
1570 // Body::Raw(raw) => {
1571 // sdp = raw.as_sdp();
1572 // }
1573 // _ => {}
1574 // }
1575 let client_transaction_tx =
1576 self.client_transaction_tx.clone();
1577 let sdp_body = Arc::clone(&resp_body);
1578 self.rt.spawn(async move {
1579 match client_transaction_tx
1580 .send(Ok((message, session_info, sdp_body)))
1581 .await
1582 {
1583 Ok(()) => {}
1584 Err(e) => {}
1585 }
1586 });
1587
1588 return;
1589 }
1590 }
1591 }
1592 }
1593
1594 // let l_sdp_body = Arc::clone(&self.l_sdp);
1595 // let r_sdp_body = resp_body;
1596
1597 // if let Some(sdp) = sdp {
1598 // if let Some(msrp_info) = sdp.as_msrp_info() {
1599 // if let Some(l_sdp) = match l_sdp_body.as_ref() {
1600 // Body::Raw(raw) => {
1601 // raw.as_sdp()
1602 // }
1603 // _ => None,
1604 // } {
1605 // if let Some(l_msrp_info) = l_sdp.as_msrp_info() {
1606 // let message_receive_listener = Arc::clone(&self.message_receive_listener);
1607 // let cpm_session = CPMSession::new(session_info, move |content_type, mesage_buf| {
1608 // message_receive_listener("", content_type, mesage_buf)
1609 // });
1610
1611 // if let Ok(_) = cpm_session.set_local_sdp(l_sdp_body, sock, tls, laddr, lport, lpath) {
1612 // if let Ok(_) = cpm_session.set_remote_sdp(r_sdp_body, raddr, rport, rpath) {
1613
1614 // if let Ok(message_tx) = cpm_session.start(connect_function, session_end_function, &self.rt) {
1615 // let client_transaction_tx = self.client_transaction_tx.clone();
1616
1617 // self.rt.spawn(async move {
1618 // client_transaction_tx.send(message_tx).await;
1619 // });
1620
1621 // return;
1622 // }
1623 // }
1624 // }
1625 // }
1626 // }
1627 // }
1628 // }
1629 }
1630 break;
1631 }
1632 }
1633 } else {
1634 status_code = l.status_code;
1635 reason_phrase = String::from_utf8_lossy(&l.reason_phrase).to_string();
1636 }
1637
1638 let client_transaction_tx = self.client_transaction_tx.clone();
1639 self.rt.spawn(async move {
1640 match client_transaction_tx
1641 .send(Err((status_code, reason_phrase)))
1642 .await
1643 {
1644 Ok(()) => {}
1645 Err(e) => {}
1646 }
1647 });
1648 }
1649 }
1650
1651 fn on_transport_error(&self) {
1652 todo!()
1653 }
1654}
1655
1656pub(crate) struct UpdateMessageCallbacks<T> {
1657 // session_expires: Option<Header>,
1658 pub(crate) dialog: Arc<SipDialog>,
1659 pub(crate) sip_session: Arc<SipSession<T>>,
1660 pub(crate) rt: Arc<Runtime>,
1661}
1662
1663impl<T> ClientTransactionCallbacks for UpdateMessageCallbacks<T> {
1664 fn on_provisional_response(&self, message: SipMessage) {}
1665
1666 fn on_final_response(&self, message: SipMessage) {
1667 if let SipMessage::Response(l, _, _) = &message {
1668 if l.status_code >= 200 && l.status_code < 300 {
1669 if let Some((timeout, refresher)) =
1670 choose_timeout_on_client_transaction_completion(None, &message)
1671 {
1672 match refresher {
1673 Refresher::UAC => {
1674 self.sip_session.schedule_refresh(timeout, true, &self.rt)
1675 }
1676
1677 Refresher::UAS => {
1678 self.sip_session.schedule_refresh(timeout, false, &self.rt)
1679 }
1680 }
1681 }
1682 } else {
1683 if l.status_code == 404
1684 || l.status_code == 410
1685 || l.status_code == 416
1686 || (l.status_code >= 482 && l.status_code <= 485)
1687 || l.status_code == 489
1688 || l.status_code == 604
1689 {
1690 self.dialog.on_terminating_response(&message, &self.rt);
1691 }
1692 }
1693 }
1694 }
1695
1696 fn on_transport_error(&self) {}
1697}
1698
1699pub(crate) struct CPMSessionDialogEventReceiver {
1700 pub(crate) sip_session: Arc<SipSession<CPMSession>>,
1701 pub(crate) ongoing_sessions:
1702 Arc<Mutex<Vec<(ContactKnownIdentities, Arc<SipSession<CPMSession>>)>>>,
1703 pub(crate) public_user_identity: String,
1704 pub(crate) message_receive_listener: Arc<dyn Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync>,
1705 pub(crate) msrp_socket_connect_function: Arc<
1706 dyn Fn(
1707 ClientSocket,
1708 &String,
1709 u16,
1710 bool,
1711 )
1712 -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
1713 + Send
1714 + Sync,
1715 >,
1716 pub(crate) rt: Arc<Runtime>,
1717}
1718
1719impl SipDialogEventCallbacks for CPMSessionDialogEventReceiver {
1720 fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {
1721 let ongoing_sessions = Arc::clone(&self.ongoing_sessions);
1722 let sip_session_ = Arc::clone(&self.sip_session);
1723 let session = self.sip_session.get_inner();
1724 let message_receive_listener = Arc::clone(&self.message_receive_listener);
1725 match session.start(
1726 &self.public_user_identity,
1727 move |cpim_info, content_type, content_body| {
1728 message_receive_listener(cpim_info, content_type, content_body)
1729 },
1730 &self.msrp_socket_connect_function,
1731 move || {
1732 let mut guard = ongoing_sessions.lock().unwrap();
1733 if let Some(idx) = guard
1734 .iter()
1735 .position(|(_, sip_session)| Arc::ptr_eq(sip_session, &sip_session_))
1736 {
1737 guard.swap_remove(idx);
1738 }
1739 },
1740 &self.rt,
1741 ) {
1742 Ok(_) => {}
1743 Err(()) => {}
1744 };
1745 }
1746
1747 fn on_new_request(
1748 &self,
1749 transaction: Arc<ServerTransaction>,
1750 tx: mpsc::Sender<ServerTransactionEvent>,
1751 // timer: &Timer,
1752 rt: &Arc<Runtime>,
1753 ) -> Option<(u16, bool)> {
1754 let message = transaction.message();
1755
1756 if let SipMessage::Request(l, req_headers, req_body) = message {
1757 if l.method == UPDATE {
1758 if let Some(headers) = req_headers {
1759 if let Some(h) = search(headers, b"Supported", true) {
1760 if h.supports(b"timer") {
1761 if let Ok(Some((timeout, refresher))) =
1762 choose_timeout_for_server_transaction_response(
1763 &transaction,
1764 true,
1765 Refresher::UAC,
1766 )
1767 {
1768 // to-do: not always UAC ?
1769 if let Some(mut resp_message) = server_transaction::make_response(
1770 message,
1771 transaction.to_tag(),
1772 200,
1773 b"Ok",
1774 ) {
1775 match refresher {
1776 Refresher::UAC => {
1777 resp_message.add_header(Header::new(
1778 b"Session-Expires",
1779 format!("{};refresher=uac", timeout),
1780 ));
1781
1782 self.sip_session.schedule_refresh(timeout, false, rt);
1783 }
1784 Refresher::UAS => {
1785 resp_message.add_header(Header::new(
1786 b"Session-Expires",
1787 format!("{};refresher=uas", timeout),
1788 ));
1789
1790 self.sip_session.schedule_refresh(timeout, true, rt);
1791 }
1792 }
1793
1794 server_transaction::send_response(
1795 transaction,
1796 resp_message,
1797 tx,
1798 rt,
1799 );
1800 }
1801
1802 return Some((200, false));
1803 }
1804 }
1805 }
1806
1807 if let Some(resp_message) =
1808 server_transaction::make_response(message, transaction.to_tag(), 200, b"Ok")
1809 {
1810 server_transaction::send_response(transaction, resp_message, tx, rt);
1811 }
1812 }
1813
1814 return Some((200, false));
1815 }
1816 }
1817
1818 None
1819 }
1820
1821 fn on_terminating_request(&self, _message: &SipMessage) {
1822 let mut guard = self.ongoing_sessions.lock().unwrap();
1823 if let Some(idx) = guard
1824 .iter()
1825 .position(|(_, sip_session)| Arc::ptr_eq(&sip_session, &self.sip_session))
1826 {
1827 guard.swap_remove(idx);
1828 }
1829 }
1830
1831 fn on_terminating_response(&self, message: &SipMessage) {}
1832}
1833
1834pub struct CPMSessionServiceWrapper {
1835 pub service: Arc<CPMSessionService>,
1836 pub tm: Arc<SipTransactionManager>,
1837}
1838
1839impl TransactionHandler for CPMSessionServiceWrapper {
1840 fn handle_transaction(
1841 &self,
1842 transaction: &Arc<ServerTransaction>,
1843 ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
1844 channels: &mut Option<(
1845 mpsc::Sender<ServerTransactionEvent>,
1846 mpsc::Receiver<ServerTransactionEvent>,
1847 )>,
1848 rt: &Arc<Runtime>,
1849 ) -> bool {
1850 let message = transaction.message();
1851 if let SipMessage::Request(req_line, Some(req_headers), Some(req_body)) = message {
1852 if req_line.method == INVITE {
1853 let mut is_cpm_session_invitation = false;
1854
1855 for header in req_headers {
1856 if header.get_name().eq_ignore_ascii_case(b"Accept-Contact") {
1857 if header.get_value().as_header_field().contains_icsi_ref(
1858 b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session",
1859 ) {
1860 is_cpm_session_invitation = true;
1861 break;
1862 }
1863 }
1864 }
1865
1866 if is_cpm_session_invitation {
1867 let mut sdp = None;
1868
1869 for header in req_headers {
1870 if header.get_name().eq_ignore_ascii_case(b"Content-Type") {
1871 if let Some(content_type) =
1872 header.get_value().as_header_field().as_content_type()
1873 {
1874 if content_type.major_type.eq_ignore_ascii_case(b"application")
1875 && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1876 {
1877 match req_body.as_ref() {
1878 Body::Raw(raw) => {
1879 sdp = raw.as_sdp();
1880 }
1881 _ => {}
1882 }
1883 }
1884 }
1885 }
1886 }
1887
1888 if let Some(sdp) = sdp {
1889 if let Some(msrp_info) = sdp.as_msrp_info() {
1890 if let Some(session_info) =
1891 get_cpm_session_info_from_message(message, false)
1892 {
1893 let auto_accept_response_code: u16;
1894
1895 let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
1896
1897 match session_info.cpm_contact.service_type {
1898 CPMServiceType::Group => loop {
1899 if let (
1900 Some(conversation_id),
1901 Some(contribution_id),
1902 Some(subject),
1903 Some(referred_by),
1904 ) = (
1905 &session_info.conversation_id,
1906 &session_info.contribution_id,
1907 &session_info.subject,
1908 &session_info.referred_by,
1909 ) {
1910 if let Ok(subject) = urlencoding::decode(subject) {
1911 if let Some(referred_by) = referred_by
1912 .as_bytes()
1913 .as_name_addresses()
1914 .first()
1915 {
1916 if let Some(referred_by_name) =
1917 referred_by.display_name
1918 {
1919 if let Ok(referred_by_name) =
1920 std::str::from_utf8(referred_by_name)
1921 {
1922 if let Some(uri_part) =
1923 &referred_by.uri_part
1924 {
1925 if let Ok(referred_by_uri) =
1926 std::str::from_utf8(
1927 uri_part.uri,
1928 )
1929 {
1930 match msrp_info.direction {
1931 MsrpDirection::SendReceive => {
1932 auto_accept_response_code = (self.service.group_invite_handler_function)(&session_info.asserted_contact_uri, conversation_id, contribution_id, &subject, referred_by_name, referred_by_uri, cancel_rx);
1933 break;
1934 }
1935
1936 _ => {
1937 (self.service.group_invite_event_listener_function)(CPMGroupEvent::OnInvite);
1938 return false;
1939 }
1940 }
1941 }
1942 }
1943 }
1944 }
1945 }
1946 }
1947 }
1948
1949 return false;
1950 },
1951
1952 _ => {
1953 let mut is_chatbot_session = false;
1954
1955 for header in req_headers {
1956 if header
1957 .get_name()
1958 .eq_ignore_ascii_case(b"Accept-Contact")
1959 {
1960 if header.get_value().as_header_field().contains_iari_ref(b"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot") {
1961 is_chatbot_session = true;
1962 break;
1963 }
1964 }
1965 }
1966
1967 if is_chatbot_session {
1968 match session_info.cpm_contact.service_type {
1969 CPMServiceType::Chatbot => {}
1970 _ => {
1971 if let Some(resp_message) =
1972 server_transaction::make_response(
1973 message,
1974 transaction.to_tag(),
1975 606,
1976 b"Not Acceptable",
1977 )
1978 {
1979 if let Some((tx, _)) = channels.take() {
1980 server_transaction::send_response(
1981 Arc::clone(transaction),
1982 resp_message,
1983 tx,
1984 // &timer,
1985 rt,
1986 );
1987 }
1988 }
1989
1990 return true;
1991 }
1992 }
1993
1994 // to-do: check privacy settings and return 606 if appropriate
1995 }
1996
1997 loop {
1998 if let (Some(conversation_id), Some(contribution_id)) = (
1999 &session_info.conversation_id,
2000 &session_info.contribution_id,
2001 ) {
2002 match msrp_info.direction {
2003 MsrpDirection::SendOnly => {
2004 if session_info.is_deferred_session {
2005 auto_accept_response_code = (self.service.conversation_invite_handler_function)(true, &session_info.asserted_contact_uri, contribution_id, conversation_id, cancel_rx);
2006 break;
2007 }
2008 }
2009
2010 MsrpDirection::SendReceive => {
2011 auto_accept_response_code = (self
2012 .service
2013 .conversation_invite_handler_function)(
2014 false,
2015 &session_info.asserted_contact_uri,
2016 &contribution_id,
2017 &conversation_id,
2018 cancel_rx,
2019 );
2020 break;
2021 }
2022
2023 _ => {}
2024 }
2025 }
2026
2027 return false;
2028 }
2029 }
2030 }
2031
2032 if auto_accept_response_code >= 200
2033 && auto_accept_response_code < 300
2034 {
2035 if let Some((tx, rx)) = channels.take() {
2036 try_accept_invitation(
2037 session_info,
2038 tx,
2039 rx,
2040 None,
2041 req_body,
2042 msrp_info,
2043 &self.service.msrp_socket_allocator_function,
2044 &self.service.msrp_socket_connect_function,
2045 &self.service.message_receive_listener,
2046 transaction,
2047 message,
2048 &self.service.registered_public_identity,
2049 ongoing_dialogs,
2050 &self.service.ongoing_sessions,
2051 &self.tm,
2052 rt,
2053 )
2054 } else {
2055 // 500
2056 }
2057
2058 return true;
2059 } else if auto_accept_response_code >= 100
2060 && auto_accept_response_code < 200
2061 {
2062 if let Some((tx, mut rx)) = channels.take() {
2063 if let Some(mut resp_message) =
2064 server_transaction::make_response(
2065 message,
2066 transaction.to_tag(),
2067 180,
2068 b"Ringing",
2069 )
2070 {
2071 resp_message.add_header(Header::new(b"Allow", b"NOTIFY, OPTIONS, INVITE, UPDATE, CANCEL, BYE, ACK, MESSAGE"));
2072
2073 {
2074 let guard = self
2075 .service
2076 .registered_public_identity
2077 .lock()
2078 .unwrap();
2079
2080 if let Some((
2081 transport,
2082 contact_identity,
2083 instance_id,
2084 )) = &*guard
2085 {
2086 let transport_ = transaction.transport();
2087 if Arc::ptr_eq(transport, transport_) {
2088 resp_message.add_header(Header::new(
2089 b"Contact",
2090 format!(
2091 "<{}>;+sip.instance=\"{}\"",
2092 contact_identity, instance_id
2093 ),
2094 ));
2095 } // to-do: treat as error if transport has changed somehow
2096 }
2097 }
2098
2099 let (d_tx, mut d_rx) = mpsc::channel(1);
2100
2101 let ongoing_dialogs_ = Arc::clone(ongoing_dialogs);
2102
2103 rt.spawn(async move {
2104 if let Some(dialog) = d_rx.recv().await {
2105 ongoing_dialogs_.remove_dialog(&dialog);
2106 }
2107 });
2108
2109 if let Ok(dialog) = SipDialog::try_new_as_uas(
2110 message,
2111 &resp_message,
2112 move |d| match d_tx.blocking_send(d) {
2113 Ok(()) => {}
2114 Err(e) => {}
2115 },
2116 ) {
2117 let dialog = Arc::new(dialog);
2118
2119 ongoing_dialogs.add_dialog(&dialog);
2120
2121 // dialog.register_transaction(Arc::clone(transaction));
2122
2123 let (tx_, rx_) =
2124 mpsc::channel::<ServerTransactionEvent>(8);
2125
2126 let known_identities =
2127 session_info.get_contact_known_identities();
2128
2129 let invitation = CPMSessionInvitation::new(
2130 session_info,
2131 Arc::clone(&req_body),
2132 Arc::clone(&transaction),
2133 tx.clone(),
2134 rx_,
2135 dialog,
2136 );
2137
2138 let (iv_tx, mut iv_rx) =
2139 mpsc::channel::<CPMSessionInvitationResponse>(
2140 1,
2141 );
2142 let iv_tx_ = iv_tx.clone();
2143 let iv = Arc::new((known_identities, iv_tx_));
2144 let iv_ = Arc::clone(&iv);
2145
2146 let message_receive_listener = Arc::clone(
2147 &self.service.message_receive_listener,
2148 );
2149 let msrp_socket_allocator_function = Arc::clone(
2150 &self.service.msrp_socket_allocator_function,
2151 );
2152 let msrp_socket_connect_function = Arc::clone(
2153 &self.service.msrp_socket_connect_function,
2154 );
2155 let registered_public_identity = Arc::clone(
2156 &self.service.registered_public_identity,
2157 );
2158 let ongoing_dialogs = Arc::clone(ongoing_dialogs);
2159 let ongoing_sessions =
2160 Arc::clone(&self.service.ongoing_sessions);
2161
2162 let ongoing_incoming_invitations_ = Arc::clone(
2163 &self.service.ongoing_incoming_invitations,
2164 );
2165
2166 let tm_ = Arc::clone(&self.tm);
2167 let rt_ = Arc::clone(rt);
2168
2169 rt.spawn(async move {
2170 match iv_rx.recv().await {
2171 Some(
2172 CPMSessionInvitationResponse::Accept,
2173 ) => {
2174 try_accept_hanging_invitation(
2175 invitation,
2176 &msrp_socket_allocator_function,
2177 &msrp_socket_connect_function,
2178 &message_receive_listener,
2179 ®istered_public_identity,
2180 &ongoing_dialogs,
2181 &ongoing_sessions,
2182 &tm_,
2183 &rt_,
2184 );
2185 }
2186 Some(
2187 CPMSessionInvitationResponse::Dispose,
2188 ) => match cancel_tx.send(()) {
2189 Ok(()) => {}
2190 Err(()) => {}
2191 },
2192 _ => {}
2193 }
2194
2195 let mut guard = ongoing_incoming_invitations_
2196 .lock()
2197 .unwrap();
2198 if let Some(idx) = guard
2199 .iter()
2200 .position(|iv| Arc::ptr_eq(iv, &iv_))
2201 {
2202 guard.swap_remove(idx);
2203 }
2204 });
2205
2206 {
2207 self.service
2208 .ongoing_incoming_invitations
2209 .lock()
2210 .unwrap()
2211 .push(iv);
2212 }
2213
2214 rt.spawn(async move {
2215 while let Some(ev) = rx.recv().await {
2216 match ev {
2217 ServerTransactionEvent::Cancelled => {
2218 match iv_tx.send(CPMSessionInvitationResponse::Dispose).await {
2219 Ok(()) => {},
2220 Err(e) => {},
2221 }
2222 }
2223 _ => {}
2224 }
2225
2226 match tx_.send(ev).await {
2227 Ok(()) => {}
2228 Err(e) => {}
2229 }
2230 }
2231 });
2232
2233 server_transaction::send_response(
2234 Arc::clone(transaction),
2235 resp_message,
2236 tx,
2237 // &timer,
2238 rt,
2239 );
2240
2241 return true;
2242 }
2243 }
2244 }
2245
2246 return true;
2247 }
2248 }
2249
2250 return false;
2251 }
2252 }
2253
2254 if let Some(resp_message) = server_transaction::make_response(
2255 message,
2256 transaction.to_tag(),
2257 400,
2258 b"Bad Request",
2259 ) {
2260 if let Some((tx, _)) = channels.take() {
2261 server_transaction::send_response(
2262 Arc::clone(transaction),
2263 resp_message,
2264 tx,
2265 // &timer,
2266 rt,
2267 );
2268 }
2269 }
2270
2271 return true;
2272 }
2273 }
2274 }
2275
2276 false
2277 }
2278}