rust_rcs_client/messaging/cpm/
session_invitation.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15extern crate rust_rcs_core;
16extern crate rust_strict_sdp;
17
18use core::panic;
19use std::{
20    pin::Pin,
21    sync::{Arc, Mutex},
22};
23
24use futures::Future;
25use tokio::{runtime::Runtime, sync::mpsc};
26
27use rust_rcs_core::{
28    cpim::CPIMInfo,
29    internet::{Body, Header},
30    io::network::stream::{ClientSocket, ClientStream},
31    msrp::info::{
32        msrp_info_reader::AsMsrpInfo, MsrpDirection, MsrpInfo, MsrpInterfaceType, MsrpSetupMethod,
33    },
34    sip::{
35        sip_core::SipDialogCache,
36        sip_session::{
37            choose_timeout_for_server_transaction_response, Refresher, SipSession, SipSessionEvent,
38            SipSessionEventReceiver,
39        },
40        sip_transaction::{client_transaction::ClientTransactionNilCallbacks, server_transaction},
41        ServerTransaction, ServerTransactionEvent, SipDialog, SipMessage, SipTransactionManager,
42        SipTransport,
43    },
44    util::rand::create_raw_alpha_numeric_string,
45};
46
47use rust_strict_sdp::AsSDP;
48
49use crate::contact::ContactKnownIdentities;
50
51use super::{
52    session::{CPMSession, CPMSessionDialogEventReceiver, CPMSessionInfo, UpdateMessageCallbacks},
53    sip::cpm_contact::CPMServiceType,
54};
55
56pub enum CPMSessionInvitationResponse {
57    Accept,
58    Dispose,
59}
60
61pub struct CPMSessionInvitation {
62    session_info: CPMSessionInfo,
63    session_sdp: Arc<Body>,
64
65    server_transaction: Arc<ServerTransaction>,
66
67    tx: mpsc::Sender<ServerTransactionEvent>,
68    rx: mpsc::Receiver<ServerTransactionEvent>,
69
70    dialog: Arc<SipDialog>,
71}
72
73impl CPMSessionInvitation {
74    pub fn new(
75        session_info: CPMSessionInfo,
76        session_sdp: Arc<Body>,
77        server_transaction: Arc<ServerTransaction>,
78        tx: mpsc::Sender<ServerTransactionEvent>,
79        rx: mpsc::Receiver<ServerTransactionEvent>,
80        dialog: Arc<SipDialog>,
81    ) -> CPMSessionInvitation {
82        CPMSessionInvitation {
83            session_info,
84            session_sdp,
85
86            server_transaction,
87            tx,
88            rx,
89
90            dialog,
91        }
92    }
93}
94
95pub fn try_accept_hanging_invitation(
96    invitation: CPMSessionInvitation,
97    msrp_socket_allocator_function: &Arc<
98        dyn Fn(
99                Option<&MsrpInfo>,
100            )
101                -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
102            + Send
103            + Sync,
104    >,
105    msrp_socket_connect_function: &Arc<
106        dyn Fn(
107                ClientSocket,
108                &String,
109                u16,
110                bool,
111            )
112                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
113            + Send
114            + Sync,
115    >,
116    message_receive_listener: &Arc<
117        dyn Fn(&Arc<SipSession<CPMSession>>, &[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync,
118    >,
119    registered_public_identity: &Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
120    ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
121    ongoing_sessions: &Arc<Mutex<Vec<(ContactKnownIdentities, Arc<SipSession<CPMSession>>)>>>,
122    tm: &Arc<SipTransactionManager>,
123    rt: &Arc<Runtime>,
124) {
125    let session_info = invitation.session_info;
126    let session_sdp = invitation.session_sdp;
127
128    let transaction = invitation.server_transaction;
129
130    let message = transaction.message();
131
132    let tx = invitation.tx;
133    let rx = invitation.rx;
134
135    let dialog = Some(invitation.dialog);
136
137    if let Body::Raw(body) = session_sdp.as_ref() {
138        if let Some(sdp) = body.as_sdp() {
139            if let Some(msrp_info) = sdp.as_msrp_info() {
140                try_accept_invitation(
141                    session_info,
142                    tx,
143                    rx,
144                    dialog,
145                    &session_sdp,
146                    msrp_info,
147                    msrp_socket_allocator_function,
148                    msrp_socket_connect_function,
149                    message_receive_listener,
150                    &transaction,
151                    message,
152                    registered_public_identity,
153                    ongoing_dialogs,
154                    ongoing_sessions,
155                    tm,
156                    rt,
157                );
158
159                return;
160            }
161        }
162    }
163
164    if let Some(resp_message) = server_transaction::make_response(
165        message,
166        transaction.to_tag(),
167        500,
168        b"Server Internal Error",
169    ) {
170        server_transaction::send_response(
171            transaction,
172            resp_message,
173            tx,
174            // &timer,
175            rt,
176        );
177    }
178}
179
180pub fn try_accept_invitation<'a, 'b>(
181    session_info: CPMSessionInfo,
182    tx: mpsc::Sender<ServerTransactionEvent>,
183    mut rx: mpsc::Receiver<ServerTransactionEvent>,
184    dialog: Option<Arc<SipDialog>>,
185    r_sdp: &'a Arc<Body>,
186    msrp_info: MsrpInfo<'a>,
187    msrp_socket_allocator_function: &Arc<
188        dyn Fn(
189                Option<&MsrpInfo>,
190            )
191                -> Result<(ClientSocket, String, u16, bool, bool, bool), (u16, &'static str)>
192            + Send
193            + Sync,
194    >,
195    msrp_socket_connect_function: &Arc<
196        dyn Fn(
197                ClientSocket,
198                &String,
199                u16,
200                bool,
201            )
202                -> Pin<Box<dyn Future<Output = Result<ClientStream, (u16, &'static str)>> + Send>>
203            + Send
204            + Sync,
205    >,
206    message_receive_listener: &Arc<
207        dyn Fn(&Arc<SipSession<CPMSession>>, &[u8], &CPIMInfo, &[u8], &[u8]) + Send + Sync,
208    >,
209    transaction: &'b Arc<ServerTransaction>,
210    message: &'b SipMessage,
211    registered_public_identity: &Arc<Mutex<Option<(Arc<SipTransport>, String, String)>>>,
212    ongoing_dialogs: &Arc<Mutex<Vec<Arc<SipDialog>>>>,
213    ongoing_sessions: &Arc<Mutex<Vec<(ContactKnownIdentities, Arc<SipSession<CPMSession>>)>>>,
214    tm: &Arc<SipTransactionManager>,
215    rt: &Arc<Runtime>,
216) {
217    match (msrp_socket_allocator_function)(Some(&msrp_info)) {
218        Ok((cs, host, port, tls, active_setup, ipv6)) => {
219            if let Ok(raddr) = std::str::from_utf8(msrp_info.address) {
220                let raddr = String::from(raddr);
221                let rport = msrp_info.port;
222                let rpath = msrp_info.path.to_vec();
223
224                let path_random = create_raw_alpha_numeric_string(16);
225                let path_random = std::str::from_utf8(&path_random).unwrap();
226                let path = if tls {
227                    format!("msrps://{}:{}/{};tcp", &host, port, path_random)
228                } else {
229                    format!("msrp://{}:{}/{};tcp", &host, port, path_random)
230                };
231
232                let path = path.into_bytes();
233
234                let l_msrp_info: MsrpInfo = MsrpInfo { protocol: if tls {
235                    b"TCP/TLS/MSRP"
236                } else {
237                    b"TCP/MSRP"
238                }, address: host.as_bytes(), interface_type: if ipv6 {
239                    MsrpInterfaceType::IPv6
240                } else {
241                    MsrpInterfaceType::IPv4
242                }, port, path: &path, inactive: false, direction: MsrpDirection::SendReceive, accept_types: match session_info.cpm_contact.service_type {
243                    CPMServiceType::OneToOne => {
244                        b"message/cpim application/im-iscomposing+xm"
245                    },
246                    CPMServiceType::Group => {
247                        b"message/cpim application/conference-info+xml"
248                    },
249                    CPMServiceType::Chatbot => {
250                        b"message/cpim"
251                    },
252                    CPMServiceType::System => {
253                        b"message/cpim"
254                    },
255                }, setup_method: if active_setup {
256                    MsrpSetupMethod::Active
257                } else {
258                    MsrpSetupMethod::Passive
259                }, accept_wrapped_types: match session_info.cpm_contact.service_type {
260                    CPMServiceType::OneToOne => {
261                        Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml")
262                    },
263                    CPMServiceType::Group => {
264                        Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml application/im-iscomposing+xml")
265                    },
266                    CPMServiceType::Chatbot => {
267                        Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml application/vnd.gsma.botmessage.v1.0+json application/vnd.gsma.botsuggestion.v1.0+json application/commontemplate+xml")
268                    },
269                    CPMServiceType::System => {
270                        Some(b"multipart/mixed text/plain application/vnd.gsma.rcs-ft-http+xml application/vnd.gsma.rcspushlocation+xml message/imdn+xml")
271                    },
272                }, file_info: None };
273
274                let l_sdp = String::from(l_msrp_info);
275                let l_sdp = l_sdp.into_bytes();
276                let l_sdp = Body::Raw(l_sdp);
277                let l_sdp = Arc::new(l_sdp);
278
279                let known_identities = session_info.get_contact_known_identities();
280
281                let contact_uri = session_info.asserted_contact_uri.as_bytes().to_vec();
282                let session = CPMSession::new(session_info);
283                let session = Arc::new(session);
284
285                let r_sdp = Arc::clone(r_sdp);
286                if let Ok(_) = session.set_remote_sdp(r_sdp, raddr, rport, rpath) {
287                    if let Ok(_) = session.set_local_sdp(l_sdp, cs, tls, host, port, path) {
288                        if let Some(mut resp_message) = server_transaction::make_response(
289                            message,
290                            transaction.to_tag(),
291                            200,
292                            b"OK",
293                        ) {
294                            let public_user_identity = {
295                                let guard = registered_public_identity.lock().unwrap();
296
297                                if let Some((transport, contact_identity, instance_id)) = &*guard {
298                                    let transport_ = transaction.transport();
299                                    if Arc::ptr_eq(transport, transport_) {
300                                        resp_message.add_header(Header::new(
301                                            b"Contact",
302                                            match session.get_session_info().cpm_contact.service_type {
303                                                CPMServiceType::Chatbot => {
304                                                    format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session\";+g.3gpp.iari-ref=\"urn%3Aurn-7%3A3gpp-application.ims.iari.rcs.chatbot\";+g.gsma.rcs.botversion=\"#=1,#=2\"", contact_identity, instance_id)
305                                                },
306                                                _ => {
307                                                    format!("<{}>;+sip.instance=\"{}\";+g.3gpp.icsi-ref=\"urn%3Aurn-7%3A3gpp-service.ims.icsi.oma.cpm.session\"", contact_identity, instance_id)
308                                                },
309                                            },
310                                        ));
311
312                                        String::from(contact_identity)
313                                    } else {
314                                        panic!("")
315                                    }
316                                } else {
317                                    panic!("")
318                                }
319                            };
320
321                            let dialog = if let Some(dialog) = dialog {
322                                dialog
323                            } else {
324                                let (d_tx, mut d_rx) = tokio::sync::mpsc::channel(1);
325
326                                let ongoing_dialogs_ = Arc::clone(ongoing_dialogs);
327
328                                rt.spawn(async move {
329                                    if let Some(dialog) = d_rx.recv().await {
330                                        ongoing_dialogs_.remove_dialog(&dialog);
331                                    }
332                                });
333
334                                if let Ok(dialog) =
335                                    SipDialog::try_new_as_uas(message, &resp_message, move |d| {
336                                        match d_tx.blocking_send(d) {
337                                            Ok(()) => {}
338                                            Err(e) => {}
339                                        }
340                                    })
341                                {
342                                    let dialog = Arc::new(dialog);
343
344                                    ongoing_dialogs.add_dialog(&dialog);
345
346                                    dialog
347                                } else {
348                                    panic!("")
349                                }
350                            };
351
352                            let (sess_tx, mut sess_rx) = mpsc::channel::<SipSessionEvent>(8);
353
354                            let sip_session = SipSession::new(
355                                &session,
356                                SipSessionEventReceiver {
357                                    tx: sess_tx,
358                                    rt: Arc::clone(rt),
359                                },
360                            );
361
362                            let sip_session = Arc::new(sip_session);
363                            let sip_session_ = Arc::clone(&sip_session);
364
365                            let message_receive_listener = Arc::clone(message_receive_listener);
366
367                            sip_session.setup_confirmed_dialog(
368                                &dialog,
369                                CPMSessionDialogEventReceiver {
370                                    public_user_identity,
371                                    sip_session: Arc::clone(&sip_session),
372                                    ongoing_sessions: Arc::clone(ongoing_sessions),
373                                    message_receive_listener: Arc::new(
374                                        move |cpim_info, content_type, content_body| {
375                                            message_receive_listener(
376                                                &sip_session_,
377                                                &contact_uri,
378                                                cpim_info,
379                                                content_type,
380                                                content_body,
381                                            )
382                                        },
383                                    ),
384                                    msrp_socket_connect_function: Arc::clone(
385                                        msrp_socket_connect_function,
386                                    ),
387                                    rt: Arc::clone(rt),
388                                },
389                            );
390
391                            match choose_timeout_for_server_transaction_response(
392                                transaction,
393                                true,
394                                Refresher::UAS,
395                            ) {
396                                Ok(Some((timeout, refresher))) => match refresher {
397                                    Refresher::UAC => {
398                                        resp_message.add_header(Header::new(
399                                            b"Session-Expires",
400                                            format!("{};refresher=uac", timeout),
401                                        ));
402
403                                        sip_session.schedule_refresh(timeout, false, rt);
404                                    }
405                                    Refresher::UAS => {
406                                        resp_message.add_header(Header::new(
407                                            b"Session-Expires",
408                                            format!("{};refresher=uas", timeout),
409                                        ));
410
411                                        sip_session.schedule_refresh(timeout, true, rt);
412                                    }
413                                },
414
415                                Ok(None) => {}
416
417                                Err((error_code, _error_phrase, min_se)) => {
418                                    if error_code == 422 {
419                                        // to-do: provide a larger expire value
420                                    }
421                                }
422                            }
423
424                            let sip_session_ = Arc::clone(&sip_session);
425
426                            let registered_public_identity = Arc::clone(registered_public_identity);
427
428                            let tm = Arc::clone(tm);
429                            let rt_ = Arc::clone(rt);
430
431                            rt.spawn(async move {
432                                while let Some(ev) = sess_rx.recv().await {
433                                    match ev {
434                                        SipSessionEvent::ShouldRefresh(dialog) => {
435                                            if let Ok(mut message) =
436                                                dialog.make_request(b"UPDATE", None)
437                                            {
438                                                message.add_header(Header::new(
439                                                    b"Supported",
440                                                    b"timer",
441                                                ));
442
443                                                let guard =
444                                                    registered_public_identity.lock().unwrap();
445
446                                                if let Some((transport, _, _)) = &*guard {
447                                                    tm.send_request(
448                                                        message,
449                                                        transport,
450                                                        UpdateMessageCallbacks {
451                                                            // session_expires: None,
452                                                            dialog,
453                                                            sip_session: Arc::clone(&sip_session_),
454                                                            rt: Arc::clone(&rt_),
455                                                        },
456                                                        &rt_,
457                                                    );
458                                                }
459                                            }
460                                        }
461
462                                        SipSessionEvent::Expired(dialog) => {
463                                            if let Ok(message) = dialog.make_request(b"BYE", None) {
464                                                let guard =
465                                                    registered_public_identity.lock().unwrap();
466
467                                                if let Some((transport, _, _)) = &*guard {
468                                                    tm.send_request(
469                                                        message,
470                                                        transport,
471                                                        ClientTransactionNilCallbacks {},
472                                                        &rt_,
473                                                    );
474                                                }
475                                            }
476                                        }
477
478                                        _ => {}
479                                    }
480                                }
481                            });
482
483                            ongoing_sessions
484                                .lock()
485                                .unwrap()
486                                .push((known_identities, sip_session));
487
488                            // let msrp_socket_connect_function =
489                            //     Arc::clone(msrp_socket_connect_function);
490                            // let rt_ = Arc::clone(rt);
491                            rt.spawn(async move {
492                                while let Some(ev) = rx.recv().await {
493                                    match ev {
494                                        ServerTransactionEvent::Acked => {
495                                            // session.on_ack(&msrp_socket_connect_function, &rt_)
496                                        }
497                                        _ => {}
498                                    }
499                                }
500                            });
501
502                            server_transaction::send_response(
503                                Arc::clone(transaction),
504                                resp_message,
505                                tx,
506                                // &timer,
507                                rt,
508                            );
509                        }
510
511                        return;
512                    }
513                }
514            }
515
516            if let Some(resp_message) = server_transaction::make_response(
517                message,
518                transaction.to_tag(),
519                400,
520                b"Bad Request",
521            ) {
522                server_transaction::send_response(
523                    Arc::clone(transaction),
524                    resp_message,
525                    tx,
526                    // &timer,
527                    rt,
528                );
529            }
530        }
531
532        Err((error_code, error_phrase)) => {
533            if let Some(resp_message) = server_transaction::make_response(
534                message,
535                transaction.to_tag(),
536                error_code,
537                error_phrase.as_bytes(),
538            ) {
539                server_transaction::send_response(
540                    Arc::clone(transaction),
541                    resp_message,
542                    tx,
543                    // &timer,
544                    rt,
545                );
546            }
547        }
548    }
549}