cyfs_bdt/sn/client/
manager.rs

1use std::{
2    time::Duration, 
3    sync::{Arc, RwLock}, 
4};
5
6use async_std::{
7    task, 
8    future
9};
10
11use cyfs_base::*;
12use crate::{
13    types::*, 
14    interface::{NetListener, udp::{self, OnUdpPackageBox}}, 
15    protocol::{*, v0::*}, 
16    stack::{Stack, WeakStack}
17};
18use super::{
19    cache::*, 
20    ping::{PingConfig, PingClients, SnStatus}, 
21    call::{CallConfig, CallManager}
22};
23
24pub trait PingClientCalledEvent<Context=()>: Send + Sync {
25    fn on_called(&self, called: &SnCalled, context: Context) -> Result<(), BuckyError>;
26}
27
28
29#[derive(Clone)]
30pub struct Config {
31    pub atomic_interval: Duration, 
32    pub ping: PingConfig, 
33    pub call: CallConfig,
34}
35
36struct ManagerImpl {
37    stack: WeakStack, 
38    gen_seq: Arc<TempSeqGenerator>, 
39    cache: SnCache, 
40    ping: RwLock<PingClients>, 
41    call: CallManager,
42}
43
44#[derive(Clone)]
45pub struct ClientManager(Arc<ManagerImpl>);
46
47impl ClientManager {
48    pub fn create(stack: WeakStack, net_listener: NetListener, local_device: Device) -> Self {
49        let strong_stack = Stack::from(&stack); 
50        let config = &strong_stack.config().sn_client;
51        let atomic_interval = config.atomic_interval;
52        let gen_seq = Arc::new(TempSeqGenerator::new());
53        let manager = Self(Arc::new(ManagerImpl {
54            cache: SnCache::new(), 
55            ping: RwLock::new(PingClients::new(stack.clone(), gen_seq.clone(), net_listener, vec![], local_device)),
56            call: CallManager::create(stack.clone()), 
57            gen_seq, 
58            stack, 
59        }));
60
61        {
62            let manager = manager.clone();
63            task::spawn(async move {
64                loop {
65                    let now = bucky_time_now();
66                    manager.ping().on_time_escape(now);
67                    manager.call().on_time_escape(now);
68                    let _ = future::timeout(atomic_interval, future::pending::<()>()).await;
69                }
70            });
71        }
72        manager
73    }
74
75    pub fn cache(&self) -> &SnCache {
76        &self.0.cache
77    }
78
79    pub fn ping(&self) -> PingClients {
80        self.0.ping.read().unwrap().clone()
81    }
82
83    pub fn reset(&self) -> Option<PingClients> {
84        let next = {
85            let mut ping = self.0.ping.write().unwrap();
86            if let Some(status) = ping.status() {
87                if SnStatus::Offline == status {
88                    let to_close = ping.clone();
89                    let to_start = PingClients::new(
90                        self.0.stack.clone(), 
91                        self.0.gen_seq.clone(), 
92                        to_close.net_listener().reset(None), 
93                        to_close.sn_list().clone(), 
94                        to_close.default_local()
95                    );
96                    *ping = to_start.clone();
97                    Some((to_start, to_close))
98                } else {
99                    None
100                }
101            } else {
102                None
103            }
104        };
105
106        if let Some((to_start, to_close)) = next {
107            to_close.stop();
108            Some(to_start)
109        } else {
110            None
111        }
112    }
113
114    pub fn reset_sn_list(&self, sn_list: Vec<Device>) -> PingClients {
115        let (to_start, to_close) = {
116            let mut ping = self.0.ping.write().unwrap();
117            let to_close = ping.clone();
118            let to_start = PingClients::new(
119                self.0.stack.clone(), 
120                self.0.gen_seq.clone(), 
121                to_close.net_listener().reset(None), 
122                sn_list, 
123                to_close.default_local()
124            );
125            *ping = to_start.clone();
126            (to_start, to_close)
127        };
128        to_close.stop();
129        to_start
130    }
131
132    pub fn reset_endpoints(&self, net_listener: NetListener, local_device: Device) -> PingClients {
133        let (to_start, to_close) = {
134            let mut ping = self.0.ping.write().unwrap();
135            let to_close = ping.clone();
136            let to_start = to_close.reset(net_listener, local_device);
137            *ping = to_start.clone();
138            (to_start, to_close)
139        };
140        to_close.stop();
141        to_start
142    }
143
144    pub fn call(&self) -> &CallManager {
145        &self.0.call
146    }
147}
148
149impl OnUdpPackageBox for ClientManager {
150    fn on_udp_package_box(&self, package_box: udp::UdpPackageBox) -> Result<(), BuckyError> {
151        let from = package_box.remote().clone();
152        let from_interface = package_box.local();
153        for pkg in package_box.as_ref().packages() {
154            match pkg.cmd_code() {
155                PackageCmdCode::SnPingResp => {
156                    match pkg.as_any().downcast_ref::<SnPingResp>() {
157                        None => return Err(BuckyError::new(BuckyErrorCode::InvalidData, "should be SnPingResp")),
158                        Some(ping_resp) => {
159                            let _ = self.ping().on_udp_ping_resp(ping_resp, &from, from_interface.clone());
160                        }
161                    }
162                },
163                PackageCmdCode::SnCalled => {
164                    match pkg.as_any().downcast_ref::<SnCalled>() {
165                        None => return Err(BuckyError::new(BuckyErrorCode::InvalidData, "should be SnCalled")),
166                        Some(called) => {
167                            let _ = self.ping().on_called(called, package_box.as_ref(), &from, from_interface.clone());
168                        }
169                    }
170                },
171                PackageCmdCode::SnCallResp => {
172                    match pkg.as_any().downcast_ref::<SnCallResp>() {
173                        None => return Err(BuckyError::new(BuckyErrorCode::InvalidData, "should be SnCallResp")),
174                        Some(call_resp) => {
175                            let _ = self.call().on_udp_call_resp(call_resp, from_interface, &from);
176                        }
177                    }
178                },
179                _ => {
180                    return Err(BuckyError::new(BuckyErrorCode::InvalidData, format!("unkown package({:?})", pkg.cmd_code()).as_str()))
181                }
182            }
183        }
184
185        Ok(())
186    }
187}
188