1use async_std::task;
2use futures::executor::ThreadPool;
3use log::*;
4use std::{
5 any::Any,
6 sync::{
7 atomic::{self, AtomicBool},
8 Arc,
9 },
10 time::Duration,
11};
12
13use cyfs_base::*;
14
15use crate::{
16 history::keystore::{self, Keystore},
17 protocol::{*, v0::*},
18 types::*,
19};
20
21use super::{
22 call_stub::CallStub,
23 net_listener::{MessageSender, NetListener, UdpSender},
24 peer_manager::PeerManager,
25 receipt::*,
26 resend_queue::{ResendQueue, ResendCallbackTrait},
27};
28
29struct ServiceImpl {
36 seq_generator: TempSeqGenerator,
37 key_store: Keystore,
38 local_device_id: DeviceId,
39 local_device: Device,
40 stopped: AtomicBool,
41 contract: Box<dyn SnServiceContractServer + Send + Sync>,
42 thread_pool: ThreadPool,
43
44 peer_mgr: PeerManager,
46 resend_queue: Option<ResendQueue>,
47 call_stub: CallStub,
48
49}
50
51#[derive(Clone)]
52pub struct SnService(Arc<ServiceImpl>);
53
54impl SnService {
55 pub fn new(
56 local_device: Device,
57 local_secret: PrivateKey,
58 contract: Box<dyn SnServiceContractServer + Send + Sync>,
59 ) -> SnService {
60 let thread_pool = ThreadPool::new().unwrap();
61
62 let service = Self(Arc::new(ServiceImpl {
63 seq_generator: TempSeqGenerator::new(),
64 key_store: Keystore::new(
65 local_secret.clone(),
66 local_device.desc().clone(),
67 RsaCPUObjectSigner::new(
68 local_device.desc().public_key().clone(),
69 local_secret.clone(),
70 ),
71 keystore::Config {
72 active_time: Duration::from_secs(600),
74 capacity: 100000,
75 },
76 ),
77 resend_queue: None,local_device_id: local_device.desc().device_id(),
79 local_device: local_device.clone(),
80 stopped: AtomicBool::new(false),
81 peer_mgr: PeerManager::new(),
82 call_stub: CallStub::new(),
83 thread_pool: thread_pool.clone(),
84 contract,
85 }));
90
91 let resend_queue = ResendQueue::new(thread_pool, Duration::from_millis(200), 5, Box::new(service.clone()));
92
93 let mut_service = unsafe { &mut *(Arc::as_ptr(&service.0) as *mut ServiceImpl) };
94 mut_service.resend_queue = Some(resend_queue);
95
96 service
97 }
98
99 pub async fn start(&self) -> BuckyResult<()> {
100 let mut endpoints_v4 = vec![];
101 let mut endpoints_v6 = vec![];
102 for endpoint in self.0.local_device.connect_info().endpoints() {
103 if endpoint.addr().is_ipv4() {
104 endpoints_v4.push(endpoint.clone());
105 } else {
106 endpoints_v6.push(endpoint.clone());
107 };
108 }
109
110 let _listener = match NetListener::listen(&endpoints_v6, &endpoints_v4, self.clone()).await
111 {
112 Ok((listener, udp_count, _)) => {
113 if udp_count == 0 {
114 log::error!("sn-minner start failed for all udp-endpoints listen failed.");
115 Err(BuckyError::new(
116 BuckyErrorCode::Failed,
117 "all udp-endpoint listen failed",
118 ))
119 } else {
120 Ok(listener)
121 }
122 }
123 Err(e) => Err(e),
124 }?;
125
126 let timer = {
128 let service = self.clone();
129 task::spawn(async move {
130 loop {
131 {
132 if service.is_stopped() {
133 return;
134 }
135 service.clean_timeout_resource();
136 }
137 task::sleep(Duration::from_micros(100000)).await;
138 }
139 })
140 };
141
142 timer.await;
144
145 Ok(())
146 }
147
148 pub fn stop(&self) {
149 self.0.stopped.store(true, atomic::Ordering::Relaxed);
150 }
151
152 pub fn is_stopped(&self) -> bool {
153 self.0.stopped.load(atomic::Ordering::Relaxed)
154 }
155
156 pub fn local_device_id(&self) -> &DeviceId {
157 &self.0.local_device_id
158 }
159
160 pub(super) fn key_store(&self) -> &Keystore {
161 &self.0.key_store
162 }
163
164 fn resend_queue(&self) -> &ResendQueue {
165 self.0.resend_queue.as_ref().unwrap()
166 }
167
168 fn peer_manager(&self) -> &PeerManager {
169 &self.0.peer_mgr
170 }
171
172 pub(super) fn thread_pool(&self) -> &ThreadPool {
173 &self.0.thread_pool
174 }
175
176 fn send_resp(&self, mut sender: MessageSender, pkg: DynamicPackage, send_log: String) {
177 self.thread_pool().spawn_ok(async move {
178 if let Err(e) = sender.send(pkg).await {
179 warn!("{} send failed. error: {}.", send_log, e.to_string());
180 } else {
181 debug!("{} send ok.", send_log);
182 }
183
184 if let MessageSender::Tcp(tcp_sender) = sender {
185 tcp_sender.close()
186 }
187 });
188 }
189
190 fn send_resp_udp(&self, sender: Arc<UdpSender>, pkg: DynamicPackage, send_log: String) {
191 self.thread_pool().spawn_ok(async move {
192 let pkg_box = sender.box_pkg(pkg);
193 if let Err(e) = sender.send(&pkg_box).await {
194 warn!("{} send failed. error: {}.", send_log, e.to_string());
195 } else {
196 debug!("{} send ok.", send_log);
197 }
198 });
199 }
200
201 fn clean_timeout_resource(&self) {
202 let now = bucky_time_now();
203
204 if let Some(drops) = self.peer_manager().try_knock_timeout(now) {
205 for device in &drops {
206 self.key_store().reset_peer(device)
207 }
208 }
209
210 self.resend_queue().try_resend(now);
211 self.0.call_stub.recycle(now);
212 }
220
221 pub(super) fn handle(&self, mut pkg_box: PackageBox, resp_sender: MessageSender) {
222 let first_pkg = pkg_box.pop();
223 if first_pkg.is_none() {
224 warn!("fetch none pkg");
225 return;
226 }
227
228 let send_time = bucky_time_now();
229 let first_pkg = first_pkg.unwrap();
230 let cmd_pkg = match first_pkg.cmd_code() {
231 PackageCmdCode::Exchange => {
232 let exchg = <Box<dyn Any + Send>>::downcast::<Exchange>(first_pkg.into_any()); if let Ok(_) = exchg {
234 self.key_store().add_key(pkg_box.key(), pkg_box.remote());
235 } else {
236 warn!("fetch exchange failed, from: {:?}.", resp_sender.remote());
237 return;
238 }
239
240 match pkg_box.pop() {
241 Some(pkg) => pkg,
242 None => {
243 warn!("fetch none cmd-pkg, from: {:?}.", resp_sender.remote());
244 return;
245 }
246 }
247 }
248 _ => first_pkg,
249 };
250
251 match cmd_pkg.cmd_code() {
252 PackageCmdCode::SnPing => {
253 let ping_req = <Box<dyn Any + Send>>::downcast::<SnPing>(cmd_pkg.into_any());
254 if let Ok(ping_req) = ping_req {
255 self.handle_ping(
256 ping_req,
257 resp_sender,
258 Some((pkg_box.key(), pkg_box.remote())),
259 send_time,
260 );
261 } else {
262 warn!("fetch ping-req failed, from: {:?}.", resp_sender.remote());
263 return;
264 }
265 }
266 PackageCmdCode::SnCall => {
267 let call_req = <Box<dyn Any + Send>>::downcast::<SnCall>(cmd_pkg.into_any());
268 if let Ok(call_req) = call_req {
269 self.handle_call(
270 call_req,
271 resp_sender,
272 Some((pkg_box.key(), pkg_box.remote())),
273 send_time,
274 );
275 } else {
276 warn!("fetch sn-call failed, from: {:?}.", resp_sender.remote());
277 return;
278 }
279 }
280 PackageCmdCode::SnCalledResp => {
281 let called_resp =
282 <Box<dyn Any + Send>>::downcast::<SnCalledResp>(cmd_pkg.into_any());
283 if let Ok(called_resp) = called_resp {
284 self.handle_called_resp(called_resp, Some(pkg_box.key()))
285 } else {
286 warn!(
287 "fetch sn-called-resp failed, from: {:?}.",
288 resp_sender.remote()
289 );
290 return;
291 }
292 }
293 _ => warn!("invalid cmd-package, from: {:?}.", resp_sender.remote()),
294 }
295 }
296
297
298 fn handle_ping(
299 &self,
300 ping_req: Box<SnPing>,
301 resp_sender: MessageSender,
302 encryptor: Option<(&MixAesKey, &DeviceId)>,
303 send_time: Timestamp,
304 ) {
305 if resp_sender.local().unwrap().is_ipv4() {
306 self.handle_ipv4_ping(ping_req, resp_sender, encryptor, send_time);
307 } else {
308 self.handle_ipv6_ping(ping_req, resp_sender, encryptor, send_time)
309 }
310 }
311
312 fn handle_ipv6_ping(
313 &self,
314 ping_req: Box<SnPing>,
315 resp_sender: MessageSender,
316 encryptor: Option<(&MixAesKey, &DeviceId)>,
317 _send_time: Timestamp,
318 ) {
319 let from_peer_id = match ping_req.from_peer_id.as_ref() {
320 Some(id) => id,
321 None => match encryptor {
322 Some((_, id)) => id,
323 None => {
324 warn!(
325 "[ping from 'unknow-deviceid' seq({})] without from peer-desc.",
326 ping_req.seq.value()
327 );
328 return;
329 }
330 },
331 };
332
333 let log_key = format!(
334 "[ping from {} seq({})]",
335 from_peer_id.to_string(),
336 ping_req.seq.value()
337 );
338
339 let resp_sender = match resp_sender {
340 MessageSender::Tcp(_) => {
341 warn!("{} from tcp.", log_key);
342 return;
343 }
344 MessageSender::Udp(u) => Arc::new(u),
345 };
346
347 info!("{}", log_key);
348
349 let ping_resp = SnPingResp {
350 seq: ping_req.seq,
351 sn_peer_id: self.local_device_id().clone(),
352 result: BuckyErrorCode::Ok.into_u8(),
353 peer_info: None,
354 end_point_array: vec![Endpoint::from((
355 Protocol::Udp,
356 resp_sender.remote().clone(),
357 ))],
358 receipt: None,
359 };
360
361 self.send_resp_udp(
362 resp_sender,
363 DynamicPackage::from(ping_resp),
364 format!("{}", log_key),
365 );
366
367 }
368
369 fn handle_ipv4_ping(
370 &self,
371 ping_req: Box<SnPing>,
372 resp_sender: MessageSender,
373 encryptor: Option<(&MixAesKey, &DeviceId)>,
374 send_time: Timestamp,
375 ) {
376 let from_peer_id = match ping_req.from_peer_id.as_ref() {
377 Some(id) => id,
378 None => match encryptor {
379 Some((_, id)) => id,
380 None => {
381 warn!(
382 "[ping from 'unknow-deviceid' seq({})] without from peer-desc.",
383 ping_req.seq.value()
384 );
385 return;
386 }
387 },
388 };
389
390 let aes_key = encryptor.map(|(key, _)| key);
391
392 let log_key = format!(
393 "[ping from {} seq({})]",
394 from_peer_id.to_string(),
395 ping_req.seq.value()
396 );
397 let resp_sender = match resp_sender {
398 MessageSender::Tcp(_) => {
399 warn!("{} from tcp.", log_key);
400 return;
401 }
402 MessageSender::Udp(u) => Arc::new(u),
403 };
404
405 info!("{}", log_key);
406
407 if !self.peer_manager().peer_heartbeat(
426 from_peer_id.clone(),
427 &ping_req.peer_info,
428 resp_sender.clone(),
429 aes_key,
430 send_time,
431 ping_req.seq,
432 ) {
433 warn!("{} cache peer failed. the ping maybe is timeout.", log_key);
434 return;
435 };
436
437 let ping_resp = SnPingResp {
438 seq: ping_req.seq,
439 sn_peer_id: self.local_device_id().clone(),
440 result: BuckyErrorCode::Ok.into_u8(),
441 peer_info: Some(self.0.local_device.clone()),
442 end_point_array: vec![Endpoint::from((
443 Protocol::Udp,
444 resp_sender.remote().clone(),
445 ))],
446 receipt: None,
447 };
448
449 self.send_resp_udp(
450 resp_sender,
451 DynamicPackage::from(ping_resp),
452 format!("{}", log_key),
453 );
454 }
455
456 fn handle_call(
542 &self,
543 mut call_req: Box<SnCall>,
544 resp_sender: MessageSender,
545 _encryptor: Option<(&MixAesKey, &DeviceId)>,
546 _send_time: Timestamp,
547 ) {
548 let from_peer_id = &call_req.from_peer_id;
549 let log_key = format!(
550 "[call {}->{} seq({})]",
551 from_peer_id.to_string(),
552 call_req.to_peer_id.to_string(),
553 call_req.seq.value()
554 );
555 info!("{}.", log_key);
556 let call_requestor = self.peer_manager().find_peer(&call_req.from_peer_id);
584
585 if let Some(call_requestor) = call_requestor.as_ref() {
586 call_requestor.peer_status.add_record(call_req.to_peer_id.clone(), call_req.seq);
587 }
588
589 let call_resp =
590 if let Some(to_peer_cache) = self.peer_manager().find_peer(&call_req.to_peer_id) {
591 let from_peer_desc = if call_req.peer_info.is_none() {
593 self.peer_manager().find_peer(from_peer_id).map(|c| c.desc)
594 } else {
595 call_req.peer_info
596 };
597
598 if let Some(from_peer_desc) = from_peer_desc {
599 info!(
600 "{} to-peer found, endpoints: {}, always_call: {}, to-peer.is_wan: {}.",
601 log_key,
602 endpoints_to_string(to_peer_cache.desc.connect_info().endpoints()),
603 call_req.is_always_call,
604 to_peer_cache.is_wan
605 );
606
607 if self.0.call_stub.insert(from_peer_id, &call_req.seq) {
608 if call_req.is_always_call || !to_peer_cache.is_wan {
609 let called_seq = self.0.seq_generator.generate();
610 let mut called_req = SnCalled {
611 seq: called_seq,
612 to_peer_id: call_req.to_peer_id.clone(),
613 sn_peer_id: self.local_device_id().clone(),
614 peer_info: from_peer_desc,
615 call_seq: call_req.seq,
616 call_send_time: call_req.send_time,
617 payload: SizedOwnedData::from(vec![]),
618 reverse_endpoint_array: vec![],
619 active_pn_list: vec![],
620 };
621
622 std::mem::swap(&mut call_req.payload, &mut called_req.payload);
623 if let Some(eps) = call_req.reverse_endpoint_array.as_mut() {
624 std::mem::swap(eps, &mut called_req.reverse_endpoint_array);
625 }
626 if let Some(pn_list) = call_req.active_pn_list.as_mut() {
627 std::mem::swap(pn_list, &mut called_req.active_pn_list);
628 }
629
630 let called_log =
631 format!("{} called-req seq({})", log_key, called_seq.value());
632 log::debug!(
633 "{} will send with payload(len={}) pn_list({:?}).",
634 called_log,
635 called_req.payload.len(),
636 called_req.active_pn_list
637 );
638 self.resend_queue().send(
639 to_peer_cache.sender.clone(),
640 DynamicPackage::from(called_req),
641 called_seq.value(),
642 called_log,
643 );
644 }
646 } else {
647 info!("{} ignore send called req for already exists.", log_key);
648 }
649
650 SnCallResp {
651 seq: call_req.seq,
652 sn_peer_id: self.local_device_id().clone(),
653 result: BuckyErrorCode::Ok.into_u8(),
654 to_peer_info: Some(to_peer_cache.desc),
655 }
656 } else {
657 warn!("{} without from-desc.", log_key);
658
659 SnCallResp {
660 seq: call_req.seq,
661 sn_peer_id: self.local_device_id().clone(),
662 result: BuckyErrorCode::NotFound.into_u8(),
663 to_peer_info: None,
664 }
665 }
666 } else {
667 warn!("{} to-peer not found.", log_key);
668 SnCallResp {
669 seq: call_req.seq,
670 sn_peer_id: self.local_device_id().clone(),
671 result: BuckyErrorCode::NotFound.into_u8(),
672 to_peer_info: None,
673 }
674 };
675
676 match &call_resp.result {
677 0 => { },
678
679 _ => {
680 if let Some(call_requestor) = call_requestor.as_ref() {
681 call_requestor.peer_status.record(call_req.to_peer_id.clone(), call_req.seq, BuckyErrorCode::from(call_resp.result as u16));
682 }
683 }
684
685 }
686
687 self.send_resp(
688 resp_sender,
689 DynamicPackage::from(call_resp),
690 format!("{} call-resp", log_key),
691 );
692 }
693
694 fn handle_called_resp(&self, called_resp: Box<SnCalledResp>, _aes_key: Option<&MixAesKey>) {
695 info!("called-resp seq {}.", called_resp.seq.value());
696 self.resend_queue().confirm_pkg(called_resp.seq.value());
697
698 }
712}
713
714impl ResendCallbackTrait for SnService {
715 fn on_callback(&self, pkg: Arc<PackageBox>, errno: BuckyErrorCode) {
716 if let Some(p) = pkg.packages_no_exchange()
717 .get(0)
718 .map(| p | {
719 let p: &SnCalled = p.as_ref();
720 p
721 }) {
722 self.peer_manager().find_peer(&p.peer_info.desc().device_id())
723 .map(| requestor | {
724 requestor.peer_status.record(p.to_peer_id.clone(), p.call_seq, errno);
725 });
726 }
727 }
728}