cyfs_bdt/sn/client/call/
client.rs

1use std::{
2    sync::{Arc, Weak, RwLock}, 
3    collections::{BTreeMap, LinkedList}, 
4    time::{Duration}, 
5};
6use async_std::task;
7use futures::future::AbortRegistration;
8use cyfs_base::*;
9use crate::{
10    types::*,
11    interface::{udp::{Interface}}, 
12    protocol::{*, v0::*}, 
13    history::keystore, 
14    stack::{WeakStack, Stack}
15};
16use super::{
17    udp::{self, *}, 
18    tcp::{*}
19};
20
21
22#[derive(Clone)]
23pub struct CallConfig {
24    pub timeout: Duration, 
25    pub first_try_timeout: Duration, 
26    pub udp: udp::Config, 
27}
28
29
30struct ManagerImpl {
31    stack: WeakStack,
32    seq_genarator: TempSeqGenerator,
33    sessions: RwLock<BTreeMap<TempSeq, WeakSessions>>,
34}
35
36#[derive(Clone)]
37pub struct CallManager(Arc<ManagerImpl>);
38
39impl std::fmt::Display for CallManager {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        let stack = Stack::from(&self.0.stack);
42        write!(f, "CallManager{{local:{}}}", stack.local_device_id())
43    }
44}
45
46impl CallManager {
47    pub fn create(stack: WeakStack) -> Self {
48        Self(Arc::new(ManagerImpl {
49            stack,
50            seq_genarator: TempSeqGenerator::new(),
51            sessions: RwLock::new(BTreeMap::new()),
52        }))
53    }
54
55    pub async fn call(
56        &self, 
57        reverse_endpoints: Option<&[Endpoint]>, 
58        remote: &DeviceId, 
59        sn_list: &Vec<DeviceId>, 
60        payload_generater: impl Fn(&SnCall) -> Vec<u8>
61    ) -> BuckyResult<CallSessions> {
62        let seq = self.0.seq_genarator.generate();
63    
64        let stack = Stack::from(&self.0.stack);
65        let active_pn_list = stack.proxy_manager().active_proxies();
66        let local_device = stack.sn_client().ping().default_local();
67
68        let mut sessions = vec![];
69        for sn_id in sn_list {
70            let sn = stack.device_cache().get_inner(sn_id).ok_or_else(|| BuckyError::new(BuckyErrorCode::NotFound, "sn device not cached"))?;
71            let mut call = SnCall {
72                protocol_version: 0, 
73                stack_version: 0, 
74                seq: seq,
75                to_peer_id: remote.clone(),
76                from_peer_id: stack.local_device_id().clone(),
77                sn_peer_id: sn_id.clone(),
78                reverse_endpoint_array: reverse_endpoints.map(|ep_list| Vec::from(ep_list)),
79                active_pn_list: if active_pn_list.len() > 0 {
80                    Some(active_pn_list.clone())
81                } else {
82                    None
83                }, 
84                peer_info: Some(local_device.clone()),
85                payload: SizedOwnedData::from(vec![]),
86                send_time: 0,
87                is_always_call: false
88            };
89            call.payload = SizedOwnedData::from(payload_generater(&call));
90            let session = CallSession::new(self.0.stack.clone(), call, stack.config().sn_client.call.clone()).await;
91            let net_listener = stack.net_manager().listener();
92
93            let mut cached = false;
94            if let Some(active) = stack.sn_client().cache().get_active(sn_id) {
95                info!("{} call with cached active endpoints, sn={}, active={}", self, sn_id, active);
96                if sn.connect_info().endpoints().iter().find(|ep| active.remote().eq(ep)).is_some() {
97                    if active.is_udp() {
98                        if let Some(local) = net_listener.udp_of(active.local()) {
99                            let tunnel = UdpCall::new(session.to_weak(), vec![local.clone()], vec![active.remote().clone()]);
100                            session.add_tunnel(tunnel.clone_as_call_tunnel());
101                            cached = true;
102                        }
103                    } else {
104                        if net_listener.tcp_of(active.local()).is_some() {
105                            let tunnel = TcpCall::new(session.to_weak(), stack.config().sn_client.call.timeout, active.remote().clone());
106                            session.add_tunnel(tunnel.clone_as_call_tunnel());
107                            cached = true;
108                        }
109                    }
110                }
111            }
112            
113            if !cached {
114                info!("{} remove cached active, sn={}", self, sn_id);
115                stack.sn_client().cache().remove_active(sn_id);
116                {
117                    let locals = net_listener.udp().iter().filter(|interface| interface.local().addr().is_ipv4()).cloned().collect();
118                    let remotes = sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_udp() && endpoint.addr().is_ipv4()).cloned().collect();
119                    let tunnel = UdpCall::new(session.to_weak(), locals, remotes);
120                    session.add_tunnel(tunnel.clone_as_call_tunnel());
121                }
122            
123                if net_listener.tcp().iter().find(|l| l.local().addr().is_ipv4()).is_some() {
124                    for remote in  sn.connect_info().endpoints().iter().filter(|endpoint| endpoint.is_tcp() && endpoint.addr().is_ipv4()) {
125                        let tunnel = TcpCall::new(session.to_weak(), stack.config().sn_client.call.timeout, remote.clone());
126                        session.add_tunnel(tunnel.clone_as_call_tunnel());
127                    }
128                }
129            }
130
131            sessions.push(session);
132        }   
133        
134        let sessions = CallSessions::new(seq, remote.clone(), sessions);
135        self.0.sessions.write().unwrap().insert(seq, sessions.to_weak());
136
137        Ok(sessions)
138    }
139
140    pub(crate) fn on_time_escape(&self, now: Timestamp) {
141        let mut alive = LinkedList::new();
142
143        {
144            let mut dead = LinkedList::new();
145            let mut sessions = self.0.sessions.write().unwrap();
146            for (seq, weak) in &*sessions {
147                if let Some(session) = weak.to_strong() {
148                    alive.push_back(session);
149                } else {
150                    dead.push_back(seq.clone())
151                }
152            }
153
154            for seq in dead {
155                sessions.remove(&seq);
156            }
157        }
158        
159        for session in alive {
160            session.on_time_escape(now);
161        }
162    }
163
164
165    pub(crate) fn on_udp_call_resp(&self, resp: &SnCallResp, local: &Interface, from: &Endpoint) {
166        let session = self.0.sessions.read().unwrap().get(&resp.seq).cloned().and_then(|weak| weak.to_strong());
167        if let Some(session) = session {
168            session.on_udp_call_resp(resp, local, from);
169        }
170    }
171}
172
173
174#[derive(Clone)]
175pub struct CallSessions(Arc<SessionsImpl>);
176
177#[derive(Clone)]
178struct WeakSessions(Weak<SessionsImpl>);
179
180impl WeakSessions {
181    fn to_strong(&self) -> Option<CallSessions> {
182        self.0.upgrade().map(|ptr| CallSessions(ptr))
183    }
184}
185
186enum SessionsState {
187    Init(StateWaiter), 
188    Running {
189        waiter: StateWaiter, 
190        pending: LinkedList<CallSession>, 
191        finished: LinkedList<CallSession>, 
192    },
193    Finished, 
194    Canceled(BuckyError)
195}
196
197struct SessionsImpl {
198    seq: TempSeq, 
199    remote: DeviceId, 
200    sessions: Vec<CallSession>, 
201    state: RwLock<SessionsState>
202}
203
204impl CallSessions {
205    fn to_weak(&self) -> WeakSessions {
206        WeakSessions(Arc::downgrade(&self.0))
207    }
208
209    fn new(seq: TempSeq, remote: DeviceId, sessions: Vec<CallSession>) -> Self {
210        Self(Arc::new(SessionsImpl {
211            seq, 
212            remote, 
213            sessions, 
214            state: RwLock::new(SessionsState::Init(StateWaiter::new()))
215        }))
216    }
217
218    fn sync_session(&self, session: CallSession) {
219        let waiter = {
220            let mut state = self.0.state.write().unwrap();
221            match &mut *state {
222                SessionsState::Running {
223                    waiter, 
224                    finished, 
225                    pending,
226                } => {
227                    finished.push_back(session.clone());
228                    pending.push_back(session);
229                    waiter.pop()
230                },  
231                SessionsState::Canceled(_) => None, 
232                _ => unreachable!()
233            }
234        };
235
236        if let Some(waiter) = waiter {
237            waiter.abort();
238        }
239    }
240
241    fn on_udp_call_resp(&self, resp: &SnCallResp, local: &Interface, from: &Endpoint) {
242        if let Some(session) = self.0.sessions.iter().find(|session| resp.sn_peer_id.eq(session.sn())) {
243            session.on_udp_call_resp(resp, local, from);
244        }
245    }
246
247    pub async fn next(&self) -> BuckyResult<Option<CallSession>> {
248        enum NextStep {
249            Start(AbortRegistration), 
250            Return(BuckyResult<Option<CallSession>>), 
251            Wait(AbortRegistration)
252        }
253
254        let next = {
255            let mut state = self.0.state.write().unwrap();
256            match &mut *state {
257                SessionsState::Init(waiter) => {
258                    assert_eq!(waiter.len(), 0);
259                    let next = NextStep::Start(waiter.new_waiter());
260                    *state = SessionsState::Running {
261                        waiter: waiter.transfer(), 
262                        pending: LinkedList::new(), 
263                        finished: LinkedList::new()
264                    };
265                    next
266                }, 
267                SessionsState::Running {
268                    waiter,  
269                    pending, 
270                    ..
271                } => {
272                    assert_eq!(waiter.len(), 0);
273                    if pending.len() > 0 {
274                        NextStep::Return(Ok(pending.pop_front()))
275                    } else {
276                        NextStep::Wait(waiter.new_waiter())
277                    }   
278                },  
279                SessionsState::Finished => NextStep::Return(Ok(None)), 
280                SessionsState::Canceled(err) => NextStep::Return(Err(err.clone()))
281            }
282        };
283
284        let state = || {
285            let mut state = self.0.state.write().unwrap();
286            match &mut *state {
287                SessionsState::Running { 
288                    pending, 
289                    finished, 
290                    ..
291                } => {
292                    let ret = Ok(pending.pop_front());
293                    if pending.len() == 0 && finished.len() == self.0.sessions.len() {
294                        *state = SessionsState::Finished;
295                    }
296                    ret
297                },  
298                SessionsState::Finished => Ok(None), 
299                SessionsState::Canceled(err) => Err(err.clone()),
300                _ => unreachable!()
301            }
302        };
303        
304        match next {
305            NextStep::Start(waiter) => {
306                for session in &self.0.sessions {
307                    let sessions = self.clone();
308                    let session = session.clone();
309                    task::spawn(async move {
310                        let _ = session.wait().await;
311                        sessions.sync_session(session);
312                    });
313                }
314                StateWaiter::wait(waiter, state).await
315            }, 
316            NextStep::Wait(waiter) => {
317                StateWaiter::wait(waiter, state).await
318            },
319            NextStep::Return(result) => result
320        }
321    }
322
323    fn on_time_escape(&self, now: Timestamp) {
324        for session in &self.0.sessions {
325            session.on_time_escape(now);
326        }
327    }
328}
329
330#[async_trait::async_trait]
331pub(super) trait CallTunnel: Send + Sync + std::fmt::Display {
332    fn clone_as_call_tunnel(&self) -> Box<dyn CallTunnel>;
333    async fn wait(&self) -> (BuckyResult<Device>, Option<EndpointPair>);
334    fn cancel(&self);
335    fn on_time_escape(&self, _now: Timestamp) {
336
337    }
338    fn reset(&self, _timeout: Duration) -> Option<Box<dyn CallTunnel>> {
339        None
340    }
341    fn on_udp_call_resp(&self, _resp: &SnCallResp, _local: &Interface, _from: &Endpoint) {
342
343    }
344}
345
346enum SessionState {
347    Init, 
348    FirstTry, 
349    SecondTry, 
350    Responsed {
351        active: EndpointPair, 
352        result: BuckyResult<Device>
353    }, 
354    Canceled(BuckyError)
355}
356
357struct SessionStateImpl {
358    packages: Arc<PackageBox>, 
359    tunnels: Vec<Box<dyn CallTunnel>>, 
360    waiter: StateWaiter, 
361    start_at: Timestamp, 
362    state: SessionState
363}
364
365struct SessionImpl {
366    stack: WeakStack, 
367    sn: DeviceId, 
368    config: CallConfig, 
369    state: RwLock<SessionStateImpl>
370}
371
372
373#[derive(Clone)]
374pub struct CallSession(Arc<SessionImpl>);
375
376#[derive(Clone)]
377pub(super) struct WeakSession(Weak<SessionImpl>);
378
379impl WeakSession {
380    pub fn to_strong(&self) -> Option<CallSession> {
381        self.0.upgrade().map(|ptr| CallSession(ptr))
382    }
383}
384
385
386impl std::fmt::Display for CallSession {
387    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388        let stack = Stack::from(&self.0.stack);
389        write!(f, "CallSession{{local:{}, sn:{}}}", stack.local_device_id(), self.sn())
390    }
391}
392
393impl std::fmt::Debug for CallSession {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        let stack = Stack::from(&self.0.stack);
396        write!(f, "CallSession{{local:{}, sn:{}}}", stack.local_device_id(), self.sn())
397    }
398}
399
400
401impl CallSession {
402    pub(super) fn to_weak(&self) -> WeakSession {
403        WeakSession(Arc::downgrade(&self.0))
404    }
405
406    async fn new(stack: WeakStack, call: SnCall, config: CallConfig) -> Self {
407        let strong_stack = Stack::from(&stack);
408        let sn = call.sn_peer_id.clone();
409        let key_stub = strong_stack.keystore().create_key(strong_stack.device_cache().get_inner(&sn).unwrap().desc(), true);
410        let mut packages = PackageBox::encrypt_box(sn.clone(), key_stub.key.clone());
411        if let keystore::EncryptedKey::Unconfirmed(encrypted) = &key_stub.encrypted {
412            let mut exchange = Exchange::from((&call, encrypted.clone(), key_stub.key.mix_key.clone()));
413            let _ = exchange.sign(strong_stack.keystore().signer()).await.unwrap();
414            packages.push(exchange);
415        }
416        packages.push(call);
417
418        let session = Self(Arc::new(SessionImpl {
419            stack, 
420            sn, 
421            config, 
422            state: RwLock::new(SessionStateImpl {
423                packages: Arc::new(packages), 
424                tunnels: vec![], 
425                waiter: StateWaiter::new(),
426                start_at: 0,  
427                state: SessionState::Init, 
428            })
429        }));
430
431
432        info!("{} created, key={}", session, key_stub.key);
433        session
434    }
435
436    pub fn sn(&self) -> &DeviceId {
437        &self.0.sn
438    }
439
440    pub(super) fn stack(&self) -> Stack {
441        Stack::from(&self.0.stack)
442    }
443
444    fn on_udp_call_resp(&self, resp: &SnCallResp, local: &Interface, from: &Endpoint) {
445        let tunnels = {
446            let state = self.0.state.read().unwrap();
447            match &state.state {
448                SessionState::FirstTry | SessionState::SecondTry => state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect(), 
449                _ => vec![]
450            }
451        };
452
453        for tunnel in tunnels {
454            tunnel.on_udp_call_resp(resp, local, from);
455        }
456    }
457
458
459    fn sync_tunnel(&self, _tunnel: &dyn CallTunnel, result: BuckyResult<Device>, active: Option<EndpointPair>) {
460        struct NextStep {
461            waiter: Option<StateWaiter>, 
462            to_cancel: Vec<Box<dyn CallTunnel>>, 
463            update_cache: Option<EndpointPair>
464        }
465
466        impl NextStep {
467            fn none() -> Self {
468                Self {
469                    waiter: None,
470                    to_cancel: vec![], 
471                    update_cache: None
472                }
473            }
474        }
475
476
477        let next = {
478            let mut next = NextStep::none();
479            let mut state = self.0.state.write().unwrap();
480
481            match &state.state {
482                SessionState::FirstTry | SessionState::SecondTry => {
483                    if let Some(active) = active {
484                        next.update_cache = Some(active.clone());
485                        state.state = SessionState::Responsed { active, result };
486                        next.waiter = Some(state.waiter.transfer());
487                    }
488                }, 
489                _ => {}
490            };
491
492            if next.waiter.is_some() {
493                std::mem::swap(&mut next.to_cancel, &mut state.tunnels);
494            }
495
496            next
497        };
498
499        if let Some(endpoint) = next.update_cache {
500            let stack = Stack::from(&self.0.stack);
501            stack.sn_client().cache().add_active(self.sn(), endpoint);
502        }
503       
504        if let Some(waiter) = next.waiter {
505            waiter.wake();
506        }
507
508        for tunnel in next.to_cancel {
509            tunnel.cancel();
510        }
511    }
512
513    pub fn config(&self) -> &CallConfig {
514        &self.0.config
515    }
516
517    async fn wait(&self) -> Option<EndpointPair> {
518        enum NextStep {
519            Start(AbortRegistration, Vec<Box<dyn CallTunnel>>), 
520            Wait(AbortRegistration),
521            Return(Option<EndpointPair>)
522        }
523
524        let next = {
525            let mut state = self.0.state.write().unwrap();
526
527            match &state.state {
528                SessionState::Init => {
529                    if state.packages.has_exchange() {
530                        state.state = SessionState::SecondTry;
531                    } else {
532                        state.state = SessionState::FirstTry;
533                    }
534                    state.start_at = bucky_time_now();
535                    NextStep::Start(state.waiter.new_waiter(), state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect())
536                }, 
537                SessionState::Responsed { active, .. } => NextStep::Return(Some(active.clone())), 
538                SessionState::Canceled(_) => NextStep::Return(None), 
539                _ => NextStep::Wait(state.waiter.new_waiter())
540            }
541        };
542
543        let state = || {
544            let state = self.0.state.read().unwrap();
545            match &state.state {
546                SessionState::Responsed { active, .. } => Some(active.clone()), 
547                SessionState::Canceled(_) => None, 
548                _ => unreachable!()
549            }
550        };
551        
552        match next {
553            NextStep::Start(waiter, tunnels) => {
554                for tunnel in tunnels {
555                    let session = self.clone();
556                    task::spawn(async move {
557                        let (result, active) = tunnel.wait().await;
558                        session.sync_tunnel(tunnel.as_ref(), result, active);
559                    });
560                }
561                StateWaiter::wait(waiter, state).await
562            }, 
563            NextStep::Wait(waiter) => StateWaiter::wait(waiter, state).await,
564            NextStep::Return(active) => active
565        }
566    }
567
568    fn add_tunnel(&self, tunnel: Box<dyn CallTunnel>) {
569        info!("{} add tunnel, tunnel={}", self, tunnel);
570        let mut state = self.0.state.write().unwrap();
571        match &state.state {
572            SessionState::Init => {
573                state.tunnels.push(tunnel);
574            },
575            _ => unreachable!()
576        }
577    }
578
579    pub fn packages(&self) -> Arc<PackageBox> {
580        self.0.state.read().unwrap().packages.clone()
581    }
582
583
584    async fn reset(&self, call: SnCall) {
585        let stack = Stack::from(&self.0.stack);
586        stack.keystore().reset_peer(self.sn());
587
588        let key_stub = stack.keystore().create_key(stack.device_cache().get_inner(self.sn()).unwrap().desc(), true);
589        let mut packages = PackageBox::encrypt_box(self.sn().clone(), key_stub.key.clone());
590        if let keystore::EncryptedKey::Unconfirmed(encrypted) = &key_stub.encrypted {
591            let mut exchange = Exchange::from((&call, encrypted.clone(), key_stub.key.mix_key.clone()));
592            let _ = exchange.sign(stack.keystore().signer()).await.unwrap();
593            packages.push(exchange);
594        }
595        packages.push(call);
596
597        info!("{} reset with key {}", self, key_stub.key);
598
599        let tunnels = {
600            let mut state = self.0.state.write().unwrap();
601            state.packages = Arc::new(packages);
602            let escaped = Duration::from_micros(bucky_time_now() - state.start_at);
603            if escaped < self.config().timeout {
604                let remain = self.config().timeout - escaped;
605                let mut resets = vec![];
606                for tunnel in &state.tunnels {
607                    if let Some(reset) = tunnel.reset(remain) {
608                        resets.push(reset);
609                    }
610                }
611                
612                for tunnel in &resets {
613                    state.tunnels.push(tunnel.clone_as_call_tunnel());
614                }
615               
616                Some(resets)
617            } else {
618                None
619            }
620        };
621       
622        if let Some(tunnels) = tunnels {
623            for tunnel in tunnels {
624                let session = self.clone();
625                task::spawn(async move {
626                    let (result, active) = tunnel.wait().await;
627                    session.sync_tunnel(tunnel.as_ref(), result, active);
628                });
629            }
630        }
631    }
632
633    fn on_time_escape(&self, now: Timestamp) {
634        struct NextStep {
635            waiter: Option<StateWaiter>, 
636            reset: Option<SnCall>, 
637            callback: Option<Vec<Box<dyn CallTunnel>>>, 
638        }
639
640        impl NextStep {
641            fn none() -> Self {
642                Self {
643                    waiter: None, 
644                    reset: None, 
645                    callback: None
646                }
647            }
648        }
649
650        let mut next = NextStep::none();
651        {
652            let mut state = self.0.state.write().unwrap();
653           
654            match &state.state {
655                SessionState::FirstTry => {
656                    if now > state.start_at && Duration::from_micros(now - state.start_at) > self.config().first_try_timeout {
657                        let call: &SnCall = state.packages.packages_no_exchange()[0].as_ref();
658                        next.reset = Some(call.clone());
659                        state.state = SessionState::SecondTry;
660                    } else {
661                        next.callback = Some(state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect());
662                    }
663                }, 
664                SessionState::SecondTry => {
665                    if now > state.start_at && Duration::from_micros(now - state.start_at) > self.config().timeout {
666                        state.state = SessionState::Canceled(BuckyError::new(BuckyErrorCode::Timeout, "session timeout"));
667                        next.waiter = Some(state.waiter.transfer());
668                    } else {
669                        next.callback = Some(state.tunnels.iter().map(|t| t.clone_as_call_tunnel()).collect());
670                    }
671                }, 
672                _ => {}
673            }
674        }
675       
676        if let Some(waiter) = next.waiter {
677            waiter.wake();
678        }
679
680        if let Some(tunnels) = next.callback {
681            for tunnel in tunnels {
682                tunnel.on_time_escape(now);
683            }
684        }
685
686        if let Some(call) = next.reset {
687            let session = self.clone();
688            task::spawn(async move {
689                session.reset(call).await;
690            });
691        }
692    }
693
694    pub fn result(&self) -> Option<BuckyResult<Device>> {
695        let state = self.0.state.read().unwrap();
696        match &state.state {
697            SessionState::Responsed { result, .. } => Some(result.clone()), 
698            SessionState::Canceled(err) => Some(Err(err.clone())), 
699            _ => None
700        }
701    }
702}
703
704