rust_rcs_client/messaging/cpm/
session.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate rust_rcs_core;
16extern crate rust_strict_sdp;
17
18use std::io::Read;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21
22use base64::{engine::general_purpose, Engine as _};
23use futures::Future;
24
25use rust_rcs_core::cpim::{CPIMInfo, CPIMMessage};
26use rust_rcs_core::internet::body::VectorReader;
27use rust_rcs_core::internet::header::search;
28use rust_rcs_core::internet::header_field::AsHeaderField;
29use rust_rcs_core::internet::Body;
30use rust_rcs_core::internet::{header, Header};
31
32use rust_rcs_core::internet::headers::{AsContentType, Supported};
33
34use rust_rcs_core::internet::name_addr::AsNameAddr;
35use rust_rcs_core::io::network::stream::{ClientSocket, ClientStream};
36use rust_rcs_core::io::{DynamicChain, Serializable};
37use rust_rcs_core::msrp::info::msrp_info_reader::AsMsrpInfo;
38use rust_rcs_core::msrp::info::{MsrpDirection, MsrpInfo, MsrpInterfaceType, MsrpSetupMethod};
39use rust_rcs_core::msrp::msrp_channel::MsrpChannel;
40use rust_rcs_core::msrp::msrp_chunk::MsrpChunk;
41use rust_rcs_core::msrp::msrp_demuxer::MsrpDemuxer;
42use rust_rcs_core::msrp::msrp_muxer::{MsrpDataWriter, MsrpMessageReceiver, MsrpMuxer};
43use rust_rcs_core::msrp::msrp_transport::msrp_transport_start;
44// use rust_rcs_core::msrp::msrp_transport::MsrpTransportWrapper;
45use rust_rcs_core::sip::sip_core::SipDialogCache;
46use rust_rcs_core::sip::sip_session::{
47    choose_timeout_for_server_transaction_response,
48    choose_timeout_on_client_transaction_completion, Refresher, SipSession, SipSessionEvent,
49    SipSessionEventReceiver,
50};
51use rust_rcs_core::sip::sip_transaction::client_transaction::ClientTransactionNilCallbacks;
52use rust_rcs_core::sip::sip_transaction::server_transaction;
53
54use rust_rcs_core::sip::SipDialog;
55use rust_rcs_core::sip::{ClientTransactionCallbacks, SipTransactionManager};
56use rust_rcs_core::sip::{ServerTransaction, SipDialogEventCallbacks};
57use rust_rcs_core::sip::{ServerTransactionEvent, UPDATE};
58use rust_rcs_core::sip::{SipCore, SipTransport};
59
60use rust_rcs_core::sip::SipMessage;
61use rust_rcs_core::sip::TransactionHandler;
62
63use rust_rcs_core::sip::INVITE;
64
65use rust_rcs_core::sip::sip_headers::AsFromTo;
66
67use rust_rcs_core::util::rand::create_raw_alpha_numeric_string;
68use rust_rcs_core::util::raw_string::StrFind;
69
70use rust_strict_sdp::AsSDP;
71
72use tokio::runtime::Runtime;
73use tokio::sync::mpsc;
74use tokio::sync::oneshot;
75use uuid::Uuid;
76
77use crate::contact::ContactKnownIdentities;
78use crate::messaging::ffi::RecipientType;
79
80use super::session_invitation::{
81    try_accept_hanging_invitation, try_accept_invitation, CPMSessionInvitation,
82    CPMSessionInvitationResponse,
83};
84use super::sip::cpm_accept_contact::CPMAcceptContact;
85use super::sip::cpm_contact::{AsCPMContact, CPMContact, CPMServiceType};
86use super::{make_cpim_message_content_body, CPMMessageParam};
87
88// to-do: use enum
89pub struct CPMSessionInfo {
90    pub cpm_contact: CPMContact,
91    pub asserted_contact_uri: String,
92    pub conversation_id: Option<String>,
93    pub contribution_id: Option<String>,
94    pub subject: Option<String>,
95    pub referred_by: Option<String>,
96    pub is_deferred_session: bool,
97    pub contact_alias: Option<String>,
98    pub conversation_supports_anonymization: bool,
99    pub conversation_is_using_anonymization_token: bool,
100}
101
102impl CPMSessionInfo {
103    pub fn get_contact_known_identities(&self) -> ContactKnownIdentities {
104        let mut known_identities = ContactKnownIdentities::new();
105        known_identities.add_identity(self.asserted_contact_uri.clone());
106        match self.cpm_contact.service_type {
107            CPMServiceType::OneToOne | CPMServiceType::Chatbot => {
108                if let Ok(service_uri) = std::str::from_utf8(&self.cpm_contact.service_uri) {
109                    known_identities.add_identity(String::from(service_uri));
110                }
111            }
112            _ => {}
113        }
114        if let Some(contact_alias) = &self.contact_alias {
115            known_identities.add_identity(String::from(contact_alias));
116        }
117        known_identities
118    }
119}
120
121pub fn get_cpm_session_info_from_message(
122    message: &SipMessage,
123    is_outgoing: bool,
124) -> Option<CPMSessionInfo> {
125    if let Some(headers) = message.headers() {
126        let mut contact_header = None;
127        let mut asserted_identity_header = None;
128        let mut subject_header = None;
129        let mut referred_by_header = None;
130
131        let mut from_header = None;
132        let mut to_header = None;
133
134        let mut conversation_id: Option<&[u8]> = None;
135        let mut contribution_id: Option<&[u8]> = None;
136
137        for header in headers {
138            if header.get_name().eq_ignore_ascii_case(b"Contact") {
139                contact_header = Some(header);
140            } else if header
141                .get_name()
142                .eq_ignore_ascii_case(b"P-Asserted-Identity")
143            {
144                asserted_identity_header = Some(header);
145            } else if header.get_name().eq_ignore_ascii_case(b"Conversation-ID") {
146                conversation_id = Some(header.get_value());
147            } else if header.get_name().eq_ignore_ascii_case(b"Contribution-ID") {
148                contribution_id = Some(header.get_value());
149            } else if header.get_name().eq_ignore_ascii_case(b"Subject") {
150                subject_header = Some(header);
151            } else if header.get_name().eq_ignore_ascii_case(b"Referred-By") {
152                referred_by_header = Some(header);
153            } else if header.get_name().eq_ignore_ascii_case(b"From") {
154                from_header = Some(header);
155            } else if header.get_name().eq_ignore_ascii_case(b"To") {
156                to_header = Some(header);
157            }
158        }
159
160        if let Some(contact_header) = contact_header {
161            if let Some(cpm_contact) = contact_header
162                .get_value()
163                .as_header_field()
164                .as_cpm_contact()
165            {
166                let mut asserted_contact_uri = None;
167                let mut is_deferred_session = false;
168                let mut conversation_supports_anonymization = false;
169                let mut conversation_is_using_anonymization_token = false;
170
171                match cpm_contact.service_type {
172                    CPMServiceType::OneToOne | CPMServiceType::Chatbot => {
173                        if let Some(asserted_identity_header) = asserted_identity_header {
174                            if let Some(name_addr) = asserted_identity_header
175                                .get_value()
176                                .as_name_addresses()
177                                .first()
178                            {
179                                if let Some(uri_part) = &name_addr.uri_part {
180                                    if uri_part.uri.start_with(b"rcse-standfw@") {
181                                        is_deferred_session = true;
182                                        if let Some(referred_by_header) = referred_by_header {
183                                            if let Some(referred_by) = referred_by_header
184                                                .get_value()
185                                                .as_name_addresses()
186                                                .first()
187                                            {
188                                                if let Some(uri_part) = &referred_by.uri_part {
189                                                    asserted_contact_uri = Some(uri_part.uri);
190                                                    for p in referred_by_header
191                                                        .get_value()
192                                                        .as_header_field()
193                                                        .get_parameter_iterator()
194                                                    {
195                                                        if p.name == b"tk" {
196                                                            match p.value {
197                                                                Some(b"on") => {
198                                                                    conversation_supports_anonymization = true;
199                                                                    conversation_is_using_anonymization_token = true;
200                                                                }
201                                                                Some(b"off") => {
202                                                                    conversation_supports_anonymization = true;
203                                                                }
204                                                                _ => {}
205                                                            }
206                                                            break;
207                                                        }
208                                                    }
209                                                }
210                                            }
211                                        }
212                                    }
213
214                                    if !is_deferred_session {
215                                        asserted_contact_uri = Some(uri_part.uri);
216                                        for p in uri_part.get_parameter_iterator() {
217                                            if p.name == b"tk" {
218                                                match p.value {
219                                                    Some(b"on") => {
220                                                        conversation_supports_anonymization = true;
221                                                        conversation_is_using_anonymization_token =
222                                                            true;
223                                                    }
224                                                    Some(b"off") => {
225                                                        conversation_supports_anonymization = true;
226                                                    }
227                                                    _ => {}
228                                                }
229                                                break;
230                                            }
231                                        }
232                                    }
233                                }
234                            }
235                        }
236                    }
237
238                    _ => {
239                        if let Some(asserted_identity_header) = asserted_identity_header {
240                            if let Some(name_addr) = asserted_identity_header
241                                .get_value()
242                                .as_name_addresses()
243                                .first()
244                            {
245                                if let Some(uri_part) = &name_addr.uri_part {
246                                    asserted_contact_uri = Some(uri_part.uri);
247                                }
248                            }
249                        }
250                    }
251                }
252
253                if asserted_contact_uri.is_none() {
254                    asserted_contact_uri = Some(&cpm_contact.service_uri);
255                }
256
257                if let Some(asserted_contact_uri) = asserted_contact_uri {
258                    if let Ok(asserted_contact_uri) = std::str::from_utf8(asserted_contact_uri) {
259                        let asserted_contact_uri = String::from(asserted_contact_uri);
260
261                        let mut contact_alias = None;
262                        if is_outgoing {
263                            if let Some(to_header) = to_header {
264                                if let Some(name_addr) = to_header
265                                    .get_value()
266                                    .as_header_field()
267                                    .as_from_to()
268                                    .addresses
269                                    .first()
270                                {
271                                    if let Some(display_name) = name_addr.display_name {
272                                        if let Ok(display_name) = std::str::from_utf8(display_name)
273                                        {
274                                            contact_alias = Some(String::from(display_name));
275                                        }
276                                    }
277                                }
278                            }
279                        } else {
280                            if let Some(from_header) = from_header {
281                                if let Some(name_addr) = from_header
282                                    .get_value()
283                                    .as_header_field()
284                                    .as_from_to()
285                                    .addresses
286                                    .first()
287                                {
288                                    if let Some(display_name) = name_addr.display_name {
289                                        if let Ok(display_name) = std::str::from_utf8(display_name)
290                                        {
291                                            contact_alias = Some(String::from(display_name));
292                                        }
293                                    }
294                                }
295                            }
296                        }
297
298                        return Some(CPMSessionInfo {
299                            cpm_contact,
300                            asserted_contact_uri,
301                            conversation_id: match conversation_id {
302                                Some(conversation_id) => match std::str::from_utf8(conversation_id)
303                                {
304                                    Ok(conversation_id) => Some(String::from(conversation_id)),
305                                    Err(_) => None,
306                                },
307                                None => None,
308                            },
309                            contribution_id: match contribution_id {
310                                Some(contribution_id) => match std::str::from_utf8(contribution_id)
311                                {
312                                    Ok(contribution_id) => Some(String::from(contribution_id)),
313                                    Err(_) => None,
314                                },
315                                None => None,
316                            },
317                            subject: match subject_header {
318                                Some(subject_header) => {
319                                    if let Ok(subject) =
320                                        std::str::from_utf8(subject_header.get_value())
321                                    {
322                                        Some(String::from(subject))
323                                    } else {
324                                        None
325                                    }
326                                }
327                                None => None,
328                            },
329                            referred_by: match referred_by_header {
330                                Some(referred_by_header) => {
331                                    if let Ok(referred_by) =
332                                        std::str::from_utf8(referred_by_header.get_value())
333                                    {
334                                        Some(String::from(referred_by))
335                                    } else {
336                                        None
337                                    }
338                                }
339                                None => None,
340                            },
341                            is_deferred_session,
342                            contact_alias,
343                            conversation_supports_anonymization,
344                            conversation_is_using_anonymization_token,
345                        });
346                    }
347                }
348            }
349        }
350    }
351
352    None
353}
354
355enum CPMSessionNegotiationState {
356    NoneSet,
357    LocalSet(Arc<Body>, Option<ClientSocket>, bool, String, u16, Vec<u8>),
358    RemoteSet(Arc<Body>, String, u16, Vec<u8>),
359}
360
361enum CPMSessionState {
362    Negotiating(CPMSessionNegotiationState),
363    Negotiated(
364        Arc<Body>,
365        Arc<Body>,
366        Option<ClientSocket>,
367        bool,
368        String,
369        u16,
370        Vec<u8>,
371        String,
372        u16,
373        Vec<u8>,
374    ),
375    Starting(
376        Arc<Body>,
377        Arc<Body>,
378        String,
379        u16,
380        String,
381        u16,
382        // Arc<MsrpChannel>,
383        mpsc::Sender<CPMMessageParam>,
384    ),
385    // Started(
386    //     Arc<Body>,
387    //     Arc<Body>,
388    //     String,
389    //     u16,
390    //     String,
391    //     u16,
392    //     Arc<MsrpTransport>,
393    // ),
394    // also
395    //  there is no Re-negotiating for CPM Session
396}
397
398pub struct CPMSession {
399    session_info: CPMSessionInfo,
400    session_state: Arc<Mutex<CPMSessionState>>,
401}
402
403impl CPMSession {
404    pub fn new(session_info: CPMSessionInfo) -> CPMSession {
405        CPMSession {
406            session_info,
407            session_state: Arc::new(Mutex::new(CPMSessionState::Negotiating(
408                CPMSessionNegotiationState::NoneSet,
409            ))),
410        }
411    }
412
413    pub fn get_session_info(&self) -> &CPMSessionInfo {
414        &self.session_info
415    }
416
417    pub fn set_local_sdp(
418        &self,
419        l_sdp: Arc<Body>,
420        cs: ClientSocket,
421        tls: bool,
422        laddr: String,
423        lport: u16,
424        lpath: Vec<u8>,
425    ) -> Result<(), &'static str> {
426        let mut guard = self.session_state.lock().unwrap();
427        match &mut *guard {
428            CPMSessionState::Negotiating(ref mut negotiation_state) => match negotiation_state {
429                CPMSessionNegotiationState::NoneSet => {
430                    *guard = CPMSessionState::Negotiating(CPMSessionNegotiationState::LocalSet(
431                        l_sdp,
432                        Some(cs),
433                        tls,
434                        laddr,
435                        lport,
436                        lpath,
437                    ));
438                    Ok(())
439                }
440                CPMSessionNegotiationState::LocalSet(_, _, _, _, _, _) => Err("local already set"),
441                CPMSessionNegotiationState::RemoteSet(
442                    ref r_sdp,
443                    ref raddr,
444                    ref rport,
445                    ref mut rpath,
446                ) => {
447                    let mut path = Vec::new();
448                    path.append(rpath);
449                    *guard = CPMSessionState::Negotiated(
450                        l_sdp,
451                        Arc::clone(r_sdp),
452                        Some(cs),
453                        tls,
454                        laddr,
455                        lport,
456                        lpath,
457                        String::from(raddr),
458                        *rport,
459                        path,
460                    );
461                    Ok(())
462                }
463            },
464
465            _ => Err("already negotiated"),
466        }
467    }
468
469    pub fn set_remote_sdp(
470        &self,
471        r_sdp: Arc<Body>,
472        raddr: String,
473        rport: u16,
474        rpath: Vec<u8>,
475    ) -> Result<(), &'static str> {
476        let mut guard = self.session_state.lock().unwrap();
477        match &mut *guard {
478            CPMSessionState::Negotiating(ref mut negotiation_state) => match negotiation_state {
479                CPMSessionNegotiationState::NoneSet => {
480                    *guard = CPMSessionState::Negotiating(CPMSessionNegotiationState::RemoteSet(
481                        r_sdp, raddr, rport, rpath,
482                    ));
483                    Ok(())
484                }
485                CPMSessionNegotiationState::LocalSet(
486                    ref l_sdp,
487                    ref mut sock,
488                    ref tls,
489                    ref laddr,
490                    ref lport,
491                    ref mut lpath,
492                ) => {
493                    let mut path = Vec::new();
494                    path.append(lpath);
495                    *guard = CPMSessionState::Negotiated(
496                        Arc::clone(l_sdp),
497                        r_sdp,
498                        sock.take(),
499                        *tls,
500                        String::from(laddr),
501                        *lport,
502                        path,
503                        raddr,
504                        rport,
505                        rpath,
506                    );
507                    Ok(())
508                }
509                CPMSessionNegotiationState::RemoteSet(_, _, _, _) => Err("remote already set"),
510            },
511
512            _ => Err("already negotiated"),
513        }
514    }
515
516    pub fn start<MRL, F>(
517        &self,
518        public_user_identity: &str,
519        message_receive_listener: MRL,
520        connect_function: &Arc<
521            dyn Fn(
522                    ClientSocket,
523                    &String,
524                    u16,
525                    bool,
526                ) -> Pin<
527                    Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>,
528                > + Send
529                + Sync
530                + 'static,
531        >,
532        session_end_function: F,
533        rt: &Arc<Runtime>,
534    ) -> Result<mpsc::Sender<CPMMessageParam>, ()>
535    where
536        MRL: Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync + 'static,
537        F: Fn() + Send + Sync + 'static,
538    {
539        let mut guard = self.session_state.lock().unwrap();
540        match &mut *guard {
541            CPMSessionState::Negotiated(
542                ref l_sdp,
543                ref r_sdp,
544                ref mut sock,
545                ref tls,
546                ref laddr,
547                ref lport,
548                ref mut lpath,
549                ref raddr,
550                ref rport,
551                ref mut rpath,
552            ) => {
553                if let Some(sock) = sock.take() {
554                    let mut from_path = Vec::with_capacity(lpath.len());
555                    from_path.append(lpath);
556                    let mut to_path = Vec::with_capacity(rpath.len());
557                    to_path.append(rpath);
558                    let from_path_ = from_path.clone();
559                    let to_path_ = to_path.clone();
560                    let demuxer = MsrpDemuxer::new(from_path_, to_path_);
561                    let demuxer = Arc::new(demuxer);
562                    let demuxer_ = Arc::clone(&demuxer);
563                    let message_receive_listener = Arc::new(message_receive_listener);
564                    let muxer = MsrpMuxer::new(CPMSessionMessageReceiver {
565                        callback: Arc::new(move |cpim_info, content_type, content_body| {
566                            message_receive_listener(cpim_info, content_type, content_body)
567                        }),
568                    });
569
570                    let from_path_ = from_path.clone();
571                    let to_path_ = to_path.clone();
572
573                    let mut session_channel =
574                        MsrpChannel::new(from_path_, to_path_, demuxer, muxer);
575                    // let session_channel = Arc::new(session_channel);
576                    // let session_channel_ = Arc::clone(&session_channel);
577
578                    let addr = String::from(raddr);
579                    let port = *rport;
580                    let tls = *tls;
581
582                    let l_sdp = Arc::clone(l_sdp);
583                    // let l_sdp_ = Arc::clone(&l_sdp);
584                    let r_sdp = Arc::clone(r_sdp);
585                    // let r_sdp_ = Arc::clone(&r_sdp);
586
587                    let laddr = String::from(laddr);
588                    // let laddr_ = String::from(&laddr);
589                    let lport = *lport;
590                    let raddr = String::from(raddr);
591                    // let raddr_ = String::from(&raddr);
592                    let rport = *rport;
593
594                    let (message_tx, mut message_rx) = mpsc::channel::<CPMMessageParam>(8);
595                    let message_tx_ = message_tx.clone();
596
597                    *guard = CPMSessionState::Starting(
598                        l_sdp, r_sdp, laddr, lport, raddr, rport,
599                        // session_channel,
600                        message_tx,
601                    );
602
603                    let t_id = String::from(public_user_identity);
604                    let public_user_identity = String::from(public_user_identity);
605
606                    let rt_ = Arc::clone(rt);
607
608                    // let session_state = Arc::clone(&self.session_state);
609
610                    let future = connect_function(sock, &addr, port, tls);
611
612                    rt.spawn(async move {
613                        if let Ok(cs) = future.await {
614                            // let transport = MsrpTransportWrapper::new(
615                            //     cs,
616                            //     t_id,
617                            //     move |message_chunk| session_channel.on_message(message_chunk),
618                            //     move |transport| session_end_function(),
619                            //     &rt_,
620                            // );
621
622                            // let t = transport.get_transport();
623
624                            // let mut guard = session_state.lock().unwrap();
625
626                            // *guard = CPMSessionState::Started(l_sdp_, r_sdp_, laddr_, lport, raddr_, rport, t);
627
628                            let (data_tx, data_rx) = mpsc::channel(8);
629                            let data_tx_ = data_tx.clone();
630
631                            msrp_transport_start(
632                                cs,
633                                t_id,
634                                data_tx_,
635                                data_rx,
636                                move |message_chunk| session_channel.on_message(message_chunk),
637                                move || session_end_function(),
638                                &rt_,
639                            );
640
641                            let data_tx_ = data_tx.clone();
642
643                            let (chunk_tx, mut chunk_rx) = mpsc::channel::<MsrpChunk>(16);
644
645                            rt_.spawn(async move {
646                                while let Some(chunk) = chunk_rx.recv().await {
647                                    let size = chunk.estimated_size();
648                                    let mut data = Vec::with_capacity(size);
649
650                                    {
651                                        let mut readers = Vec::new();
652                                        chunk.get_readers(&mut readers);
653                                        match DynamicChain::new(readers).read_to_end(&mut data) {
654                                            Ok(_) => {}
655                                            Err(_) => {} // to-do: early failure
656                                        }
657                                    }
658
659                                    match data_tx_.send(Some(data)).await {
660                                        Ok(()) => {}
661                                        Err(e) => {}
662                                    }
663                                }
664                            });
665
666                            while let Some(message) = message_rx.recv().await {
667                                let content_type = b"message/cpim".to_vec();
668                                let content_type = Some(content_type);
669
670                                let body = make_cpim_message_content_body(
671                                    &message.message_type,
672                                    &message.message_content,
673                                    &message.recipient_type,
674                                    &message.recipient_uri,
675                                    Uuid::new_v4(), // to-do: should be provided by user
676                                    &public_user_identity,
677                                );
678
679                                let size = body.estimated_size();
680
681                                let mut data = Vec::with_capacity(size);
682                                match body.reader() {
683                                    Ok(mut reader) => {
684                                        match reader.read_to_end(&mut data) {
685                                            Ok(_) => {}
686                                            Err(_) => {} // to-do: early failure
687                                        }
688                                    }
689                                    Err(_) => {} // to-do: early failure
690                                }
691
692                                let from_path_ = from_path.clone();
693                                let to_path_ = to_path.clone();
694
695                                let chunk_tx = chunk_tx.clone();
696
697                                demuxer_.start(
698                                    from_path_,
699                                    to_path_,
700                                    content_type,
701                                    size,
702                                    VectorReader::new(data),
703                                    move |status_code, reason_phrase| {
704                                        (message.message_result_callback)(
705                                            status_code,
706                                            reason_phrase,
707                                        );
708                                    },
709                                    chunk_tx,
710                                    &rt_,
711                                );
712                            }
713                        }
714                    });
715
716                    return Ok(message_tx_);
717                }
718            }
719            _ => {}
720        }
721
722        Err(())
723    }
724
725    pub fn send_message(&self, message: CPMMessageParam, rt: &Arc<Runtime>) {
726        let guard = self.session_state.lock().unwrap();
727        match &*guard {
728            CPMSessionState::Negotiating(_) => todo!(),
729            CPMSessionState::Negotiated(_, _, _, _, _, _, _, _, _, _) => todo!(),
730            CPMSessionState::Starting(_, _, _, _, _, _, message_tx) => {
731                let message_tx = message_tx.clone();
732                rt.spawn(async move {
733                    match message_tx.send(message).await {
734                        Ok(()) => {}
735                        Err(e) => {
736                            (e.0.message_result_callback)(500, String::from("Internal Error"))
737                        }
738                    }
739                });
740            }
741        }
742    }
743}
744
745struct CPMSessionCPIMMessageWriter {
746    message_id: Vec<u8>,
747    message_buf: Vec<u8>,
748    message_callback: Arc<dyn Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync>,
749}
750
751impl MsrpDataWriter for CPMSessionCPIMMessageWriter {
752    fn write(&mut self, data: &[u8]) -> Result<usize, (u16, &'static str)> {
753        self.message_buf.extend_from_slice(data);
754        Ok(data.len())
755    }
756
757    fn complete(&mut self) -> Result<(), (u16, &'static str)> {
758        match Body::construct_message(&self.message_buf) {
759            Ok(body) => match CPIMMessage::try_from(&body) {
760                Ok(cpim_message) => match cpim_message.get_info() {
761                    Ok(cpim_message_info) => {
762                        if let Some((content_type, content_body, base64_encoded)) =
763                            cpim_message.get_message_body()
764                        {
765                            let content_type: &[u8] = match content_type {
766                                Some(content_type) => content_type,
767                                None => b"text/plain",
768                            };
769
770                            if base64_encoded {
771                                if let Ok(decoded_content_body) =
772                                    general_purpose::STANDARD.decode(content_body)
773                                {
774                                    (self.message_callback)(
775                                        &cpim_message_info,
776                                        content_type,
777                                        &decoded_content_body,
778                                    );
779                                    return Ok(());
780                                }
781                            } else {
782                                (self.message_callback)(
783                                    &cpim_message_info,
784                                    content_type,
785                                    content_body,
786                                );
787                                return Ok(());
788                            }
789                        }
790
791                        Err((400, "failed to get content body"))
792                    }
793
794                    Err(e) => Err((400, e)),
795                },
796
797                Err(e) => Err((400, e)),
798            },
799
800            Err(e) => Err((400, e)),
801        }
802    }
803}
804
805struct CPMSessionMessageReceiver {
806    callback: Arc<dyn Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync>,
807}
808
809impl MsrpMessageReceiver for CPMSessionMessageReceiver {
810    fn on_message(
811        &mut self,
812        message_id: &[u8],
813        content_type: &[u8],
814    ) -> Result<Box<dyn MsrpDataWriter + Send + Sync>, (u16, &'static str)> {
815        if content_type.eq_ignore_ascii_case(b"message/cpim") {
816            // to-do: should check accept-types
817            let cb = Arc::clone(&self.callback);
818            Ok(Box::new(CPMSessionCPIMMessageWriter {
819                message_id: message_id.to_vec(),
820                message_buf: Vec::with_capacity(1024),
821                message_callback: cb,
822            }))
823        } else {
824            Err((415, "Unsupported Media Type"))
825        }
826    }
827}
828
829pub enum CPMGroupEvent {
830    OnInvite,
831    OnJoined,
832}
833
834pub struct CPMSessionService {
835    ongoing_sessions: Arc<Mutex<Vec<(ContactKnownIdentities, Arc<SipSession<CPMSession>>)>>>,
836    ongoing_incoming_invitations: Arc<
837        Mutex<
838            Vec<
839                Arc<(
840                    ContactKnownIdentities,
841                    mpsc::Sender<CPMSessionInvitationResponse>,
842                )>,
843            >,
844        >,
845    >,
846    ongoing_outgoing_invitations:
847        Arc<Mutex<Vec<Arc<(ContactKnownIdentities, mpsc::Sender<CPMMessageParam>)>>>>,
848
849    registered_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
850
851    message_receive_listener:
852        Arc<dyn Fn(&Arc<SipSession<CPMSession>>, &[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>,
853
854    msrp_socket_allocator_function: Arc<
855        dyn Fn(
856                Option<&MsrpInfo>,
857            )
858                -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
859            + Send
860            + Sync
861            + 'static,
862    >,
863
864    msrp_socket_connect_function: Arc<
865        dyn Fn(
866                ClientSocket,
867                &String,
868                u16,
869                bool,
870            )
871                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
872            + Send
873            + Sync
874            + 'static,
875    >,
876
877    conversation_invite_handler_function:
878        Box<dyn Fn(bool, &str, &str, &str, oneshot::Receiver<()>) -> u16 + Send + Sync + 'static>,
879
880    group_invite_handler_function: Box<
881        dyn Fn(&str, &str, &str, &str, &str, &str, oneshot::Receiver<()>) -> u16
882            + Send
883            + Sync
884            + 'static,
885    >,
886    group_invite_event_listener_function: Box<dyn Fn(CPMGroupEvent) + Send + Sync + 'static>,
887}
888
889impl CPMSessionService {
890    pub fn new<MRL, MAF, MCF, CF, GF, GL>(
891        message_receive_listener: MRL,
892        msrp_socket_allocator_function: MAF,
893        msrp_socket_connect_function: MCF,
894        conversation_invite_handler_function: CF,
895        group_invite_handler_function: GF,
896        group_invite_event_listener_function: GL,
897    ) -> CPMSessionService
898    where
899        MRL: Fn(&Arc<SipSession<CPMSession>>, &[u8], &CPIMInfo, &[u8], &[u8])
900            + Send
901            + Sync
902            + 'static,
903        MAF: Fn(
904                Option<&MsrpInfo>,
905            )
906                -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
907            + Send
908            + Sync
909            + 'static,
910        MCF: Fn(
911                ClientSocket,
912                &String,
913                u16,
914                bool,
915            )
916                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
917            + Send
918            + Sync
919            + 'static,
920        CF: Fn(bool, &str, &str, &str, oneshot::Receiver<()>) -> u16 + Send + Sync + 'static,
921        GF: Fn(&str, &str, &str, &str, &str, &str, oneshot::Receiver<()>) -> u16
922            + Send
923            + Sync
924            + 'static,
925        GL: Fn(CPMGroupEvent) + Send + Sync + 'static,
926    {
927        CPMSessionService {
928            ongoing_sessions: Arc::new(Mutex::new(Vec::new())),
929            ongoing_incoming_invitations: Arc::new(Mutex::new(Vec::new())),
930            ongoing_outgoing_invitations: Arc::new(Mutex::new(Vec::new())),
931            registered_public_identity: Arc::new(Mutex::new(None)),
932            message_receive_listener: Arc::new(message_receive_listener),
933            msrp_socket_allocator_function: Arc::new(msrp_socket_allocator_function),
934            msrp_socket_connect_function: Arc::new(msrp_socket_connect_function),
935            conversation_invite_handler_function: Box::new(conversation_invite_handler_function),
936            group_invite_handler_function: Box::new(group_invite_handler_function),
937            group_invite_event_listener_function: Box::new(group_invite_event_listener_function),
938        }
939    }
940
941    pub fn set_registered_public_identity(
942        &self,
943        registered_public_identity: String,
944        sip_instance_id: String,
945        transport: Arc<SipTransport>,
946    ) {
947        (*self.registered_public_identity.lock().unwrap()).replace((
948            transport,
949            registered_public_identity,
950            sip_instance_id,
951        ));
952    }
953
954    pub fn accept_and_open_session(&self, recipient: &str, rt: &Arc<Runtime>) {
955        let guard = self.ongoing_incoming_invitations.lock().unwrap();
956        for invitation in &*guard {
957            let (invitation, tx) = invitation.as_ref();
958            if invitation == recipient {
959                let tx = tx.clone();
960                rt.spawn(async move {
961                    match tx.send(CPMSessionInvitationResponse::Accept).await {
962                        Ok(()) => {}
963                        Err(e) => {}
964                    }
965                });
966                return;
967            }
968        }
969    }
970
971    pub fn send_message<F>(
972        &self,
973        message_type: &str,
974        message_content: &str,
975        recipient: &str,
976        recipient_type: &RecipientType,
977        recipient_uri: &str,
978        message_result_callback: F,
979        core: &Arc<SipCore>,
980        // ctrl_itf: SipTransactionManagerControlInterface,
981        rt: &Arc<Runtime>,
982    ) where
983        F: FnOnce(u16, String) + Send + Sync + 'static,
984    {
985        let guard = self.ongoing_sessions.lock().unwrap();
986
987        for (known_identities, sip_session) in &*guard {
988            if known_identities == recipient {
989                sip_session.mark_session_active();
990
991                let cpm_session = sip_session.get_inner();
992
993                let message_result_callback = Arc::new(Mutex::new(Some(message_result_callback)));
994                let message_result_callback = move |status_code, reason_phrase| {
995                    let mut guard = message_result_callback.lock().unwrap();
996                    if let Some(message_result_callback) = guard.take() {
997                        message_result_callback(status_code, reason_phrase);
998                    }
999                };
1000
1001                let message = CPMMessageParam::new(
1002                    String::from(message_type),
1003                    String::from(message_content),
1004                    recipient,
1005                    recipient_type,
1006                    recipient_uri,
1007                    message_result_callback,
1008                );
1009
1010                cpm_session.send_message(message, rt);
1011
1012                return;
1013            }
1014        }
1015
1016        let guard = self.ongoing_outgoing_invitations.lock().unwrap();
1017
1018        for invitation in &*guard {
1019            let (known_identities, message_tx) = invitation.as_ref();
1020            if known_identities == recipient {
1021                let message_result_callback = Arc::new(Mutex::new(Some(message_result_callback)));
1022                let message_result_callback = move |status_code, reason_phrase| {
1023                    let mut guard = message_result_callback.lock().unwrap();
1024                    if let Some(message_result_callback) = guard.take() {
1025                        message_result_callback(status_code, reason_phrase);
1026                    }
1027                };
1028
1029                let message = CPMMessageParam::new(
1030                    String::from(message_type),
1031                    String::from(message_content),
1032                    recipient,
1033                    recipient_type,
1034                    recipient_uri,
1035                    message_result_callback,
1036                );
1037                let message_tx = message_tx.clone();
1038                rt.spawn(async move {
1039                    match message_tx.send(message).await {
1040                        Ok(()) => {}
1041                        Err(e) => {
1042                            (e.0.message_result_callback)(403, String::from("Forbidden"));
1043                        }
1044                    }
1045                });
1046                return;
1047            }
1048        }
1049
1050        if let Ok((sock, host, port, tls, active_setup, ipv6)) =
1051            (self.msrp_socket_allocator_function)(None)
1052        {
1053            let path_random = create_raw_alpha_numeric_string(16);
1054            let path_random = std::str::from_utf8(&path_random).unwrap();
1055            let path = if tls {
1056                format!("msrps://{}:{}/{};tcp", &host, port, path_random)
1057            } else {
1058                format!("msrp://{}:{}/{};tcp", &host, port, path_random)
1059            };
1060
1061            // to-do: accepted types for Group message and others
1062
1063            let path = path.into_bytes();
1064
1065            let msrp_info: MsrpInfo = MsrpInfo {
1066                protocol: if tls { b"TCP/TLS/MSRP" } else { b"TCP/MSRP" },
1067                address: host.as_bytes(),
1068                interface_type: if ipv6 {
1069                    MsrpInterfaceType::IPv6
1070                } else {
1071                    MsrpInterfaceType::IPv4
1072                },
1073                port,
1074                path: &path,
1075                inactive: false,
1076                direction: MsrpDirection::SendReceive,
1077                accept_types: if let RecipientType::Chatbot = recipient_type {
1078                    b"message/cpim"
1079                } else {
1080                    b"message/cpim application/im-iscomposing+xm"
1081                },
1082                setup_method: if active_setup {
1083                    MsrpSetupMethod::Active
1084                } else {
1085                    MsrpSetupMethod::Passive
1086                },
1087                accept_wrapped_types: if let RecipientType::Chatbot = recipient_type {
1088                    Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml application/vnd.gsma.botmessage.v1.0+json application/vnd.gsma.botsuggestion.v1.0+json application/commontemplate+xml")
1089                } else {
1090                    Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml")
1091                },
1092                file_info: None,
1093            };
1094
1095            let sdp = String::from(msrp_info);
1096            let sdp = sdp.into_bytes();
1097            let sdp = Body::Raw(sdp);
1098            let sdp = Arc::new(sdp);
1099
1100            let guard = self.registered_public_identity.lock().unwrap();
1101
1102            if let Some((transport, contact_identity, instance_id)) = &*guard {
1103                let mut invite_message = SipMessage::new_request(INVITE, recipient.as_bytes());
1104
1105                let call_id = String::from(
1106                    Uuid::new_v4()
1107                        .as_hyphenated()
1108                        .encode_lower(&mut Uuid::encode_buffer()),
1109                );
1110                invite_message.add_header(Header::new(b"Call-ID", call_id));
1111
1112                invite_message.add_header(Header::new(b"CSeq", b"1 INVITE"));
1113
1114                let tag = create_raw_alpha_numeric_string(8);
1115                let tag = String::from_utf8_lossy(&tag);
1116
1117                invite_message.add_header(Header::new(
1118                    b"From",
1119                    format!("<{}>;tag={}", contact_identity, tag),
1120                ));
1121
1122                invite_message.add_header(Header::new(b"To", format!("<{}>", recipient_uri)));
1123
1124                invite_message.add_header(Header::new(
1125                    b"Contact",
1126                    if let RecipientType::Chatbot = recipient_type {
1127                        format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session\";+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot\";+g.gsma.rcs.botversion=\"#=1,#=2\"", contact_identity, instance_id)
1128                    } else {
1129                        format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session\"", contact_identity, instance_id)
1130                    },
1131                ));
1132
1133                invite_message.add_header(Header::new(
1134                    b"Accept-Contact",
1135                    "*;+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.msg\"",
1136                ));
1137
1138                if let RecipientType::Chatbot = recipient_type {
1139                    invite_message.add_header(Header::new(b"Accept-Contact", "*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot\";+g.gsma.rcs.botversion=\"#=1,#=2\";require;explicit"));
1140                }
1141
1142                let conversation_id = String::from(
1143                    Uuid::new_v4()
1144                        .as_hyphenated()
1145                        .encode_lower(&mut Uuid::encode_buffer()),
1146                );
1147                invite_message.add_header(Header::new(b"Conversation-ID", conversation_id));
1148
1149                let contribution_id = String::from(
1150                    Uuid::new_v4()
1151                        .as_hyphenated()
1152                        .encode_lower(&mut Uuid::encode_buffer()),
1153                );
1154                invite_message.add_header(Header::new(b"Contribution-ID", contribution_id));
1155
1156                invite_message.add_header(Header::new(
1157                    b"Allow",
1158                    b"NOTIFY, OPTIONS, INVITE, UPDATE, CANCEL, BYE, ACK, MESSAGE",
1159                ));
1160
1161                invite_message.add_header(Header::new(b"Supported", b"timer"));
1162
1163                /*
1164                 * to-do: 3gpp TS 24.229 5.1.3.1
1165                 *
1166                 * If the UE supports the Session Timer extension, the UE shall include the option-tag "timer" in the Supported header field and should either insert a Session-Expires header field with the header field value set to the configured session timer interval value, or should not include the Session-Expires header field in the initial INVITE request. The header field value of the Session-Expires header field may be configured using local configuration or using the Session_Timer_Initial_Interval node specified in 3GPP 24.167 [8G]. If the UE is configured with both the local configuration and the Session_Timer_Initial_Interval node specified in 3GPP 24.167 [8G], then the local configuration shall take precedence.
1167                 * If the UE inserts the Session-Expires header field in the initial INVITE request, the UE may also include the "refresher" parameter with the "refresher" parameter value set to "uac".
1168                 */
1169                invite_message.add_header(Header::new(b"Session-Expires", b"1800"));
1170
1171                invite_message.add_header(Header::new(
1172                    b"P-Preferred-Service",
1173                    b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.session",
1174                ));
1175
1176                invite_message.add_header(Header::new(
1177                    b"P-Preferred-Identity",
1178                    String::from(contact_identity),
1179                ));
1180
1181                invite_message.add_header(Header::new(b"Content-Type", "application/sdp"));
1182
1183                let content_length = sdp.estimated_size();
1184                invite_message.add_header(Header::new(
1185                    b"Content-Length",
1186                    format!("{}", content_length),
1187                ));
1188
1189                let l_sdp_body = Arc::clone(&sdp);
1190                invite_message.set_body(sdp);
1191
1192                let req_headers = invite_message.copy_headers();
1193
1194                let public_user_identity = String::from(contact_identity);
1195
1196                // let (client_transaction_tx, client_transaction_rx) = mpsc::channel::<mpsc::Sender<CPMMessageParam>>(1);
1197                let (client_transaction_tx, mut client_transaction_rx) = mpsc::channel::<
1198                    Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>,
1199                >(1);
1200                let (message_tx, mut message_rx) = mpsc::channel::<CPMMessageParam>(8);
1201                let message_receive_listener = Arc::clone(&self.message_receive_listener);
1202                let message_receive_listener_ = Arc::clone(&message_receive_listener);
1203
1204                let connect_function = Arc::clone(&self.msrp_socket_connect_function);
1205
1206                let message_result_callback = Arc::new(Mutex::new(Some(message_result_callback)));
1207                let message_result_callback = move |status_code, reason_phrase| {
1208                    let mut guard = message_result_callback.lock().unwrap();
1209                    if let Some(message_result_callback) = guard.take() {
1210                        message_result_callback(status_code, reason_phrase);
1211                    }
1212                };
1213
1214                let message = CPMMessageParam::new(
1215                    String::from(message_type),
1216                    String::from(message_content),
1217                    recipient,
1218                    recipient_type,
1219                    recipient_uri,
1220                    message_result_callback,
1221                );
1222
1223                let ongoing_dialogs = core.get_ongoing_dialogs();
1224
1225                let mut known_identities = ContactKnownIdentities::new();
1226                known_identities.add_identity(String::from(recipient));
1227                let outgoing_invitation = Arc::new((known_identities, message_tx));
1228                let outgoing_invitation_ = Arc::clone(&outgoing_invitation);
1229
1230                let ongoing_outgoing_invitations = Arc::clone(&self.ongoing_outgoing_invitations);
1231                let ongoing_sessions = Arc::clone(&self.ongoing_sessions);
1232
1233                let registered_public_identity = Arc::clone(&self.registered_public_identity);
1234
1235                let tm = core.get_transaction_manager();
1236                let tm_ = Arc::clone(&tm);
1237
1238                let rt_ = Arc::clone(rt);
1239
1240                rt.spawn(async move {
1241                    // if let Some(message_tx) = client_transaction_rx.recv().await {
1242                    //     match message_tx.send(message).await {
1243                    //         Ok(()) => {},
1244                    //         Err(e) => {
1245                    //             (e.0.message_result_callback)(403);
1246                    //         },
1247                    //     }
1248
1249                    //     while let Some(message) = message_rx.recv().await {
1250                    //         match message_tx.send(message).await {
1251                    //             Ok(()) => {},
1252                    //             Err(e) => {
1253                    //                 (e.0.message_result_callback)(403);
1254                    //             },
1255                    //         }
1256                    //     }
1257                    // }
1258
1259                    if let Some(res) = client_transaction_rx.recv().await {
1260                        match res {
1261                            Ok((resp_message, session_info, r_sdp_body)) => {
1262                                if let Some(l_sdp) = match l_sdp_body.as_ref() {
1263                                    Body::Raw(raw) => {
1264                                        raw.as_sdp()
1265                                    }
1266                                    _ => None,
1267                                } {
1268                                    if let Some(r_sdp) = match r_sdp_body.as_ref() {
1269                                        Body::Raw(raw) => {
1270                                            raw.as_sdp()
1271                                        }
1272                                        _ => None,
1273                                    } {
1274                                        if let Some(r_msrp_info) = r_sdp.as_msrp_info() {
1275                                            if let Ok(raddr) = std::str::from_utf8(r_msrp_info.address) {
1276                                                let raddr = String::from(raddr);
1277                                                let rport = r_msrp_info.port;
1278                                                let rpath = r_msrp_info.path.to_vec();
1279
1280                                                let asserted_contact_uri = session_info.asserted_contact_uri.clone();
1281                                                let asserted_contact_uri_ = asserted_contact_uri.clone();
1282                                                let known_identities = session_info.get_contact_known_identities();
1283                                                let cpm_session = CPMSession::new(session_info);
1284
1285                                                if let Ok(_) = cpm_session.set_local_sdp(l_sdp_body, sock, tls, host, port, path) {
1286                                                    if let Ok(_) = cpm_session.set_remote_sdp(r_sdp_body, raddr, rport, rpath) {
1287                                                        let cpm_session = Arc::new(cpm_session);
1288                                                        let cpm_session_ = Arc::clone(&cpm_session);
1289
1290                                                        let (d_tx, mut d_rx) = mpsc::channel(1);
1291
1292                                                        let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
1293
1294                                                        rt_.spawn(async move {
1295                                                            if let Some(dialog) = d_rx.recv().await {
1296                                                                ongoing_dialogs_.remove_dialog(&dialog);
1297                                                            }
1298                                                        });
1299
1300                                                        if let Ok(dialog) = SipDialog::try_new_as_uac(&req_headers, &resp_message, move |d| {
1301                                                            match d_tx.blocking_send(d) {
1302                                                                Ok(()) => {},
1303                                                                Err(e) => {},
1304                                                            }
1305                                                        }) {
1306                                                            let dialog = Arc::new(dialog);
1307
1308                                                            ongoing_dialogs.add_dialog(&dialog);
1309
1310                                                            let (sess_tx, mut sess_rx) = mpsc::channel::<SipSessionEvent>(8);
1311
1312                                                            let sip_session = SipSession::new(&cpm_session, SipSessionEventReceiver {
1313                                                                tx: sess_tx,
1314                                                                rt: Arc::clone(&rt_),
1315                                                            });
1316
1317                                                            let sip_session = Arc::new(sip_session);
1318                                                            let sip_session_ = Arc::clone(&sip_session);
1319
1320                                                            let public_user_identity_ = public_user_identity.clone();
1321
1322                                                            sip_session.setup_confirmed_dialog(&dialog, CPMSessionDialogEventReceiver {
1323                                                                public_user_identity: public_user_identity_,
1324                                                                sip_session: Arc::clone(&sip_session),
1325                                                                ongoing_sessions: Arc::clone(&ongoing_sessions),
1326                                                                message_receive_listener: Arc::new(move |cpim_info, content_type, content_body| {
1327                                                                    message_receive_listener_(&sip_session_, asserted_contact_uri_.as_bytes(), cpim_info, content_type, content_body)
1328                                                                }),
1329                                                                msrp_socket_connect_function: Arc::clone(
1330                                                                    &connect_function,
1331                                                                ),
1332                                                                rt: Arc::clone(&rt_),
1333                                                            });
1334
1335                                                            if let Some((timeout, refresher)) =
1336                                                            choose_timeout_on_client_transaction_completion(None, &resp_message)
1337                                                            {
1338                                                                match refresher {
1339                                                                    Refresher::UAC => sip_session.schedule_refresh(timeout, true, &rt_),
1340
1341                                                                    Refresher::UAS => sip_session.schedule_refresh(timeout, false, &rt_),
1342                                                                }
1343                                                            }
1344
1345                                                            let sip_session_1 = Arc::clone(&sip_session);
1346                                                            let sip_session_2 = Arc::clone(&sip_session);
1347
1348                                                            let registered_public_identity_ = Arc::clone(&registered_public_identity);
1349
1350                                                            let tm = Arc::clone(&tm_);
1351                                                            let rt = Arc::clone(&rt_);
1352
1353                                                            rt_.spawn(async move {
1354                                                                while let Some(ev) = sess_rx.recv().await {
1355                                                                    match ev {
1356                                                                        SipSessionEvent::ShouldRefresh(dialog) => {
1357                                                                            if let Ok(mut message) =
1358                                                                                dialog.make_request(b"UPDATE", None)
1359                                                                            {
1360                                                                                message.add_header(Header::new(
1361                                                                                    b"Supported",
1362                                                                                    b"timer",
1363                                                                                ));
1364
1365                                                                                let guard =
1366                                                                                registered_public_identity_.lock().unwrap();
1367
1368                                                                                if let Some((transport, _, _)) = &*guard {
1369                                                                                    tm.send_request(
1370                                                                                        message,
1371                                                                                        transport,
1372                                                                                        UpdateMessageCallbacks {
1373                                                                                            // session_expires: None,
1374                                                                                            dialog,
1375                                                                                            sip_session: Arc::clone(&sip_session_1),
1376                                                                                            rt: Arc::clone(&rt),
1377                                                                                        },
1378                                                                                        &rt,
1379                                                                                    );
1380                                                                                }
1381                                                                            }
1382                                                                        }
1383
1384                                                                        SipSessionEvent::Expired(dialog) => {
1385                                                                            if let Ok(message) = dialog.make_request(b"BYE", None) {
1386                                                                                let guard =
1387                                                                                registered_public_identity_.lock().unwrap();
1388
1389                                                                                if let Some((transport, _, _)) = &*guard {
1390                                                                                    tm.send_request(
1391                                                                                        message,
1392                                                                                        transport,
1393                                                                                        ClientTransactionNilCallbacks {},
1394                                                                                        &rt,
1395                                                                                    );
1396                                                                                }
1397                                                                            }
1398                                                                        }
1399
1400                                                                        _ => {}
1401                                                                    }
1402                                                                }
1403                                                            });
1404
1405                                                            if let Ok(ack_message) = dialog.make_request(b"ACK", Some(1)) {
1406
1407                                                                let rt = Arc::clone(&rt_);
1408
1409                                                                if let Some(transport) = {
1410                                                                    let i;
1411                                                                    {
1412                                                                        let guard = registered_public_identity.lock().unwrap();
1413
1414                                                                        if let Some((transport, _, _)) = &*guard {
1415                                                                            i = Some(Arc::clone(transport))
1416                                                                        } else {
1417                                                                            i = None
1418                                                                        }
1419                                                                    }
1420                                                                    i
1421                                                                } {
1422                                                                    tm_.send_request(
1423                                                                        ack_message,
1424                                                                        &transport,
1425                                                                        ClientTransactionNilCallbacks {},
1426                                                                        &rt_,
1427                                                                    );
1428
1429                                                                    let ongoing_sessions_ = Arc::clone(&ongoing_sessions);
1430
1431                                                                    if let Ok(message_tx) = cpm_session.start(&public_user_identity, move |cpim_info, content_type, mesage_buf| {
1432                                                                        message_receive_listener(&sip_session_2, asserted_contact_uri.as_bytes(), cpim_info, content_type, mesage_buf)
1433                                                                    }, &connect_function, move || {
1434                                                                        let mut guard = ongoing_sessions_.lock().unwrap();
1435                                                                        if let Some(idx) = guard.iter().position(|(_, sip_session)| {
1436                                                                            let inner = sip_session.get_inner();
1437                                                                            Arc::ptr_eq(&inner, &cpm_session_)
1438                                                                        }) {
1439                                                                            guard.swap_remove(idx);
1440                                                                        }
1441                                                                    }, &rt) {
1442
1443                                                                        match message_tx.send(message).await {
1444                                                                            Ok(()) => {},
1445                                                                            Err(e) => {
1446                                                                                (e.0.message_result_callback)(403, String::from("Forbidden"));
1447                                                                            },
1448                                                                        }
1449
1450                                                                        while let Some(message) = message_rx.recv().await {
1451                                                                            match message_tx.send(message).await {
1452                                                                                Ok(()) => {},
1453                                                                                Err(e) => {
1454                                                                                    (e.0.message_result_callback)(403, String::from("Forbidden"));
1455                                                                                },
1456                                                                            }
1457                                                                        }
1458
1459                                                                        let mut guard = ongoing_sessions.lock().unwrap();
1460
1461                                                                        guard.push((known_identities, sip_session));
1462
1463                                                                        let mut guard = ongoing_outgoing_invitations.lock().unwrap();
1464
1465                                                                        if let Some(idx) = guard.iter().position(|outgoing_invitation| Arc::ptr_eq(outgoing_invitation, &outgoing_invitation_)) {
1466                                                                            guard.swap_remove(idx);
1467                                                                        }
1468
1469                                                                        return;
1470                                                                    }
1471                                                                }
1472                                                            }
1473                                                        }
1474                                                    }
1475                                                }
1476                                            }
1477                                        }
1478                                    }
1479                                }
1480                            }
1481
1482                            Err((error_code, error_reason)) => {
1483                                (message.message_result_callback)(error_code, error_reason);
1484                                return;
1485                            }
1486                        }
1487                    }
1488
1489                    (message.message_result_callback)(500, String::from("Internal Error"));
1490                });
1491
1492                tm.send_request(
1493                    invite_message,
1494                    transport,
1495                    CPMSessionInviteCallbacks {
1496                        // l_sdp,
1497                        recipient_type: RecipientType::from(recipient_type),
1498                        client_transaction_tx,
1499                        // message_result_callback: Box::new(message_result_callback),
1500                        // message_receive_listener: Arc::clone(&self.message_receive_listener),
1501                        rt: Arc::clone(rt),
1502                    },
1503                    rt,
1504                );
1505
1506                self.ongoing_outgoing_invitations
1507                    .lock()
1508                    .unwrap()
1509                    .push(outgoing_invitation);
1510                return;
1511            }
1512        }
1513
1514        (message_result_callback)(500, String::from("Internal Error"));
1515    }
1516}
1517
1518struct CPMSessionInviteCallbacks {
1519    // l_sdp: Arc<Body>,
1520    recipient_type: RecipientType,
1521    client_transaction_tx:
1522        mpsc::Sender<Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>>,
1523    // client_transaction_tx: mpsc::Sender<mpsc::Sender<CPMMessageParam>>,
1524    // message_result_callback: Box<dyn Fn(u16) + Send + Sync>,
1525    // message_receive_listener: Arc<dyn Fn(&[u8], &[u8], &[u8]) + Send + Sync>,
1526    rt: Arc<Runtime>,
1527}
1528
1529impl ClientTransactionCallbacks for CPMSessionInviteCallbacks {
1530    fn on_provisional_response(&self, message: SipMessage) {}
1531
1532    fn on_final_response(&self, message: SipMessage) {
1533        if let SipMessage::Response(l, _, _) = &message {
1534            let mut status_code = 500;
1535            let mut reason_phrase = String::from("Internal Error");
1536            if l.status_code >= 200 && l.status_code < 300 {
1537                if let Some(session_info) = get_cpm_session_info_from_message(&message, true) {
1538                    loop {
1539                        let accepting_chatbot_session_is_forbidden =
1540                            if let RecipientType::Chatbot = self.recipient_type {
1541                                match session_info.cpm_contact.service_type {
1542                                    CPMServiceType::Chatbot => false,
1543                                    _ => true,
1544                                }
1545                            } else {
1546                                false
1547                            };
1548
1549                        if accepting_chatbot_session_is_forbidden {
1550                            status_code = 606;
1551                            reason_phrase = String::from("Not Acceptable");
1552                            break;
1553                        }
1554
1555                        if let Some(resp_body) = message.get_body() {
1556                            // let mut sdp: Option<Sdp> = None;
1557
1558                            if let Some(headers) = message.headers() {
1559                                if let Some(header) = header::search(headers, b"Content-Type", true)
1560                                {
1561                                    if let Some(content_type) =
1562                                        header.get_value().as_header_field().as_content_type()
1563                                    {
1564                                        if content_type
1565                                            .major_type
1566                                            .eq_ignore_ascii_case(b"application")
1567                                            && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1568                                        {
1569                                            // match resp_body.as_ref() {
1570                                            //     Body::Raw(raw) => {
1571                                            //         sdp = raw.as_sdp();
1572                                            //     }
1573                                            //     _ => {}
1574                                            // }
1575                                            let client_transaction_tx =
1576                                                self.client_transaction_tx.clone();
1577                                            let sdp_body = Arc::clone(&resp_body);
1578                                            self.rt.spawn(async move {
1579                                                match client_transaction_tx
1580                                                    .send(Ok((message, session_info, sdp_body)))
1581                                                    .await
1582                                                {
1583                                                    Ok(()) => {}
1584                                                    Err(e) => {}
1585                                                }
1586                                            });
1587
1588                                            return;
1589                                        }
1590                                    }
1591                                }
1592                            }
1593
1594                            // let l_sdp_body = Arc::clone(&self.l_sdp);
1595                            // let r_sdp_body = resp_body;
1596
1597                            // if let Some(sdp) = sdp {
1598                            //     if let Some(msrp_info) = sdp.as_msrp_info() {
1599                            //         if let Some(l_sdp) = match l_sdp_body.as_ref() {
1600                            //             Body::Raw(raw) => {
1601                            //                 raw.as_sdp()
1602                            //             }
1603                            //             _ => None,
1604                            //         } {
1605                            //             if let Some(l_msrp_info) = l_sdp.as_msrp_info() {
1606                            //                 let message_receive_listener = Arc::clone(&self.message_receive_listener);
1607                            //                 let cpm_session = CPMSession::new(session_info, move |content_type, mesage_buf| {
1608                            //                     message_receive_listener("", content_type, mesage_buf)
1609                            //                 });
1610
1611                            //                 if let Ok(_) = cpm_session.set_local_sdp(l_sdp_body, sock, tls, laddr, lport, lpath) {
1612                            //                     if let Ok(_) = cpm_session.set_remote_sdp(r_sdp_body, raddr, rport, rpath) {
1613
1614                            //                         if let Ok(message_tx) = cpm_session.start(connect_function, session_end_function, &self.rt) {
1615                            //                             let client_transaction_tx = self.client_transaction_tx.clone();
1616
1617                            //                             self.rt.spawn(async move {
1618                            //                                 client_transaction_tx.send(message_tx).await;
1619                            //                             });
1620
1621                            //                             return;
1622                            //                         }
1623                            //                     }
1624                            //                 }
1625                            //             }
1626                            //         }
1627                            //     }
1628                            // }
1629                        }
1630                        break;
1631                    }
1632                }
1633            } else {
1634                status_code = l.status_code;
1635                reason_phrase = String::from_utf8_lossy(&l.reason_phrase).to_string();
1636            }
1637
1638            let client_transaction_tx = self.client_transaction_tx.clone();
1639            self.rt.spawn(async move {
1640                match client_transaction_tx
1641                    .send(Err((status_code, reason_phrase)))
1642                    .await
1643                {
1644                    Ok(()) => {}
1645                    Err(e) => {}
1646                }
1647            });
1648        }
1649    }
1650
1651    fn on_transport_error(&self) {
1652        todo!()
1653    }
1654}
1655
1656pub(crate) struct UpdateMessageCallbacks<T> {
1657    // session_expires: Option<Header>,
1658    pub(crate) dialog: Arc<SipDialog>,
1659    pub(crate) sip_session: Arc<SipSession<T>>,
1660    pub(crate) rt: Arc<Runtime>,
1661}
1662
1663impl<T> ClientTransactionCallbacks for UpdateMessageCallbacks<T> {
1664    fn on_provisional_response(&self, message: SipMessage) {}
1665
1666    fn on_final_response(&self, message: SipMessage) {
1667        if let SipMessage::Response(l, _, _) = &message {
1668            if l.status_code >= 200 && l.status_code < 300 {
1669                if let Some((timeout, refresher)) =
1670                    choose_timeout_on_client_transaction_completion(None, &message)
1671                {
1672                    match refresher {
1673                        Refresher::UAC => {
1674                            self.sip_session.schedule_refresh(timeout, true, &self.rt)
1675                        }
1676
1677                        Refresher::UAS => {
1678                            self.sip_session.schedule_refresh(timeout, false, &self.rt)
1679                        }
1680                    }
1681                }
1682            } else {
1683                if l.status_code == 404
1684                    || l.status_code == 410
1685                    || l.status_code == 416
1686                    || (l.status_code >= 482 && l.status_code <= 485)
1687                    || l.status_code == 489
1688                    || l.status_code == 604
1689                {
1690                    self.dialog.on_terminating_response(&message, &self.rt);
1691                }
1692            }
1693        }
1694    }
1695
1696    fn on_transport_error(&self) {}
1697}
1698
1699pub(crate) struct CPMSessionDialogEventReceiver {
1700    pub(crate) sip_session: Arc<SipSession<CPMSession>>,
1701    pub(crate) ongoing_sessions:
1702        Arc<Mutex<Vec<(ContactKnownIdentities, Arc<SipSession<CPMSession>>)>>>,
1703    pub(crate) public_user_identity: String,
1704    pub(crate) message_receive_listener: Arc<dyn Fn(&CPIMInfo, &[u8], &[u8]) + Send + Sync>,
1705    pub(crate) msrp_socket_connect_function: Arc<
1706        dyn Fn(
1707                ClientSocket,
1708                &String,
1709                u16,
1710                bool,
1711            )
1712                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
1713            + Send
1714            + Sync,
1715    >,
1716    pub(crate) rt: Arc<Runtime>,
1717}
1718
1719impl SipDialogEventCallbacks for CPMSessionDialogEventReceiver {
1720    fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {
1721        let ongoing_sessions = Arc::clone(&self.ongoing_sessions);
1722        let sip_session_ = Arc::clone(&self.sip_session);
1723        let session = self.sip_session.get_inner();
1724        let message_receive_listener = Arc::clone(&self.message_receive_listener);
1725        match session.start(
1726            &self.public_user_identity,
1727            move |cpim_info, content_type, content_body| {
1728                message_receive_listener(cpim_info, content_type, content_body)
1729            },
1730            &self.msrp_socket_connect_function,
1731            move || {
1732                let mut guard = ongoing_sessions.lock().unwrap();
1733                if let Some(idx) = guard
1734                    .iter()
1735                    .position(|(_, sip_session)| Arc::ptr_eq(sip_session, &sip_session_))
1736                {
1737                    guard.swap_remove(idx);
1738                }
1739            },
1740            &self.rt,
1741        ) {
1742            Ok(_) => {}
1743            Err(()) => {}
1744        };
1745    }
1746
1747    fn on_new_request(
1748        &self,
1749        transaction: Arc<ServerTransaction>,
1750        tx: mpsc::Sender<ServerTransactionEvent>,
1751        // timer: &Timer,
1752        rt: &Arc<Runtime>,
1753    ) -> Option<(u16, bool)> {
1754        let message = transaction.message();
1755
1756        if let SipMessage::Request(l, req_headers, req_body) = message {
1757            if l.method == UPDATE {
1758                if let Some(headers) = req_headers {
1759                    if let Some(h) = search(headers, b"Supported", true) {
1760                        if h.supports(b"timer") {
1761                            if let Ok(Some((timeout, refresher))) =
1762                                choose_timeout_for_server_transaction_response(
1763                                    &transaction,
1764                                    true,
1765                                    Refresher::UAC,
1766                                )
1767                            {
1768                                // to-do: not always UAC ?
1769                                if let Some(mut resp_message) = server_transaction::make_response(
1770                                    message,
1771                                    transaction.to_tag(),
1772                                    200,
1773                                    b"Ok",
1774                                ) {
1775                                    match refresher {
1776                                        Refresher::UAC => {
1777                                            resp_message.add_header(Header::new(
1778                                                b"Session-Expires",
1779                                                format!("{};refresher=uac", timeout),
1780                                            ));
1781
1782                                            self.sip_session.schedule_refresh(timeout, false, rt);
1783                                        }
1784                                        Refresher::UAS => {
1785                                            resp_message.add_header(Header::new(
1786                                                b"Session-Expires",
1787                                                format!("{};refresher=uas", timeout),
1788                                            ));
1789
1790                                            self.sip_session.schedule_refresh(timeout, true, rt);
1791                                        }
1792                                    }
1793
1794                                    server_transaction::send_response(
1795                                        transaction,
1796                                        resp_message,
1797                                        tx,
1798                                        rt,
1799                                    );
1800                                }
1801
1802                                return Some((200, false));
1803                            }
1804                        }
1805                    }
1806
1807                    if let Some(resp_message) =
1808                        server_transaction::make_response(message, transaction.to_tag(), 200, b"Ok")
1809                    {
1810                        server_transaction::send_response(transaction, resp_message, tx, rt);
1811                    }
1812                }
1813
1814                return Some((200, false));
1815            }
1816        }
1817
1818        None
1819    }
1820
1821    fn on_terminating_request(&self, _message: &SipMessage) {
1822        let mut guard = self.ongoing_sessions.lock().unwrap();
1823        if let Some(idx) = guard
1824            .iter()
1825            .position(|(_, sip_session)| Arc::ptr_eq(&sip_session, &self.sip_session))
1826        {
1827            guard.swap_remove(idx);
1828        }
1829    }
1830
1831    fn on_terminating_response(&self, message: &SipMessage) {}
1832}
1833
1834pub struct CPMSessionServiceWrapper {
1835    pub service: Arc<CPMSessionService>,
1836    pub tm: Arc<SipTransactionManager>,
1837}
1838
1839impl TransactionHandler for CPMSessionServiceWrapper {
1840    fn handle_transaction(
1841        &self,
1842        transaction: &Arc<ServerTransaction>,
1843        ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
1844        channels: &mut Option<(
1845            mpsc::Sender<ServerTransactionEvent>,
1846            mpsc::Receiver<ServerTransactionEvent>,
1847        )>,
1848        rt: &Arc<Runtime>,
1849    ) -> bool {
1850        let message = transaction.message();
1851        if let SipMessage::Request(req_line, Some(req_headers), Some(req_body)) = message {
1852            if req_line.method == INVITE {
1853                let mut is_cpm_session_invitation = false;
1854
1855                for header in req_headers {
1856                    if header.get_name().eq_ignore_ascii_case(b"Accept-Contact") {
1857                        if header.get_value().as_header_field().contains_icsi_ref(
1858                            b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session",
1859                        ) {
1860                            is_cpm_session_invitation = true;
1861                            break;
1862                        }
1863                    }
1864                }
1865
1866                if is_cpm_session_invitation {
1867                    let mut sdp = None;
1868
1869                    for header in req_headers {
1870                        if header.get_name().eq_ignore_ascii_case(b"Content-Type") {
1871                            if let Some(content_type) =
1872                                header.get_value().as_header_field().as_content_type()
1873                            {
1874                                if content_type.major_type.eq_ignore_ascii_case(b"application")
1875                                    && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1876                                {
1877                                    match req_body.as_ref() {
1878                                        Body::Raw(raw) => {
1879                                            sdp = raw.as_sdp();
1880                                        }
1881                                        _ => {}
1882                                    }
1883                                }
1884                            }
1885                        }
1886                    }
1887
1888                    if let Some(sdp) = sdp {
1889                        if let Some(msrp_info) = sdp.as_msrp_info() {
1890                            if let Some(session_info) =
1891                                get_cpm_session_info_from_message(message, false)
1892                            {
1893                                let auto_accept_response_code: u16;
1894
1895                                let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
1896
1897                                match session_info.cpm_contact.service_type {
1898                                    CPMServiceType::Group => loop {
1899                                        if let (
1900                                            Some(conversation_id),
1901                                            Some(contribution_id),
1902                                            Some(subject),
1903                                            Some(referred_by),
1904                                        ) = (
1905                                            &session_info.conversation_id,
1906                                            &session_info.contribution_id,
1907                                            &session_info.subject,
1908                                            &session_info.referred_by,
1909                                        ) {
1910                                            if let Ok(subject) = urlencoding::decode(subject) {
1911                                                if let Some(referred_by) = referred_by
1912                                                    .as_bytes()
1913                                                    .as_name_addresses()
1914                                                    .first()
1915                                                {
1916                                                    if let Some(referred_by_name) =
1917                                                        referred_by.display_name
1918                                                    {
1919                                                        if let Ok(referred_by_name) =
1920                                                            std::str::from_utf8(referred_by_name)
1921                                                        {
1922                                                            if let Some(uri_part) =
1923                                                                &referred_by.uri_part
1924                                                            {
1925                                                                if let Ok(referred_by_uri) =
1926                                                                    std::str::from_utf8(
1927                                                                        uri_part.uri,
1928                                                                    )
1929                                                                {
1930                                                                    match msrp_info.direction {
1931                                                                            MsrpDirection::SendReceive => {
1932                                                                                auto_accept_response_code = (self.service.group_invite_handler_function)(&session_info.asserted_contact_uri, conversation_id, contribution_id, &subject, referred_by_name, referred_by_uri, cancel_rx);
1933                                                                                break;
1934                                                                            }
1935
1936                                                                            _ => {
1937                                                                                (self.service.group_invite_event_listener_function)(CPMGroupEvent::OnInvite);
1938                                                                                return false;
1939                                                                            }
1940                                                                        }
1941                                                                }
1942                                                            }
1943                                                        }
1944                                                    }
1945                                                }
1946                                            }
1947                                        }
1948
1949                                        return false;
1950                                    },
1951
1952                                    _ => {
1953                                        let mut is_chatbot_session = false;
1954
1955                                        for header in req_headers {
1956                                            if header
1957                                                .get_name()
1958                                                .eq_ignore_ascii_case(b"Accept-Contact")
1959                                            {
1960                                                if header.get_value().as_header_field().contains_iari_ref(b"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot") {
1961                                                    is_chatbot_session = true;
1962                                                    break;
1963                                                }
1964                                            }
1965                                        }
1966
1967                                        if is_chatbot_session {
1968                                            match session_info.cpm_contact.service_type {
1969                                                CPMServiceType::Chatbot => {}
1970                                                _ => {
1971                                                    if let Some(resp_message) =
1972                                                        server_transaction::make_response(
1973                                                            message,
1974                                                            transaction.to_tag(),
1975                                                            606,
1976                                                            b"Not Acceptable",
1977                                                        )
1978                                                    {
1979                                                        if let Some((tx, _)) = channels.take() {
1980                                                            server_transaction::send_response(
1981                                                                Arc::clone(transaction),
1982                                                                resp_message,
1983                                                                tx,
1984                                                                // &timer,
1985                                                                rt,
1986                                                            );
1987                                                        }
1988                                                    }
1989
1990                                                    return true;
1991                                                }
1992                                            }
1993
1994                                            // to-do: check privacy settings and return 606 if appropriate
1995                                        }
1996
1997                                        loop {
1998                                            if let (Some(conversation_id), Some(contribution_id)) = (
1999                                                &session_info.conversation_id,
2000                                                &session_info.contribution_id,
2001                                            ) {
2002                                                match msrp_info.direction {
2003                                                    MsrpDirection::SendOnly => {
2004                                                        if session_info.is_deferred_session {
2005                                                            auto_accept_response_code = (self.service.conversation_invite_handler_function)(true, &session_info.asserted_contact_uri, contribution_id, conversation_id, cancel_rx);
2006                                                            break;
2007                                                        }
2008                                                    }
2009
2010                                                    MsrpDirection::SendReceive => {
2011                                                        auto_accept_response_code = (self
2012                                                            .service
2013                                                            .conversation_invite_handler_function)(
2014                                                            false,
2015                                                            &session_info.asserted_contact_uri,
2016                                                            &contribution_id,
2017                                                            &conversation_id,
2018                                                            cancel_rx,
2019                                                        );
2020                                                        break;
2021                                                    }
2022
2023                                                    _ => {}
2024                                                }
2025                                            }
2026
2027                                            return false;
2028                                        }
2029                                    }
2030                                }
2031
2032                                if auto_accept_response_code >= 200
2033                                    && auto_accept_response_code < 300
2034                                {
2035                                    if let Some((tx, rx)) = channels.take() {
2036                                        try_accept_invitation(
2037                                            session_info,
2038                                            tx,
2039                                            rx,
2040                                            None,
2041                                            req_body,
2042                                            msrp_info,
2043                                            &self.service.msrp_socket_allocator_function,
2044                                            &self.service.msrp_socket_connect_function,
2045                                            &self.service.message_receive_listener,
2046                                            transaction,
2047                                            message,
2048                                            &self.service.registered_public_identity,
2049                                            ongoing_dialogs,
2050                                            &self.service.ongoing_sessions,
2051                                            &self.tm,
2052                                            rt,
2053                                        )
2054                                    } else {
2055                                        // 500
2056                                    }
2057
2058                                    return true;
2059                                } else if auto_accept_response_code >= 100
2060                                    && auto_accept_response_code < 200
2061                                {
2062                                    if let Some((tx, mut rx)) = channels.take() {
2063                                        if let Some(mut resp_message) =
2064                                            server_transaction::make_response(
2065                                                message,
2066                                                transaction.to_tag(),
2067                                                180,
2068                                                b"Ringing",
2069                                            )
2070                                        {
2071                                            resp_message.add_header(Header::new(b"Allow", b"NOTIFY, OPTIONS, INVITE, UPDATE, CANCEL, BYE, ACK, MESSAGE"));
2072
2073                                            {
2074                                                let guard = self
2075                                                    .service
2076                                                    .registered_public_identity
2077                                                    .lock()
2078                                                    .unwrap();
2079
2080                                                if let Some((
2081                                                    transport,
2082                                                    contact_identity,
2083                                                    instance_id,
2084                                                )) = &*guard
2085                                                {
2086                                                    let transport_ = transaction.transport();
2087                                                    if Arc::ptr_eq(transport, transport_) {
2088                                                        resp_message.add_header(Header::new(
2089                                                            b"Contact",
2090                                                            format!(
2091                                                                "<{}>;+sip.instance=\"{}\"",
2092                                                                contact_identity, instance_id
2093                                                            ),
2094                                                        ));
2095                                                    } // to-do: treat as error if transport has changed somehow
2096                                                }
2097                                            }
2098
2099                                            let (d_tx, mut d_rx) = mpsc::channel(1);
2100
2101                                            let ongoing_dialogs_ = Arc::clone(ongoing_dialogs);
2102
2103                                            rt.spawn(async move {
2104                                                if let Some(dialog) = d_rx.recv().await {
2105                                                    ongoing_dialogs_.remove_dialog(&dialog);
2106                                                }
2107                                            });
2108
2109                                            if let Ok(dialog) = SipDialog::try_new_as_uas(
2110                                                message,
2111                                                &resp_message,
2112                                                move |d| match d_tx.blocking_send(d) {
2113                                                    Ok(()) => {}
2114                                                    Err(e) => {}
2115                                                },
2116                                            ) {
2117                                                let dialog = Arc::new(dialog);
2118
2119                                                ongoing_dialogs.add_dialog(&dialog);
2120
2121                                                // dialog.register_transaction(Arc::clone(transaction));
2122
2123                                                let (tx_, rx_) =
2124                                                    mpsc::channel::<ServerTransactionEvent>(8);
2125
2126                                                let known_identities =
2127                                                    session_info.get_contact_known_identities();
2128
2129                                                let invitation = CPMSessionInvitation::new(
2130                                                    session_info,
2131                                                    Arc::clone(&req_body),
2132                                                    Arc::clone(&transaction),
2133                                                    tx.clone(),
2134                                                    rx_,
2135                                                    dialog,
2136                                                );
2137
2138                                                let (iv_tx, mut iv_rx) =
2139                                                    mpsc::channel::<CPMSessionInvitationResponse>(
2140                                                        1,
2141                                                    );
2142                                                let iv_tx_ = iv_tx.clone();
2143                                                let iv = Arc::new((known_identities, iv_tx_));
2144                                                let iv_ = Arc::clone(&iv);
2145
2146                                                let message_receive_listener = Arc::clone(
2147                                                    &self.service.message_receive_listener,
2148                                                );
2149                                                let msrp_socket_allocator_function = Arc::clone(
2150                                                    &self.service.msrp_socket_allocator_function,
2151                                                );
2152                                                let msrp_socket_connect_function = Arc::clone(
2153                                                    &self.service.msrp_socket_connect_function,
2154                                                );
2155                                                let registered_public_identity = Arc::clone(
2156                                                    &self.service.registered_public_identity,
2157                                                );
2158                                                let ongoing_dialogs = Arc::clone(ongoing_dialogs);
2159                                                let ongoing_sessions =
2160                                                    Arc::clone(&self.service.ongoing_sessions);
2161
2162                                                let ongoing_incoming_invitations_ = Arc::clone(
2163                                                    &self.service.ongoing_incoming_invitations,
2164                                                );
2165
2166                                                let tm_ = Arc::clone(&self.tm);
2167                                                let rt_ = Arc::clone(rt);
2168
2169                                                rt.spawn(async move {
2170                                                    match iv_rx.recv().await {
2171                                                        Some(
2172                                                            CPMSessionInvitationResponse::Accept,
2173                                                        ) => {
2174                                                            try_accept_hanging_invitation(
2175                                                                invitation,
2176                                                                &msrp_socket_allocator_function,
2177                                                                &msrp_socket_connect_function,
2178                                                                &message_receive_listener,
2179                                                                &registered_public_identity,
2180                                                                &ongoing_dialogs,
2181                                                                &ongoing_sessions,
2182                                                                &tm_,
2183                                                                &rt_,
2184                                                            );
2185                                                        }
2186                                                        Some(
2187                                                            CPMSessionInvitationResponse::Dispose,
2188                                                        ) => match cancel_tx.send(()) {
2189                                                            Ok(()) => {}
2190                                                            Err(()) => {}
2191                                                        },
2192                                                        _ => {}
2193                                                    }
2194
2195                                                    let mut guard = ongoing_incoming_invitations_
2196                                                        .lock()
2197                                                        .unwrap();
2198                                                    if let Some(idx) = guard
2199                                                        .iter()
2200                                                        .position(|iv| Arc::ptr_eq(iv, &iv_))
2201                                                    {
2202                                                        guard.swap_remove(idx);
2203                                                    }
2204                                                });
2205
2206                                                {
2207                                                    self.service
2208                                                        .ongoing_incoming_invitations
2209                                                        .lock()
2210                                                        .unwrap()
2211                                                        .push(iv);
2212                                                }
2213
2214                                                rt.spawn(async move {
2215                                                    while let Some(ev) = rx.recv().await {
2216                                                        match ev {
2217                                                            ServerTransactionEvent::Cancelled => {
2218                                                                match iv_tx.send(CPMSessionInvitationResponse::Dispose).await {
2219                                                                    Ok(()) => {},
2220                                                                    Err(e) => {},
2221                                                                }
2222                                                            }
2223                                                            _ => {}
2224                                                        }
2225
2226                                                        match tx_.send(ev).await {
2227                                                            Ok(()) => {}
2228                                                            Err(e) => {}
2229                                                        }
2230                                                    }
2231                                                });
2232
2233                                                server_transaction::send_response(
2234                                                    Arc::clone(transaction),
2235                                                    resp_message,
2236                                                    tx,
2237                                                    // &timer,
2238                                                    rt,
2239                                                );
2240
2241                                                return true;
2242                                            }
2243                                        }
2244                                    }
2245
2246                                    return true;
2247                                }
2248                            }
2249
2250                            return false;
2251                        }
2252                    }
2253
2254                    if let Some(resp_message) = server_transaction::make_response(
2255                        message,
2256                        transaction.to_tag(),
2257                        400,
2258                        b"Bad Request",
2259                    ) {
2260                        if let Some((tx, _)) = channels.take() {
2261                            server_transaction::send_response(
2262                                Arc::clone(transaction),
2263                                resp_message,
2264                                tx,
2265                                // &timer,
2266                                rt,
2267                            );
2268                        }
2269                    }
2270
2271                    return true;
2272                }
2273            }
2274        }
2275
2276        false
2277    }
2278}