cyfs_bdt/sn/client/
manager.rs1use std::{
2 time::Duration,
3 sync::{Arc, RwLock},
4};
5
6use async_std::{
7 task,
8 future
9};
10
11use cyfs_base::*;
12use crate::{
13 types::*,
14 interface::{NetListener, udp::{self, OnUdpPackageBox}},
15 protocol::{*, v0::*},
16 stack::{Stack, WeakStack}
17};
18use super::{
19 cache::*,
20 ping::{PingConfig, PingClients, SnStatus},
21 call::{CallConfig, CallManager}
22};
23
24pub trait PingClientCalledEvent<Context=()>: Send + Sync {
25 fn on_called(&self, called: &SnCalled, context: Context) -> Result<(), BuckyError>;
26}
27
28
29#[derive(Clone)]
30pub struct Config {
31 pub atomic_interval: Duration,
32 pub ping: PingConfig,
33 pub call: CallConfig,
34}
35
36struct ManagerImpl {
37 stack: WeakStack,
38 gen_seq: Arc<TempSeqGenerator>,
39 cache: SnCache,
40 ping: RwLock<PingClients>,
41 call: CallManager,
42}
43
44#[derive(Clone)]
45pub struct ClientManager(Arc<ManagerImpl>);
46
47impl ClientManager {
48 pub fn create(stack: WeakStack, net_listener: NetListener, local_device: Device) -> Self {
49 let strong_stack = Stack::from(&stack);
50 let config = &strong_stack.config().sn_client;
51 let atomic_interval = config.atomic_interval;
52 let gen_seq = Arc::new(TempSeqGenerator::new());
53 let manager = Self(Arc::new(ManagerImpl {
54 cache: SnCache::new(),
55 ping: RwLock::new(PingClients::new(stack.clone(), gen_seq.clone(), net_listener, vec![], local_device)),
56 call: CallManager::create(stack.clone()),
57 gen_seq,
58 stack,
59 }));
60
61 {
62 let manager = manager.clone();
63 task::spawn(async move {
64 loop {
65 let now = bucky_time_now();
66 manager.ping().on_time_escape(now);
67 manager.call().on_time_escape(now);
68 let _ = future::timeout(atomic_interval, future::pending::<()>()).await;
69 }
70 });
71 }
72 manager
73 }
74
75 pub fn cache(&self) -> &SnCache {
76 &self.0.cache
77 }
78
79 pub fn ping(&self) -> PingClients {
80 self.0.ping.read().unwrap().clone()
81 }
82
83 pub fn reset(&self) -> Option<PingClients> {
84 let next = {
85 let mut ping = self.0.ping.write().unwrap();
86 if let Some(status) = ping.status() {
87 if SnStatus::Offline == status {
88 let to_close = ping.clone();
89 let to_start = PingClients::new(
90 self.0.stack.clone(),
91 self.0.gen_seq.clone(),
92 to_close.net_listener().reset(None),
93 to_close.sn_list().clone(),
94 to_close.default_local()
95 );
96 *ping = to_start.clone();
97 Some((to_start, to_close))
98 } else {
99 None
100 }
101 } else {
102 None
103 }
104 };
105
106 if let Some((to_start, to_close)) = next {
107 to_close.stop();
108 Some(to_start)
109 } else {
110 None
111 }
112 }
113
114 pub fn reset_sn_list(&self, sn_list: Vec<Device>) -> PingClients {
115 let (to_start, to_close) = {
116 let mut ping = self.0.ping.write().unwrap();
117 let to_close = ping.clone();
118 let to_start = PingClients::new(
119 self.0.stack.clone(),
120 self.0.gen_seq.clone(),
121 to_close.net_listener().reset(None),
122 sn_list,
123 to_close.default_local()
124 );
125 *ping = to_start.clone();
126 (to_start, to_close)
127 };
128 to_close.stop();
129 to_start
130 }
131
132 pub fn reset_endpoints(&self, net_listener: NetListener, local_device: Device) -> PingClients {
133 let (to_start, to_close) = {
134 let mut ping = self.0.ping.write().unwrap();
135 let to_close = ping.clone();
136 let to_start = to_close.reset(net_listener, local_device);
137 *ping = to_start.clone();
138 (to_start, to_close)
139 };
140 to_close.stop();
141 to_start
142 }
143
144 pub fn call(&self) -> &CallManager {
145 &self.0.call
146 }
147}
148
149impl OnUdpPackageBox for ClientManager {
150 fn on_udp_package_box(&self, package_box: udp::UdpPackageBox) -> Result<(), BuckyError> {
151 let from = package_box.remote().clone();
152 let from_interface = package_box.local();
153 for pkg in package_box.as_ref().packages() {
154 match pkg.cmd_code() {
155 PackageCmdCode::SnPingResp => {
156 match pkg.as_any().downcast_ref::<SnPingResp>() {
157 None => return Err(BuckyError::new(BuckyErrorCode::InvalidData, "should be SnPingResp")),
158 Some(ping_resp) => {
159 let _ = self.ping().on_udp_ping_resp(ping_resp, &from, from_interface.clone());
160 }
161 }
162 },
163 PackageCmdCode::SnCalled => {
164 match pkg.as_any().downcast_ref::<SnCalled>() {
165 None => return Err(BuckyError::new(BuckyErrorCode::InvalidData, "should be SnCalled")),
166 Some(called) => {
167 let _ = self.ping().on_called(called, package_box.as_ref(), &from, from_interface.clone());
168 }
169 }
170 },
171 PackageCmdCode::SnCallResp => {
172 match pkg.as_any().downcast_ref::<SnCallResp>() {
173 None => return Err(BuckyError::new(BuckyErrorCode::InvalidData, "should be SnCallResp")),
174 Some(call_resp) => {
175 let _ = self.call().on_udp_call_resp(call_resp, from_interface, &from);
176 }
177 }
178 },
179 _ => {
180 return Err(BuckyError::new(BuckyErrorCode::InvalidData, format!("unkown package({:?})", pkg.cmd_code()).as_str()))
181 }
182 }
183 }
184
185 Ok(())
186 }
187}
188