1use std::{
16 ops::Add,
17 sync::{Arc, Mutex, MutexGuard},
18 time::Duration,
19};
20
21use tokio::{
22 runtime::Runtime,
23 select,
24 sync::oneshot,
25 time::{interval, Instant},
26};
27
28use rust_rcs_core::{
29 ffi::log::platform_log,
30 internet::{name_addr::AsNameAddr, Body, Header},
31 sip::{
32 sip_subscription::{subscribe_request::SubscribeRequest, subscriber::SubscriberEvent},
33 sip_subscription::{subscriber::Subscriber, InitialSubscribeContext, SubscriptionManager},
34 SipCore, SipMessage, SipTransactionManager, SipTransport, SUBSCRIBE,
35 },
36 util::{rand, raw_string::StrEq},
37};
38
39use uuid::Uuid;
40
41use super::{
42 reg_info_xml::{parse_xml, ContactNode, RegistrationNode},
43 registration::Registration,
44};
45
46const LOG_TAG: &str = "librust_rcs_client";
47
48#[derive(Debug)]
49pub struct AddressOfRecord {
50 _is_implicit_address: bool,
51 record_id: String,
52 _public_user_id: String,
53}
54
55#[derive(Debug)]
56pub struct FlowInfo {
57 uri: String,
58 flow_id: String,
59 expires: u32,
60 registerred_addresses: Vec<AddressOfRecord>,
61}
62
63pub struct FlowManager {
64 managed_flow: Arc<
65 Mutex<(
66 Option<(
67 Arc<SipTransport>,
68 String,
69 Arc<Registration>,
70 oneshot::Sender<()>,
71 )>,
72 Option<(String, Arc<Subscriber>)>,
73 Vec<FlowInfo>,
74 )>,
75 >,
76
77 sm: Arc<SubscriptionManager>,
78 tm: Arc<SipTransactionManager>,
79
80 state_callback:
81 Box<dyn Fn(bool, Arc<SipTransport>, Option<String>, Arc<Registration>) + Send + Sync>,
82}
83
84impl FlowManager {
85 pub fn new<CB>(
86 sm: &Arc<SubscriptionManager>,
87 tm: &Arc<SipTransactionManager>,
88 state_callback: CB,
89 ) -> FlowManager
90 where
91 CB: Fn(bool, Arc<SipTransport>, Option<String>, Arc<Registration>) + Send + Sync + 'static,
92 {
93 FlowManager {
94 managed_flow: Arc::new(Mutex::new((None, None, Vec::new()))),
97
98 sm: Arc::clone(sm),
99 tm: Arc::clone(tm),
100
101 state_callback: Box::new(state_callback),
102 }
103 }
104
105 pub fn observe_registration(
106 &self,
107 transport: &Arc<SipTransport>,
108 transport_address: String,
109 registration: &Arc<Registration>,
110 core: &Arc<SipCore>,
111 rt: &Arc<Runtime>,
112 ) {
113 let (tx, mut rx) = oneshot::channel();
114
115 let mut guard = self.managed_flow.lock().unwrap();
116
117 guard.0 = Some((
118 Arc::clone(transport),
119 transport_address,
120 Arc::clone(registration),
121 tx,
122 ));
123
124 if let Some((_, subscriber)) = &guard.1 {
125 subscriber.stop_subscribing(&self.sm, &self.tm, transport, rt);
126
127 guard.1.take();
128 guard.2.clear();
129 }
130
131 let transport_ = Arc::clone(transport);
132
133 let core_ = Arc::clone(core);
134 let rt_ = Arc::clone(rt);
135
136 rt.spawn(async move {
137 let mut interval = interval(Duration::from_secs(120)); let mut ticks = 0;
139
140 loop {
141 select! {
142 _ = &mut rx => {
143 break
144 },
145
146 _ = interval.tick() => {
147 if ticks > 0 {
148 let tm = core_.get_transaction_manager();
149 tm.send_heartbeat(&transport_, &rt_);
150 }
151 ticks += 1;
152 },
153 };
154 }
155 });
156 }
157
158 pub fn schedule_next_subscribe_directly(
159 &self,
160 transport: &Arc<SipTransport>,
161 core: &Arc<SipCore>,
162 rt: &Arc<Runtime>,
163 ) {
164 let sm = core.get_subscription_manager();
165 let tm = core.get_transaction_manager();
166
167 let guard = self.managed_flow.lock().unwrap();
168
169 match &guard.1 {
170 Some((_, subscriber)) => {
171 subscriber.extend_all_subscriptions(3600, &sm, &tm, transport, rt);
172 }
173 None => {}
174 }
175 }
176
177 pub fn stop_observation(&self, transport: &Arc<SipTransport>) -> Option<Arc<Registration>> {
178 let mut guard = self.managed_flow.lock().unwrap();
179 if let Some((transport_, transport_address, registration, tx)) = guard.0.take() {
180 if Arc::ptr_eq(transport, &transport_) {
181 let _ = tx.send(()); guard.1.take();
184 guard.2.clear();
185
186 return Some(registration);
187 } else {
188 guard
189 .0
190 .replace((transport_, transport_address, registration, tx));
191 }
192 }
193
194 None
195 }
196}
197
198pub enum SubscriptionEvent {
199 ExpirationRefreshed(Instant),
200}
201
202pub fn on_registration_authenticated<CB>(
203 flow_manager: &Arc<FlowManager>,
204 impu: String,
205 transport: &Arc<SipTransport>,
206 _transport_address: String,
207 registration: &Arc<Registration>,
208 core: &Arc<SipCore>,
209 rt: &Arc<Runtime>,
210 state_callback: CB,
211) where
212 CB: Fn(SubscriptionEvent) + Send + Sync + 'static,
213{
214 platform_log(LOG_TAG, "on_registration_authenticated()");
215
216 let mut guard = flow_manager.managed_flow.lock().unwrap();
217
218 match &guard.0 {
219 Some((_, _, registration_, _)) => {
220 if !Arc::ptr_eq(registration, registration_) {
221 return;
222 }
223 }
224
225 None => {}
226 }
227
228 let flow_manager_ = Arc::clone(&flow_manager);
229
230 let impu_ = impu.clone();
231
232 let transport_ = Arc::clone(transport);
233 let registration_ = Arc::clone(registration);
234
235 let subscriber = Subscriber::new(
236 "Registrar",
237 move |event| match event {
239 SubscriberEvent::ReceivedNotify(_, content_type, body) => {
240 platform_log(LOG_TAG, "on NOTIFY content");
241 if content_type.equals_bytes(b"application/reginfo+xml", true) {
242 if let Body::Raw(r) = body.as_ref() {
243 if let Some(reg_info) = parse_xml(r) {
244 platform_log(LOG_TAG, "reginfo+xml parsed successfully");
245 let mut guard = flow_manager_.managed_flow.lock().unwrap();
246
247 for registration_node in ®_info.registration_nodes {
248 for contact_node in ®istration_node.contact_nodes {
249 if &contact_node.state == "terminated"
250 && (&contact_node.event == "unregistered"
251 || &contact_node.event == "rejected"
252 || &contact_node.event == "deactivated")
253 {
254 remove_aor_within_flow(
255 contact_node,
256 registration_node,
257 &mut guard,
258 );
259 } else if &contact_node.state == "active" {
260 if &contact_node.event == "created"
261 || &contact_node.event == "registered"
262 {
263 create_aor_within_flow(
264 contact_node,
265 registration_node,
266 &mut guard,
267 );
268 } else if &contact_node.event == "shortened"
269 || &contact_node.event == "refreshed"
270 {
271 match update_aor_within_flow(contact_node, &mut guard) {
272 Ok(()) => {}
273
274 Err(()) => {
275 guard.1.take();
276 guard.2.clear();
277
278 if let Some((transport, _, registration, _)) =
279 &guard.0
280 {
281 (flow_manager_.state_callback)(
282 false,
283 Arc::clone(transport),
284 None,
285 Arc::clone(registration),
286 );
287 }
288
289 return;
290 }
291 }
292 }
293 }
294 }
295 }
296
297 match &guard.0 {
298 Some((_, t_addr, _, _)) => {
299 match &guard.1 {
300 Some((impu, _)) => {
301 let uri = if let Some(idx) = impu.find('@') {
302 format!("{}@{}", &impu[0..idx], t_addr)
304 } else {
305 format!("{}@{}", impu, t_addr)
306 };
307
308 platform_log(
309 LOG_TAG,
310 format!("registration is bound at {}", &uri),
311 );
312
313 for flow in &*guard.2 {
314 platform_log(
315 LOG_TAG,
316 format!("active flow {:?}", flow),
317 );
318
319 let mut is_current_registry = false;
320
321 if uri.eq_ignore_ascii_case(&flow.uri) {
322 is_current_registry = true;
323 }
324
325 if !is_current_registry {
326 if let Some(flow_uri) = flow.uri.as_bytes().as_name_addresses().first()
327 {
328 if let Some(uri_part) = &flow_uri.uri_part {
329
330 if uri
331 .as_bytes()
332 .equals_bytes(uri_part.uri, true)
333 {
334 is_current_registry = true;
335 }
336 }
337 }
338 }
339
340 if is_current_registry {
341 let public_user_identity = impu.clone();
342 (flow_manager_.state_callback)(
343 true,
344 Arc::clone(&transport_),
345 Some(public_user_identity),
346 Arc::clone(®istration_),
347 );
348
349 let expiration = Instant::now().add(
350 Duration::from_secs(
351 flow.expires as u64,
352 ),
353 );
354
355 state_callback(SubscriptionEvent::ExpirationRefreshed(expiration));
356
357 return;
358 }
359
360 }
361 }
362 None => {}
363 }
364 }
365 None => {}
366 }
367 }
368 }
369 }
370
371 let mut guard = flow_manager_.managed_flow.lock().unwrap();
372
373 guard.1.take();
374 guard.2.clear();
375
376 if let Some((transport, _, registration, _)) = &guard.0 {
377 (flow_manager_.state_callback)(
378 false,
379 Arc::clone(transport),
380 None,
381 Arc::clone(registration),
382 );
383 }
384 }
385 SubscriberEvent::NearExpiration(_) => {
386 todo!()
387 }
388 SubscriberEvent::Terminated(_, _, _) | SubscriberEvent::SubscribeFailed(_) => {
390 let mut guard = flow_manager_.managed_flow.lock().unwrap();
391
392 guard.1.take();
393 guard.2.clear();
394
395 if let Some((transport, _, registration, _)) = &guard.0 {
396 (flow_manager_.state_callback)(
397 false,
398 Arc::clone(transport),
399 None,
400 Arc::clone(registration),
401 );
402 }
403 }
404 },
405 move |dialog, expiration| -> Result<SipMessage, ()> {
406 let request_message = match dialog {
407 Some(dialog) => match dialog.make_request(SUBSCRIBE, None) {
408 Ok(message) => Ok(message),
409 Err(e) => {
410 platform_log(LOG_TAG, format!("sip in-dialog message build error {}", e));
411 Err(())
412 }
413 },
414
415 None => {
416 let mut message = SipMessage::new_request(SUBSCRIBE, impu_.as_bytes());
417
418 message.add_header(Header::new(
419 b"Call-ID",
420 String::from(
421 Uuid::new_v4()
422 .as_hyphenated()
423 .encode_lower(&mut Uuid::encode_buffer()),
424 ),
425 ));
426
427 message.add_header(Header::new(b"CSeq", b"1 SUBSCRIBE"));
428
429 let tag = rand::create_raw_alpha_numeric_string(8);
430 let tag = String::from_utf8_lossy(&tag);
431
432 message.add_header(Header::new(b"From", format!("<{}>;tag={}", impu_, tag)));
433
434 message.add_header(Header::new(b"To", format!("<{}>", impu_)));
435
436 message.add_header(Header::new(b"Contact", format!("<{}>", impu_))); message.add_header(Header::new(b"Expires", expiration.to_string()));
439
440 message.add_header(Header::new(b"Event", b"reg"));
441
442 message.add_header(Header::new(b"Accept", b"application/reginfo+xml"));
443
444 Ok(message)
445 }
446 };
447
448 request_message
449 },
450 );
451
452 let subscriber = Arc::new(subscriber);
453 let subscriber_ = Arc::clone(&subscriber);
454
455 if let Ok(request_message) = subscriber.build_request(None, 3600) {
456 if let Ok(subscribe_request) = SubscribeRequest::new(&request_message, subscriber) {
457 let subscribe_request = Arc::new(subscribe_request);
458
459 core.get_subscription_manager()
460 .register_request(Arc::clone(&subscribe_request), rt);
461
462 let core_ = Arc::clone(&core);
464 let rt_ = Arc::clone(rt);
465
466 core.get_transaction_manager().send_request(
467 request_message,
468 &transport,
469 InitialSubscribeContext::new(subscribe_request, core_, rt_),
470 rt,
471 )
472 }
473 }
474
475 guard.1 = Some((impu, subscriber_));
476}
477
478fn create_aor(
479 registerred_addresses: &mut Vec<AddressOfRecord>,
480 registration_node: &RegistrationNode,
481 is_implicit_address: bool,
482) {
483 for aor in &*registerred_addresses {
484 if aor.record_id == registration_node.id {
485 return;
486 }
487 }
488
489 registerred_addresses.push(AddressOfRecord {
490 _is_implicit_address: is_implicit_address,
491 record_id: registration_node.id.clone(),
492 _public_user_id: registration_node.aor.clone(),
493 });
494}
495
496fn create_aor_within_flow(
497 contact_node: &ContactNode,
498 registration_node: &RegistrationNode,
499 guard: &mut MutexGuard<(
500 Option<(
501 Arc<SipTransport>,
502 String,
503 Arc<Registration>,
504 oneshot::Sender<()>,
505 )>,
506 Option<(String, Arc<Subscriber>)>,
507 Vec<FlowInfo>,
508 )>,
509) {
510 platform_log(LOG_TAG, "create_aor_within_flow");
511
512 let is_implicit_address = if &contact_node.event == "created" {
513 true
514 } else {
515 false
516 };
517
518 for flow in &mut guard.2 {
519 if flow.flow_id == contact_node.id {
520 create_aor(
521 &mut flow.registerred_addresses,
522 registration_node,
523 is_implicit_address,
524 );
525 return;
526 }
527 }
528
529 let mut registerred_addresses = Vec::new();
530
531 create_aor(
532 &mut registerred_addresses,
533 registration_node,
534 is_implicit_address,
535 );
536
537 guard.2.push(FlowInfo {
538 uri: contact_node.uri.clone(),
539 flow_id: contact_node.id.clone(),
540 expires: match &contact_node.expires {
541 Some(expires) => match u32::from_str_radix(expires, 10) {
542 Ok(expires) => expires,
543 _ => 0,
544 },
545 None => 0,
546 },
547 registerred_addresses,
548 });
549}
550
551fn update_aor_within_flow(
552 contact_node: &ContactNode,
553 guard: &mut MutexGuard<(
554 Option<(
555 Arc<SipTransport>,
556 String,
557 Arc<Registration>,
558 oneshot::Sender<()>,
559 )>,
560 Option<(String, Arc<Subscriber>)>,
561 Vec<FlowInfo>,
562 )>,
563) -> Result<(), ()> {
564 platform_log(LOG_TAG, "update_aor_within_flow");
565
566 for flow in &mut guard.2 {
567 if flow.flow_id == contact_node.id {
568 flow.expires = match &contact_node.expires {
569 Some(expires) => match u32::from_str_radix(expires, 10) {
570 Ok(expires) => expires,
571 _ => 0,
572 },
573 None => 0,
574 };
575 return Ok(());
576 }
577 }
578
579 Err(())
580}
581
582fn remove_aor_within_flow(
583 contact_node: &ContactNode,
584 registration_node: &RegistrationNode,
585 guard: &mut MutexGuard<(
586 Option<(
587 Arc<SipTransport>,
588 String,
589 Arc<Registration>,
590 oneshot::Sender<()>,
591 )>,
592 Option<(String, Arc<Subscriber>)>,
593 Vec<FlowInfo>,
594 )>,
595) {
596 platform_log(LOG_TAG, "remove_aor_within_flow");
597
598 let mut i = 0;
599 for flow in &mut guard.2 {
600 if flow.flow_id == contact_node.id {
601 let mut j = 0;
602 for aor in &mut flow.registerred_addresses {
603 if aor.record_id == registration_node.id {
604 flow.registerred_addresses.swap_remove(j);
605 break;
606 }
607 j += 1;
608 }
609 if flow.registerred_addresses.is_empty() {
610 guard.2.swap_remove(i);
611 }
612 break;
613 }
614 i += 1;
615 }
616}