rust_rcs_client/connectivity/
flow.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    ops::Add,
17    sync::{Arc, Mutex, MutexGuard},
18    time::Duration,
19};
20
21use tokio::{
22    runtime::Runtime,
23    select,
24    sync::oneshot,
25    time::{interval, Instant},
26};
27
28use rust_rcs_core::{
29    ffi::log::platform_log,
30    internet::{name_addr::AsNameAddr, Body, Header},
31    sip::{
32        sip_subscription::{subscribe_request::SubscribeRequest, subscriber::SubscriberEvent},
33        sip_subscription::{subscriber::Subscriber, InitialSubscribeContext, SubscriptionManager},
34        SipCore, SipMessage, SipTransactionManager, SipTransport, SUBSCRIBE,
35    },
36    util::{rand, raw_string::StrEq},
37};
38
39use uuid::Uuid;
40
41use super::{
42    reg_info_xml::{parse_xml, ContactNode, RegistrationNode},
43    registration::Registration,
44};
45
46const LOG_TAG: &str = "librust_rcs_client";
47
48#[derive(Debug)]
49pub struct AddressOfRecord {
50    _is_implicit_address: bool,
51    record_id: String,
52    _public_user_id: String,
53}
54
55#[derive(Debug)]
56pub struct FlowInfo {
57    uri: String,
58    flow_id: String,
59    expires: u32,
60    registerred_addresses: Vec<AddressOfRecord>,
61}
62
63pub struct FlowManager {
64    managed_flow: Arc<
65        Mutex<(
66            Option<(
67                Arc<SipTransport>,
68                String,
69                Arc<Registration>,
70                oneshot::Sender<()>,
71            )>,
72            Option<(String, Arc<Subscriber>)>,
73            Vec<FlowInfo>,
74        )>,
75    >,
76
77    sm: Arc<SubscriptionManager>,
78    tm: Arc<SipTransactionManager>,
79
80    state_callback:
81        Box<dyn Fn(bool, Arc<SipTransport>, Option<String>, Arc<Registration>) + Send + Sync>,
82}
83
84impl FlowManager {
85    pub fn new<CB>(
86        sm: &Arc<SubscriptionManager>,
87        tm: &Arc<SipTransactionManager>,
88        state_callback: CB,
89    ) -> FlowManager
90    where
91        CB: Fn(bool, Arc<SipTransport>, Option<String>, Arc<Registration>) + Send + Sync + 'static,
92    {
93        FlowManager {
94            // managed_reg: Arc::new(Mutex::new(None)),
95            // managed_sub: Arc::new(Mutex::new(None)),
96            managed_flow: Arc::new(Mutex::new((None, None, Vec::new()))),
97
98            sm: Arc::clone(sm),
99            tm: Arc::clone(tm),
100
101            state_callback: Box::new(state_callback),
102        }
103    }
104
105    pub fn observe_registration(
106        &self,
107        transport: &Arc<SipTransport>,
108        transport_address: String,
109        registration: &Arc<Registration>,
110        core: &Arc<SipCore>,
111        rt: &Arc<Runtime>,
112    ) {
113        let (tx, mut rx) = oneshot::channel();
114
115        let mut guard = self.managed_flow.lock().unwrap();
116
117        guard.0 = Some((
118            Arc::clone(transport),
119            transport_address,
120            Arc::clone(registration),
121            tx,
122        ));
123
124        if let Some((_, subscriber)) = &guard.1 {
125            subscriber.stop_subscribing(&self.sm, &self.tm, transport, rt);
126
127            guard.1.take();
128            guard.2.clear();
129        }
130
131        let transport_ = Arc::clone(transport);
132
133        let core_ = Arc::clone(core);
134        let rt_ = Arc::clone(rt);
135
136        rt.spawn(async move {
137            let mut interval = interval(Duration::from_secs(120)); // to-do: make it adjustable
138            let mut ticks = 0;
139
140            loop {
141                select! {
142                    _ = &mut rx => {
143                        break
144                    },
145
146                    _ = interval.tick() => {
147                        if ticks > 0 {
148                            let tm = core_.get_transaction_manager();
149                            tm.send_heartbeat(&transport_, &rt_);
150                        }
151                        ticks += 1;
152                    },
153                };
154            }
155        });
156    }
157
158    pub fn schedule_next_subscribe_directly(
159        &self,
160        transport: &Arc<SipTransport>,
161        core: &Arc<SipCore>,
162        rt: &Arc<Runtime>,
163    ) {
164        let sm = core.get_subscription_manager();
165        let tm = core.get_transaction_manager();
166
167        let guard = self.managed_flow.lock().unwrap();
168
169        match &guard.1 {
170            Some((_, subscriber)) => {
171                subscriber.extend_all_subscriptions(3600, &sm, &tm, transport, rt);
172            }
173            None => {}
174        }
175    }
176
177    pub fn stop_observation(&self, transport: &Arc<SipTransport>) -> Option<Arc<Registration>> {
178        let mut guard = self.managed_flow.lock().unwrap();
179        if let Some((transport_, transport_address, registration, tx)) = guard.0.take() {
180            if Arc::ptr_eq(transport, &transport_) {
181                let _ = tx.send(()); // stop heart-beat
182
183                guard.1.take();
184                guard.2.clear();
185
186                return Some(registration);
187            } else {
188                guard
189                    .0
190                    .replace((transport_, transport_address, registration, tx));
191            }
192        }
193
194        None
195    }
196}
197
198pub enum SubscriptionEvent {
199    ExpirationRefreshed(Instant),
200}
201
202pub fn on_registration_authenticated<CB>(
203    flow_manager: &Arc<FlowManager>,
204    impu: String,
205    transport: &Arc<SipTransport>,
206    _transport_address: String,
207    registration: &Arc<Registration>,
208    core: &Arc<SipCore>,
209    rt: &Arc<Runtime>,
210    state_callback: CB,
211) where
212    CB: Fn(SubscriptionEvent) + Send + Sync + 'static,
213{
214    platform_log(LOG_TAG, "on_registration_authenticated()");
215
216    let mut guard = flow_manager.managed_flow.lock().unwrap();
217
218    match &guard.0 {
219        Some((_, _, registration_, _)) => {
220            if !Arc::ptr_eq(registration, registration_) {
221                return;
222            }
223        }
224
225        None => {}
226    }
227
228    let flow_manager_ = Arc::clone(&flow_manager);
229
230    let impu_ = impu.clone();
231
232    let transport_ = Arc::clone(transport);
233    let registration_ = Arc::clone(registration);
234
235    let subscriber = Subscriber::new(
236        "Registrar",
237        // only single source dialog allowed for reg-info subscription
238        move |event| match event {
239            SubscriberEvent::ReceivedNotify(_, content_type, body) => {
240                platform_log(LOG_TAG, "on NOTIFY content");
241                if content_type.equals_bytes(b"application/reginfo+xml", true) {
242                    if let Body::Raw(r) = body.as_ref() {
243                        if let Some(reg_info) = parse_xml(r) {
244                            platform_log(LOG_TAG, "reginfo+xml parsed successfully");
245                            let mut guard = flow_manager_.managed_flow.lock().unwrap();
246
247                            for registration_node in &reg_info.registration_nodes {
248                                for contact_node in &registration_node.contact_nodes {
249                                    if &contact_node.state == "terminated"
250                                        && (&contact_node.event == "unregistered"
251                                            || &contact_node.event == "rejected"
252                                            || &contact_node.event == "deactivated")
253                                    {
254                                        remove_aor_within_flow(
255                                            contact_node,
256                                            registration_node,
257                                            &mut guard,
258                                        );
259                                    } else if &contact_node.state == "active" {
260                                        if &contact_node.event == "created"
261                                            || &contact_node.event == "registered"
262                                        {
263                                            create_aor_within_flow(
264                                                contact_node,
265                                                registration_node,
266                                                &mut guard,
267                                            );
268                                        } else if &contact_node.event == "shortened"
269                                            || &contact_node.event == "refreshed"
270                                        {
271                                            match update_aor_within_flow(contact_node, &mut guard) {
272                                                Ok(()) => {}
273
274                                                Err(()) => {
275                                                    guard.1.take();
276                                                    guard.2.clear();
277
278                                                    if let Some((transport, _, registration, _)) =
279                                                        &guard.0
280                                                    {
281                                                        (flow_manager_.state_callback)(
282                                                            false,
283                                                            Arc::clone(transport),
284                                                            None,
285                                                            Arc::clone(registration),
286                                                        );
287                                                    }
288
289                                                    return;
290                                                }
291                                            }
292                                        }
293                                    }
294                                }
295                            }
296
297                            match &guard.0 {
298                                Some((_, t_addr, _, _)) => {
299                                    match &guard.1 {
300                                        Some((impu, _)) => {
301                                            let uri = if let Some(idx) = impu.find('@') {
302                                                // we should really be more specific about the kind of IMPU that we use to identify our selves
303                                                format!("{}@{}", &impu[0..idx], t_addr)
304                                            } else {
305                                                format!("{}@{}", impu, t_addr)
306                                            };
307
308                                            platform_log(
309                                                LOG_TAG,
310                                                format!("registration is bound at {}", &uri),
311                                            );
312
313                                            for flow in &*guard.2 {
314                                                platform_log(
315                                                    LOG_TAG,
316                                                    format!("active flow {:?}", flow),
317                                                );
318
319                                                let mut is_current_registry = false;
320
321                                                if uri.eq_ignore_ascii_case(&flow.uri) {
322                                                    is_current_registry = true;
323                                                }
324
325                                                if !is_current_registry {
326                                                    if let Some(flow_uri) = flow.uri.as_bytes().as_name_addresses().first()
327                                                    {
328                                                        if let Some(uri_part) = &flow_uri.uri_part {
329
330                                                            if uri
331                                                                .as_bytes()
332                                                                .equals_bytes(uri_part.uri, true)
333                                                            {
334                                                                is_current_registry = true;
335                                                            }
336                                                        }
337                                                    }
338                                                }
339
340                                                if is_current_registry {
341                                                    let public_user_identity = impu.clone();
342                                                    (flow_manager_.state_callback)(
343                                                        true,
344                                                        Arc::clone(&transport_),
345                                                        Some(public_user_identity),
346                                                        Arc::clone(&registration_),
347                                                    );
348
349                                                    let expiration = Instant::now().add(
350                                                        Duration::from_secs(
351                                                            flow.expires as u64,
352                                                        ),
353                                                    );
354
355                                                    state_callback(SubscriptionEvent::ExpirationRefreshed(expiration));
356
357                                                    return;
358                                                }
359
360                                            }
361                                        }
362                                        None => {}
363                                    }
364                                }
365                                None => {}
366                            }
367                        }
368                    }
369                }
370
371                let mut guard = flow_manager_.managed_flow.lock().unwrap();
372
373                guard.1.take();
374                guard.2.clear();
375
376                if let Some((transport, _, registration, _)) = &guard.0 {
377                    (flow_manager_.state_callback)(
378                        false,
379                        Arc::clone(transport),
380                        None,
381                        Arc::clone(registration),
382                    );
383                }
384            }
385            SubscriberEvent::NearExpiration(_) => {
386                todo!()
387            }
388            SubscriberEvent::Terminated(_, _, _) | // only one subscription is allowed so terminating one means terminating all
389            SubscriberEvent::SubscribeFailed(_) => {
390                let mut guard = flow_manager_.managed_flow.lock().unwrap();
391
392                guard.1.take();
393                guard.2.clear();
394
395                if let Some((transport, _, registration, _)) = &guard.0 {
396                    (flow_manager_.state_callback)(
397                        false,
398                        Arc::clone(transport),
399                        None,
400                        Arc::clone(registration),
401                    );
402                }
403            }
404        },
405        move |dialog, expiration| -> Result<SipMessage, ()> {
406            let request_message = match dialog {
407                Some(dialog) => match dialog.make_request(SUBSCRIBE, None) {
408                    Ok(message) => Ok(message),
409                    Err(e) => {
410                        platform_log(LOG_TAG, format!("sip in-dialog message build error {}", e));
411                        Err(())
412                    }
413                },
414
415                None => {
416                    let mut message = SipMessage::new_request(SUBSCRIBE, impu_.as_bytes());
417
418                    message.add_header(Header::new(
419                        b"Call-ID",
420                        String::from(
421                            Uuid::new_v4()
422                                .as_hyphenated()
423                                .encode_lower(&mut Uuid::encode_buffer()),
424                        ),
425                    ));
426
427                    message.add_header(Header::new(b"CSeq", b"1 SUBSCRIBE"));
428
429                    let tag = rand::create_raw_alpha_numeric_string(8);
430                    let tag = String::from_utf8_lossy(&tag);
431
432                    message.add_header(Header::new(b"From", format!("<{}>;tag={}", impu_, tag)));
433
434                    message.add_header(Header::new(b"To", format!("<{}>", impu_)));
435
436                    message.add_header(Header::new(b"Contact", format!("<{}>", impu_))); // to-do: is transport_address neccessary? also check NOTIFY 200 Ok
437
438                    message.add_header(Header::new(b"Expires", expiration.to_string()));
439
440                    message.add_header(Header::new(b"Event", b"reg"));
441
442                    message.add_header(Header::new(b"Accept", b"application/reginfo+xml"));
443
444                    Ok(message)
445                }
446            };
447
448            request_message
449        },
450    );
451
452    let subscriber = Arc::new(subscriber);
453    let subscriber_ = Arc::clone(&subscriber);
454
455    if let Ok(request_message) = subscriber.build_request(None, 3600) {
456        if let Ok(subscribe_request) = SubscribeRequest::new(&request_message, subscriber) {
457            let subscribe_request = Arc::new(subscribe_request);
458
459            core.get_subscription_manager()
460                .register_request(Arc::clone(&subscribe_request), rt);
461
462            // let ongoing_dialogs = core.get_ongoing_dialogs();
463            let core_ = Arc::clone(&core);
464            let rt_ = Arc::clone(rt);
465
466            core.get_transaction_manager().send_request(
467                request_message,
468                &transport,
469                InitialSubscribeContext::new(subscribe_request, core_, rt_),
470                rt,
471            )
472        }
473    }
474
475    guard.1 = Some((impu, subscriber_));
476}
477
478fn create_aor(
479    registerred_addresses: &mut Vec<AddressOfRecord>,
480    registration_node: &RegistrationNode,
481    is_implicit_address: bool,
482) {
483    for aor in &*registerred_addresses {
484        if aor.record_id == registration_node.id {
485            return;
486        }
487    }
488
489    registerred_addresses.push(AddressOfRecord {
490        _is_implicit_address: is_implicit_address,
491        record_id: registration_node.id.clone(),
492        _public_user_id: registration_node.aor.clone(),
493    });
494}
495
496fn create_aor_within_flow(
497    contact_node: &ContactNode,
498    registration_node: &RegistrationNode,
499    guard: &mut MutexGuard<(
500        Option<(
501            Arc<SipTransport>,
502            String,
503            Arc<Registration>,
504            oneshot::Sender<()>,
505        )>,
506        Option<(String, Arc<Subscriber>)>,
507        Vec<FlowInfo>,
508    )>,
509) {
510    platform_log(LOG_TAG, "create_aor_within_flow");
511
512    let is_implicit_address = if &contact_node.event == "created" {
513        true
514    } else {
515        false
516    };
517
518    for flow in &mut guard.2 {
519        if flow.flow_id == contact_node.id {
520            create_aor(
521                &mut flow.registerred_addresses,
522                registration_node,
523                is_implicit_address,
524            );
525            return;
526        }
527    }
528
529    let mut registerred_addresses = Vec::new();
530
531    create_aor(
532        &mut registerred_addresses,
533        registration_node,
534        is_implicit_address,
535    );
536
537    guard.2.push(FlowInfo {
538        uri: contact_node.uri.clone(),
539        flow_id: contact_node.id.clone(),
540        expires: match &contact_node.expires {
541            Some(expires) => match u32::from_str_radix(expires, 10) {
542                Ok(expires) => expires,
543                _ => 0,
544            },
545            None => 0,
546        },
547        registerred_addresses,
548    });
549}
550
551fn update_aor_within_flow(
552    contact_node: &ContactNode,
553    guard: &mut MutexGuard<(
554        Option<(
555            Arc<SipTransport>,
556            String,
557            Arc<Registration>,
558            oneshot::Sender<()>,
559        )>,
560        Option<(String, Arc<Subscriber>)>,
561        Vec<FlowInfo>,
562    )>,
563) -> Result<(), ()> {
564    platform_log(LOG_TAG, "update_aor_within_flow");
565
566    for flow in &mut guard.2 {
567        if flow.flow_id == contact_node.id {
568            flow.expires = match &contact_node.expires {
569                Some(expires) => match u32::from_str_radix(expires, 10) {
570                    Ok(expires) => expires,
571                    _ => 0,
572                },
573                None => 0,
574            };
575            return Ok(());
576        }
577    }
578
579    Err(())
580}
581
582fn remove_aor_within_flow(
583    contact_node: &ContactNode,
584    registration_node: &RegistrationNode,
585    guard: &mut MutexGuard<(
586        Option<(
587            Arc<SipTransport>,
588            String,
589            Arc<Registration>,
590            oneshot::Sender<()>,
591        )>,
592        Option<(String, Arc<Subscriber>)>,
593        Vec<FlowInfo>,
594    )>,
595) {
596    platform_log(LOG_TAG, "remove_aor_within_flow");
597
598    let mut i = 0;
599    for flow in &mut guard.2 {
600        if flow.flow_id == contact_node.id {
601            let mut j = 0;
602            for aor in &mut flow.registerred_addresses {
603                if aor.record_id == registration_node.id {
604                    flow.registerred_addresses.swap_remove(j);
605                    break;
606                }
607                j += 1;
608            }
609            if flow.registerred_addresses.is_empty() {
610                guard.2.swap_remove(i);
611            }
612            break;
613        }
614        i += 1;
615    }
616}