1extern crate base64;
16
17use std::io::Read;
18use std::pin::Pin;
19use std::sync::{Arc, Mutex};
20
21use base64::{engine::general_purpose, Engine as _};
22use futures::Future;
23use rust_rcs_core::ffi::log::platform_log;
24use rust_rcs_core::internet::body::multipart_body::MultipartBody;
25use rust_rcs_core::internet::body::VectorReader;
26use rust_rcs_core::io::network::stream::{ClientSocket, ClientStream};
27use rust_rcs_core::msrp::info::msrp_info_reader::AsMsrpInfo;
28use rust_rcs_core::msrp::info::{MsrpDirection, MsrpInfo, MsrpInterfaceType, MsrpSetupMethod};
29use rust_rcs_core::msrp::msrp_channel::MsrpChannel;
30use rust_rcs_core::msrp::msrp_demuxer::MsrpDemuxer;
31use rust_rcs_core::msrp::msrp_muxer::{MsrpDataWriter, MsrpMessageReceiver, MsrpMuxer};
32use rust_rcs_core::msrp::msrp_transport::msrp_transport_start;
33use rust_rcs_core::sip::sip_core::SipDialogCache;
34use rust_rcs_core::sip::sip_session::{
35 choose_timeout_for_server_transaction_response, Refresher, SipSession, SipSessionEvent,
36 SipSessionEventReceiver,
37};
38use rust_rcs_core::sip::sip_transaction::client_transaction::ClientTransactionNilCallbacks;
39
40use rust_strict_sdp::{AsSDP, Sdp};
41
42use tokio::runtime::Runtime;
43use tokio::sync::mpsc;
44
45use uuid::Uuid;
46
47use rust_rcs_core::cpim::{CPIMInfo, CPIMMessage};
48
49use rust_rcs_core::internet::body::message_body::MessageBody;
50use rust_rcs_core::internet::header::{search, HeaderSearch};
51use rust_rcs_core::internet::header_field::AsHeaderField;
52use rust_rcs_core::internet::Body;
53use rust_rcs_core::internet::{header, Header};
54
55use rust_rcs_core::internet::headers::{AsContentType, Supported};
56
57use rust_rcs_core::internet::name_addr::AsNameAddr;
58
59use rust_rcs_core::io::{DynamicChain, Serializable};
60use rust_rcs_core::sip::sip_transaction::server_transaction;
61use rust_rcs_core::sip::SipDialog;
62use rust_rcs_core::sip::{ClientTransactionCallbacks, SipDialogEventCallbacks};
63use rust_rcs_core::sip::{ServerTransaction, UPDATE};
64use rust_rcs_core::sip::{ServerTransactionEvent, SipTransactionManager};
65use rust_rcs_core::sip::{SipCore, SipTransport};
66
67use rust_rcs_core::sip::SipMessage;
68use rust_rcs_core::sip::TransactionHandler;
69
70use rust_rcs_core::sip::INVITE;
71use rust_rcs_core::sip::MESSAGE;
72
73use rust_rcs_core::util::rand::{self, create_raw_alpha_numeric_string};
74use rust_rcs_core::util::raw_string::{StrEq, StrFind};
75use crate::chat_bot::cpim::GetBotInfo;
78use crate::messaging::ffi::RecipientType;
79
80use super::session::{get_cpm_session_info_from_message, CPMSessionInfo, UpdateMessageCallbacks};
81use super::sip::cpm_accept_contact::CPMAcceptContact;
82use super::sip::cpm_contact::{AsCPMContact, CPMServiceType};
83use super::{make_cpim_message_content_body, CPMMessageParam};
84
85const LOG_TAG: &str = "messaging";
86
87struct StandaloneLargeModeSendSession {
88 state: Arc<
89 Mutex<(
90 Arc<Body>,
91 Arc<Body>,
92 Option<ClientSocket>,
93 bool,
94 String,
95 u16,
96 Vec<u8>,
97 String,
98 u16,
99 Vec<u8>,
100 Option<mpsc::Sender<Option<Vec<u8>>>>,
101 )>,
102 >,
103
104 message_body: Arc<Body>,
105}
106
107impl StandaloneLargeModeSendSession {
108 pub fn new(
109 l_sdp: Arc<Body>,
110 r_sdp: Arc<Body>,
111 sock: ClientSocket,
112 tls: bool,
113 laddr: String,
114 lport: u16,
115 lpath: Vec<u8>,
116 raddr: String,
117 rport: u16,
118 rpath: Vec<u8>,
119 message_body: Arc<Body>,
120 ) -> StandaloneLargeModeSendSession {
121 StandaloneLargeModeSendSession {
122 state: Arc::new(Mutex::new((
123 l_sdp,
124 r_sdp,
125 Some(sock),
126 tls,
127 laddr,
128 lport,
129 lpath,
130 raddr,
131 rport,
132 rpath,
133 None,
134 ))),
135 message_body,
136 }
137 }
138
139 pub fn start(
140 &self,
141 message_result_callback: &Arc<dyn Fn(u16, String) + Send + Sync + 'static>,
142 connect_function: &Arc<
143 dyn Fn(
144 ClientSocket,
145 &String,
146 u16,
147 bool,
148 ) -> Pin<
149 Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>,
150 > + Send
151 + Sync
152 + 'static,
153 >,
154 rt: &Arc<Runtime>,
155 ) {
156 let mut guard = self.state.lock().unwrap();
157 if let Some(sock) = guard.2.take() {
158 let addr = String::from(&guard.7);
159 let port = guard.8;
160 let tls = guard.3;
161
162 let lpath = guard.6.clone();
163 let rpath = guard.9.clone();
164
165 let (data_tx, data_rx) = mpsc::channel(8);
166 let data_tx_ = data_tx.clone();
167
168 let message_result_callback = Arc::clone(message_result_callback);
169 let message_result_callback_ = Arc::clone(&message_result_callback);
170
171 let connect_function = Arc::clone(&connect_function);
172
173 let message_body = Arc::clone(&self.message_body);
174
175 let rt_ = Arc::clone(rt);
176
177 rt.spawn(async move {
178 if let Ok(cs) = connect_function(sock, &addr, port, tls).await {
179 let t_id = format!("");
180
181 let from_path = lpath.clone();
182 let to_path = rpath.clone();
183
184 let demuxer = MsrpDemuxer::new(from_path, to_path);
185 let demuxer = Arc::new(demuxer);
186 let demuxer_ = Arc::clone(&demuxer);
187
188 let muxer = MsrpMuxer::new(StandaloneLargeModeNilReceiver {});
189
190 let rt = Arc::clone(&rt_);
191
192 let from_path = lpath.clone();
193 let to_path = rpath.clone();
194
195 let data_tx_ = data_tx.clone();
196
197 let mut session_channel = MsrpChannel::new(from_path, to_path, demuxer_, muxer);
198
199 msrp_transport_start(
200 cs,
201 t_id,
202 data_tx_,
203 data_rx,
204 move |message_chunk| session_channel.on_message(message_chunk),
205 move || message_result_callback(0, String::from("Transport Error")),
206 &rt_,
207 );
208
209 let content_type = b"message/cpim".to_vec();
210 let content_type = Some(content_type);
211
212 let size = message_body.estimated_size();
213
214 let mut data = Vec::with_capacity(size);
215 match message_body.reader() {
216 Ok(mut reader) => {
217 match reader.read_to_end(&mut data) {
218 Ok(_) => {}
219 Err(_) => {} }
221 }
222 Err(_) => {} }
224
225 let from_path = lpath.clone();
226 let to_path = rpath.clone();
227
228 let (chunk_tx, mut chunk_rx) = mpsc::channel(8);
229
230 demuxer.start(
231 from_path,
232 to_path,
233 content_type,
234 size,
235 VectorReader::new(data),
236 move |status_code, reason_phrase| {
237 message_result_callback_(status_code, reason_phrase)
238 },
239 chunk_tx,
240 &rt,
241 );
242
243 rt.spawn(async move {
244 while let Some(chunk) = chunk_rx.recv().await {
245 let size = chunk.estimated_size();
246 let mut data = Vec::with_capacity(size);
247
248 {
249 let mut readers = Vec::new();
250 chunk.get_readers(&mut readers);
251 match DynamicChain::new(readers).read_to_end(&mut data) {
252 Ok(_) => {}
253 Err(_) => {} }
255 }
256
257 match data_tx.send(Some(data)).await {
258 Ok(()) => {}
259 Err(e) => {}
260 }
261 }
262 });
263 }
264 });
265
266 guard.10.replace(data_tx_);
267 }
268 }
269
270 pub fn stop(&self, rt: &Arc<Runtime>) {
271 let mut guard = self.state.lock().unwrap();
272 if let Some(tx) = guard.10.take() {
273 rt.spawn(async move {
274 match tx.send(None).await {
275 Ok(()) => {}
276 Err(e) => {}
277 }
278 });
279 }
280 guard.2.take();
281 }
282}
283
284struct StandaloneLargeModeNilReceiver {}
285
286impl MsrpMessageReceiver for StandaloneLargeModeNilReceiver {
287 fn on_message(
288 &mut self,
289 message_id: &[u8],
290 content_type: &[u8],
291 ) -> Result<Box<dyn MsrpDataWriter + Send + Sync>, (u16, &'static str)> {
292 Err((415, "Unsupported Media Type"))
293 }
294}
295
296struct StandaloneLargeModeReceiveSession {
297 state: Arc<
298 Mutex<(
299 Arc<Body>,
300 Arc<Body>,
301 Option<ClientSocket>,
302 bool,
303 String,
304 u16,
305 Vec<u8>,
306 String,
307 u16,
308 Vec<u8>,
309 Option<mpsc::Sender<Option<Vec<u8>>>>,
310 )>,
311 >,
312
313 contact_uri: Arc<Vec<u8>>,
314
315 sip_cpim_body: Option<Arc<Body>>,
316}
317
318impl StandaloneLargeModeReceiveSession {
319 pub fn new(
320 l_sdp: Arc<Body>,
321 r_sdp: Arc<Body>,
322 cs: ClientSocket,
323 tls: bool,
324 laddr: String,
325 lport: u16,
326 lpath: Vec<u8>,
327 raddr: String,
328 rport: u16,
329 rpath: Vec<u8>,
330 contact_uri: Arc<Vec<u8>>,
331 sip_cpim_body: Option<Arc<Body>>,
332 ) -> StandaloneLargeModeReceiveSession {
333 StandaloneLargeModeReceiveSession {
334 state: Arc::new(Mutex::new((
335 l_sdp,
336 r_sdp,
337 Some(cs),
338 tls,
339 laddr,
340 lport,
341 lpath,
342 raddr,
343 rport,
344 rpath,
345 None,
346 ))),
347 contact_uri,
348 sip_cpim_body,
349 }
350 }
351
352 pub fn start(
353 &self,
354 message_receive_listener: &Arc<
355 dyn Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync + 'static,
356 >,
357 connect_function: &Arc<
358 dyn Fn(
359 ClientSocket,
360 &String,
361 u16,
362 bool,
363 ) -> Pin<
364 Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>,
365 > + Send
366 + Sync
367 + 'static,
368 >,
369 rt: &Arc<Runtime>,
370 ) {
371 let mut guard = self.state.lock().unwrap();
372 if let Some(sock) = guard.2.take() {
373 let addr = String::from(&guard.7);
374 let port = guard.8;
375 let tls = guard.3;
376
377 let lpath = guard.6.clone();
378 let rpath = guard.9.clone();
379
380 let sip_cpim_body = if let Some(sip_cpim_body) = &self.sip_cpim_body {
381 Some(Arc::clone(sip_cpim_body))
382 } else {
383 None
384 };
385
386 let (data_tx, data_rx) = mpsc::channel(8);
387 let data_tx_ = data_tx.clone();
388
389 let message_receive_listener = Arc::clone(message_receive_listener);
390
391 let connect_function = Arc::clone(&connect_function);
392
393 let contact_uri = Arc::clone(&self.contact_uri);
394
395 let rt_ = Arc::clone(rt);
396
397 rt.spawn(async move {
398 if let Ok(cs) = connect_function(sock, &addr, port, tls).await {
399 let t_id = format!("");
400
401 let from_path = lpath.clone();
402 let to_path = rpath.clone();
403
404 let demuxer = MsrpDemuxer::new(from_path, to_path);
405 let demuxer = Arc::new(demuxer);
406
407 let muxer = MsrpMuxer::new(StandaloneLargeModeReceiver {
408 message_receive_listener: Some(Box::new(
409 move |contact_uri, cpim_info, content_type, content_body| {
410 message_receive_listener(
411 contact_uri,
412 cpim_info,
413 content_type,
414 content_body,
415 )
416 },
417 )),
418 contact_uri,
419 sip_cpim_body,
420 });
421
422 let mut session_channel = MsrpChannel::new(lpath, rpath, demuxer, muxer);
423
424 msrp_transport_start(
425 cs,
426 t_id,
427 data_tx_,
428 data_rx,
429 move |message_chunk| session_channel.on_message(message_chunk),
430 || {},
431 &rt_,
432 );
433 }
434 });
435
436 guard.10.replace(data_tx);
437 }
438 }
439
440 pub fn stop(&self, rt: &Arc<Runtime>) {
441 let mut guard = self.state.lock().unwrap();
442 if let Some(tx) = guard.10.take() {
443 rt.spawn(async move {
444 match tx.send(None).await {
445 Ok(()) => {}
446 Err(e) => {}
447 }
448 });
449 }
450 guard.2.take();
451 }
452}
453
454struct StandaloneLargeModeMessageDataWriter {
455 data: Vec<u8>,
456 contact_uri: Arc<Vec<u8>>,
457 sip_cpim_body: Option<Arc<Body>>,
458 message_receive_listener: Option<Box<dyn FnOnce(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>>,
459}
460
461impl MsrpDataWriter for StandaloneLargeModeMessageDataWriter {
462 fn write(&mut self, data: &[u8]) -> Result<usize, (u16, &'static str)> {
463 self.data.extend_from_slice(data);
464 Ok(data.len())
465 }
466
467 fn complete(&mut self) -> Result<(), (u16, &'static str)> {
468 if let Some(message_receive_listener) = self.message_receive_listener.take() {
469 match Body::construct_message(&self.data) {
470 Ok(body) => match CPIMMessage::try_from(&body) {
471 Ok(cpim_message) => {
472 if let Some(sip_cpim_body) = &self.sip_cpim_body {
473 match CPIMMessage::try_from(sip_cpim_body) {
474 Ok(sip_cpim_message) => match sip_cpim_message.get_info() {
475 Ok(sip_cpim_info) => {
476 if let Some((content_type, content_body, base64_encoded)) =
477 cpim_message.get_message_body()
478 {
479 let content_type: &[u8] = match content_type {
480 Some(content_type) => content_type,
481 None => b"text/plain",
482 };
483 if base64_encoded {
484 if let Ok(decoded_content_body) =
485 general_purpose::STANDARD.decode(content_body)
486 {
487 message_receive_listener(
488 &self.contact_uri,
489 &sip_cpim_info,
490 content_type,
491 &decoded_content_body,
492 );
493 return Ok(());
494 }
495 } else {
496 message_receive_listener(
497 &self.contact_uri,
498 &sip_cpim_info,
499 content_type,
500 content_body,
501 );
502 return Ok(());
503 }
504 }
505
506 Err((400, "failed to get content body"))
507 }
508
509 Err(e) => Err((400, e)),
510 },
511
512 Err(e) => Err((400, e)),
513 }
514 } else {
515 match cpim_message.get_info() {
516 Ok(cpim_info) => {
517 if let Some((content_type, b, e)) =
518 cpim_message.get_message_body()
519 {
520 let content_type: &[u8] = match content_type {
521 Some(content_type) => content_type,
522 None => b"text/plain",
523 };
524
525 if e {
526 if let Ok(d) = general_purpose::STANDARD.decode(b) {
527 message_receive_listener(
528 &self.contact_uri,
529 &cpim_info,
530 content_type,
531 &d,
532 );
533 return Ok(());
534 }
535 } else {
536 message_receive_listener(
537 &self.contact_uri,
538 &cpim_info,
539 content_type,
540 b,
541 );
542 return Ok(());
543 }
544 }
545
546 Err((400, "failed to get content body"))
547 }
548
549 Err(e) => Err((400, e)),
550 }
551 }
552 }
553 Err(e) => Err((400, e)),
554 },
555 Err(e) => Err((400, e)),
556 }
557 } else {
558 Err((500, "message already received"))
559 }
560 }
561}
562
563struct StandaloneLargeModeReceiver {
564 contact_uri: Arc<Vec<u8>>,
565 sip_cpim_body: Option<Arc<Body>>,
566 message_receive_listener: Option<Box<dyn FnOnce(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>>,
567}
568
569impl MsrpMessageReceiver for StandaloneLargeModeReceiver {
570 fn on_message(
571 &mut self,
572 message_id: &[u8],
573 content_type: &[u8],
574 ) -> Result<Box<dyn MsrpDataWriter + Send + Sync>, (u16, &'static str)> {
575 if let Some(message_receive_listener) = self.message_receive_listener.take() {
576 return Ok(Box::new(StandaloneLargeModeMessageDataWriter {
577 data: Vec::with_capacity(1024),
578 contact_uri: Arc::clone(&self.contact_uri),
579 sip_cpim_body: match &self.sip_cpim_body {
580 Some(body) => Some(Arc::clone(body)),
581 None => None,
582 },
583 message_receive_listener: Some(message_receive_listener),
584 }));
585 }
586
587 Err((500, ""))
588 }
589}
590
591pub struct StandaloneMessagingService {
592 registered_public_identity: Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
593 message_receive_listener: Arc<dyn Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>,
594 msrp_socket_allocator_function: Arc<
595 dyn Fn(
596 Option<&MsrpInfo>,
597 )
598 -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
599 + Send
600 + Sync,
601 >,
602 msrp_socket_connect_function: Arc<
603 dyn Fn(
604 ClientSocket,
605 &String,
606 u16,
607 bool,
608 )
609 -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
610 + Send
611 + Sync,
612 >,
613}
614
615impl StandaloneMessagingService {
616 pub fn new<MRL, MAF, MCF>(
617 message_receive_listener: MRL,
618 msrp_socket_allocator_function: MAF,
619 msrp_socket_connect_function: MCF,
620 ) -> StandaloneMessagingService
621 where
622 MRL: Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync + 'static,
623 MAF: Fn(
624 Option<&MsrpInfo>,
625 )
626 -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
627 + Send
628 + Sync
629 + 'static,
630 MCF: Fn(
631 ClientSocket,
632 &String,
633 u16,
634 bool,
635 )
636 -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
637 + Send
638 + Sync
639 + 'static,
640 {
641 StandaloneMessagingService {
642 registered_public_identity: Arc::new(Mutex::new(None)),
643 message_receive_listener: Arc::new(message_receive_listener),
644 msrp_socket_allocator_function: Arc::new(msrp_socket_allocator_function),
645 msrp_socket_connect_function: Arc::new(msrp_socket_connect_function),
646 }
647 }
648
649 pub fn set_registered_public_identity(
650 &self,
651 registered_public_identity: String,
652 sip_instance_id: String,
653 transport: Arc<SipTransport>,
654 ) {
655 (*self.registered_public_identity.lock().unwrap()).replace((
656 transport,
657 registered_public_identity,
658 sip_instance_id,
659 ));
660 }
661
662 pub fn send_large_mode_message<MRF>(
663 &self,
664 message_type: &str,
665 message_content: &str,
666 recipient: &str,
667 recipient_type: &RecipientType,
668 recipient_uri: &str,
669 message_result_callback: MRF,
670 core: &Arc<SipCore>,
671 rt: &Arc<Runtime>,
672 ) where
673 MRF: FnOnce(u16, String) + Send + Sync + 'static,
674 {
675 match (self.msrp_socket_allocator_function)(None) {
676 Ok((sock, host, port, tls, active_setup, ipv6)) => {
677 let path_random = create_raw_alpha_numeric_string(16);
678 let path_random = std::str::from_utf8(&path_random).unwrap();
679 let path = if tls {
680 format!("msrps://{}:{}/{};tcp", &host, port, path_random)
681 } else {
682 format!("msrp://{}:{}/{};tcp", &host, port, path_random)
683 };
684
685 let path = path.into_bytes();
686
687 let msrp_info: MsrpInfo = MsrpInfo {
688 protocol: if tls { b"TCP/TLS/MSRP" } else { b"TCP/MSRP" },
689 address: host.as_bytes(),
690 interface_type: if ipv6 {
691 MsrpInterfaceType::IPv6
692 } else {
693 MsrpInterfaceType::IPv4
694 },
695 port,
696 path: &path,
697 inactive: false,
698 direction: MsrpDirection::SendOnly,
699 accept_types: b"message/cpim",
700 setup_method: if active_setup {
701 MsrpSetupMethod::Active
702 } else {
703 MsrpSetupMethod::Passive
704 },
705 accept_wrapped_types: Some(message_type.as_bytes()),
706 file_info: None,
707 };
708
709 let sdp = String::from(msrp_info);
710 let sdp_bytes = sdp.into_bytes();
711 let sdp_body = Body::Raw(sdp_bytes);
712
713 if let Some((transport, public_user_identity, instance_id)) =
714 core.get_default_public_identity()
715 {
716 let mut invite_message =
717 SipMessage::new_request(INVITE, recipient_uri.as_bytes());
718
719 invite_message.add_header(Header::new(
720 b"Call-ID",
721 String::from(
722 Uuid::new_v4()
723 .as_hyphenated()
724 .encode_lower(&mut Uuid::encode_buffer()),
725 ),
726 ));
727
728 invite_message.add_header(Header::new(b"CSeq", b"1 INVITE"));
729
730 let tag = rand::create_raw_alpha_numeric_string(8);
731 let tag = String::from_utf8_lossy(&tag);
732
733 invite_message.add_header(Header::new(
734 b"From",
735 format!("<{}>;tag={}", public_user_identity, tag),
736 ));
737
738 invite_message.add_header(Header::new(b"To", format!("<{}>", recipient_uri)));
739
740 invite_message.add_header(Header::new(b"Contact", if let RecipientType::Chatbot = recipient_type {
741 format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\";+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot\";+g.gsma.rcs.botversion=\"#=1,#=2\"", &public_user_identity, &instance_id)
742 } else {
743 format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\"", &public_user_identity, &instance_id)
744 }));
745
746 invite_message.add_header(Header::new(b"Accept-Contact", format!("*;+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\"")));
747
748 if message_type.eq_ignore_ascii_case("application/vnd.gsma.rcs-ft-http+xml") {
749 invite_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.fthttp\";require;explicit"));
750 }
751
752 if let RecipientType::Chatbot = recipient_type {
755 invite_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot.sa\";+g.gsma.rcs.botversion=\"#=1,#=2\";require;explicit"));
756 }
757
758 invite_message.add_header(Header::new(
759 b"Allow",
760 b"NOTIFY, OPTIONS, INVITE, UPDATE, CANCEL, BYE, ACK, MESSAGE",
761 ));
762
763 invite_message.add_header(Header::new(b"Supported", b"timer"));
764
765 invite_message.add_header(Header::new(b"Session-Expires", b"1800"));
766
767 invite_message.add_header(Header::new(
768 b"P-Preferred-Service",
769 b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.largemsg",
770 ));
771
772 invite_message.add_header(Header::new(
773 b"P-Preferred-Identity",
774 String::from(&public_user_identity),
775 ));
776
777 let message_imdn_id = Uuid::new_v4();
778
779 let cpim_body = make_cpim_message_content_body(
780 message_type,
781 "",
782 &recipient_type,
783 recipient_uri,
784 message_imdn_id,
785 &public_user_identity,
786 );
787
788 let boundary = create_raw_alpha_numeric_string(16);
789 let boundary_ = String::from_utf8_lossy(&boundary);
790 let mut parts =
791 Vec::with_capacity(if let RecipientType::ResourceList = recipient_type {
792 3
793 } else {
794 2
795 });
796
797 if let RecipientType::ResourceList = recipient_type {
798 let resource_list_body: Vec<u8> = recipient.as_bytes().to_vec();
799
800 let mut resource_list_headers = Vec::new();
801
802 let resource_list_length = resource_list_body.len();
803
804 resource_list_headers.push(Header::new(
805 b"Content-Type",
806 b"application/resource-lists+xml",
807 ));
808 resource_list_headers
809 .push(Header::new(b"Content-Disposition", b"recipient-list"));
810 resource_list_headers.push(Header::new(
811 b"Content-Length",
812 format!("{}", resource_list_length),
813 ));
814
815 let resource_list_body = Body::Message(MessageBody {
816 headers: resource_list_headers,
817 body: Arc::new(Body::Raw(resource_list_body)),
818 });
819
820 parts.push(Arc::new(resource_list_body));
821 }
822
823 let mut cpim_part_headers = Vec::new();
824 cpim_part_headers.push(Header::new(b"Content-Type", b"message/cpim"));
825 let cpim_content_length = cpim_body.estimated_size();
826 cpim_part_headers.push(Header::new(
827 b"Content-Length",
828 format!("{}", cpim_content_length),
829 ));
830
831 parts.push(Arc::new(Body::Message(MessageBody {
832 headers: cpim_part_headers,
833 body: Arc::new(cpim_body),
834 })));
835
836 let mut sdp_part_headers = Vec::new();
837 sdp_part_headers.push(Header::new(b"Content-Type", b"application/sdp"));
838 let sdp_content_length = sdp_body.estimated_size();
839 sdp_part_headers.push(Header::new(
840 b"Content-Length",
841 format!("{}", sdp_content_length),
842 ));
843
844 let sdp_body = Arc::new(sdp_body);
845 parts.push(Arc::new(Body::Message(MessageBody {
846 headers: sdp_part_headers,
847 body: Arc::clone(&sdp_body),
848 })));
849
850 invite_message.add_header(Header::new(
851 b"Content-Type",
852 format!("multipart/mixed; boundary={}", boundary_),
853 ));
854
855 let multipart = Body::Multipart(MultipartBody { boundary, parts });
856
857 let content_length = multipart.estimated_size();
858 invite_message.add_header(Header::new(
859 b"Content-Length",
860 format!("{}", content_length),
861 ));
862
863 let multipart = Arc::new(multipart);
864 invite_message.set_body(multipart);
865
866 let req_headers = invite_message.copy_headers();
867
868 let message_result_callback =
869 Arc::new(Mutex::new(Some(message_result_callback)));
870 let message_result_callback = move |status_code, reason_phrase| {
871 let mut guard = message_result_callback.lock().unwrap();
872 if let Some(message_result_callback) = guard.take() {
873 message_result_callback(status_code, reason_phrase);
874 }
875 };
876
877 let message = CPMMessageParam::new(
878 String::from(message_type),
879 String::from(message_content),
880 recipient,
881 recipient_type,
882 recipient_uri,
883 message_result_callback,
884 );
885
886 let (client_transaction_tx, mut client_transaction_rx) =
887 mpsc::channel::<
888 Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>,
889 >(1);
890
891 let connect_function = Arc::clone(&self.msrp_socket_connect_function);
892
893 let ongoing_dialogs = core.get_ongoing_dialogs();
894
895 let core_ = Arc::clone(core);
896 let rt_ = Arc::clone(rt);
897
898 rt.spawn(async move {
899 if let Some(res) = client_transaction_rx.recv().await {
900 match res {
901 Ok((resp_message, session_info, r_sdp_body)) => {
902 if let Some(r_sdp) = match r_sdp_body.as_ref() {
903 Body::Raw(raw) => {
904 raw.as_sdp()
905 }
906 _ => None,
907 } {
908 if let Some(r_msrp_info) = r_sdp.as_msrp_info() {
909 if let Ok(raddr) = std::str::from_utf8(r_msrp_info.address) {
910 let raddr = String::from(raddr);
911 let rport = r_msrp_info.port;
912 let rpath = r_msrp_info.path.to_vec();
913
914 let (d_tx, mut d_rx) = mpsc::channel(1);
915
916 let ongoing_dialogs_ = Arc::clone(&ongoing_dialogs);
917
918 rt_.spawn(async move {
919 if let Some(dialog) = d_rx.recv().await {
920 ongoing_dialogs_.remove_dialog(&dialog);
921 }
922 });
923
924 if let Ok(dialog) = SipDialog::try_new_as_uac(&req_headers, &resp_message, move |d| {
925 match d_tx.blocking_send(d) {
926 Ok(()) => {},
927 Err(e) => {},
928 }
929 }) {
930 let dialog = Arc::new(dialog);
931
932 let message_body = make_cpim_message_content_body(
933 &message.message_type,
934 &message.message_content,
935 &message.recipient_type,
936 &message.recipient_uri,
937 message_imdn_id,
938 &public_user_identity,
939 );
940 let message_body = Arc::new(message_body);
941
942 let session = StandaloneLargeModeSendSession::new(sdp_body, r_sdp_body, sock, tls, host, port, path, raddr, rport, rpath, message_body);
943
944 let session = Arc::new(session);
945
946 let (sess_tx, mut sess_rx) = mpsc::channel(8);
947
948 let sip_session = SipSession::new(&session, SipSessionEventReceiver {
949 tx: sess_tx,
950 rt: Arc::clone(&rt_),
951 });
952
953 let sip_session = Arc::new(sip_session);
954 let sip_session_ = Arc::clone(&sip_session);
955
956 let core = Arc::clone(&core_);
957 let rt = Arc::clone(&rt_);
958
959 rt_.spawn(async move {
960 let rt_ = Arc::clone(&rt);
961 let core_ = Arc::clone(&core);
962 while let Some(ev) = sess_rx.recv().await {
963 match ev {
964 SipSessionEvent::ShouldRefresh(dialog) => {
965 if let Ok(mut message) =
966 dialog.make_request(b"UPDATE", None)
967 {
968 message.add_header(Header::new(
969 b"Supported",
970 b"timer",
971 ));
972
973 if let Some((transport, _, _)) = core_.get_default_public_identity() {
974 core_.get_transaction_manager().send_request(
975 message,
976 &transport,
977 UpdateMessageCallbacks {
978 dialog,
980 sip_session: Arc::clone(&sip_session_),
981 rt: Arc::clone(&rt_),
982 },
983 &rt_,
984 );
985 }
986 }
987 }
988
989 SipSessionEvent::Expired(dialog) => {
990 if let Ok(message) = dialog.make_request(b"BYE", None) {
991
992 if let Some((transport, _, _)) = core_.get_default_public_identity() {
993 core_.get_transaction_manager().send_request(
994 message,
995 &transport,
996 ClientTransactionNilCallbacks {},
997 &rt_,
998 );
999 }
1000 }
1001 }
1002
1003 _ => {}
1004 }
1005 }
1006 });
1007
1008 sip_session.setup_confirmed_dialog(&dialog, StandaloneLargeModeSendSessionDialogEventReceiver {
1009 sip_session: Arc::clone(&sip_session),
1010 rt: Arc::clone(&rt_),
1011 });
1012
1013 if let Ok(ack_message) = dialog.make_request(b"ACK", Some(1)) {
1014
1015 if let Some((transport, _, _)) = core_.get_default_public_identity(){
1016
1017 core_.get_transaction_manager().send_request(
1018 ack_message,
1019 &transport,
1020 ClientTransactionNilCallbacks {},
1021 &rt_,
1022 );
1023
1024 session.start(&message.message_result_callback, &connect_function, &rt_);
1025 }
1026 }
1027 }
1028 }
1029 }
1030 }
1031 }
1032
1033 Err((error_code, error_reason)) => {
1034 (message.message_result_callback)(error_code, error_reason);
1035 return;
1036 }
1037 }
1038 }
1039
1040 (message.message_result_callback)(500, String::from("Internal Error"))
1041 });
1042
1043 core.get_transaction_manager().send_request(
1044 invite_message,
1045 &transport,
1046 StandaloneLargeModeInviteCallbacks {
1047 recipient_type: RecipientType::from(recipient_type),
1048 client_transaction_tx,
1049 rt: Arc::clone(rt),
1050 },
1051 rt,
1052 );
1053
1054 return;
1055 }
1056
1057 message_result_callback(408, String::from("Timeout"));
1058 }
1059
1060 Err((error_code, error_reason)) => {
1061 message_result_callback(error_code, String::from(error_reason))
1062 }
1063 }
1064 }
1065}
1066
1067struct StandaloneLargeModeInviteCallbacks {
1068 recipient_type: RecipientType,
1069 client_transaction_tx:
1070 mpsc::Sender<Result<(SipMessage, CPMSessionInfo, Arc<Body>), (u16, String)>>,
1071 rt: Arc<Runtime>,
1072}
1073
1074impl ClientTransactionCallbacks for StandaloneLargeModeInviteCallbacks {
1075 fn on_provisional_response(&self, _message: SipMessage) {}
1076
1077 fn on_final_response(&self, message: SipMessage) {
1078 if let SipMessage::Response(l, _, _) = &message {
1079 let mut status_code = 500;
1080 let mut reason_phrase = String::from("Internal Error");
1081 if l.status_code >= 200 && l.status_code < 300 {
1082 if let Some(session_info) = get_cpm_session_info_from_message(&message, true) {
1083 loop {
1084 let accepting_chatbot_session_is_forbidden =
1085 if let RecipientType::Chatbot = self.recipient_type {
1086 match session_info.cpm_contact.service_type {
1087 CPMServiceType::Chatbot => false,
1088 _ => true,
1089 }
1090 } else {
1091 false
1092 };
1093
1094 if accepting_chatbot_session_is_forbidden {
1095 status_code = 606;
1096 reason_phrase = String::from("Not Acceptable");
1097 break;
1098 }
1099
1100 if let Some(resp_body) = message.get_body() {
1101 if let Some(headers) = message.headers() {
1104 if let Some(header) = header::search(headers, b"Content-Type", true)
1105 {
1106 if let Some(content_type) =
1107 header.get_value().as_header_field().as_content_type()
1108 {
1109 if content_type
1110 .major_type
1111 .eq_ignore_ascii_case(b"application")
1112 && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1113 {
1114 let client_transaction_tx =
1115 self.client_transaction_tx.clone();
1116 let sdp_body = Arc::clone(&resp_body);
1117 self.rt.spawn(async move {
1118 match client_transaction_tx
1119 .send(Ok((message, session_info, sdp_body)))
1120 .await
1121 {
1122 Ok(()) => {}
1123 Err(e) => {}
1124 }
1125 });
1126
1127 return;
1128 }
1129 }
1130 }
1131 }
1132 }
1133
1134 break;
1135 }
1136 }
1137 } else {
1138 status_code = l.status_code;
1139 reason_phrase = String::from_utf8_lossy(&l.reason_phrase).to_string();
1140 }
1141
1142 let client_transaction_tx = self.client_transaction_tx.clone();
1143 self.rt.spawn(async move {
1144 match client_transaction_tx
1145 .send(Err((status_code, reason_phrase)))
1146 .await
1147 {
1148 Ok(()) => {}
1149 Err(e) => {}
1150 }
1151 });
1152 }
1153 }
1154
1155 fn on_transport_error(&self) {
1156 let client_transaction_tx = self.client_transaction_tx.clone();
1157 self.rt.spawn(async move {
1158 match client_transaction_tx
1159 .send(Err((0, String::from("Transport Error"))))
1160 .await
1161 {
1162 Ok(()) => {}
1163 Err(e) => {}
1164 }
1165 });
1166 }
1167}
1168
1169pub struct StandaloneMessagingServiceWrapper {
1170 pub service: Arc<StandaloneMessagingService>,
1171 pub tm: Arc<SipTransactionManager>,
1172}
1173
1174impl TransactionHandler for StandaloneMessagingServiceWrapper {
1175 fn handle_transaction(
1176 &self,
1177 transaction: &Arc<ServerTransaction>,
1178 ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
1179 channels: &mut Option<(
1180 mpsc::Sender<ServerTransactionEvent>,
1181 mpsc::Receiver<ServerTransactionEvent>,
1182 )>,
1183 rt: &Arc<Runtime>,
1185 ) -> bool {
1186 platform_log(LOG_TAG, "handle_transaction");
1187
1188 let message = transaction.message();
1189 if let SipMessage::Request(req_line, Some(req_headers), Some(req_body)) = message {
1190 if req_line.method == INVITE {
1191 platform_log(LOG_TAG, "is INVITE request");
1192
1193 let mut is_large_mode_cpm_standalone_message = false;
1194
1195 'check_large_mode: for header in
1196 HeaderSearch::new(req_headers, b"Accept-Contact", true)
1197 {
1198 let header_field = header.get_value().as_header_field();
1199 if header_field
1200 .contains_icsi_ref(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg")
1201 || header_field.contains_icsi_ref(
1202 b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.deferred",
1203 )
1204 {
1205 is_large_mode_cpm_standalone_message = true;
1206 break 'check_large_mode;
1207 }
1208 }
1209
1210 if is_large_mode_cpm_standalone_message {
1211 let mut conversation_id: &[u8] = &[];
1212 let mut contribution_id: &[u8] = &[];
1213
1214 for header in req_headers {
1215 if header.get_name().eq_ignore_ascii_case(b"Conversation-ID") {
1216 conversation_id = header.get_value();
1217 } else if header.get_name().eq_ignore_ascii_case(b"Contribution-ID") {
1218 contribution_id = header.get_value();
1219 }
1220 }
1221
1222 if conversation_id.len() == 0 || contribution_id.len() == 0 {
1223 if let Some(resp_message) = server_transaction::make_response(
1224 message,
1225 transaction.to_tag(),
1226 400,
1227 b"Bad Request",
1228 ) {
1229 if let Some((tx, _)) = channels.take() {
1230 server_transaction::send_response(
1231 Arc::clone(transaction),
1232 resp_message,
1233 tx,
1234 rt,
1236 );
1237 }
1238 }
1239
1240 return true;
1241 }
1242
1243 let mut sdp: Option<Sdp> = None;
1244 let mut sip_cpim_body: Option<Arc<Body>> = None;
1245
1246 if let Some(header) = search(req_headers, b"Content-Type", true) {
1247 if let Some(content_type) =
1248 header.get_value().as_header_field().as_content_type()
1249 {
1250 if content_type.major_type.eq_ignore_ascii_case(b"application")
1251 && content_type.sub_type.eq_ignore_ascii_case(b"sdp")
1252 {
1253 match req_body.as_ref() {
1254 Body::Raw(raw) => {
1255 sdp = raw.as_sdp();
1256 }
1257 _ => {}
1258 }
1259 } else if content_type.major_type.eq_ignore_ascii_case(b"multipart")
1260 && content_type.sub_type.eq_ignore_ascii_case(b"mixed")
1261 {
1262 match req_body.as_ref() {
1263 Body::Multipart(multipart) => {
1264 for part in &multipart.parts {
1265 if let Body::Message(body) = part.as_ref() {
1266 if let Some(header) =
1267 search(&body.headers, b"Content-Type", true)
1268 {
1269 if let Some(content_type) = header
1270 .get_value()
1271 .as_header_field()
1272 .as_content_type()
1273 {
1274 if content_type
1275 .major_type
1276 .eq_ignore_ascii_case(b"application")
1277 && content_type
1278 .sub_type
1279 .eq_ignore_ascii_case(b"sdp")
1280 {
1281 match body.body.as_ref() {
1282 Body::Raw(raw) => {
1283 sdp = raw.as_sdp();
1284 }
1285 _ => {}
1286 }
1287 } else if content_type
1288 .major_type
1289 .eq_ignore_ascii_case(b"message")
1290 && content_type
1291 .sub_type
1292 .eq_ignore_ascii_case(b"cpim")
1293 {
1294 sip_cpim_body =
1295 Some(Arc::clone(&body.body));
1296 }
1297 }
1298 }
1299 }
1300 }
1301 }
1302 _ => {}
1303 }
1304 }
1305 }
1306 }
1307
1308 if let Some(sdp) = sdp {
1309 let mut contact_uri: Option<Vec<u8>> = None;
1310
1311 if let Some(header) = search(req_headers, b"P-Asserted-Identity", true) {
1312 if let Some(asserted_identity) =
1313 header.get_value().as_name_addresses().first()
1314 {
1315 if let Some(uri_part) = &asserted_identity.uri_part {
1316 let data_size = uri_part.estimated_size();
1317 let mut data = Vec::with_capacity(data_size);
1318 {
1319 let mut readers = Vec::new();
1320 uri_part.get_readers(&mut readers);
1321 match DynamicChain::new(readers).read_to_end(&mut data) {
1322 Ok(_) => {}
1323 Err(_) => {} }
1325 }
1326
1327 contact_uri.replace(data);
1328 }
1329 }
1330 }
1331
1332 if contact_uri.is_none() {
1333 if let Some(sip_cpim_body) = &sip_cpim_body {
1334 match CPIMMessage::try_from(sip_cpim_body) {
1335 Ok(sip_cpim_message) => {
1336 if let Ok(sip_cpim_info) = sip_cpim_message.get_info() {
1337 if let Some(from_uri) = sip_cpim_info.from_uri {
1338 contact_uri = Some(from_uri.string_representation_without_query_and_fragment());
1339 }
1340 }
1341 }
1342 Err(e) => {
1343 platform_log(LOG_TAG, format!("CPIM decode error: {}", e));
1344 }
1345 }
1346 }
1347 }
1348
1349 if contact_uri.is_none() {
1350 if let Some(header) = search(req_headers, b"Contact", true) {
1351 if let Some(cpm_contact) =
1352 header.get_value().as_header_field().as_cpm_contact()
1353 {
1354 contact_uri = Some(cpm_contact.service_uri.clone());
1355 }
1356 }
1357 }
1358
1359 if let Some(contact_uri) = contact_uri {
1360 let contact_uri = Arc::new(contact_uri);
1361 if let Some(msrp_info) = sdp.as_msrp_info() {
1362 match (self.service.msrp_socket_allocator_function)(Some(
1363 &msrp_info,
1364 )) {
1365 Ok((sock, host, port, tls, active_setup, ipv6)) => {
1366 if let Ok(raddr) = std::str::from_utf8(msrp_info.address) {
1367 let raddr = String::from(raddr);
1368 let rport = msrp_info.port;
1369 let rpath = msrp_info.path.to_vec();
1370
1371 let path_random = create_raw_alpha_numeric_string(16);
1372 let path_random =
1373 std::str::from_utf8(&path_random).unwrap();
1374 let path = if tls {
1375 format!(
1376 "msrps://{}:{}/{};tcp",
1377 &host, port, path_random
1378 )
1379 } else {
1380 format!(
1381 "msrp://{}:{}/{};tcp",
1382 &host, port, path_random
1383 )
1384 };
1385
1386 let path = path.into_bytes();
1387
1388 let l_msrp_info: MsrpInfo = MsrpInfo {
1389 protocol: if tls {
1390 b"TCP/TLS/MSRP"
1391 } else {
1392 b"TCP/MSRP"
1393 },
1394 address: host.as_bytes(),
1395 interface_type: if ipv6 {
1396 MsrpInterfaceType::IPv6
1397 } else {
1398 MsrpInterfaceType::IPv4
1399 },
1400 port,
1401 path: &path,
1402 inactive: false,
1403 direction: MsrpDirection::ReceiveOnly,
1404 accept_types: msrp_info.accept_types,
1405 setup_method: if active_setup {
1406 MsrpSetupMethod::Active
1407 } else {
1408 MsrpSetupMethod::Passive
1409 },
1410 accept_wrapped_types: msrp_info
1411 .accept_wrapped_types,
1412 file_info: None,
1413 };
1414
1415 let l_sdp = String::from(l_msrp_info);
1416 let l_sdp = l_sdp.into_bytes();
1417 let l_sdp = Body::Raw(l_sdp);
1418 let l_sdp = Arc::new(l_sdp);
1419
1420 let r_sdp = Arc::clone(req_body);
1421
1422 if let Some(mut resp_message) =
1423 server_transaction::make_response(
1424 message,
1425 transaction.to_tag(),
1426 200,
1427 b"Ok",
1428 )
1429 {
1430 let registered_public_identity = Arc::clone(
1431 &self.service.registered_public_identity,
1432 );
1433
1434 {
1435 if let Some((
1436 _,
1437 contact_identity,
1438 instance_id,
1439 )) =
1440 &*registered_public_identity.lock().unwrap()
1441 {
1442 resp_message.add_header(Header::new(
1443 b"Contact",
1444 format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg\"", contact_identity, instance_id),
1445 ));
1446 }
1447 }
1448
1449 let (d_tx, mut d_rx) = mpsc::channel(1);
1450
1451 let ongoing_dialogs_ = Arc::clone(ongoing_dialogs);
1452
1453 rt.spawn(async move {
1454 if let Some(dialog) = d_rx.recv().await {
1455 ongoing_dialogs_.remove_dialog(&dialog);
1456 }
1457 });
1458
1459 if let Ok(dialog) = SipDialog::try_new_as_uas(
1460 message,
1461 &resp_message,
1462 move |d| match d_tx.blocking_send(d) {
1463 Ok(()) => {}
1464 Err(e) => {}
1465 },
1466 ) {
1467 let dialog = Arc::new(dialog);
1468
1469 let session =
1470 StandaloneLargeModeReceiveSession::new(
1471 l_sdp,
1472 r_sdp,
1473 sock,
1474 tls,
1475 host,
1476 port,
1477 path,
1478 raddr,
1479 rport,
1480 rpath,
1481 contact_uri,
1482 sip_cpim_body,
1483 );
1484 let session = Arc::new(session);
1485
1486 let (sess_tx, mut sess_rx) = mpsc::channel(8);
1487
1488 let tm = Arc::clone(&self.tm);
1489
1490 let sip_session = SipSession::new(
1491 &session,
1492 SipSessionEventReceiver {
1493 tx: sess_tx,
1494 rt: Arc::clone(&rt),
1495 },
1496 );
1497
1498 let sip_session = Arc::new(sip_session);
1499 let sip_session_ = Arc::clone(&sip_session);
1500
1501 let rt_ = Arc::clone(&rt);
1502
1503 rt.spawn(async move {
1504 let rt = Arc::clone(&rt_);
1505 while let Some(ev) = sess_rx.recv().await {
1506 match ev {
1507 SipSessionEvent::ShouldRefresh(dialog) => {
1508 if let Ok(mut message) =
1509 dialog.make_request(b"UPDATE", None)
1510 {
1511 message.add_header(Header::new(
1512 b"Supported",
1513 b"timer",
1514 ));
1515
1516 let guard =
1517 registered_public_identity.lock().unwrap();
1518
1519 if let Some((transport, _, _)) = &*guard {
1520 tm.send_request(
1521 message,
1522 transport,
1523 UpdateMessageCallbacks {
1524 dialog,
1526 sip_session: Arc::clone(&sip_session_),
1527 rt: Arc::clone(&rt),
1528 },
1529 &rt,
1530 );
1531 }
1532 }
1533 }
1534
1535 SipSessionEvent::Expired(dialog) => {
1536 if let Ok(message) = dialog.make_request(b"BYE", None) {
1537 let guard =
1538 registered_public_identity.lock().unwrap();
1539
1540 if let Some((transport, _, _)) = &*guard {
1541 tm.send_request(
1542 message,
1543 transport,
1544 ClientTransactionNilCallbacks {},
1545 &rt,
1546 );
1547 }
1548 }
1549 }
1550
1551 _ => {}
1552 }
1553 }
1554 });
1555
1556 sip_session.setup_confirmed_dialog(&dialog, StandaloneLargeModeReceiveSessionDialogEventReceiver {
1557 sip_session: Arc::clone(&sip_session),
1558 message_receive_listener: Arc::clone(&self.service.message_receive_listener),
1559 connect_function: Arc::clone(&self.service.msrp_socket_connect_function),
1560 rt: Arc::clone(&rt),
1561 })
1562 }
1563
1564 if let Some((tx, mut rx)) = channels.take() {
1565 rt.spawn(async move{
1566 while let Some(ev) = rx.recv().await {
1567 match ev {
1568 ServerTransactionEvent::Cancelled => {
1569 todo!()
1570 },
1571 _ => {},
1572 }
1573 }
1574 });
1575
1576 server_transaction::send_response(
1577 Arc::clone(transaction),
1578 resp_message,
1579 tx,
1580 rt,
1582 );
1583 }
1584 }
1585
1586 return true;
1587 }
1588
1589 if let Some(resp_message) =
1590 server_transaction::make_response(
1591 message,
1592 transaction.to_tag(),
1593 400,
1594 b"Bad Request",
1595 )
1596 {
1597 if let Some((tx, _)) = channels.take() {
1598 server_transaction::send_response(
1599 Arc::clone(transaction),
1600 resp_message,
1601 tx,
1602 rt,
1604 );
1605 }
1606 }
1607 }
1608
1609 Err((error_code, error_phrase)) => {
1610 if let Some(resp_message) =
1611 server_transaction::make_response(
1612 message,
1613 transaction.to_tag(),
1614 error_code,
1615 error_phrase.as_bytes(),
1616 )
1617 {
1618 if let Some((tx, _rx)) = channels.take() {
1619 server_transaction::send_response(
1620 Arc::clone(transaction),
1621 resp_message,
1622 tx,
1623 rt,
1625 );
1626 }
1627 }
1628 }
1629 }
1630
1631 return true;
1632 }
1633 }
1634 }
1635
1636 if let Some(resp_message) = server_transaction::make_response(
1637 message,
1638 transaction.to_tag(),
1639 400,
1640 b"Bad Request",
1641 ) {
1642 if let Some((tx, _)) = channels.take() {
1643 server_transaction::send_response(
1644 Arc::clone(transaction),
1645 resp_message,
1646 tx,
1647 rt,
1649 );
1650 }
1651 }
1652
1653 return true;
1654 }
1655 } else if req_line.method == MESSAGE {
1656 platform_log(LOG_TAG, "is MESSAGE request");
1657
1658 if let Some(content_type_header) =
1659 header::search(req_headers, b"Content-Type", true)
1660 {
1661 let content_type_header_field =
1662 content_type_header.get_value().as_header_field();
1663 if let Some(content_type) = content_type_header_field.as_content_type() {
1664 let mut is_imdn = false;
1665 let mut cpim_message: Option<CPIMMessage> = None;
1666
1667 if content_type.major_type.equals_bytes(b"message", true)
1668 && content_type.sub_type.equals_bytes(b"cpim", true)
1669 {
1670 match CPIMMessage::try_from(req_body) {
1671 Ok(message) => {
1672 cpim_message = Some(message);
1673 }
1674 Err(e) => {
1675 platform_log(LOG_TAG, format!("CPIM decode error: {}", e));
1676 }
1677 }
1678 }
1679
1680 if let Some(cpim_message) = &cpim_message {
1681 is_imdn = cpim_message.contains_imdn();
1682 }
1683
1684 let mut is_pager_mode_cpm_standalone_message = false;
1685
1686 'check_pager_mode: for header in
1687 HeaderSearch::new(req_headers, b"Accept-Contact", true)
1688 {
1689 let header_field = header.get_value().as_header_field();
1690 for parameter in header_field.get_parameter_iterator() {
1691 if parameter.name == b"+g.3gpp.icsi-ref" {
1692 if let Some(value) = parameter.value {
1693 if let Some(_) = value.index_of(
1694 b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.msg",
1695 ) {
1696 is_pager_mode_cpm_standalone_message = true;
1697 break 'check_pager_mode;
1698 } else if let Some(_) = value.index_of(
1699 b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.deferred",
1700 ) {
1701 is_pager_mode_cpm_standalone_message = true;
1702 break 'check_pager_mode;
1703 } else if is_imdn {
1704 if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.largemsg") {
1705 is_pager_mode_cpm_standalone_message = true;
1706 break 'check_pager_mode;
1707 } else if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session") {
1708 is_pager_mode_cpm_standalone_message = true;
1709 break 'check_pager_mode;
1710 } else if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.filetransfer") {
1711 is_pager_mode_cpm_standalone_message = true;
1712 break 'check_pager_mode;
1713 }
1714 }
1715 }
1716 } else if parameter.name == b"+g.3gpp.iari-ref" {
1717 if let Some(value) = parameter.value {
1718 if let Some(_) = value.index_of(
1719 b"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.fthttp",
1720 ) {
1721 is_pager_mode_cpm_standalone_message = true;
1722 break 'check_pager_mode;
1723 }
1724 }
1725 }
1726 }
1727 }
1728
1729 if is_pager_mode_cpm_standalone_message {
1730 platform_log(LOG_TAG, "is PAGER MODE");
1731
1732 let mut is_chatbot_message = false;
1733
1734 'check_chatbot: for header in
1735 HeaderSearch::new(req_headers, b"Accept-Contact", true)
1736 {
1737 let header_field = header.get_value().as_header_field();
1738 for parameter in header_field.get_parameter_iterator() {
1739 if parameter.name == b"+g.3gpp.iari-ref" {
1740 if let Some(value) = parameter.value {
1741 if let Some(_) = value.index_of(b"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot.sa") {
1742 is_chatbot_message = true;
1743 break 'check_chatbot;
1744 }
1745 }
1746 }
1747 }
1748 }
1749
1750 if is_chatbot_message {
1751 }
1753
1754 if cpim_message.is_none() {
1755 if let Body::Multipart(multipart) = req_body.as_ref() {
1756 platform_log(
1757 LOG_TAG,
1758 "trying to find CPIM in multipart messages",
1759 );
1760 for part in &multipart.parts {
1761 if let Body::Message(message) = part.as_ref() {
1762 if let Some(content_type_header) = header::search(
1763 &message.headers,
1764 b"Content-Type",
1765 false,
1766 ) {
1767 let content_type_header_field = content_type_header
1768 .get_value()
1769 .as_header_field();
1770 if let Some(content_type) =
1771 content_type_header_field.as_content_type()
1772 {
1773 if content_type
1774 .major_type
1775 .equals_bytes(b"message", true)
1776 && content_type
1777 .sub_type
1778 .equals_bytes(b"cpim", true)
1779 {
1780 match CPIMMessage::try_from(req_body) {
1781 Ok(cpim) => {
1782 cpim_message = Some(cpim);
1783 break;
1784 }
1785 Err(e) => {
1786 platform_log(
1787 LOG_TAG,
1788 format!(
1789 "CPIM decode error: {}",
1790 e
1791 ),
1792 );
1793 }
1794 }
1795 }
1796 }
1797 }
1798 }
1799 }
1800 }
1801 }
1802
1803 if let Some(cpim_message) = cpim_message {
1804 if let Ok(cpim_info) = cpim_message.get_info() {
1805 if !is_imdn && is_chatbot_message {
1806 if let None = cpim_info.get_bot_related_info() {
1807 if let Some(resp_message) =
1808 server_transaction::make_response(
1809 message,
1810 transaction.to_tag(),
1811 606,
1812 b"Not Acceptable",
1813 )
1814 {
1815 if let Some((tx, _)) = channels.take() {
1816 server_transaction::send_response(
1817 Arc::clone(transaction),
1818 resp_message,
1819 tx,
1820 rt,
1822 );
1823 }
1824 }
1825
1826 return true;
1827 }
1828 }
1829 }
1830
1831 let mut contact_uri: Option<Vec<u8>> = None;
1832
1833 if let Some(p_asserted_identity_header) =
1834 header::search(req_headers, b"P-Asserted-Identity", true)
1835 {
1836 if let Some(asserted_identity) = p_asserted_identity_header
1837 .get_value()
1838 .as_name_addresses()
1839 .first()
1840 {
1841 if let Some(uri_part) = &asserted_identity.uri_part {
1842 let data_size = uri_part.estimated_size();
1843 let mut data = Vec::with_capacity(data_size);
1844 {
1845 let mut readers = Vec::new();
1846 uri_part.get_readers(&mut readers);
1847 match DynamicChain::new(readers)
1848 .read_to_end(&mut data)
1849 {
1850 Ok(_) => {}
1851 Err(_) => {} }
1853 }
1854
1855 contact_uri.replace(data);
1856 }
1857 }
1858 }
1859
1860 if contact_uri.is_none() {
1861 if let Ok(message_info) = cpim_message.get_info() {
1862 if let Some(uri) = message_info.from_uri {
1863 let from_uri = uri
1864 .string_representation_without_query_and_fragment();
1865
1866 if let Some(from_uri) =
1867 from_uri.as_name_addresses().first()
1868 {
1869 if let Some(uri_part) = &from_uri.uri_part {
1870 let data_size = uri_part.estimated_size();
1871 let mut data = Vec::with_capacity(data_size);
1872 {
1873 let mut readers = Vec::new();
1874 uri_part.get_readers(&mut readers);
1875 match DynamicChain::new(readers)
1876 .read_to_end(&mut data)
1877 {
1878 Ok(_) => {}
1879 Err(_) => {} }
1881 }
1882
1883 contact_uri.replace(data);
1884 }
1885 }
1886 }
1887 }
1888 }
1889
1890 if let Some(contact_uri) = contact_uri {
1891 if let Ok(message_info) = cpim_message.get_info() {
1892 if let Some((content_type, body, encoded)) =
1893 cpim_message.get_message_body()
1894 {
1895 let content_type: &[u8] = match content_type {
1896 Some(content_type) => content_type,
1897 None => match message_info.payload_type {
1898 Some(payload_type) => payload_type,
1899 None => b"text/plain",
1900 },
1901 };
1902
1903 if encoded {
1904 if let Ok(decoded_body) =
1905 general_purpose::STANDARD.decode(body)
1906 {
1907 (self.service.message_receive_listener)(
1908 &contact_uri,
1909 &message_info,
1910 content_type,
1911 &decoded_body,
1912 )
1913 }
1914 } else {
1915 (self.service.message_receive_listener)(
1916 &contact_uri,
1917 &message_info,
1918 content_type,
1919 body,
1920 )
1921 }
1922 }
1923
1924 if let Some(resp_message) =
1925 server_transaction::make_response(
1926 message,
1927 transaction.to_tag(),
1928 200,
1929 b"Ok",
1930 )
1931 {
1932 if let Some((tx, _)) = channels.take() {
1933 server_transaction::send_response(
1934 Arc::clone(transaction),
1935 resp_message,
1936 tx,
1937 rt,
1939 );
1940 }
1941 }
1942
1943 return true;
1944 } else {
1945 platform_log(LOG_TAG, "incomplete CPIM information");
1946 }
1947 } else {
1948 platform_log(LOG_TAG, "cannot retrieve contact information")
1949 }
1950 } else {
1951 platform_log(LOG_TAG, "cannot decode CPIM message");
1952 }
1953 }
1954
1955 return false;
1956 }
1957 }
1958
1959 if let Some(resp_message) = server_transaction::make_response(
1960 message,
1961 transaction.to_tag(),
1962 400,
1963 b"Bad Request",
1964 ) {
1965 if let Some((tx, _)) = channels.take() {
1966 server_transaction::send_response(
1967 Arc::clone(transaction),
1968 resp_message,
1969 tx,
1970 rt,
1972 );
1973 }
1974 }
1975
1976 return true;
1977 }
1978 }
1979
1980 false
1981 }
1982}
1983
1984pub fn send_message<F>(
1985 message_type: &str,
1986 message_content: &str,
1987 recipient: &str,
1988 recipient_type: &RecipientType,
1989 recipient_uri: &str,
1990 message_result_callback: F,
1991 core: Arc<SipCore>,
1992 transport: &Arc<SipTransport>,
1993 public_user_identity: &str,
1994 rt: &Arc<Runtime>,
1995) where
1996 F: FnOnce(u16, String) + Send + Sync + 'static,
1997{
1998 let mut req_message = SipMessage::new_request(MESSAGE, recipient_uri.as_bytes());
1999
2000 req_message.add_header(Header::new(
2001 b"Call-ID",
2002 String::from(
2003 Uuid::new_v4()
2004 .as_hyphenated()
2005 .encode_lower(&mut Uuid::encode_buffer()),
2006 ),
2007 ));
2008
2009 req_message.add_header(Header::new(b"CSeq", b"1 MESSAGE"));
2010
2011 let tag = rand::create_raw_alpha_numeric_string(8);
2012 let tag = String::from_utf8_lossy(&tag);
2013
2014 req_message.add_header(Header::new(
2015 b"From",
2016 format!("<{}>;tag={}", public_user_identity, tag),
2017 ));
2018
2019 req_message.add_header(Header::new(b"To", format!("<{}>", recipient_uri)));
2020
2021 req_message.add_header(Header::new(
2022 b"Accept-Contact",
2023 b"*;+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.msg\"",
2024 ));
2025
2026 if message_type.eq_ignore_ascii_case("application/vnd.gsma.rcs-ft-http+xml") {
2027 req_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.fthttp\";require;explicit"));
2028 }
2029
2030 if let RecipientType::Chatbot = recipient_type {
2031 req_message.add_header(Header::new(b"Accept-Contact", b"*;+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot.sa\";+g.gsma.rcs.botversion=\"#=1,#=2\";require;explicit"));
2032 }
2033
2034 req_message.add_header(Header::new(
2035 b"Contribution-ID",
2036 String::from(
2037 Uuid::new_v4()
2038 .as_hyphenated()
2039 .encode_lower(&mut Uuid::encode_buffer()),
2040 ),
2041 ));
2042
2043 req_message.add_header(Header::new(
2044 b"Conversation-ID",
2045 String::from(
2046 Uuid::new_v4()
2047 .as_hyphenated()
2048 .encode_lower(&mut Uuid::encode_buffer()),
2049 ),
2050 ));
2051
2052 if let RecipientType::ResourceList = recipient_type {
2053 req_message.add_header(Header::new(b"Require", b"recipient-list-message"));
2054
2055 req_message.add_header(Header::new(
2056 b"P-Preferred-Service",
2057 b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.msg.group",
2058 ));
2059 } else {
2060 req_message.add_header(Header::new(
2061 b"P-Preferred-Service",
2062 b"urn:urn-7:3gpp-service.ims.icsi.oma.cpm.msg",
2063 ));
2064 }
2065
2066 req_message.add_header(Header::new(
2067 b"P-Preferred-Identity",
2068 String::from(public_user_identity),
2069 ));
2070
2071 let cpim_body = make_cpim_message_content_body(
2072 message_type,
2073 message_content,
2074 &recipient_type,
2075 recipient_uri,
2076 Uuid::new_v4(),
2077 public_user_identity,
2078 );
2079
2080 if let RecipientType::ResourceList = recipient_type {
2081 let boundary = create_raw_alpha_numeric_string(16);
2082 let boundary_ = String::from_utf8_lossy(&boundary);
2083 let mut parts = Vec::with_capacity(2);
2084
2085 let resource_list_body: Vec<u8> = recipient.as_bytes().to_vec();
2086
2087 let mut resource_list_body_headers = Vec::with_capacity(2);
2088
2089 let resource_list_length = resource_list_body.len();
2090
2091 resource_list_body_headers.push(Header::new(
2092 b"Content-Type",
2093 b"application/resource-lists+xml",
2094 ));
2095 resource_list_body_headers.push(Header::new(b"Content-Disposition", b"recipient-list"));
2096 resource_list_body_headers.push(Header::new(
2097 b"Content-Length",
2098 format!("{}", resource_list_length),
2099 ));
2100
2101 let resource_list_body_part = Body::Message(MessageBody {
2102 headers: resource_list_body_headers,
2103 body: Arc::new(Body::Raw(resource_list_body)),
2104 });
2105
2106 let mut cpim_body_headers = Vec::with_capacity(2);
2107
2108 let cpim_body_len = cpim_body.estimated_size();
2109
2110 cpim_body_headers.push(Header::new(b"Content-Type", "message/cpim"));
2111 cpim_body_headers.push(Header::new(b"Content-Length", format!("{}", cpim_body_len)));
2112
2113 let cpim_body_part = Body::Message(MessageBody {
2114 headers: cpim_body_headers,
2115 body: Arc::new(cpim_body),
2116 });
2117
2118 parts.push(Arc::new(resource_list_body_part));
2119 parts.push(Arc::new(cpim_body_part));
2120
2121 req_message.add_header(Header::new(
2122 b"Content-Type",
2123 format!("multipart/mixed; boundary={}", boundary_),
2124 ));
2125
2126 let sip_body = Body::Multipart(MultipartBody { boundary, parts });
2127
2128 let sip_body_len = sip_body.estimated_size();
2129
2130 req_message.set_body(Arc::new(sip_body));
2131
2132 req_message.add_header(Header::new(b"Content-Length", format!("{}", sip_body_len)));
2133 } else {
2134 let sip_body_len = cpim_body.estimated_size();
2135
2136 req_message.set_body(Arc::new(cpim_body));
2137
2138 req_message.add_header(Header::new(b"Content-Type", b"message/cpim"));
2139
2140 req_message.add_header(Header::new(b"Content-Length", format!("{}", sip_body_len)));
2141 };
2142
2143 let (client_transaction_tx, mut client_transaction_rx) = mpsc::channel(1);
2144
2145 rt.spawn(async move {
2146 if let Some((status_code, reason_phrase)) = client_transaction_rx.recv().await {
2147 message_result_callback(status_code, reason_phrase);
2148 }
2149 });
2150
2151 core.get_transaction_manager().send_request(
2152 req_message,
2154 transport,
2155 OutgoingPagerModeContext {
2156 client_transaction_tx,
2157 rt: Arc::clone(&rt),
2158 },
2159 &rt,
2160 );
2161}
2162
2163struct StandaloneLargeModeSendSessionDialogEventReceiver {
2164 sip_session: Arc<SipSession<StandaloneLargeModeSendSession>>,
2165 rt: Arc<Runtime>,
2166}
2167
2168impl SipDialogEventCallbacks for StandaloneLargeModeSendSessionDialogEventReceiver {
2169 fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {}
2170
2171 fn on_new_request(
2172 &self,
2173 transaction: Arc<ServerTransaction>,
2174 tx: mpsc::Sender<ServerTransactionEvent>,
2175 rt: &Arc<Runtime>,
2177 ) -> Option<(u16, bool)> {
2178 let message = transaction.message();
2179
2180 if let SipMessage::Request(l, req_headers, req_body) = message {
2181 if l.method == UPDATE {
2182 if let Some(headers) = req_headers {
2183 if let Some(h) = search(headers, b"Supported", true) {
2184 if h.supports(b"timer") {
2185 if let Ok(Some((timeout, refresher))) =
2186 choose_timeout_for_server_transaction_response(
2187 &transaction,
2188 true,
2189 Refresher::UAC,
2190 )
2191 {
2192 if let Some(mut resp_message) = server_transaction::make_response(
2194 message,
2195 transaction.to_tag(),
2196 200,
2197 b"Ok",
2198 ) {
2199 match refresher {
2200 Refresher::UAC => {
2201 resp_message.add_header(Header::new(
2202 b"Session-Expires",
2203 format!("{};refresher=uac", timeout),
2204 ));
2205
2206 self.sip_session.schedule_refresh(timeout, false, rt);
2207 }
2208 Refresher::UAS => {
2209 resp_message.add_header(Header::new(
2210 b"Session-Expires",
2211 format!("{};refresher=uas", timeout),
2212 ));
2213
2214 self.sip_session.schedule_refresh(timeout, true, rt);
2215 }
2216 }
2217
2218 server_transaction::send_response(
2219 transaction,
2220 resp_message,
2221 tx,
2222 rt,
2223 );
2224 }
2225
2226 return Some((200, false));
2227 }
2228 }
2229 }
2230
2231 if let Some(resp_message) =
2232 server_transaction::make_response(message, transaction.to_tag(), 200, b"Ok")
2233 {
2234 server_transaction::send_response(transaction, resp_message, tx, rt);
2235 }
2236 }
2237
2238 return Some((200, false));
2239 }
2240 }
2241
2242 None
2243 }
2244
2245 fn on_terminating_request(&self, _message: &SipMessage) {
2246 self.sip_session.get_inner().stop(&self.rt);
2247 }
2248
2249 fn on_terminating_response(&self, _message: &SipMessage) {
2250 self.sip_session.get_inner().stop(&self.rt);
2251 }
2252}
2253
2254struct StandaloneLargeModeReceiveSessionDialogEventReceiver {
2255 sip_session: Arc<SipSession<StandaloneLargeModeReceiveSession>>,
2256 message_receive_listener: Arc<dyn Fn(&[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync>,
2257 connect_function: Arc<
2258 dyn Fn(
2259 ClientSocket,
2260 &String,
2261 u16,
2262 bool,
2263 )
2264 -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
2265 + Send
2266 + Sync,
2267 >,
2268 rt: Arc<Runtime>,
2269}
2270
2271impl SipDialogEventCallbacks for StandaloneLargeModeReceiveSessionDialogEventReceiver {
2272 fn on_ack(&self, _transaction: &Arc<ServerTransaction>) {
2273 self.sip_session.get_inner().start(
2274 &self.message_receive_listener,
2275 &self.connect_function,
2276 &self.rt,
2277 );
2278 }
2279
2280 fn on_new_request(
2281 &self,
2282 transaction: Arc<ServerTransaction>,
2283 tx: mpsc::Sender<ServerTransactionEvent>,
2284 rt: &Arc<Runtime>,
2286 ) -> Option<(u16, bool)> {
2287 let message = transaction.message();
2288
2289 if let SipMessage::Request(l, req_headers, req_body) = message {
2290 if l.method == UPDATE {
2291 if let Some(headers) = req_headers {
2292 if let Some(h) = search(headers, b"Supported", true) {
2293 if h.supports(b"timer") {
2294 if let Ok(Some((timeout, refresher))) =
2295 choose_timeout_for_server_transaction_response(
2296 &transaction,
2297 true,
2298 Refresher::UAC,
2299 )
2300 {
2301 if let Some(mut resp_message) = server_transaction::make_response(
2303 message,
2304 transaction.to_tag(),
2305 200,
2306 b"Ok",
2307 ) {
2308 match refresher {
2309 Refresher::UAC => {
2310 resp_message.add_header(Header::new(
2311 b"Session-Expires",
2312 format!("{};refresher=uac", timeout),
2313 ));
2314
2315 self.sip_session.schedule_refresh(timeout, false, rt);
2316 }
2317 Refresher::UAS => {
2318 resp_message.add_header(Header::new(
2319 b"Session-Expires",
2320 format!("{};refresher=uas", timeout),
2321 ));
2322
2323 self.sip_session.schedule_refresh(timeout, true, rt);
2324 }
2325 }
2326
2327 server_transaction::send_response(
2328 transaction,
2329 resp_message,
2330 tx,
2331 rt,
2332 );
2333 }
2334
2335 return Some((200, false));
2336 }
2337 }
2338 }
2339
2340 if let Some(resp_message) =
2341 server_transaction::make_response(message, transaction.to_tag(), 200, b"Ok")
2342 {
2343 server_transaction::send_response(transaction, resp_message, tx, rt);
2344 }
2345 }
2346
2347 return Some((200, false));
2348 }
2349 }
2350
2351 None
2352 }
2353
2354 fn on_terminating_request(&self, message: &SipMessage) {
2355 self.sip_session.get_inner().stop(&self.rt);
2356 }
2357
2358 fn on_terminating_response(&self, message: &SipMessage) {
2359 self.sip_session.get_inner().stop(&self.rt);
2360 }
2361}
2362
2363struct OutgoingPagerModeContext {
2364 client_transaction_tx: mpsc::Sender<(u16, String)>,
2365 rt: Arc<Runtime>,
2366}
2367
2368impl ClientTransactionCallbacks for OutgoingPagerModeContext {
2369 fn on_provisional_response(&self, _message: SipMessage) {}
2370
2371 fn on_final_response(&self, message: SipMessage) {
2372 if let SipMessage::Response(l, _, _) = message {
2373 let status_code = l.status_code;
2374 let reason_phrase = String::from_utf8_lossy(&l.reason_phrase).to_string();
2375 let tx = self.client_transaction_tx.clone();
2376 self.rt.spawn(async move {
2377 match tx.send((status_code, reason_phrase)).await {
2378 Ok(()) => {}
2379 Err(e) => {}
2380 }
2381 });
2382 }
2383 }
2384
2385 fn on_transport_error(&self) {
2386 let tx = self.client_transaction_tx.clone();
2387 self.rt.spawn(async move {
2388 match tx.send((0, String::from("Transport Error"))).await {
2389 Ok(()) => {}
2390 Err(e) => {}
2391 }
2392 });
2393 }
2394}