rust_rcs_client/messaging/cpm/
standalone_messaging.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate base64;
16
17use std::io::Read;
18use std::pin::Pin;
19use std::sync::{Arc, Mutex};
20
21use base64::{engine::general_purpose, Engine as _};
22use futures::Future;
23use rust_rcs_core::ffi::log::platform_log;
24use rust_rcs_core::internet::body::multipart_body::MultipartBody;
25use rust_rcs_core::internet::body::VectorReader;
26use rust_rcs_core::io::network::stream::{ClientSocket, ClientStream};
27use rust_rcs_core::msrp::info::msrp_info_reader::AsMsrpInfo;
28use rust_rcs_core::msrp::info::{MsrpDirection, MsrpInfo, MsrpInterfaceType, MsrpSetupMethod};
29use rust_rcs_core::msrp::msrp_channel::MsrpChannel;
30use rust_rcs_core::msrp::msrp_demuxer::MsrpDemuxer;
31use rust_rcs_core::msrp::msrp_muxer::{MsrpDataWriter, MsrpMessageReceiver, MsrpMuxer};
32use rust_rcs_core::msrp::msrp_transport::msrp_transport_start;
33use rust_rcs_core::sip::sip_core::SipDialogCache;
34use rust_rcs_core::sip::sip_session::{
35    choose_timeout_for_server_transaction_response, Refresher, SipSession, SipSessionEvent,
36    SipSessionEventReceiver,
37};
38use rust_rcs_core::sip::sip_transaction::client_transaction::ClientTransactionNilCallbacks;
39
40use rust_strict_sdp::{AsSDP, Sdp};
41
42use tokio::runtime::Runtime;
43use tokio::sync::mpsc;
44
45use uuid::Uuid;
46
47use rust_rcs_core::cpim::{CPIMInfo, CPIMMessage};
48
49use rust_rcs_core::internet::body::message_body::MessageBody;
50use rust_rcs_core::internet::header::{search, HeaderSearch};
51use rust_rcs_core::internet::header_field::AsHeaderField;
52use rust_rcs_core::internet::Body;
53use rust_rcs_core::internet::{header, Header};
54
55use rust_rcs_core::internet::headers::{AsContentType, Supported};
56
57use rust_rcs_core::internet::name_addr::AsNameAddr;
58
59use rust_rcs_core::io::{DynamicChain, Serializable};
60use rust_rcs_core::sip::sip_transaction::server_transaction;
61use rust_rcs_core::sip::SipDialog;
62use rust_rcs_core::sip::{ClientTransactionCallbacks, SipDialogEventCallbacks};
63use rust_rcs_core::sip::{ServerTransaction, UPDATE};
64use rust_rcs_core::sip::{ServerTransactionEvent, SipTransactionManager};
65use rust_rcs_core::sip::{SipCore, SipTransport};
66
67use rust_rcs_core::sip::SipMessage;
68use rust_rcs_core::sip::TransactionHandler;
69
70use rust_rcs_core::sip::INVITE;
71use rust_rcs_core::sip::MESSAGE;
72
73use rust_rcs_core::util::rand::{self, create_raw_alpha_numeric_string};
74use rust_rcs_core::util::raw_string::{StrEq, StrFind};
75// use rust_rcs_core::util::timer::Timer;
76
77use crate::chat_bot::cpim::GetBotInfo;
78use crate::messaging::ffi::RecipientType;
79
80use super::session::{get_cpm_session_info_from_message, CPMSessionInfo, UpdateMessageCallbacks};
81use super::sip::cpm_accept_contact::CPMAcceptContact;
82use super::sip::cpm_contact::{AsCPMContact, CPMServiceType};
83use super::{make_cpim_message_content_body, CPMMessageParam};
84
85const LOG_TAG: &str = "messaging";
86
87struct StandaloneLargeModeSendSession {
88    state: Arc<
89        Mutex<(
90            Arc<Body>,
91            Arc<Body>,
92            Option<ClientSocket>,
93            bool,
94            String,
95            u16,
96            Vec<u8>,
97            String,
98            u16,
99            Vec<u8>,
100            Option<mpsc::Sender<Option<Vec<u8>>>>,
101        )>,
102    >,
103
104    message_body: Arc<Body>,
105}
106
107impl StandaloneLargeModeSendSession {
108    pub fn new(
109        l_sdp: Arc<Body>,
110        r_sdp: Arc<Body>,
111        sock: ClientSocket,
112        tls: bool,
113        laddr: String,
114        lport: u16,
115        lpath: Vec<u8>,
116        raddr: String,
117        rport: u16,
118        rpath: Vec<u8>,
119        message_body: Arc<Body>,
120    ) -> StandaloneLargeModeSendSession {
121        StandaloneLargeModeSendSession {
122            state: Arc::new(Mutex::new((
123                l_sdp,
124                r_sdp,
125                Some(sock),
126                tls,
127                laddr,
128                lport,
129                lpath,
130                raddr,
131                rport,
132                rpath,
133                None,
134            ))),
135            message_body,
136        }
137    }
138
139    pub fn start(
140        &self,
141        message_result_callback: &Arc<dyn Fn(u16, String) + Send + Sync + 'static>,
142        connect_function: &Arc<
143            dyn Fn(
144                    ClientSocket,
145                    &String,
146                    u16,
147                    bool,
148                ) -> Pin<
149                    Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>,
150                > + Send
151                + Sync
152                + 'static,
153        >,
154        rt: &Arc<Runtime>,
155    ) {
156        let mut guard = self.state.lock().unwrap();
157        if let Some(sock) = guard.2.take() {
158            let addr = String::from(&guard.7);
159            let port = guard.8;
160            let tls = guard.3;
161
162            let lpath = guard.6.clone();
163            let rpath = guard.9.clone();
164
165            let (data_tx, data_rx) = mpsc::channel(8);
166            let data_tx_ = data_tx.clone();
167
168            let message_result_callback = Arc::clone(message_result_callback);
169            let message_result_callback_ = Arc::clone(&message_result_callback);
170
171            let connect_function = Arc::clone(&connect_function);
172
173            let message_body = Arc::clone(&self.message_body);
174
175            let rt_ = Arc::clone(rt);
176
177            rt.spawn(async move {
178                if let Ok(cs) = connect_function(sock, &addr, port, tls).await {
179                    let t_id = format!("");
180
181                    let from_path = lpath.clone();
182                    let to_path = rpath.clone();
183
184                    let demuxer = MsrpDemuxer::new(from_path, to_path);
185                    let demuxer = Arc::new(demuxer);
186                    let demuxer_ = Arc::clone(&demuxer);
187
188                    let muxer = MsrpMuxer::new(StandaloneLargeModeNilReceiver {});
189
190                    let rt = Arc::clone(&rt_);
191
192                    let from_path = lpath.clone();
193                    let to_path = rpath.clone();
194
195                    let data_tx_ = data_tx.clone();
196
197                    let mut session_channel = MsrpChannel::new(from_path, to_path, demuxer_, muxer);
198
199                    msrp_transport_start(
200                        cs,
201                        t_id,
202                        data_tx_,
203                        data_rx,
204                        move |message_chunk| session_channel.on_message(message_chunk),
205                        move || message_result_callback(0, String::from("Transport Error")),
206                        &rt_,
207                    );
208
209                    let content_type = b"message/cpim".to_vec();
210                    let content_type = Some(content_type);
211
212                    let size = message_body.estimated_size();
213
214                    let mut data = Vec::with_capacity(size);
215                    match message_body.reader() {
216                        Ok(mut reader) => {
217                            match reader.read_to_end(&mut data) {
218                                Ok(_) => {}
219                                Err(_) => {} // to-do: early failure
220                            }
221                        }
222                        Err(_) => {} // to-do: early failure
223                    }
224
225                    let from_path = lpath.clone();
226                    let to_path = rpath.clone();
227
228                    let (chunk_tx, mut chunk_rx) = mpsc::channel(8);
229
230                    demuxer.start(
231                        from_path,
232                        to_path,
233                        content_type,
234                        size,
235                        VectorReader::new(data),
236                        move |status_code, reason_phrase| {
237                            message_result_callback_(status_code, reason_phrase)
238                        },
239                        chunk_tx,
240                        &rt,
241                    );
242
243                    rt.spawn(async move {
244                        while let Some(chunk) = chunk_rx.recv().await {
245                            let size = chunk.estimated_size();
246                            let mut data = Vec::with_capacity(size);
247
248                            {
249                                let mut readers = Vec::new();
250                                chunk.get_readers(&mut readers);
251                                match DynamicChain::new(readers).read_to_end(&mut data) {
252                                    Ok(_) => {}
253                                    Err(_) => {} // to-do: early failure
254                                }
255                            }
256
257                            match data_tx.send(Some(data)).await {
258                                Ok(()) => {}
259                                Err(e) => {}
260                            }
261                        }
262                    });
263                }
264            });
265
266            guard.10.replace(data_tx_);
267        }
268    }
269
270    pub fn stop(&self, rt: &Arc<Runtime>) {
271        let mut guard = self.state.lock().unwrap();
272        if let Some(tx) = guard.10.take() {
273            rt.spawn(async move {
274                match tx.send(None).await {
275                    Ok(()) => {}
276                    Err(e) => {}
277                }
278            });
279        }
280        guard.2.take();
281    }
282}
283
284struct StandaloneLargeModeNilReceiver {}
285
286impl MsrpMessageReceiver for StandaloneLargeModeNilReceiver {
287    fn on_message(
288        &mut self,
289        message_id: &[u8],
290        content_type: &[u8],
291    ) -> Result<Box<dyn MsrpDataWriter + Send + Sync>, (u16, &'static str)> {
292        Err((415, "Unsupported Media Type"))
293    }
294}
295
296struct StandaloneLargeModeReceiveSession {
297    state: Arc<
298        Mutex<(
299            Arc<Body>,
300            Arc<Body>,
301            Option<ClientSocket>,
302            bool,
303            String,
304            u16,
305            Vec<u8>,
306            String,
307            u16,
308            Vec<u8>,
309            Option<mpsc::Sender<Option<Vec<u8>>>>,
310        )>,
311    >,
312
313    contact_uri: Arc<Vec<u8>>,
314
315    sip_cpim_body: Option<Arc<Body>>,
316}
317
318impl StandaloneLargeModeReceiveSession {
319    pub fn new(
320        l_sdp: Arc<Body>,
321        r_sdp: Arc<Body>,
322        cs: ClientSocket,
323        tls: bool,
324        laddr: String,
325        lport: u16,
326        lpath: Vec<u8>,
327        raddr: String,
328        rport: u16,
329        rpath: Vec<u8>,
330        contact_uri: Arc<Vec<u8>>,
331        sip_cpim_body: Option<Arc<Body>>,
332    ) -> StandaloneLargeModeReceiveSession {
333        StandaloneLargeModeReceiveSession {
334            state: Arc::new(Mutex::new((
335                l_sdp,
336                r_sdp,
337                Some(cs),
338                tls,
339                laddr,
340                lport,
341                lpath,
342                raddr,
343                rport,
344                rpath,
345                None,
346            ))),
347            contact_uri,
348            sip_cpim_body,
349        }
350    }
351
352    pub fn start(
353        &self,
354        message_receive_listener: &Arc<
355            dyn Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync + 'static,
356        >,
357        connect_function: &Arc<
358            dyn Fn(
359                    ClientSocket,
360                    &String,
361                    u16,
362                    bool,
363                ) -> Pin<
364                    Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>,
365                > + Send
366                + Sync
367                + 'static,
368        >,
369        rt: &Arc<Runtime>,
370    ) {
371        let mut guard = self.state.lock().unwrap();
372        if let Some(sock) = guard.2.take() {
373            let addr = String::from(&guard.7);
374            let port = guard.8;
375            let tls = guard.3;
376
377            let lpath = guard.6.clone();
378            let rpath = guard.9.clone();
379
380            let sip_cpim_body = if let Some(sip_cpim_body) = &self.sip_cpim_body {
381                Some(Arc::clone(sip_cpim_body))
382            } else {
383                None
384            };
385
386            let (data_tx, data_rx) = mpsc::channel(8);
387            let data_tx_ = data_tx.clone();
388
389            let message_receive_listener = Arc::clone(message_receive_listener);
390
391            let connect_function = Arc::clone(&connect_function);
392
393            let contact_uri = Arc::clone(&self.contact_uri);
394
395            let rt_ = Arc::clone(rt);
396
397            rt.spawn(async move {
398                if let Ok(cs) = connect_function(sock, &addr, port, tls).await {
399                    let t_id = format!("");
400
401                    let from_path = lpath.clone();
402                    let to_path = rpath.clone();
403
404                    let demuxer = MsrpDemuxer::new(from_path, to_path);
405                    let demuxer = Arc::new(demuxer);
406
407                    let muxer = MsrpMuxer::new(StandaloneLargeModeReceiver {
408                        message_receive_listener: Some(Box::new(
409                            move |contact_uri, cpim_info, content_type, content_body| {
410                                message_receive_listener(
411                                    contact_uri,
412                                    cpim_info,
413                                    content_type,
414                                    content_body,
415                                )
416                            },
417                        )),
418                        contact_uri,
419                        sip_cpim_body,
420                    });
421
422                    let mut session_channel = MsrpChannel::new(lpath, rpath, demuxer, muxer);
423
424                    msrp_transport_start(
425                        cs,
426                        t_id,
427                        data_tx_,
428                        data_rx,
429                        move |message_chunk| session_channel.on_message(message_chunk),
430                        || {},
431                        &rt_,
432                    );
433                }
434            });
435
436            guard.10.replace(data_tx);
437        }
438    }
439
440    pub fn stop(&self, rt: &Arc<Runtime>) {
441        let mut guard = self.state.lock().unwrap();
442        if let Some(tx) = guard.10.take() {
443            rt.spawn(async move {
444                match tx.send(None).await {
445                    Ok(()) => {}
446                    Err(e) => {}
447                }
448            });
449        }
450        guard.2.take();
451    }
452}
453
454struct StandaloneLargeModeMessageDataWriter {
455    data: Vec<u8>,
456    contact_uri: Arc<Vec<u8>>,
457    sip_cpim_body: Option<Arc<Body>>,
458    message_receive_listener: Option<Box<dyn FnOnce(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>>,
459}
460
461impl MsrpDataWriter for StandaloneLargeModeMessageDataWriter {
462    fn write(&mut self, data: &[u8]) -> Result<usize, (u16, &'static str)> {
463        self.data.extend_from_slice(data);
464        Ok(data.len())
465    }
466
467    fn complete(&mut self) -> Result<(), (u16, &'static str)> {
468        if let Some(message_receive_listener) = self.message_receive_listener.take() {
469            match Body::construct_message(&self.data) {
470                Ok(body) => match CPIMMessage::try_from(&body) {
471                    Ok(cpim_message) => {
472                        if let Some(sip_cpim_body) = &self.sip_cpim_body {
473                            match CPIMMessage::try_from(sip_cpim_body) {
474                                Ok(sip_cpim_message) => match sip_cpim_message.get_info() {
475                                    Ok(sip_cpim_info) => {
476                                        if let Some((content_type, content_body, base64_encoded)) =
477                                            cpim_message.get_message_body()
478                                        {
479                                            let content_type: &[u8] = match content_type {
480                                                Some(content_type) => content_type,
481                                                None => b"text/plain",
482                                            };
483                                            if base64_encoded {
484                                                if let Ok(decoded_content_body) =
485                                                    general_purpose::STANDARD.decode(content_body)
486                                                {
487                                                    message_receive_listener(
488                                                        &self.contact_uri,
489                                                        &sip_cpim_info,
490                                                        content_type,
491                                                        &decoded_content_body,
492                                                    );
493                                                    return Ok(());
494                                                }
495                                            } else {
496                                                message_receive_listener(
497                                                    &self.contact_uri,
498                                                    &sip_cpim_info,
499                                                    content_type,
500                                                    content_body,
501                                                );
502                                                return Ok(());
503                                            }
504                                        }
505
506                                        Err((400, "failed to get content body"))
507                                    }
508
509                                    Err(e) => Err((400, e)),
510                                },
511
512                                Err(e) => Err((400, e)),
513                            }
514                        } else {
515                            match cpim_message.get_info() {
516                                Ok(cpim_info) => {
517                                    if let Some((content_type, b, e)) =
518                                        cpim_message.get_message_body()
519                                    {
520                                        let content_type: &[u8] = match content_type {
521                                            Some(content_type) => content_type,
522                                            None => b"text/plain",
523                                        };
524
525                                        if e {
526                                            if let Ok(d) = general_purpose::STANDARD.decode(b) {
527                                                message_receive_listener(
528                                                    &self.contact_uri,
529                                                    &cpim_info,
530                                                    content_type,
531                                                    &d,
532                                                );
533                                                return Ok(());
534                                            }
535                                        } else {
536                                            message_receive_listener(
537                                                &self.contact_uri,
538                                                &cpim_info,
539                                                content_type,
540                                                b,
541                                            );
542                                            return Ok(());
543                                        }
544                                    }
545
546                                    Err((400, "failed to get content body"))
547                                }
548
549                                Err(e) => Err((400, e)),
550                            }
551                        }
552                    }
553                    Err(e) => Err((400, e)),
554                },
555                Err(e) => Err((400, e)),
556            }
557        } else {
558            Err((500, "message already received"))
559        }
560    }
561}
562
563struct StandaloneLargeModeReceiver {
564    contact_uri: Arc<Vec<u8>>,
565    sip_cpim_body: Option<Arc<Body>>,
566    message_receive_listener: Option<Box<dyn FnOnce(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>>,
567}
568
569impl MsrpMessageReceiver for StandaloneLargeModeReceiver {
570    fn on_message(
571        &mut self,
572        message_id: &[u8],
573        content_type: &[u8],
574    ) -> Result<Box<dyn MsrpDataWriter + Send + Sync>, (u16, &'static str)> {
575        if let Some(message_receive_listener) = self.message_receive_listener.take() {
576            return Ok(Box::new(StandaloneLargeModeMessageDataWriter {
577                data: Vec::with_capacity(1024),
578                contact_uri: Arc::clone(&self.contact_uri),
579                sip_cpim_body: match &self.sip_cpim_body {
580                    Some(body) => Some(Arc::clone(body)),
581                    None => None,
582                },
583                message_receive_listener: Some(message_receive_listener),
584            }));
585        }
586
587        Err((500, ""))
588    }
589}
590
591pub struct StandaloneMessagingService {
592    registered_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
593    message_receive_listener: Arc<dyn Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>,
594    msrp_socket_allocator_function: Arc<
595        dyn Fn(
596                Option<&MsrpInfo>,
597            )
598                -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
599            + Send
600            + Sync,
601    >,
602    msrp_socket_connect_function: Arc<
603        dyn Fn(
604                ClientSocket,
605                &String,
606                u16,
607                bool,
608            )
609                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
610            + Send
611            + Sync,
612    >,
613}
614
615impl StandaloneMessagingService {
616    pub fn new<MRL, MAF, MCF>(
617        message_receive_listener: MRL,
618        msrp_socket_allocator_function: MAF,
619        msrp_socket_connect_function: MCF,
620    ) -> StandaloneMessagingService
621    where
622        MRL: Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync + 'static,
623        MAF: Fn(
624                Option<&MsrpInfo>,
625            )
626                -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
627            + Send
628            + Sync
629            + 'static,
630        MCF: Fn(
631                ClientSocket,
632                &String,
633                u16,
634                bool,
635            )
636                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
637            + Send
638            + Sync
639            + 'static,
640    {
641        StandaloneMessagingService {
642            registered_public_identity: Arc::new(Mutex::new(None)),
643            message_receive_listener: Arc::new(message_receive_listener),
644            msrp_socket_allocator_function: Arc::new(msrp_socket_allocator_function),
645            msrp_socket_connect_function: Arc::new(msrp_socket_connect_function),
646        }
647    }
648
649    pub fn set_registered_public_identity(
650        &self,
651        registered_public_identity: String,
652        sip_instance_id: String,
653        transport: Arc<SipTransport>,
654    ) {
655        (*self.registered_public_identity.lock().unwrap()).replace((
656            transport,
657            registered_public_identity,
658            sip_instance_id,
659        ));
660    }
661
662    pub fn send_large_mode_message<MRF>(
663        &self,
664        message_type: &str,
665        message_content: &str,
666        recipient: &str,
667        recipient_type: &RecipientType,
668        recipient_uri: &str,
669        message_result_callback: MRF,
670        core: &Arc<SipCore>,
671        rt: &Arc<Runtime>,
672    ) where
673        MRF: FnOnce(u16, String) + Send + Sync + 'static,
674    {
675        match (self.msrp_socket_allocator_function)(None) {
676            Ok((sock, host, port, tls, active_setup, ipv6)) => {
677                let path_random = create_raw_alpha_numeric_string(16);
678                let path_random = std::str::from_utf8(&path_random).unwrap();
679                let path = if tls {
680                    format!("msrps://{}:{}/{};tcp", &host, port, path_random)
681                } else {
682                    format!("msrp://{}:{}/{};tcp", &host, port, path_random)
683                };
684
685                let path = path.into_bytes();
686
687                let msrp_info: MsrpInfo = MsrpInfo {
688                    protocol: if tls { b"TCP/TLS/MSRP" } else { b"TCP/MSRP" },
689                    address: host.as_bytes(),
690                    interface_type: if ipv6 {
691                        MsrpInterfaceType::IPv6
692                    } else {
693                        MsrpInterfaceType::IPv4
694                    },
695                    port,
696                    path: &path,
697                    inactive: false,
698                    direction: MsrpDirection::SendOnly,
699                    accept_types: b"message/cpim",
700                    setup_method: if active_setup {
701                        MsrpSetupMethod::Active
702                    } else {
703                        MsrpSetupMethod::Passive
704                    },
705                    accept_wrapped_types: Some(message_type.as_bytes()),
706                    file_info: None,
707                };
708
709                let sdp = String::from(msrp_info);
710                let sdp_bytes = sdp.into_bytes();
711                let sdp_body = Body::Raw(sdp_bytes);
712
713                if let Some((transport, public_user_identity, instance_id)) =
714                    core.get_default_public_identity()
715                {
716                    let mut invite_message =
717                        SipMessage::new_request(INVITE, recipient_uri.as_bytes());
718
719                    invite_message.add_header(Header::new(
720                        b"Call-ID",
721                        String::from(
722                            Uuid::new_v4()
723                                .as_hyphenated()
724                                .encode_lower(&mut Uuid::encode_buffer()),
725                        ),
726                    ));
727
728                    invite_message.add_header(Header::new(b"CSeq", b"1 INVITE"));
729
730                    let tag = rand::create_raw_alpha_numeric_string(8);
731                    let tag = String::from_utf8_lossy(&tag);
732
733                    invite_message.add_header(Header::new(
734                        b"From",
735                        format!("<{}>;tag={}", public_user_identity, tag),
736                    ));
737
738                    invite_message.add_header(Header::new(b"To", format!("<{}>", recipient_uri)));
739
740                    invite_message.add_header(Header::new(b"Contact", if let RecipientType::Chatbot = recipient_type {
741                        format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\";+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot\";+g.gsma.rcs.botversion=\"#=1,#=2\"", &public_user_identity, &instance_id)
742                    } else {
743                        format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\"", &public_user_identity, &instance_id)
744                    }));
745
746                    invite_message.add_header(Header::new(b"Accept-Contact", format!("*;+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\"")));
747
748                    if message_type.eq_ignore_ascii_case("application/vnd.gsma.rcs-ft-http+xml") {
749                        invite_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.fthttp\";require;explicit"));
750                    }
751
752                    // to-do: build up for other recipient types
753
754                    if let RecipientType::Chatbot = recipient_type {
755                        invite_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot.sa\";+g.gsma.rcs.botversion=\"#=1,#=2\";require;explicit"));
756                    }
757
758                    invite_message.add_header(Header::new(
759                        b"Allow",
760                        b"NOTIFY, OPTIONS, INVITE, UPDATE, CANCEL, BYE, ACK, MESSAGE",
761                    ));
762
763                    invite_message.add_header(Header::new(b"Supported", b"timer"));
764
765                    invite_message.add_header(Header::new(b"Session-Expires", b"1800"));
766
767                    invite_message.add_header(Header::new(
768                        b"P-Preferred-Service",
769                        b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.largemsg",
770                    ));
771
772                    invite_message.add_header(Header::new(
773                        b"P-Preferred-Identity",
774                        String::from(&public_user_identity),
775                    ));
776
777                    let message_imdn_id = Uuid::new_v4();
778
779                    let cpim_body = make_cpim_message_content_body(
780                        message_type,
781                        "",
782                        &recipient_type,
783                        recipient_uri,
784                        message_imdn_id,
785                        &public_user_identity,
786                    );
787
788                    let boundary = create_raw_alpha_numeric_string(16);
789                    let boundary_ = String::from_utf8_lossy(&boundary);
790                    let mut parts =
791                        Vec::with_capacity(if let RecipientType::ResourceList = recipient_type {
792                            3
793                        } else {
794                            2
795                        });
796
797                    if let RecipientType::ResourceList = recipient_type {
798                        let resource_list_body: Vec<u8> = recipient.as_bytes().to_vec();
799
800                        let mut resource_list_headers = Vec::new();
801
802                        let resource_list_length = resource_list_body.len();
803
804                        resource_list_headers.push(Header::new(
805                            b"Content-Type",
806                            b"application/resource-lists+xml",
807                        ));
808                        resource_list_headers
809                            .push(Header::new(b"Content-Disposition", b"recipient-list"));
810                        resource_list_headers.push(Header::new(
811                            b"Content-Length",
812                            format!("{}", resource_list_length),
813                        ));
814
815                        let resource_list_body = Body::Message(MessageBody {
816                            headers: resource_list_headers,
817                            body: Arc::new(Body::Raw(resource_list_body)),
818                        });
819
820                        parts.push(Arc::new(resource_list_body));
821                    }
822
823                    let mut cpim_part_headers = Vec::new();
824                    cpim_part_headers.push(Header::new(b"Content-Type", b"message/cpim"));
825                    let cpim_content_length = cpim_body.estimated_size();
826                    cpim_part_headers.push(Header::new(
827                        b"Content-Length",
828                        format!("{}", cpim_content_length),
829                    ));
830
831                    parts.push(Arc::new(Body::Message(MessageBody {
832                        headers: cpim_part_headers,
833                        body: Arc::new(cpim_body),
834                    })));
835
836                    let mut sdp_part_headers = Vec::new();
837                    sdp_part_headers.push(Header::new(b"Content-Type", b"application/sdp"));
838                    let sdp_content_length = sdp_body.estimated_size();
839                    sdp_part_headers.push(Header::new(
840                        b"Content-Length",
841                        format!("{}", sdp_content_length),
842                    ));
843
844                    let sdp_body = Arc::new(sdp_body);
845                    parts.push(Arc::new(Body::Message(MessageBody {
846                        headers: sdp_part_headers,
847                        body: Arc::clone(&sdp_body),
848                    })));
849
850                    invite_message.add_header(Header::new(
851                        b"Content-Type",
852                        format!("multipart/mixed; boundary={}", boundary_),
853                    ));
854
855                    let multipart = Body::Multipart(MultipartBody { boundary, parts });
856
857                    let content_length = multipart.estimated_size();
858                    invite_message.add_header(Header::new(
859                        b"Content-Length",
860                        format!("{}", content_length),
861                    ));
862
863                    let multipart = Arc::new(multipart);
864                    invite_message.set_body(multipart);
865
866                    let req_headers = invite_message.copy_headers();
867
868                    let message_result_callback =
869                        Arc::new(Mutex::new(Some(message_result_callback)));
870                    let message_result_callback = move |status_code, reason_phrase| {
871                        let mut guard = message_result_callback.lock().unwrap();
872                        if let Some(message_result_callback) = guard.take() {
873                            message_result_callback(status_code, reason_phrase);
874                        }
875                    };
876
877                    let message = CPMMessageParam::new(
878                        String::from(message_type),
879                        String::from(message_content),
880                        recipient,
881                        recipient_type,
882                        recipient_uri,
883                        message_result_callback,
884                    );
885
886                    let (client_transaction_tx, mut client_transaction_rx) =
887                        mpsc::channel::<
888                            Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>,
889                        >(1);
890
891                    let connect_function = Arc::clone(&self.msrp_socket_connect_function);
892
893                    let ongoing_dialogs = core.get_ongoing_dialogs();
894
895                    let core_ = Arc::clone(core);
896                    let rt_ = Arc::clone(rt);
897
898                    rt.spawn(async move {
899                        if let Some(res) = client_transaction_rx.recv().await {
900                            match res {
901                                Ok((resp_message, session_info, r_sdp_body)) => {
902                                    if let Some(r_sdp) = match r_sdp_body.as_ref() {
903                                        Body::Raw(raw) => {
904                                            raw.as_sdp()
905                                        }
906                                        _ => None,
907                                    } {
908                                        if let Some(r_msrp_info) = r_sdp.as_msrp_info() {
909                                            if let Ok(raddr) = std::str::from_utf8(r_msrp_info.address) {
910                                                let raddr = String::from(raddr);
911                                                let rport = r_msrp_info.port;
912                                                let rpath = r_msrp_info.path.to_vec();
913
914                                                let (d_tx, mut d_rx) = mpsc::channel(1);
915
916                                                let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
917
918                                                rt_.spawn(async move {
919                                                    if let Some(dialog) = d_rx.recv().await {
920                                                        ongoing_dialogs_.remove_dialog(&dialog);
921                                                    }
922                                                });
923
924                                                if let Ok(dialog) = SipDialog::try_new_as_uac(&req_headers, &resp_message, move |d| {
925                                                    match d_tx.blocking_send(d) {
926                                                        Ok(()) => {},
927                                                        Err(e) => {},
928                                                    }
929                                                }) {
930                                                    let dialog = Arc::new(dialog);
931
932                                                    let message_body = make_cpim_message_content_body(
933                                                        &message.message_type,
934                                                        &message.message_content,
935                                                        &message.recipient_type,
936                                                        &message.recipient_uri,
937                                                        message_imdn_id,
938                                                        &public_user_identity,
939                                                    );
940                                                    let message_body = Arc::new(message_body);
941
942                                                    let session = StandaloneLargeModeSendSession::new(sdp_body, r_sdp_body, sock, tls, host, port, path, raddr, rport, rpath, message_body);
943
944                                                    let session = Arc::new(session);
945
946                                                    let (sess_tx, mut sess_rx) = mpsc::channel(8);
947
948                                                    let sip_session = SipSession::new(&session, SipSessionEventReceiver {
949                                                        tx: sess_tx,
950                                                        rt: Arc::clone(&rt_),
951                                                    });
952
953                                                    let sip_session = Arc::new(sip_session);
954                                                    let sip_session_ = Arc::clone(&sip_session);
955
956                                                    let core = Arc::clone(&core_);
957                                                    let rt = Arc::clone(&rt_);
958
959                                                    rt_.spawn(async move {
960                                                        let rt_ = Arc::clone(&rt);
961                                                        let core_ = Arc::clone(&core);
962                                                        while let Some(ev) = sess_rx.recv().await {
963                                                            match ev {
964                                                                SipSessionEvent::ShouldRefresh(dialog) => {
965                                                                    if let Ok(mut message) =
966                                                                        dialog.make_request(b"UPDATE", None)
967                                                                    {
968                                                                        message.add_header(Header::new(
969                                                                            b"Supported",
970                                                                            b"timer",
971                                                                        ));
972
973                                                                        if let Some((transport, _, _)) = core_.get_default_public_identity() {
974                                                                            core_.get_transaction_manager().send_request(
975                                                                                message,
976                                                                                &transport,
977                                                                                UpdateMessageCallbacks {
978                                                                                    // session_expires: None,
979                                                                                    dialog,
980                                                                                    sip_session: Arc::clone(&sip_session_),
981                                                                                    rt: Arc::clone(&rt_),
982                                                                                },
983                                                                                &rt_,
984                                                                            );
985                                                                        }
986                                                                    }
987                                                                }
988
989                                                                SipSessionEvent::Expired(dialog) => {
990                                                                    if let Ok(message) = dialog.make_request(b"BYE", None) {
991
992                                                                        if let Some((transport, _, _)) = core_.get_default_public_identity() {
993                                                                            core_.get_transaction_manager().send_request(
994                                                                                message,
995                                                                                &transport,
996                                                                                ClientTransactionNilCallbacks {},
997                                                                                &rt_,
998                                                                            );
999                                                                        }
1000                                                                    }
1001                                                                }
1002
1003                                                                _ => {}
1004                                                            }
1005                                                        }
1006                                                    });
1007
1008                                                    sip_session.setup_confirmed_dialog(&dialog, StandaloneLargeModeSendSessionDialogEventReceiver {
1009                                                        sip_session: Arc::clone(&sip_session),
1010                                                        rt: Arc::clone(&rt_),
1011                                                    });
1012
1013                                                    if let Ok(ack_message) = dialog.make_request(b"ACK", Some(1)) {
1014
1015                                                        if let Some((transport, _, _)) = core_.get_default_public_identity(){
1016
1017                                                            core_.get_transaction_manager().send_request(
1018                                                                ack_message,
1019                                                                &transport,
1020                                                                ClientTransactionNilCallbacks {},
1021                                                                &rt_,
1022                                                            );
1023
1024                                                            session.start(&message.message_result_callback, &connect_function, &rt_);
1025                                                        }
1026                                                    }
1027                                                }
1028                                            }
1029                                        }
1030                                    }
1031                                }
1032
1033                                Err((error_code, error_reason)) => {
1034                                    (message.message_result_callback)(error_code, error_reason);
1035                                    return;
1036                                }
1037                            }
1038                        }
1039
1040                        (message.message_result_callback)(500, String::from("Internal Error"))
1041                    });
1042
1043                    core.get_transaction_manager().send_request(
1044                        invite_message,
1045                        &transport,
1046                        StandaloneLargeModeInviteCallbacks {
1047                            recipient_type: RecipientType::from(recipient_type),
1048                            client_transaction_tx,
1049                            rt: Arc::clone(rt),
1050                        },
1051                        rt,
1052                    );
1053
1054                    return;
1055                }
1056
1057                message_result_callback(408, String::from("Timeout"));
1058            }
1059
1060            Err((error_code, error_reason)) => {
1061                message_result_callback(error_code, String::from(error_reason))
1062            }
1063        }
1064    }
1065}
1066
1067struct StandaloneLargeModeInviteCallbacks {
1068    recipient_type: RecipientType,
1069    client_transaction_tx:
1070        mpsc::Sender<Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>>,
1071    rt: Arc<Runtime>,
1072}
1073
1074impl ClientTransactionCallbacks for StandaloneLargeModeInviteCallbacks {
1075    fn on_provisional_response(&self, _message: SipMessage) {}
1076
1077    fn on_final_response(&self, message: SipMessage) {
1078        if let SipMessage::Response(l, _, _) = &message {
1079            let mut status_code = 500;
1080            let mut reason_phrase = String::from("Internal Error");
1081            if l.status_code >= 200 && l.status_code < 300 {
1082                if let Some(session_info) = get_cpm_session_info_from_message(&message, true) {
1083                    loop {
1084                        let accepting_chatbot_session_is_forbidden =
1085                            if let RecipientType::Chatbot = self.recipient_type {
1086                                match session_info.cpm_contact.service_type {
1087                                    CPMServiceType::Chatbot => false,
1088                                    _ => true,
1089                                }
1090                            } else {
1091                                false
1092                            };
1093
1094                        if accepting_chatbot_session_is_forbidden {
1095                            status_code = 606;
1096                            reason_phrase = String::from("Not Acceptable");
1097                            break;
1098                        }
1099
1100                        if let Some(resp_body) = message.get_body() {
1101                            // let mut sdp: Option<Sdp> = None;
1102
1103                            if let Some(headers) = message.headers() {
1104                                if let Some(header) = header::search(headers, b"Content-Type", true)
1105                                {
1106                                    if let Some(content_type) =
1107                                        header.get_value().as_header_field().as_content_type()
1108                                    {
1109                                        if content_type
1110                                            .major_type
1111                                            .eq_ignore_ascii_case(b"application")
1112                                            && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1113                                        {
1114                                            let client_transaction_tx =
1115                                                self.client_transaction_tx.clone();
1116                                            let sdp_body = Arc::clone(&resp_body);
1117                                            self.rt.spawn(async move {
1118                                                match client_transaction_tx
1119                                                    .send(Ok((message, session_info, sdp_body)))
1120                                                    .await
1121                                                {
1122                                                    Ok(()) => {}
1123                                                    Err(e) => {}
1124                                                }
1125                                            });
1126
1127                                            return;
1128                                        }
1129                                    }
1130                                }
1131                            }
1132                        }
1133
1134                        break;
1135                    }
1136                }
1137            } else {
1138                status_code = l.status_code;
1139                reason_phrase = String::from_utf8_lossy(&l.reason_phrase).to_string();
1140            }
1141
1142            let client_transaction_tx = self.client_transaction_tx.clone();
1143            self.rt.spawn(async move {
1144                match client_transaction_tx
1145                    .send(Err((status_code, reason_phrase)))
1146                    .await
1147                {
1148                    Ok(()) => {}
1149                    Err(e) => {}
1150                }
1151            });
1152        }
1153    }
1154
1155    fn on_transport_error(&self) {
1156        let client_transaction_tx = self.client_transaction_tx.clone();
1157        self.rt.spawn(async move {
1158            match client_transaction_tx
1159                .send(Err((0, String::from("Transport Error"))))
1160                .await
1161            {
1162                Ok(()) => {}
1163                Err(e) => {}
1164            }
1165        });
1166    }
1167}
1168
1169pub struct StandaloneMessagingServiceWrapper {
1170    pub service: Arc<StandaloneMessagingService>,
1171    pub tm: Arc<SipTransactionManager>,
1172}
1173
1174impl TransactionHandler for StandaloneMessagingServiceWrapper {
1175    fn handle_transaction(
1176        &self,
1177        transaction: &Arc<ServerTransaction>,
1178        ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
1179        channels: &mut Option<(
1180            mpsc::Sender<ServerTransactionEvent>,
1181            mpsc::Receiver<ServerTransactionEvent>,
1182        )>,
1183        // timer: &Timer,
1184        rt: &Arc<Runtime>,
1185    ) -> bool {
1186        platform_log(LOG_TAG, "handle_transaction");
1187
1188        let message = transaction.message();
1189        if let SipMessage::Request(req_line, Some(req_headers), Some(req_body)) = message {
1190            if req_line.method == INVITE {
1191                platform_log(LOG_TAG, "is INVITE request");
1192
1193                let mut is_large_mode_cpm_standalone_message = false;
1194
1195                'check_large_mode: for header in
1196                    HeaderSearch::new(req_headers, b"Accept-Contact", true)
1197                {
1198                    let header_field = header.get_value().as_header_field();
1199                    if header_field
1200                        .contains_icsi_ref(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg")
1201                        || header_field.contains_icsi_ref(
1202                            b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.deferred",
1203                        )
1204                    {
1205                        is_large_mode_cpm_standalone_message = true;
1206                        break 'check_large_mode;
1207                    }
1208                }
1209
1210                if is_large_mode_cpm_standalone_message {
1211                    let mut conversation_id: &[u8] = &[];
1212                    let mut contribution_id: &[u8] = &[];
1213
1214                    for header in req_headers {
1215                        if header.get_name().eq_ignore_ascii_case(b"Conversation-ID") {
1216                            conversation_id = header.get_value();
1217                        } else if header.get_name().eq_ignore_ascii_case(b"Contribution-ID") {
1218                            contribution_id = header.get_value();
1219                        }
1220                    }
1221
1222                    if conversation_id.len() == 0 || contribution_id.len() == 0 {
1223                        if let Some(resp_message) = server_transaction::make_response(
1224                            message,
1225                            transaction.to_tag(),
1226                            400,
1227                            b"Bad Request",
1228                        ) {
1229                            if let Some((tx, _)) = channels.take() {
1230                                server_transaction::send_response(
1231                                    Arc::clone(transaction),
1232                                    resp_message,
1233                                    tx,
1234                                    // &timer,
1235                                    rt,
1236                                );
1237                            }
1238                        }
1239
1240                        return true;
1241                    }
1242
1243                    let mut sdp: Option<Sdp> = None;
1244                    let mut sip_cpim_body: Option<Arc<Body>> = None;
1245
1246                    if let Some(header) = search(req_headers, b"Content-Type", true) {
1247                        if let Some(content_type) =
1248                            header.get_value().as_header_field().as_content_type()
1249                        {
1250                            if content_type.major_type.eq_ignore_ascii_case(b"application")
1251                                && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1252                            {
1253                                match req_body.as_ref() {
1254                                    Body::Raw(raw) => {
1255                                        sdp = raw.as_sdp();
1256                                    }
1257                                    _ => {}
1258                                }
1259                            } else if content_type.major_type.eq_ignore_ascii_case(b"multipart")
1260                                && content_type.sub_type.eq_ignore_ascii_case(b"mixed")
1261                            {
1262                                match req_body.as_ref() {
1263                                    Body::Multipart(multipart) => {
1264                                        for part in &multipart.parts {
1265                                            if let Body::Message(body) = part.as_ref() {
1266                                                if let Some(header) =
1267                                                    search(&body.headers, b"Content-Type", true)
1268                                                {
1269                                                    if let Some(content_type) = header
1270                                                        .get_value()
1271                                                        .as_header_field()
1272                                                        .as_content_type()
1273                                                    {
1274                                                        if content_type
1275                                                            .major_type
1276                                                            .eq_ignore_ascii_case(b"application")
1277                                                            && content_type
1278                                                                .sub_type
1279                                                                .eq_ignore_ascii_case(b"sdp")
1280                                                        {
1281                                                            match body.body.as_ref() {
1282                                                                Body::Raw(raw) => {
1283                                                                    sdp = raw.as_sdp();
1284                                                                }
1285                                                                _ => {}
1286                                                            }
1287                                                        } else if content_type
1288                                                            .major_type
1289                                                            .eq_ignore_ascii_case(b"message")
1290                                                            && content_type
1291                                                                .sub_type
1292                                                                .eq_ignore_ascii_case(b"cpim")
1293                                                        {
1294                                                            sip_cpim_body =
1295                                                                Some(Arc::clone(&body.body));
1296                                                        }
1297                                                    }
1298                                                }
1299                                            }
1300                                        }
1301                                    }
1302                                    _ => {}
1303                                }
1304                            }
1305                        }
1306                    }
1307
1308                    if let Some(sdp) = sdp {
1309                        let mut contact_uri: Option<Vec<u8>> = None;
1310
1311                        if let Some(header) = search(req_headers, b"P-Asserted-Identity", true) {
1312                            if let Some(asserted_identity) =
1313                                header.get_value().as_name_addresses().first()
1314                            {
1315                                if let Some(uri_part) = &asserted_identity.uri_part {
1316                                    let data_size = uri_part.estimated_size();
1317                                    let mut data = Vec::with_capacity(data_size);
1318                                    {
1319                                        let mut readers = Vec::new();
1320                                        uri_part.get_readers(&mut readers);
1321                                        match DynamicChain::new(readers).read_to_end(&mut data) {
1322                                            Ok(_) => {}
1323                                            Err(_) => {} // to-do: early failure
1324                                        }
1325                                    }
1326
1327                                    contact_uri.replace(data);
1328                                }
1329                            }
1330                        }
1331
1332                        if contact_uri.is_none() {
1333                            if let Some(sip_cpim_body) = &sip_cpim_body {
1334                                match CPIMMessage::try_from(sip_cpim_body) {
1335                                    Ok(sip_cpim_message) => {
1336                                        if let Ok(sip_cpim_info) = sip_cpim_message.get_info() {
1337                                            if let Some(from_uri) = sip_cpim_info.from_uri {
1338                                                contact_uri = Some(from_uri.string_representation_without_query_and_fragment());
1339                                            }
1340                                        }
1341                                    }
1342                                    Err(e) => {
1343                                        platform_log(LOG_TAG, format!("CPIM decode error: {}", e));
1344                                    }
1345                                }
1346                            }
1347                        }
1348
1349                        if contact_uri.is_none() {
1350                            if let Some(header) = search(req_headers, b"Contact", true) {
1351                                if let Some(cpm_contact) =
1352                                    header.get_value().as_header_field().as_cpm_contact()
1353                                {
1354                                    contact_uri = Some(cpm_contact.service_uri.clone());
1355                                }
1356                            }
1357                        }
1358
1359                        if let Some(contact_uri) = contact_uri {
1360                            let contact_uri = Arc::new(contact_uri);
1361                            if let Some(msrp_info) = sdp.as_msrp_info() {
1362                                match (self.service.msrp_socket_allocator_function)(Some(
1363                                    &msrp_info,
1364                                )) {
1365                                    Ok((sock, host, port, tls, active_setup, ipv6)) => {
1366                                        if let Ok(raddr) = std::str::from_utf8(msrp_info.address) {
1367                                            let raddr = String::from(raddr);
1368                                            let rport = msrp_info.port;
1369                                            let rpath = msrp_info.path.to_vec();
1370
1371                                            let path_random = create_raw_alpha_numeric_string(16);
1372                                            let path_random =
1373                                                std::str::from_utf8(&path_random).unwrap();
1374                                            let path = if tls {
1375                                                format!(
1376                                                    "msrps://{}:{}/{};tcp",
1377                                                    &host, port, path_random
1378                                                )
1379                                            } else {
1380                                                format!(
1381                                                    "msrp://{}:{}/{};tcp",
1382                                                    &host, port, path_random
1383                                                )
1384                                            };
1385
1386                                            let path = path.into_bytes();
1387
1388                                            let l_msrp_info: MsrpInfo = MsrpInfo {
1389                                                protocol: if tls {
1390                                                    b"TCP/TLS/MSRP"
1391                                                } else {
1392                                                    b"TCP/MSRP"
1393                                                },
1394                                                address: host.as_bytes(),
1395                                                interface_type: if ipv6 {
1396                                                    MsrpInterfaceType::IPv6
1397                                                } else {
1398                                                    MsrpInterfaceType::IPv4
1399                                                },
1400                                                port,
1401                                                path: &path,
1402                                                inactive: false,
1403                                                direction: MsrpDirection::ReceiveOnly,
1404                                                accept_types: msrp_info.accept_types,
1405                                                setup_method: if active_setup {
1406                                                    MsrpSetupMethod::Active
1407                                                } else {
1408                                                    MsrpSetupMethod::Passive
1409                                                },
1410                                                accept_wrapped_types: msrp_info
1411                                                    .accept_wrapped_types,
1412                                                file_info: None,
1413                                            };
1414
1415                                            let l_sdp = String::from(l_msrp_info);
1416                                            let l_sdp = l_sdp.into_bytes();
1417                                            let l_sdp = Body::Raw(l_sdp);
1418                                            let l_sdp = Arc::new(l_sdp);
1419
1420                                            let r_sdp = Arc::clone(req_body);
1421
1422                                            if let Some(mut resp_message) =
1423                                                server_transaction::make_response(
1424                                                    message,
1425                                                    transaction.to_tag(),
1426                                                    200,
1427                                                    b"Ok",
1428                                                )
1429                                            {
1430                                                let registered_public_identity = Arc::clone(
1431                                                    &self.service.registered_public_identity,
1432                                                );
1433
1434                                                {
1435                                                    if let Some((
1436                                                        _,
1437                                                        contact_identity,
1438                                                        instance_id,
1439                                                    )) =
1440                                                        &*registered_public_identity.lock().unwrap()
1441                                                    {
1442                                                        resp_message.add_header(Header::new(
1443                                                            b"Contact",
1444                                                            format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\"", contact_identity, instance_id),
1445                                                        ));
1446                                                    }
1447                                                }
1448
1449                                                let (d_tx, mut d_rx) = mpsc::channel(1);
1450
1451                                                let ongoing_dialogs_ = Arc::clone(ongoing_dialogs);
1452
1453                                                rt.spawn(async move {
1454                                                    if let Some(dialog) = d_rx.recv().await {
1455                                                        ongoing_dialogs_.remove_dialog(&dialog);
1456                                                    }
1457                                                });
1458
1459                                                if let Ok(dialog) = SipDialog::try_new_as_uas(
1460                                                    message,
1461                                                    &resp_message,
1462                                                    move |d| match d_tx.blocking_send(d) {
1463                                                        Ok(()) => {}
1464                                                        Err(e) => {}
1465                                                    },
1466                                                ) {
1467                                                    let dialog = Arc::new(dialog);
1468
1469                                                    let session =
1470                                                        StandaloneLargeModeReceiveSession::new(
1471                                                            l_sdp,
1472                                                            r_sdp,
1473                                                            sock,
1474                                                            tls,
1475                                                            host,
1476                                                            port,
1477                                                            path,
1478                                                            raddr,
1479                                                            rport,
1480                                                            rpath,
1481                                                            contact_uri,
1482                                                            sip_cpim_body,
1483                                                        );
1484                                                    let session = Arc::new(session);
1485
1486                                                    let (sess_tx, mut sess_rx) = mpsc::channel(8);
1487
1488                                                    let tm = Arc::clone(&self.tm);
1489
1490                                                    let sip_session = SipSession::new(
1491                                                        &session,
1492                                                        SipSessionEventReceiver {
1493                                                            tx: sess_tx,
1494                                                            rt: Arc::clone(&rt),
1495                                                        },
1496                                                    );
1497
1498                                                    let sip_session = Arc::new(sip_session);
1499                                                    let sip_session_ = Arc::clone(&sip_session);
1500
1501                                                    let rt_ = Arc::clone(&rt);
1502
1503                                                    rt.spawn(async move {
1504                                                        let rt = Arc::clone(&rt_);
1505                                                        while let Some(ev) = sess_rx.recv().await {
1506                                                            match ev {
1507                                                                SipSessionEvent::ShouldRefresh(dialog) => {
1508                                                                    if let Ok(mut message) =
1509                                                                        dialog.make_request(b"UPDATE", None)
1510                                                                    {
1511                                                                        message.add_header(Header::new(
1512                                                                            b"Supported",
1513                                                                            b"timer",
1514                                                                        ));
1515
1516                                                                        let guard =
1517                                                                        registered_public_identity.lock().unwrap();
1518
1519                                                                        if let Some((transport, _, _)) = &*guard {
1520                                                                            tm.send_request(
1521                                                                                message,
1522                                                                                transport,
1523                                                                                UpdateMessageCallbacks {
1524                                                                                    // session_expires: None,
1525                                                                                    dialog,
1526                                                                                    sip_session: Arc::clone(&sip_session_),
1527                                                                                    rt: Arc::clone(&rt),
1528                                                                                },
1529                                                                                &rt,
1530                                                                            );
1531                                                                        }
1532                                                                    }
1533                                                                }
1534
1535                                                                SipSessionEvent::Expired(dialog) => {
1536                                                                    if let Ok(message) = dialog.make_request(b"BYE", None) {
1537                                                                        let guard =
1538                                                                        registered_public_identity.lock().unwrap();
1539
1540                                                                        if let Some((transport, _, _)) = &*guard {
1541                                                                            tm.send_request(
1542                                                                                message,
1543                                                                                transport,
1544                                                                                ClientTransactionNilCallbacks {},
1545                                                                                &rt,
1546                                                                            );
1547                                                                        }
1548                                                                    }
1549                                                                }
1550
1551                                                                _ => {}
1552                                                            }
1553                                                        }
1554                                                    });
1555
1556                                                    sip_session.setup_confirmed_dialog(&dialog, StandaloneLargeModeReceiveSessionDialogEventReceiver {
1557                                                        sip_session: Arc::clone(&sip_session),
1558                                                        message_receive_listener: Arc::clone(&self.service.message_receive_listener),
1559                                                        connect_function: Arc::clone(&self.service.msrp_socket_connect_function),
1560                                                        rt: Arc::clone(&rt),
1561                                                    })
1562                                                }
1563
1564                                                if let Some((tx, mut rx)) = channels.take() {
1565                                                    rt.spawn(async move{
1566                                                        while let Some(ev) = rx.recv().await {
1567                                                            match ev {
1568                                                                ServerTransactionEvent::Cancelled => {
1569                                                                    todo!()
1570                                                                },
1571                                                                _ => {},
1572                                                            }
1573                                                        }
1574                                                    });
1575
1576                                                    server_transaction::send_response(
1577                                                        Arc::clone(transaction),
1578                                                        resp_message,
1579                                                        tx,
1580                                                        // &timer,
1581                                                        rt,
1582                                                    );
1583                                                }
1584                                            }
1585
1586                                            return true;
1587                                        }
1588
1589                                        if let Some(resp_message) =
1590                                            server_transaction::make_response(
1591                                                message,
1592                                                transaction.to_tag(),
1593                                                400,
1594                                                b"Bad Request",
1595                                            )
1596                                        {
1597                                            if let Some((tx, _)) = channels.take() {
1598                                                server_transaction::send_response(
1599                                                    Arc::clone(transaction),
1600                                                    resp_message,
1601                                                    tx,
1602                                                    // &timer,
1603                                                    rt,
1604                                                );
1605                                            }
1606                                        }
1607                                    }
1608
1609                                    Err((error_code, error_phrase)) => {
1610                                        if let Some(resp_message) =
1611                                            server_transaction::make_response(
1612                                                message,
1613                                                transaction.to_tag(),
1614                                                error_code,
1615                                                error_phrase.as_bytes(),
1616                                            )
1617                                        {
1618                                            if let Some((tx, _rx)) = channels.take() {
1619                                                server_transaction::send_response(
1620                                                    Arc::clone(transaction),
1621                                                    resp_message,
1622                                                    tx,
1623                                                    // &timer,
1624                                                    rt,
1625                                                );
1626                                            }
1627                                        }
1628                                    }
1629                                }
1630
1631                                return true;
1632                            }
1633                        }
1634                    }
1635
1636                    if let Some(resp_message) = server_transaction::make_response(
1637                        message,
1638                        transaction.to_tag(),
1639                        400,
1640                        b"Bad Request",
1641                    ) {
1642                        if let Some((tx, _)) = channels.take() {
1643                            server_transaction::send_response(
1644                                Arc::clone(transaction),
1645                                resp_message,
1646                                tx,
1647                                // &timer,
1648                                rt,
1649                            );
1650                        }
1651                    }
1652
1653                    return true;
1654                }
1655            } else if req_line.method == MESSAGE {
1656                platform_log(LOG_TAG, "is MESSAGE request");
1657
1658                if let Some(content_type_header) =
1659                    header::search(req_headers, b"Content-Type", true)
1660                {
1661                    let content_type_header_field =
1662                        content_type_header.get_value().as_header_field();
1663                    if let Some(content_type) = content_type_header_field.as_content_type() {
1664                        let mut is_imdn = false;
1665                        let mut cpim_message: Option<CPIMMessage> = None;
1666
1667                        if content_type.major_type.equals_bytes(b"message", true)
1668                            && content_type.sub_type.equals_bytes(b"cpim", true)
1669                        {
1670                            match CPIMMessage::try_from(req_body) {
1671                                Ok(message) => {
1672                                    cpim_message = Some(message);
1673                                }
1674                                Err(e) => {
1675                                    platform_log(LOG_TAG, format!("CPIM decode error: {}", e));
1676                                }
1677                            }
1678                        }
1679
1680                        if let Some(cpim_message) = &cpim_message {
1681                            is_imdn = cpim_message.contains_imdn();
1682                        }
1683
1684                        let mut is_pager_mode_cpm_standalone_message = false;
1685
1686                        'check_pager_mode: for header in
1687                            HeaderSearch::new(req_headers, b"Accept-Contact", true)
1688                        {
1689                            let header_field = header.get_value().as_header_field();
1690                            for parameter in header_field.get_parameter_iterator() {
1691                                if parameter.name == b"+g.3gpp.icsi-ref" {
1692                                    if let Some(value) = parameter.value {
1693                                        if let Some(_) = value.index_of(
1694                                            b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.msg",
1695                                        ) {
1696                                            is_pager_mode_cpm_standalone_message = true;
1697                                            break 'check_pager_mode;
1698                                        } else if let Some(_) = value.index_of(
1699                                            b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.deferred",
1700                                        ) {
1701                                            is_pager_mode_cpm_standalone_message = true;
1702                                            break 'check_pager_mode;
1703                                        } else if is_imdn {
1704                                            if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg") {
1705                                                is_pager_mode_cpm_standalone_message = true;
1706                                                break 'check_pager_mode;
1707                                            } else if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session") {
1708                                                is_pager_mode_cpm_standalone_message = true;
1709                                                break 'check_pager_mode;
1710                                            } else if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.filetransfer") {
1711                                                is_pager_mode_cpm_standalone_message = true;
1712                                                break 'check_pager_mode;
1713                                            }
1714                                        }
1715                                    }
1716                                } else if parameter.name == b"+g.3gpp.iari-ref" {
1717                                    if let Some(value) = parameter.value {
1718                                        if let Some(_) = value.index_of(
1719                                            b"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.fthttp",
1720                                        ) {
1721                                            is_pager_mode_cpm_standalone_message = true;
1722                                            break 'check_pager_mode;
1723                                        }
1724                                    }
1725                                }
1726                            }
1727                        }
1728
1729                        if is_pager_mode_cpm_standalone_message {
1730                            platform_log(LOG_TAG, "is PAGER MODE");
1731
1732                            let mut is_chatbot_message = false;
1733
1734                            'check_chatbot: for header in
1735                                HeaderSearch::new(req_headers, b"Accept-Contact", true)
1736                            {
1737                                let header_field = header.get_value().as_header_field();
1738                                for parameter in header_field.get_parameter_iterator() {
1739                                    if parameter.name == b"+g.3gpp.iari-ref" {
1740                                        if let Some(value) = parameter.value {
1741                                            if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot.sa") {
1742                                                is_chatbot_message = true;
1743                                                break 'check_chatbot;
1744                                            }
1745                                        }
1746                                    }
1747                                }
1748                            }
1749
1750                            if is_chatbot_message {
1751                                // to-do: check chatbot parameters
1752                            }
1753
1754                            if cpim_message.is_none() {
1755                                if let Body::Multipart(multipart) = req_body.as_ref() {
1756                                    platform_log(
1757                                        LOG_TAG,
1758                                        "trying to find CPIM in multipart messages",
1759                                    );
1760                                    for part in &multipart.parts {
1761                                        if let Body::Message(message) = part.as_ref() {
1762                                            if let Some(content_type_header) = header::search(
1763                                                &message.headers,
1764                                                b"Content-Type",
1765                                                false,
1766                                            ) {
1767                                                let content_type_header_field = content_type_header
1768                                                    .get_value()
1769                                                    .as_header_field();
1770                                                if let Some(content_type) =
1771                                                    content_type_header_field.as_content_type()
1772                                                {
1773                                                    if content_type
1774                                                        .major_type
1775                                                        .equals_bytes(b"message", true)
1776                                                        && content_type
1777                                                            .sub_type
1778                                                            .equals_bytes(b"cpim", true)
1779                                                    {
1780                                                        match CPIMMessage::try_from(req_body) {
1781                                                            Ok(cpim) => {
1782                                                                cpim_message = Some(cpim);
1783                                                                break;
1784                                                            }
1785                                                            Err(e) => {
1786                                                                platform_log(
1787                                                                    LOG_TAG,
1788                                                                    format!(
1789                                                                        "CPIM decode error: {}",
1790                                                                        e
1791                                                                    ),
1792                                                                );
1793                                                            }
1794                                                        }
1795                                                    }
1796                                                }
1797                                            }
1798                                        }
1799                                    }
1800                                }
1801                            }
1802
1803                            if let Some(cpim_message) = cpim_message {
1804                                if let Ok(cpim_info) = cpim_message.get_info() {
1805                                    if !is_imdn && is_chatbot_message {
1806                                        if let None = cpim_info.get_bot_related_info() {
1807                                            if let Some(resp_message) =
1808                                                server_transaction::make_response(
1809                                                    message,
1810                                                    transaction.to_tag(),
1811                                                    606,
1812                                                    b"Not Acceptable",
1813                                                )
1814                                            {
1815                                                if let Some((tx, _)) = channels.take() {
1816                                                    server_transaction::send_response(
1817                                                        Arc::clone(transaction),
1818                                                        resp_message,
1819                                                        tx,
1820                                                        // &timer,
1821                                                        rt,
1822                                                    );
1823                                                }
1824                                            }
1825
1826                                            return true;
1827                                        }
1828                                    }
1829                                }
1830
1831                                let mut contact_uri: Option<Vec<u8>> = None;
1832
1833                                if let Some(p_asserted_identity_header) =
1834                                    header::search(req_headers, b"P-Asserted-Identity", true)
1835                                {
1836                                    if let Some(asserted_identity) = p_asserted_identity_header
1837                                        .get_value()
1838                                        .as_name_addresses()
1839                                        .first()
1840                                    {
1841                                        if let Some(uri_part) = &asserted_identity.uri_part {
1842                                            let data_size = uri_part.estimated_size();
1843                                            let mut data = Vec::with_capacity(data_size);
1844                                            {
1845                                                let mut readers = Vec::new();
1846                                                uri_part.get_readers(&mut readers);
1847                                                match DynamicChain::new(readers)
1848                                                    .read_to_end(&mut data)
1849                                                {
1850                                                    Ok(_) => {}
1851                                                    Err(_) => {} // to-do: early failure
1852                                                }
1853                                            }
1854
1855                                            contact_uri.replace(data);
1856                                        }
1857                                    }
1858                                }
1859
1860                                if contact_uri.is_none() {
1861                                    if let Ok(message_info) = cpim_message.get_info() {
1862                                        if let Some(uri) = message_info.from_uri {
1863                                            let from_uri = uri
1864                                                .string_representation_without_query_and_fragment();
1865
1866                                            if let Some(from_uri) =
1867                                                from_uri.as_name_addresses().first()
1868                                            {
1869                                                if let Some(uri_part) = &from_uri.uri_part {
1870                                                    let data_size = uri_part.estimated_size();
1871                                                    let mut data = Vec::with_capacity(data_size);
1872                                                    {
1873                                                        let mut readers = Vec::new();
1874                                                        uri_part.get_readers(&mut readers);
1875                                                        match DynamicChain::new(readers)
1876                                                            .read_to_end(&mut data)
1877                                                        {
1878                                                            Ok(_) => {}
1879                                                            Err(_) => {} // to-do: early failure
1880                                                        }
1881                                                    }
1882
1883                                                    contact_uri.replace(data);
1884                                                }
1885                                            }
1886                                        }
1887                                    }
1888                                }
1889
1890                                if let Some(contact_uri) = contact_uri {
1891                                    if let Ok(message_info) = cpim_message.get_info() {
1892                                        if let Some((content_type, body, encoded)) =
1893                                            cpim_message.get_message_body()
1894                                        {
1895                                            let content_type: &[u8] = match content_type {
1896                                                Some(content_type) => content_type,
1897                                                None => match message_info.payload_type {
1898                                                    Some(payload_type) => payload_type,
1899                                                    None => b"text/plain",
1900                                                },
1901                                            };
1902
1903                                            if encoded {
1904                                                if let Ok(decoded_body) =
1905                                                    general_purpose::STANDARD.decode(body)
1906                                                {
1907                                                    (self.service.message_receive_listener)(
1908                                                        &contact_uri,
1909                                                        &message_info,
1910                                                        content_type,
1911                                                        &decoded_body,
1912                                                    )
1913                                                }
1914                                            } else {
1915                                                (self.service.message_receive_listener)(
1916                                                    &contact_uri,
1917                                                    &message_info,
1918                                                    content_type,
1919                                                    body,
1920                                                )
1921                                            }
1922                                        }
1923
1924                                        if let Some(resp_message) =
1925                                            server_transaction::make_response(
1926                                                message,
1927                                                transaction.to_tag(),
1928                                                200,
1929                                                b"Ok",
1930                                            )
1931                                        {
1932                                            if let Some((tx, _)) = channels.take() {
1933                                                server_transaction::send_response(
1934                                                    Arc::clone(transaction),
1935                                                    resp_message,
1936                                                    tx,
1937                                                    // &timer,
1938                                                    rt,
1939                                                );
1940                                            }
1941                                        }
1942
1943                                        return true;
1944                                    } else {
1945                                        platform_log(LOG_TAG, "incomplete CPIM information");
1946                                    }
1947                                } else {
1948                                    platform_log(LOG_TAG, "cannot retrieve contact information")
1949                                }
1950                            } else {
1951                                platform_log(LOG_TAG, "cannot decode CPIM message");
1952                            }
1953                        }
1954
1955                        return false;
1956                    }
1957                }
1958
1959                if let Some(resp_message) = server_transaction::make_response(
1960                    message,
1961                    transaction.to_tag(),
1962                    400,
1963                    b"Bad Request",
1964                ) {
1965                    if let Some((tx, _)) = channels.take() {
1966                        server_transaction::send_response(
1967                            Arc::clone(transaction),
1968                            resp_message,
1969                            tx,
1970                            // &timer,
1971                            rt,
1972                        );
1973                    }
1974                }
1975
1976                return true;
1977            }
1978        }
1979
1980        false
1981    }
1982}
1983
1984pub fn send_message<F>(
1985    message_type: &str,
1986    message_content: &str,
1987    recipient: &str,
1988    recipient_type: &RecipientType,
1989    recipient_uri: &str,
1990    message_result_callback: F,
1991    core: Arc<SipCore>,
1992    transport: &Arc<SipTransport>,
1993    public_user_identity: &str,
1994    rt: &Arc<Runtime>,
1995) where
1996    F: FnOnce(u16, String) + Send + Sync + 'static,
1997{
1998    let mut req_message = SipMessage::new_request(MESSAGE, recipient_uri.as_bytes());
1999
2000    req_message.add_header(Header::new(
2001        b"Call-ID",
2002        String::from(
2003            Uuid::new_v4()
2004                .as_hyphenated()
2005                .encode_lower(&mut Uuid::encode_buffer()),
2006        ),
2007    ));
2008
2009    req_message.add_header(Header::new(b"CSeq", b"1 MESSAGE"));
2010
2011    let tag = rand::create_raw_alpha_numeric_string(8);
2012    let tag = String::from_utf8_lossy(&tag);
2013
2014    req_message.add_header(Header::new(
2015        b"From",
2016        format!("<{}>;tag={}", public_user_identity, tag),
2017    ));
2018
2019    req_message.add_header(Header::new(b"To", format!("<{}>", recipient_uri)));
2020
2021    req_message.add_header(Header::new(
2022        b"Accept-Contact",
2023        b"*;+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.msg\"",
2024    ));
2025
2026    if message_type.eq_ignore_ascii_case("application/vnd.gsma.rcs-ft-http+xml") {
2027        req_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.fthttp\";require;explicit"));
2028    }
2029
2030    if let RecipientType::Chatbot = recipient_type {
2031        req_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot.sa\";+g.gsma.rcs.botversion=\"#=1,#=2\";require;explicit"));
2032    }
2033
2034    req_message.add_header(Header::new(
2035        b"Contribution-ID",
2036        String::from(
2037            Uuid::new_v4()
2038                .as_hyphenated()
2039                .encode_lower(&mut Uuid::encode_buffer()),
2040        ),
2041    ));
2042
2043    req_message.add_header(Header::new(
2044        b"Conversation-ID",
2045        String::from(
2046            Uuid::new_v4()
2047                .as_hyphenated()
2048                .encode_lower(&mut Uuid::encode_buffer()),
2049        ),
2050    ));
2051
2052    if let RecipientType::ResourceList = recipient_type {
2053        req_message.add_header(Header::new(b"Require", b"recipient-list-message"));
2054
2055        req_message.add_header(Header::new(
2056            b"P-Preferred-Service",
2057            b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.msg.group",
2058        ));
2059    } else {
2060        req_message.add_header(Header::new(
2061            b"P-Preferred-Service",
2062            b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.msg",
2063        ));
2064    }
2065
2066    req_message.add_header(Header::new(
2067        b"P-Preferred-Identity",
2068        String::from(public_user_identity),
2069    ));
2070
2071    let cpim_body = make_cpim_message_content_body(
2072        message_type,
2073        message_content,
2074        &recipient_type,
2075        recipient_uri,
2076        Uuid::new_v4(),
2077        public_user_identity,
2078    );
2079
2080    if let RecipientType::ResourceList = recipient_type {
2081        let boundary = create_raw_alpha_numeric_string(16);
2082        let boundary_ = String::from_utf8_lossy(&boundary);
2083        let mut parts = Vec::with_capacity(2);
2084
2085        let resource_list_body: Vec<u8> = recipient.as_bytes().to_vec();
2086
2087        let mut resource_list_body_headers = Vec::with_capacity(2);
2088
2089        let resource_list_length = resource_list_body.len();
2090
2091        resource_list_body_headers.push(Header::new(
2092            b"Content-Type",
2093            b"application/resource-lists+xml",
2094        ));
2095        resource_list_body_headers.push(Header::new(b"Content-Disposition", b"recipient-list"));
2096        resource_list_body_headers.push(Header::new(
2097            b"Content-Length",
2098            format!("{}", resource_list_length),
2099        ));
2100
2101        let resource_list_body_part = Body::Message(MessageBody {
2102            headers: resource_list_body_headers,
2103            body: Arc::new(Body::Raw(resource_list_body)),
2104        });
2105
2106        let mut cpim_body_headers = Vec::with_capacity(2);
2107
2108        let cpim_body_len = cpim_body.estimated_size();
2109
2110        cpim_body_headers.push(Header::new(b"Content-Type", "message/cpim"));
2111        cpim_body_headers.push(Header::new(b"Content-Length", format!("{}", cpim_body_len)));
2112
2113        let cpim_body_part = Body::Message(MessageBody {
2114            headers: cpim_body_headers,
2115            body: Arc::new(cpim_body),
2116        });
2117
2118        parts.push(Arc::new(resource_list_body_part));
2119        parts.push(Arc::new(cpim_body_part));
2120
2121        req_message.add_header(Header::new(
2122            b"Content-Type",
2123            format!("multipart/mixed; boundary={}", boundary_),
2124        ));
2125
2126        let sip_body = Body::Multipart(MultipartBody { boundary, parts });
2127
2128        let sip_body_len = sip_body.estimated_size();
2129
2130        req_message.set_body(Arc::new(sip_body));
2131
2132        req_message.add_header(Header::new(b"Content-Length", format!("{}", sip_body_len)));
2133    } else {
2134        let sip_body_len = cpim_body.estimated_size();
2135
2136        req_message.set_body(Arc::new(cpim_body));
2137
2138        req_message.add_header(Header::new(b"Content-Type", b"message/cpim"));
2139
2140        req_message.add_header(Header::new(b"Content-Length", format!("{}", sip_body_len)));
2141    };
2142
2143    let (client_transaction_tx, mut client_transaction_rx) = mpsc::channel(1);
2144
2145    rt.spawn(async move {
2146        if let Some((status_code, reason_phrase)) = client_transaction_rx.recv().await {
2147            message_result_callback(status_code, reason_phrase);
2148        }
2149    });
2150
2151    core.get_transaction_manager().send_request(
2152        // ctrl_itf,
2153        req_message,
2154        transport,
2155        OutgoingPagerModeContext {
2156            client_transaction_tx,
2157            rt: Arc::clone(&rt),
2158        },
2159        &rt,
2160    );
2161}
2162
2163struct StandaloneLargeModeSendSessionDialogEventReceiver {
2164    sip_session: Arc<SipSession<StandaloneLargeModeSendSession>>,
2165    rt: Arc<Runtime>,
2166}
2167
2168impl SipDialogEventCallbacks for StandaloneLargeModeSendSessionDialogEventReceiver {
2169    fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {}
2170
2171    fn on_new_request(
2172        &self,
2173        transaction: Arc<ServerTransaction>,
2174        tx: mpsc::Sender<ServerTransactionEvent>,
2175        // timer: &Timer,
2176        rt: &Arc<Runtime>,
2177    ) -> Option<(u16, bool)> {
2178        let message = transaction.message();
2179
2180        if let SipMessage::Request(l, req_headers, req_body) = message {
2181            if l.method == UPDATE {
2182                if let Some(headers) = req_headers {
2183                    if let Some(h) = search(headers, b"Supported", true) {
2184                        if h.supports(b"timer") {
2185                            if let Ok(Some((timeout, refresher))) =
2186                                choose_timeout_for_server_transaction_response(
2187                                    &transaction,
2188                                    true,
2189                                    Refresher::UAC,
2190                                )
2191                            {
2192                                // to-do: not always UAC ?
2193                                if let Some(mut resp_message) = server_transaction::make_response(
2194                                    message,
2195                                    transaction.to_tag(),
2196                                    200,
2197                                    b"Ok",
2198                                ) {
2199                                    match refresher {
2200                                        Refresher::UAC => {
2201                                            resp_message.add_header(Header::new(
2202                                                b"Session-Expires",
2203                                                format!("{};refresher=uac", timeout),
2204                                            ));
2205
2206                                            self.sip_session.schedule_refresh(timeout, false, rt);
2207                                        }
2208                                        Refresher::UAS => {
2209                                            resp_message.add_header(Header::new(
2210                                                b"Session-Expires",
2211                                                format!("{};refresher=uas", timeout),
2212                                            ));
2213
2214                                            self.sip_session.schedule_refresh(timeout, true, rt);
2215                                        }
2216                                    }
2217
2218                                    server_transaction::send_response(
2219                                        transaction,
2220                                        resp_message,
2221                                        tx,
2222                                        rt,
2223                                    );
2224                                }
2225
2226                                return Some((200, false));
2227                            }
2228                        }
2229                    }
2230
2231                    if let Some(resp_message) =
2232                        server_transaction::make_response(message, transaction.to_tag(), 200, b"Ok")
2233                    {
2234                        server_transaction::send_response(transaction, resp_message, tx, rt);
2235                    }
2236                }
2237
2238                return Some((200, false));
2239            }
2240        }
2241
2242        None
2243    }
2244
2245    fn on_terminating_request(&self, _message: &SipMessage) {
2246        self.sip_session.get_inner().stop(&self.rt);
2247    }
2248
2249    fn on_terminating_response(&self, _message: &SipMessage) {
2250        self.sip_session.get_inner().stop(&self.rt);
2251    }
2252}
2253
2254struct StandaloneLargeModeReceiveSessionDialogEventReceiver {
2255    sip_session: Arc<SipSession<StandaloneLargeModeReceiveSession>>,
2256    message_receive_listener: Arc<dyn Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>,
2257    connect_function: Arc<
2258        dyn Fn(
2259                ClientSocket,
2260                &String,
2261                u16,
2262                bool,
2263            )
2264                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
2265            + Send
2266            + Sync,
2267    >,
2268    rt: Arc<Runtime>,
2269}
2270
2271impl SipDialogEventCallbacks for StandaloneLargeModeReceiveSessionDialogEventReceiver {
2272    fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {
2273        self.sip_session.get_inner().start(
2274            &self.message_receive_listener,
2275            &self.connect_function,
2276            &self.rt,
2277        );
2278    }
2279
2280    fn on_new_request(
2281        &self,
2282        transaction: Arc<ServerTransaction>,
2283        tx: mpsc::Sender<ServerTransactionEvent>,
2284        // timer: &Timer,
2285        rt: &Arc<Runtime>,
2286    ) -> Option<(u16, bool)> {
2287        let message = transaction.message();
2288
2289        if let SipMessage::Request(l, req_headers, req_body) = message {
2290            if l.method == UPDATE {
2291                if let Some(headers) = req_headers {
2292                    if let Some(h) = search(headers, b"Supported", true) {
2293                        if h.supports(b"timer") {
2294                            if let Ok(Some((timeout, refresher))) =
2295                                choose_timeout_for_server_transaction_response(
2296                                    &transaction,
2297                                    true,
2298                                    Refresher::UAC,
2299                                )
2300                            {
2301                                // to-do: not always UAC ?
2302                                if let Some(mut resp_message) = server_transaction::make_response(
2303                                    message,
2304                                    transaction.to_tag(),
2305                                    200,
2306                                    b"Ok",
2307                                ) {
2308                                    match refresher {
2309                                        Refresher::UAC => {
2310                                            resp_message.add_header(Header::new(
2311                                                b"Session-Expires",
2312                                                format!("{};refresher=uac", timeout),
2313                                            ));
2314
2315                                            self.sip_session.schedule_refresh(timeout, false, rt);
2316                                        }
2317                                        Refresher::UAS => {
2318                                            resp_message.add_header(Header::new(
2319                                                b"Session-Expires",
2320                                                format!("{};refresher=uas", timeout),
2321                                            ));
2322
2323                                            self.sip_session.schedule_refresh(timeout, true, rt);
2324                                        }
2325                                    }
2326
2327                                    server_transaction::send_response(
2328                                        transaction,
2329                                        resp_message,
2330                                        tx,
2331                                        rt,
2332                                    );
2333                                }
2334
2335                                return Some((200, false));
2336                            }
2337                        }
2338                    }
2339
2340                    if let Some(resp_message) =
2341                        server_transaction::make_response(message, transaction.to_tag(), 200, b"Ok")
2342                    {
2343                        server_transaction::send_response(transaction, resp_message, tx, rt);
2344                    }
2345                }
2346
2347                return Some((200, false));
2348            }
2349        }
2350
2351        None
2352    }
2353
2354    fn on_terminating_request(&self, message: &SipMessage) {
2355        self.sip_session.get_inner().stop(&self.rt);
2356    }
2357
2358    fn on_terminating_response(&self, message: &SipMessage) {
2359        self.sip_session.get_inner().stop(&self.rt);
2360    }
2361}
2362
2363struct OutgoingPagerModeContext {
2364    client_transaction_tx: mpsc::Sender<(u16, String)>,
2365    rt: Arc<Runtime>,
2366}
2367
2368impl ClientTransactionCallbacks for OutgoingPagerModeContext {
2369    fn on_provisional_response(&self, _message: SipMessage) {}
2370
2371    fn on_final_response(&self, message: SipMessage) {
2372        if let SipMessage::Response(l, _, _) = message {
2373            let status_code = l.status_code;
2374            let reason_phrase = String::from_utf8_lossy(&l.reason_phrase).to_string();
2375            let tx = self.client_transaction_tx.clone();
2376            self.rt.spawn(async move {
2377                match tx.send((status_code, reason_phrase)).await {
2378                    Ok(()) => {}
2379                    Err(e) => {}
2380                }
2381            });
2382        }
2383    }
2384
2385    fn on_transport_error(&self) {
2386        let tx = self.client_transaction_tx.clone();
2387        self.rt.spawn(async move {
2388            match tx.send((0, String::from("Transport Error"))).await {
2389                Ok(()) => {}
2390                Err(e) => {}
2391            }
2392        });
2393    }
2394}