1use std::{
2 sync::{Arc, Weak, RwLock},
3 collections::{BTreeMap, LinkedList},
4 time::{Duration},
5};
6use async_std::task;
7use futures::future::AbortRegistration;
8use cyfs_base::*;
9use crate::{
10 types::*,
11 interface::{udp::{Interface}},
12 protocol::{*, v0::*},
13 history::keystore,
14 stack::{WeakStack, Stack}
15};
16use super::{
17 udp::{self, *},
18 tcp::{*}
19};
20
21
22#[derive(Clone)]
23pub struct CallConfig {
24 pub timeout: Duration,
25 pub first_try_timeout: Duration,
26 pub udp: udp::Config,
27}
28
29
30struct ManagerImpl {
31 stack: WeakStack,
32 seq_genarator: TempSeqGenerator,
33 sessions: RwLock<BTreeMap<TempSeq, WeakSessions>>,
34}
35
36#[derive(Clone)]
37pub struct CallManager(Arc<ManagerImpl>);
38
39impl std::fmt::Display for CallManager {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 let stack = Stack::from(&self.0.stack);
42 write!(f, "CallManager{{local:{}}}", stack.local_device_id())
43 }
44}
45
46impl CallManager {
47 pub fn create(stack: WeakStack) -> Self {
48 Self(Arc::new(ManagerImpl {
49 stack,
50 seq_genarator: TempSeqGenerator::new(),
51 sessions: RwLock::new(BTreeMap::new()),
52 }))
53 }
54
55 pub async fn call(
56 &self,
57 reverse_endpoints: Option<&[Endpoint]>,
58 remote: &DeviceId,
59 sn_list: &Vec<DeviceId>,
60 payload_generater: impl Fn(&SnCall) -> Vec<u8>
61 ) -> BuckyResult<CallSessions> {
62 let seq = self.0.seq_genarator.generate();
63
64 let stack = Stack::from(&self.0.stack);
65 let active_pn_list = stack.proxy_manager().active_proxies();
66 let local_device = stack.sn_client().ping().default_local();
67
68 let mut sessions = vec![];
69 for sn_id in sn_list {
70 let sn = stack.device_cache().get_inner(sn_id).ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "sn device not cached"))?;
71 let mut call = SnCall {
72 protocol_version: 0,
73 stack_version: 0,
74 seq: seq,
75 to_peer_id: remote.clone(),
76 from_peer_id: stack.local_device_id().clone(),
77 sn_peer_id: sn_id.clone(),
78 reverse_endpoint_array: reverse_endpoints.map(|ep_list| Vec::from(ep_list)),
79 active_pn_list: if active_pn_list.len() > 0 {
80 Some(active_pn_list.clone())
81 } else {
82 None
83 },
84 peer_info: Some(local_device.clone()),
85 payload: SizedOwnedData::from(vec![]),
86 send_time: 0,
87 is_always_call: false
88 };
89 call.payload = SizedOwnedData::from(payload_generater(&call));
90 let session = CallSession::new(self.0.stack.clone(), call, stack.config().sn_client.call.clone()).await;
91 let net_listener = stack.net_manager().listener();
92
93 let mut cached = false;
94 if let Some(active) = stack.sn_client().cache().get_active(sn_id) {
95 info!("{} call with cached active endpoints, sn={}, active={}", self, sn_id, active);
96 if sn.connect_info().endpoints().iter().find(|ep| active.remote().eq(ep)).is_some() {
97 if active.is_udp() {
98 if let Some(local) = net_listener.udp_of(active.local()) {
99 let tunnel = UdpCall::new(session.to_weak(), vec![local.clone()], vec![active.remote().clone()]);
100 session.add_tunnel(tunnel.clone_as_call_tunnel());
101 cached = true;
102 }
103 } else {
104 if net_listener.tcp_of(active.local()).is_some() {
105 let tunnel = TcpCall::new(session.to_weak(), stack.config().sn_client.call.timeout, active.remote().clone());
106 session.add_tunnel(tunnel.clone_as_call_tunnel());
107 cached = true;
108 }
109 }
110 }
111 }
112
113 if !cached {
114 info!("{} remove cached active, sn={}", self, sn_id);
115 stack.sn_client().cache().remove_active(sn_id);
116 {
117 let locals = net_listener.udp().iter().filter(|interface| interface.local().addr().is_ipv4()).cloned().collect();
118 let remotes = sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_udp() && endpoint.addr().is_ipv4()).cloned().collect();
119 let tunnel = UdpCall::new(session.to_weak(), locals, remotes);
120 session.add_tunnel(tunnel.clone_as_call_tunnel());
121 }
122
123 if net_listener.tcp().iter().find(|l| l.local().addr().is_ipv4()).is_some() {
124 for remote in sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_tcp() && endpoint.addr().is_ipv4()) {
125 let tunnel = TcpCall::new(session.to_weak(), stack.config().sn_client.call.timeout, remote.clone());
126 session.add_tunnel(tunnel.clone_as_call_tunnel());
127 }
128 }
129 }
130
131 sessions.push(session);
132 }
133
134 let sessions = CallSessions::new(seq, remote.clone(), sessions);
135 self.0.sessions.write().unwrap().insert(seq, sessions.to_weak());
136
137 Ok(sessions)
138 }
139
140 pub(crate) fn on_time_escape(&self, now: Timestamp) {
141 let mut alive = LinkedList::new();
142
143 {
144 let mut dead = LinkedList::new();
145 let mut sessions = self.0.sessions.write().unwrap();
146 for (seq, weak) in &*sessions {
147 if let Some(session) = weak.to_strong() {
148 alive.push_back(session);
149 } else {
150 dead.push_back(seq.clone())
151 }
152 }
153
154 for seq in dead {
155 sessions.remove(&seq);
156 }
157 }
158
159 for session in alive {
160 session.on_time_escape(now);
161 }
162 }
163
164
165 pub(crate) fn on_udp_call_resp(&self, resp: &SnCallResp, local: &Interface, from: &Endpoint) {
166 let session = self.0.sessions.read().unwrap().get(&resp.seq).cloned().and_then(|weak| weak.to_strong());
167 if let Some(session) = session {
168 session.on_udp_call_resp(resp, local, from);
169 }
170 }
171}
172
173
174#[derive(Clone)]
175pub struct CallSessions(Arc<SessionsImpl>);
176
177#[derive(Clone)]
178struct WeakSessions(Weak<SessionsImpl>);
179
180impl WeakSessions {
181 fn to_strong(&self) -> Option<CallSessions> {
182 self.0.upgrade().map(|ptr| CallSessions(ptr))
183 }
184}
185
186enum SessionsState {
187 Init(StateWaiter),
188 Running {
189 waiter: StateWaiter,
190 pending: LinkedList<CallSession>,
191 finished: LinkedList<CallSession>,
192 },
193 Finished,
194 Canceled(BuckyError)
195}
196
197struct SessionsImpl {
198 seq: TempSeq,
199 remote: DeviceId,
200 sessions: Vec<CallSession>,
201 state: RwLock<SessionsState>
202}
203
204impl CallSessions {
205 fn to_weak(&self) -> WeakSessions {
206 WeakSessions(Arc::downgrade(&self.0))
207 }
208
209 fn new(seq: TempSeq, remote: DeviceId, sessions: Vec<CallSession>) -> Self {
210 Self(Arc::new(SessionsImpl {
211 seq,
212 remote,
213 sessions,
214 state: RwLock::new(SessionsState::Init(StateWaiter::new()))
215 }))
216 }
217
218 fn sync_session(&self, session: CallSession) {
219 let waiter = {
220 let mut state = self.0.state.write().unwrap();
221 match &mut *state {
222 SessionsState::Running {
223 waiter,
224 finished,
225 pending,
226 } => {
227 finished.push_back(session.clone());
228 pending.push_back(session);
229 waiter.pop()
230 },
231 SessionsState::Canceled(_) => None,
232 _ => unreachable!()
233 }
234 };
235
236 if let Some(waiter) = waiter {
237 waiter.abort();
238 }
239 }
240
241 fn on_udp_call_resp(&self, resp: &SnCallResp, local: &Interface, from: &Endpoint) {
242 if let Some(session) = self.0.sessions.iter().find(|session| resp.sn_peer_id.eq(session.sn())) {
243 session.on_udp_call_resp(resp, local, from);
244 }
245 }
246
247 pub async fn next(&self) -> BuckyResult<Option<CallSession>> {
248 enum NextStep {
249 Start(AbortRegistration),
250 Return(BuckyResult<Option<CallSession>>),
251 Wait(AbortRegistration)
252 }
253
254 let next = {
255 let mut state = self.0.state.write().unwrap();
256 match &mut *state {
257 SessionsState::Init(waiter) => {
258 assert_eq!(waiter.len(), 0);
259 let next = NextStep::Start(waiter.new_waiter());
260 *state = SessionsState::Running {
261 waiter: waiter.transfer(),
262 pending: LinkedList::new(),
263 finished: LinkedList::new()
264 };
265 next
266 },
267 SessionsState::Running {
268 waiter,
269 pending,
270 ..
271 } => {
272 assert_eq!(waiter.len(), 0);
273 if pending.len() > 0 {
274 NextStep::Return(Ok(pending.pop_front()))
275 } else {
276 NextStep::Wait(waiter.new_waiter())
277 }
278 },
279 SessionsState::Finished => NextStep::Return(Ok(None)),
280 SessionsState::Canceled(err) => NextStep::Return(Err(err.clone()))
281 }
282 };
283
284 let state = || {
285 let mut state = self.0.state.write().unwrap();
286 match &mut *state {
287 SessionsState::Running {
288 pending,
289 finished,
290 ..
291 } => {
292 let ret = Ok(pending.pop_front());
293 if pending.len() == 0 && finished.len() == self.0.sessions.len() {
294 *state = SessionsState::Finished;
295 }
296 ret
297 },
298 SessionsState::Finished => Ok(None),
299 SessionsState::Canceled(err) => Err(err.clone()),
300 _ => unreachable!()
301 }
302 };
303
304 match next {
305 NextStep::Start(waiter) => {
306 for session in &self.0.sessions {
307 let sessions = self.clone();
308 let session = session.clone();
309 task::spawn(async move {
310 let _ = session.wait().await;
311 sessions.sync_session(session);
312 });
313 }
314 StateWaiter::wait(waiter, state).await
315 },
316 NextStep::Wait(waiter) => {
317 StateWaiter::wait(waiter, state).await
318 },
319 NextStep::Return(result) => result
320 }
321 }
322
323 fn on_time_escape(&self, now: Timestamp) {
324 for session in &self.0.sessions {
325 session.on_time_escape(now);
326 }
327 }
328}
329
330#[async_trait::async_trait]
331pub(super) trait CallTunnel: Send + Sync + std::fmt::Display {
332 fn clone_as_call_tunnel(&self) -> Box<dyn CallTunnel>;
333 async fn wait(&self) -> (BuckyResult<Device>, Option<EndpointPair>);
334 fn cancel(&self);
335 fn on_time_escape(&self, _now: Timestamp) {
336
337 }
338 fn reset(&self, _timeout: Duration) -> Option<Box<dyn CallTunnel>> {
339 None
340 }
341 fn on_udp_call_resp(&self, _resp: &SnCallResp, _local: &Interface, _from: &Endpoint) {
342
343 }
344}
345
346enum SessionState {
347 Init,
348 FirstTry,
349 SecondTry,
350 Responsed {
351 active: EndpointPair,
352 result: BuckyResult<Device>
353 },
354 Canceled(BuckyError)
355}
356
357struct SessionStateImpl {
358 packages: Arc<PackageBox>,
359 tunnels: Vec<Box<dyn CallTunnel>>,
360 waiter: StateWaiter,
361 start_at: Timestamp,
362 state: SessionState
363}
364
365struct SessionImpl {
366 stack: WeakStack,
367 sn: DeviceId,
368 config: CallConfig,
369 state: RwLock<SessionStateImpl>
370}
371
372
373#[derive(Clone)]
374pub struct CallSession(Arc<SessionImpl>);
375
376#[derive(Clone)]
377pub(super) struct WeakSession(Weak<SessionImpl>);
378
379impl WeakSession {
380 pub fn to_strong(&self) -> Option<CallSession> {
381 self.0.upgrade().map(|ptr| CallSession(ptr))
382 }
383}
384
385
386impl std::fmt::Display for CallSession {
387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388 let stack = Stack::from(&self.0.stack);
389 write!(f, "CallSession{{local:{}, sn:{}}}", stack.local_device_id(), self.sn())
390 }
391}
392
393impl std::fmt::Debug for CallSession {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 let stack = Stack::from(&self.0.stack);
396 write!(f, "CallSession{{local:{}, sn:{}}}", stack.local_device_id(), self.sn())
397 }
398}
399
400
401impl CallSession {
402 pub(super) fn to_weak(&self) -> WeakSession {
403 WeakSession(Arc::downgrade(&self.0))
404 }
405
406 async fn new(stack: WeakStack, call: SnCall, config: CallConfig) -> Self {
407 let strong_stack = Stack::from(&stack);
408 let sn = call.sn_peer_id.clone();
409 let key_stub = strong_stack.keystore().create_key(strong_stack.device_cache().get_inner(&sn).unwrap().desc(), true);
410 let mut packages = PackageBox::encrypt_box(sn.clone(), key_stub.key.clone());
411 if let keystore::EncryptedKey::Unconfirmed(encrypted) = &key_stub.encrypted {
412 let mut exchange = Exchange::from((&call, encrypted.clone(), key_stub.key.mix_key.clone()));
413 let _ = exchange.sign(strong_stack.keystore().signer()).await.unwrap();
414 packages.push(exchange);
415 }
416 packages.push(call);
417
418 let session = Self(Arc::new(SessionImpl {
419 stack,
420 sn,
421 config,
422 state: RwLock::new(SessionStateImpl {
423 packages: Arc::new(packages),
424 tunnels: vec![],
425 waiter: StateWaiter::new(),
426 start_at: 0,
427 state: SessionState::Init,
428 })
429 }));
430
431
432 info!("{} created, key={}", session, key_stub.key);
433 session
434 }
435
436 pub fn sn(&self) -> &DeviceId {
437 &self.0.sn
438 }
439
440 pub(super) fn stack(&self) -> Stack {
441 Stack::from(&self.0.stack)
442 }
443
444 fn on_udp_call_resp(&self, resp: &SnCallResp, local: &Interface, from: &Endpoint) {
445 let tunnels = {
446 let state = self.0.state.read().unwrap();
447 match &state.state {
448 SessionState::FirstTry | SessionState::SecondTry => state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect(),
449 _ => vec![]
450 }
451 };
452
453 for tunnel in tunnels {
454 tunnel.on_udp_call_resp(resp, local, from);
455 }
456 }
457
458
459 fn sync_tunnel(&self, _tunnel: &dyn CallTunnel, result: BuckyResult<Device>, active: Option<EndpointPair>) {
460 struct NextStep {
461 waiter: Option<StateWaiter>,
462 to_cancel: Vec<Box<dyn CallTunnel>>,
463 update_cache: Option<EndpointPair>
464 }
465
466 impl NextStep {
467 fn none() -> Self {
468 Self {
469 waiter: None,
470 to_cancel: vec![],
471 update_cache: None
472 }
473 }
474 }
475
476
477 let next = {
478 let mut next = NextStep::none();
479 let mut state = self.0.state.write().unwrap();
480
481 match &state.state {
482 SessionState::FirstTry | SessionState::SecondTry => {
483 if let Some(active) = active {
484 next.update_cache = Some(active.clone());
485 state.state = SessionState::Responsed { active, result };
486 next.waiter = Some(state.waiter.transfer());
487 }
488 },
489 _ => {}
490 };
491
492 if next.waiter.is_some() {
493 std::mem::swap(&mut next.to_cancel, &mut state.tunnels);
494 }
495
496 next
497 };
498
499 if let Some(endpoint) = next.update_cache {
500 let stack = Stack::from(&self.0.stack);
501 stack.sn_client().cache().add_active(self.sn(), endpoint);
502 }
503
504 if let Some(waiter) = next.waiter {
505 waiter.wake();
506 }
507
508 for tunnel in next.to_cancel {
509 tunnel.cancel();
510 }
511 }
512
513 pub fn config(&self) -> &CallConfig {
514 &self.0.config
515 }
516
517 async fn wait(&self) -> Option<EndpointPair> {
518 enum NextStep {
519 Start(AbortRegistration, Vec<Box<dyn CallTunnel>>),
520 Wait(AbortRegistration),
521 Return(Option<EndpointPair>)
522 }
523
524 let next = {
525 let mut state = self.0.state.write().unwrap();
526
527 match &state.state {
528 SessionState::Init => {
529 if state.packages.has_exchange() {
530 state.state = SessionState::SecondTry;
531 } else {
532 state.state = SessionState::FirstTry;
533 }
534 state.start_at = bucky_time_now();
535 NextStep::Start(state.waiter.new_waiter(), state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect())
536 },
537 SessionState::Responsed { active, .. } => NextStep::Return(Some(active.clone())),
538 SessionState::Canceled(_) => NextStep::Return(None),
539 _ => NextStep::Wait(state.waiter.new_waiter())
540 }
541 };
542
543 let state = || {
544 let state = self.0.state.read().unwrap();
545 match &state.state {
546 SessionState::Responsed { active, .. } => Some(active.clone()),
547 SessionState::Canceled(_) => None,
548 _ => unreachable!()
549 }
550 };
551
552 match next {
553 NextStep::Start(waiter, tunnels) => {
554 for tunnel in tunnels {
555 let session = self.clone();
556 task::spawn(async move {
557 let (result, active) = tunnel.wait().await;
558 session.sync_tunnel(tunnel.as_ref(), result, active);
559 });
560 }
561 StateWaiter::wait(waiter, state).await
562 },
563 NextStep::Wait(waiter) => StateWaiter::wait(waiter, state).await,
564 NextStep::Return(active) => active
565 }
566 }
567
568 fn add_tunnel(&self, tunnel: Box<dyn CallTunnel>) {
569 info!("{} add tunnel, tunnel={}", self, tunnel);
570 let mut state = self.0.state.write().unwrap();
571 match &state.state {
572 SessionState::Init => {
573 state.tunnels.push(tunnel);
574 },
575 _ => unreachable!()
576 }
577 }
578
579 pub fn packages(&self) -> Arc<PackageBox> {
580 self.0.state.read().unwrap().packages.clone()
581 }
582
583
584 async fn reset(&self, call: SnCall) {
585 let stack = Stack::from(&self.0.stack);
586 stack.keystore().reset_peer(self.sn());
587
588 let key_stub = stack.keystore().create_key(stack.device_cache().get_inner(self.sn()).unwrap().desc(), true);
589 let mut packages = PackageBox::encrypt_box(self.sn().clone(), key_stub.key.clone());
590 if let keystore::EncryptedKey::Unconfirmed(encrypted) = &key_stub.encrypted {
591 let mut exchange = Exchange::from((&call, encrypted.clone(), key_stub.key.mix_key.clone()));
592 let _ = exchange.sign(stack.keystore().signer()).await.unwrap();
593 packages.push(exchange);
594 }
595 packages.push(call);
596
597 info!("{} reset with key {}", self, key_stub.key);
598
599 let tunnels = {
600 let mut state = self.0.state.write().unwrap();
601 state.packages = Arc::new(packages);
602 let escaped = Duration::from_micros(bucky_time_now() - state.start_at);
603 if escaped < self.config().timeout {
604 let remain = self.config().timeout - escaped;
605 let mut resets = vec![];
606 for tunnel in &state.tunnels {
607 if let Some(reset) = tunnel.reset(remain) {
608 resets.push(reset);
609 }
610 }
611
612 for tunnel in &resets {
613 state.tunnels.push(tunnel.clone_as_call_tunnel());
614 }
615
616 Some(resets)
617 } else {
618 None
619 }
620 };
621
622 if let Some(tunnels) = tunnels {
623 for tunnel in tunnels {
624 let session = self.clone();
625 task::spawn(async move {
626 let (result, active) = tunnel.wait().await;
627 session.sync_tunnel(tunnel.as_ref(), result, active);
628 });
629 }
630 }
631 }
632
633 fn on_time_escape(&self, now: Timestamp) {
634 struct NextStep {
635 waiter: Option<StateWaiter>,
636 reset: Option<SnCall>,
637 callback: Option<Vec<Box<dyn CallTunnel>>>,
638 }
639
640 impl NextStep {
641 fn none() -> Self {
642 Self {
643 waiter: None,
644 reset: None,
645 callback: None
646 }
647 }
648 }
649
650 let mut next = NextStep::none();
651 {
652 let mut state = self.0.state.write().unwrap();
653
654 match &state.state {
655 SessionState::FirstTry => {
656 if now > state.start_at && Duration::from_micros(now - state.start_at) > self.config().first_try_timeout {
657 let call: &SnCall = state.packages.packages_no_exchange()[0].as_ref();
658 next.reset = Some(call.clone());
659 state.state = SessionState::SecondTry;
660 } else {
661 next.callback = Some(state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect());
662 }
663 },
664 SessionState::SecondTry => {
665 if now > state.start_at && Duration::from_micros(now - state.start_at) > self.config().timeout {
666 state.state = SessionState::Canceled(BuckyError::new(BuckyErrorCode::Timeout, "session timeout"));
667 next.waiter = Some(state.waiter.transfer());
668 } else {
669 next.callback = Some(state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect());
670 }
671 },
672 _ => {}
673 }
674 }
675
676 if let Some(waiter) = next.waiter {
677 waiter.wake();
678 }
679
680 if let Some(tunnels) = next.callback {
681 for tunnel in tunnels {
682 tunnel.on_time_escape(now);
683 }
684 }
685
686 if let Some(call) = next.reset {
687 let session = self.clone();
688 task::spawn(async move {
689 session.reset(call).await;
690 });
691 }
692 }
693
694 pub fn result(&self) -> Option<BuckyResult<Device>> {
695 let state = self.0.state.read().unwrap();
696 match &state.state {
697 SessionState::Responsed { result, .. } => Some(result.clone()),
698 SessionState::Canceled(err) => Some(Err(err.clone())),
699 _ => None
700 }
701 }
702}
703
704