cyfs_bdt/sn/service/
service.rs

1use async_std::task;
2use futures::executor::ThreadPool;
3use log::*;
4use std::{
5    any::Any,
6    sync::{
7        atomic::{self, AtomicBool},
8        Arc,
9    },
10    time::Duration,
11};
12
13use cyfs_base::*;
14
15use crate::{
16    history::keystore::{self, Keystore},
17    protocol::{*, v0::*},
18    types::*,
19};
20
21use super::{
22    call_stub::CallStub,
23    net_listener::{MessageSender, NetListener, UdpSender},
24    peer_manager::PeerManager,
25    receipt::*,
26    resend_queue::{ResendQueue, ResendCallbackTrait},
27};
28
29// const TRACKER_INTERVAL: Duration = Duration::from_secs(60);
30// struct CallTracker {
31//     calls: HashMap<TempSeq, (u64, Instant, DeviceId)>, // <called_seq, (call_send_time, called_send_time)>
32//     begin_time: Instant,
33// }
34
35struct ServiceImpl {
36    seq_generator: TempSeqGenerator,
37    key_store: Keystore,
38    local_device_id: DeviceId,
39    local_device: Device,
40    stopped: AtomicBool,
41    contract: Box<dyn SnServiceContractServer + Send + Sync>,
42    thread_pool: ThreadPool,
43
44    // call_tracker: CallTracker,
45    peer_mgr: PeerManager,
46    resend_queue: Option<ResendQueue>,
47    call_stub: CallStub,
48
49}
50
51#[derive(Clone)]
52pub struct SnService(Arc<ServiceImpl>);
53
54impl SnService {
55    pub fn new(
56        local_device: Device,
57        local_secret: PrivateKey,
58        contract: Box<dyn SnServiceContractServer + Send + Sync>,
59    ) -> SnService {
60        let thread_pool = ThreadPool::new().unwrap();
61
62        let service = Self(Arc::new(ServiceImpl {
63            seq_generator: TempSeqGenerator::new(),
64            key_store: Keystore::new(
65                local_secret.clone(),
66                local_device.desc().clone(),
67                RsaCPUObjectSigner::new(
68                    local_device.desc().public_key().clone(),
69                    local_secret.clone(),
70                ),
71                keystore::Config {
72                    // <TODO>提供配置
73                    active_time: Duration::from_secs(600),
74                    capacity: 100000,
75                },
76            ),
77            resend_queue: None,/* ResendQueue::new(thread_pool.clone(), Duration::from_millis(200), 5), */
78            local_device_id: local_device.desc().device_id(),
79            local_device: local_device.clone(),
80            stopped: AtomicBool::new(false),
81            peer_mgr: PeerManager::new(),
82            call_stub: CallStub::new(),
83            thread_pool: thread_pool.clone(),
84            contract,
85            // call_tracker: CallTracker {
86            //     calls: Default::default(),
87            //     begin_time: Instant::now()
88            // }
89        }));
90
91        let resend_queue = ResendQueue::new(thread_pool, Duration::from_millis(200), 5, Box::new(service.clone()));
92
93        let mut_service = unsafe { &mut *(Arc::as_ptr(&service.0) as *mut ServiceImpl) };
94        mut_service.resend_queue = Some(resend_queue);
95
96        service
97    }
98
99    pub async fn start(&self) -> BuckyResult<()> {
100        let mut endpoints_v4 = vec![];
101        let mut endpoints_v6 = vec![];
102        for endpoint in self.0.local_device.connect_info().endpoints() {
103            if endpoint.addr().is_ipv4() {
104                endpoints_v4.push(endpoint.clone());
105            } else {
106                endpoints_v6.push(endpoint.clone());
107            };
108        }
109
110        let _listener = match NetListener::listen(&endpoints_v6, &endpoints_v4, self.clone()).await
111        {
112            Ok((listener, udp_count, _)) => {
113                if udp_count == 0 {
114                    log::error!("sn-minner start failed for all udp-endpoints listen failed.");
115                    Err(BuckyError::new(
116                        BuckyErrorCode::Failed,
117                        "all udp-endpoint listen failed",
118                    ))
119                } else {
120                    Ok(listener)
121                }
122            }
123            Err(e) => Err(e),
124        }?;
125
126        // 清理过期数据
127        let timer = {
128            let service = self.clone();
129            task::spawn(async move {
130                loop {
131                    {
132                        if service.is_stopped() {
133                            return;
134                        }
135                        service.clean_timeout_resource();
136                    }
137                    task::sleep(Duration::from_micros(100000)).await;
138                }
139            })
140        };
141
142        // 没有stop
143        timer.await;
144
145        Ok(())
146    }
147
148    pub fn stop(&self) {
149        self.0.stopped.store(true, atomic::Ordering::Relaxed);
150    }
151
152    pub fn is_stopped(&self) -> bool {
153        self.0.stopped.load(atomic::Ordering::Relaxed)
154    }
155
156    pub fn local_device_id(&self) -> &DeviceId {
157        &self.0.local_device_id
158    }
159
160    pub(super) fn key_store(&self) -> &Keystore {
161        &self.0.key_store
162    }
163
164    fn resend_queue(&self) -> &ResendQueue {
165        self.0.resend_queue.as_ref().unwrap()
166    }
167
168    fn peer_manager(&self) -> &PeerManager {
169        &self.0.peer_mgr
170    }
171
172    pub(super) fn thread_pool(&self) -> &ThreadPool {
173        &self.0.thread_pool
174    }
175
176    fn send_resp(&self, mut sender: MessageSender, pkg: DynamicPackage, send_log: String) {
177        self.thread_pool().spawn_ok(async move {
178            if let Err(e) = sender.send(pkg).await {
179                warn!("{} send failed. error: {}.", send_log, e.to_string());
180            } else {
181                debug!("{} send ok.", send_log);
182            }
183
184            if let MessageSender::Tcp(tcp_sender) = sender {
185                tcp_sender.close()
186            }
187        });
188    }
189
190    fn send_resp_udp(&self, sender: Arc<UdpSender>, pkg: DynamicPackage, send_log: String) {
191        self.thread_pool().spawn_ok(async move {
192            let pkg_box = sender.box_pkg(pkg);
193            if let Err(e) = sender.send(&pkg_box).await {
194                warn!("{} send failed. error: {}.", send_log, e.to_string());
195            } else {
196                debug!("{} send ok.", send_log);
197            }
198        });
199    }
200
201    fn clean_timeout_resource(&self) {
202        let now = bucky_time_now();
203
204        if let Some(drops) = self.peer_manager().try_knock_timeout(now) {
205            for device in &drops {
206                self.key_store().reset_peer(device)
207            }
208        }
209
210        self.resend_queue().try_resend(now);
211        self.0.call_stub.recycle(now);
212        // {
213        //     let tracker = &mut self.call_tracker;
214        //     if let Ordering::Greater = now.duration_since(tracker.begin_time).cmp(&TRACKER_INTERVAL) {
215        //         tracker.calls.clear();
216        //         tracker.begin_time = now;
217        //     }
218        // }
219    }
220
221    pub(super) fn handle(&self, mut pkg_box: PackageBox, resp_sender: MessageSender) {
222        let first_pkg = pkg_box.pop();
223        if first_pkg.is_none() {
224            warn!("fetch none pkg");
225            return;
226        }
227
228        let send_time = bucky_time_now();
229        let first_pkg = first_pkg.unwrap();
230        let cmd_pkg = match first_pkg.cmd_code() {
231            PackageCmdCode::Exchange => {
232                let exchg = <Box<dyn Any + Send>>::downcast::<Exchange>(first_pkg.into_any()); // pkg.into_any().downcast::<Exchange>();
233                if let Ok(_) = exchg {
234                    self.key_store().add_key(pkg_box.key(), pkg_box.remote());
235                } else {
236                    warn!("fetch exchange failed, from: {:?}.", resp_sender.remote());
237                    return;
238                }
239
240                match pkg_box.pop() {
241                    Some(pkg) => pkg,
242                    None => {
243                        warn!("fetch none cmd-pkg, from: {:?}.", resp_sender.remote());
244                        return;
245                    }
246                }
247            }
248            _ => first_pkg,
249        };
250
251        match cmd_pkg.cmd_code() {
252            PackageCmdCode::SnPing => {
253                let ping_req = <Box<dyn Any + Send>>::downcast::<SnPing>(cmd_pkg.into_any());
254                if let Ok(ping_req) = ping_req {
255                    self.handle_ping(
256                        ping_req,
257                        resp_sender,
258                        Some((pkg_box.key(), pkg_box.remote())),
259                        send_time,
260                    );
261                } else {
262                    warn!("fetch ping-req failed, from: {:?}.", resp_sender.remote());
263                    return;
264                }
265            }
266            PackageCmdCode::SnCall => {
267                let call_req = <Box<dyn Any + Send>>::downcast::<SnCall>(cmd_pkg.into_any());
268                if let Ok(call_req) = call_req {
269                    self.handle_call(
270                        call_req,
271                        resp_sender,
272                        Some((pkg_box.key(), pkg_box.remote())),
273                        send_time,
274                    );
275                } else {
276                    warn!("fetch sn-call failed, from: {:?}.", resp_sender.remote());
277                    return;
278                }
279            }
280            PackageCmdCode::SnCalledResp => {
281                let called_resp =
282                    <Box<dyn Any + Send>>::downcast::<SnCalledResp>(cmd_pkg.into_any());
283                if let Ok(called_resp) = called_resp {
284                    self.handle_called_resp(called_resp, Some(pkg_box.key()))
285                } else {
286                    warn!(
287                        "fetch sn-called-resp failed, from: {:?}.",
288                        resp_sender.remote()
289                    );
290                    return;
291                }
292            }
293            _ => warn!("invalid cmd-package, from: {:?}.", resp_sender.remote()),
294        }
295    }
296
297
298    fn handle_ping(
299        &self,
300        ping_req: Box<SnPing>,
301        resp_sender: MessageSender,
302        encryptor: Option<(&MixAesKey, &DeviceId)>,
303        send_time: Timestamp,
304    ) {
305        if resp_sender.local().unwrap().is_ipv4() {
306            self.handle_ipv4_ping(ping_req, resp_sender, encryptor, send_time);
307        } else {
308            self.handle_ipv6_ping(ping_req, resp_sender, encryptor, send_time)
309        }
310    }
311
312    fn handle_ipv6_ping(
313        &self,
314        ping_req: Box<SnPing>,
315        resp_sender: MessageSender,
316        encryptor: Option<(&MixAesKey, &DeviceId)>,
317        _send_time: Timestamp,
318    ) {
319        let from_peer_id = match ping_req.from_peer_id.as_ref() {
320            Some(id) => id,
321            None => match encryptor {
322                Some((_, id)) => id,
323                None => {
324                    warn!(
325                        "[ping from 'unknow-deviceid' seq({})] without from peer-desc.",
326                        ping_req.seq.value()
327                    );
328                    return;
329                }
330            },
331        };
332
333        let log_key = format!(
334            "[ping from {} seq({})]",
335            from_peer_id.to_string(),
336            ping_req.seq.value()
337        );
338
339        let resp_sender = match resp_sender {
340            MessageSender::Tcp(_) => {
341                warn!("{} from tcp.", log_key);
342                return;
343            }
344            MessageSender::Udp(u) => Arc::new(u),
345        };
346
347        info!("{}", log_key);
348
349        let ping_resp = SnPingResp {
350            seq: ping_req.seq,
351            sn_peer_id: self.local_device_id().clone(),
352            result: BuckyErrorCode::Ok.into_u8(),
353            peer_info: None,
354            end_point_array: vec![Endpoint::from((
355                Protocol::Udp,
356                resp_sender.remote().clone(),
357            ))],
358            receipt: None,
359        };
360
361        self.send_resp_udp(
362            resp_sender,
363            DynamicPackage::from(ping_resp),
364            format!("{}", log_key),
365        );
366
367    }
368
369    fn handle_ipv4_ping(
370        &self,
371        ping_req: Box<SnPing>,
372        resp_sender: MessageSender,
373        encryptor: Option<(&MixAesKey, &DeviceId)>,
374        send_time: Timestamp,
375    ) {
376        let from_peer_id = match ping_req.from_peer_id.as_ref() {
377            Some(id) => id,
378            None => match encryptor {
379                Some((_, id)) => id,
380                None => {
381                    warn!(
382                        "[ping from 'unknow-deviceid' seq({})] without from peer-desc.",
383                        ping_req.seq.value()
384                    );
385                    return;
386                }
387            },
388        };
389
390        let aes_key = encryptor.map(|(key, _)| key);
391
392        let log_key = format!(
393            "[ping from {} seq({})]",
394            from_peer_id.to_string(),
395            ping_req.seq.value()
396        );
397        let resp_sender = match resp_sender {
398            MessageSender::Tcp(_) => {
399                warn!("{} from tcp.", log_key);
400                return;
401            }
402            MessageSender::Udp(u) => Arc::new(u),
403        };
404
405        info!("{}", log_key);
406
407        // let (result, endpoints, receipt) = if let Some((accept, local_receipt)) = self.ping_receipt(&ping_req, from_peer_id) {
408        //     let receipt = match accept {
409        //         IsAcceptClient::Refuse => {
410        //             return;
411        //         }
412        //         IsAcceptClient::Accept(is_request_receipt) => if is_request_receipt {
413        //             Some(local_receipt)
414        //         } else {
415        //             None
416        //         }
417        //     };
418
419        //     info!("{} from-endpoint: {}", log_key, resp_sender.remote());
420        //     (BuckyErrorCode::Ok as u8, vec![Endpoint::from((Protocol::Udp, resp_sender.remote().clone()))], receipt)
421        // } else {
422        //     (BuckyErrorCode::NotFound as u8, vec![], None)
423        // };
424
425        if !self.peer_manager().peer_heartbeat(
426            from_peer_id.clone(),
427            &ping_req.peer_info,
428            resp_sender.clone(),
429            aes_key,
430            send_time,
431            ping_req.seq,
432        ) {
433            warn!("{} cache peer failed. the ping maybe is timeout.", log_key);
434            return;
435        };
436
437        let ping_resp = SnPingResp {
438            seq: ping_req.seq,
439            sn_peer_id: self.local_device_id().clone(),
440            result: BuckyErrorCode::Ok.into_u8(),
441            peer_info: Some(self.0.local_device.clone()),
442            end_point_array: vec![Endpoint::from((
443                Protocol::Udp,
444                resp_sender.remote().clone(),
445            ))],
446            receipt: None,
447        };
448
449        self.send_resp_udp(
450            resp_sender,
451            DynamicPackage::from(ping_resp),
452            format!("{}", log_key),
453        );
454    }
455
456    // fn verify_receipt_sign(
457    //     &self,
458    //     client_desc: &DeviceDesc,
459    //     signed_receipt: &Option<ReceiptWithSignature>) -> bool {
460    //     match signed_receipt {
461    //         None => false,
462    //         Some(receipt) => {
463    //             receipt.receipt().verify(sn_peerid, receipt.signature(), client_desc)
464    //         }
465    //     }
466    // }
467
468    // // 处理ping服务证明
469    // fn ping_receipt(&self, ping_req: &SnPing, from_id: &DeviceId) -> Option<(IsAcceptClient, SnServiceReceipt)> {
470    //     let mut cache_peer = self.peer_mgr.find_peer(from_id, FindPeerReason::Other);
471
472    //     let (device, local_receipt, last_receipt_request_time) = match &cache_peer {
473    //         Some(cache) => (&cache.desc, cache.receipt.clone(), cache.last_receipt_request_time),
474    //         None => {
475    //             let dev = match ping_req.peer_info.as_ref() {
476    //                 Some(dev) => dev,
477    //                 None => return None,
478    //             };
479    //             (
480    //                 dev,
481    //                 SnServiceReceipt::default(),
482    //                 ReceiptRequestTime::None
483    //             )
484    //         }
485    //     };
486
487    //     let is_verify_ok = self.verify_receipt_sign(ping_req.peer_info.desc(), &ping_req.receipt);
488    //     let client_receipt = if is_verify_ok { &ping_req.receipt } else { &None };
489    //     let check_receipt = self.contract.check_receipt(device, &local_receipt, client_receipt, &last_receipt_request_time);
490
491    //     let is_reset_receipt = if is_verify_ok {
492    //         match cache_peer.as_mut() {
493    //             Some(cache_peer) => match last_receipt_request_time {
494    //                 ReceiptRequestTime::Wait(t) => {
495    //                     cache_peer.last_receipt_request_time = ReceiptRequestTime::Last(t);
496    //                     // 重置统计计数
497    //                     true
498    //                 }
499    //                 _ => false
500    //             }
501    //             None => false
502    //         }
503    //     } else {
504    //         false
505    //     };
506
507    //     let is_request_receipt = match check_receipt {
508    //         IsAcceptClient::Refuse => {
509    //             warn!("[ping from {} seq({})] refused by contract.", from_id, ping_req.seq.value());
510    //             return Some((IsAcceptClient::Refuse, local_receipt))
511    //         },
512    //         IsAcceptClient::Accept(r) => r,
513    //     };
514
515    //     if let Some(cache_peer) = cache_peer {
516    //         if is_reset_receipt {
517    //             cache_peer.receipt.start_time = SystemTime::now();
518    //             cache_peer.receipt.ping_count = 0;
519    //             cache_peer.receipt.ping_resp_count = 0;
520    //             cache_peer.receipt.called_count = 0;
521    //             cache_peer.receipt.call_peer_count = 0;
522    //             cache_peer.call_peers.clear();
523    //         }
524
525    //         if cache_peer.last_ping_seq != ping_req.seq {
526    //             cache_peer.receipt.ping_count += 1;
527    //             cache_peer.receipt.ping_resp_count += 1;
528    //             cache_peer.last_ping_seq = ping_req.seq;
529    //         }
530
531    //         if is_request_receipt {
532    //             if let ReceiptRequestTime::Last(_) = cache_peer.last_receipt_request_time { // 一次新的请求
533    //                 cache_peer.last_receipt_request_time = ReceiptRequestTime::Wait(SystemTime::now());
534    //             }
535    //         }
536    //     }
537
538    //     Some((IsAcceptClient::Accept(is_request_receipt), local_receipt))
539    // }
540
541    fn handle_call(
542        &self,
543        mut call_req: Box<SnCall>,
544        resp_sender: MessageSender,
545        _encryptor: Option<(&MixAesKey, &DeviceId)>,
546        _send_time: Timestamp,
547    ) {
548        let from_peer_id = &call_req.from_peer_id;
549        let log_key = format!(
550            "[call {}->{} seq({})]",
551            from_peer_id.to_string(),
552            call_req.to_peer_id.to_string(),
553            call_req.seq.value()
554        );
555        info!("{}.", log_key);
556        // if let IsAcceptClient::Refuse = self.contract.verify_auth(&call_req.to_peer_id) {
557        //     warn!("{} refused by contract.", log_key);
558        //     send_responce(self,
559        //                   resp_sender,
560        //                   call_req.seq,
561        //                   BuckyErrorCode::PermissionDenied,
562        //                   None,
563        //                   log_key.as_str()
564        //     );
565        //     return;
566        // }
567
568        // if let Some(cached_from) = self.peer_mgr.find_peer(from_peer_id, FindPeerReason::CallFrom(*send_time)) {
569        //     if &cached_from.last_call_time > send_time {
570        //         warn!("{} ignore for timeout.", log_key);
571        //         return;
572        //     } else {
573        //         if from_peer_desc.is_none() {
574        //             from_peer_desc = Some(cached_from.desc.clone());
575        //         }
576        //     }
577        // } else {
578        //     warn!("{} without from-desc.", log_key);
579        //     call_result = BuckyErrorCode::NotFound;
580        // };
581
582
583        let call_requestor = self.peer_manager().find_peer(&call_req.from_peer_id);
584
585        if let Some(call_requestor) = call_requestor.as_ref() {
586            call_requestor.peer_status.add_record(call_req.to_peer_id.clone(), call_req.seq);
587        }
588
589        let call_resp =
590            if let Some(to_peer_cache) = self.peer_manager().find_peer(&call_req.to_peer_id) {
591                // Self::call_stat_contract(to_peer_cache, &call_req);
592                let from_peer_desc = if call_req.peer_info.is_none() {
593                    self.peer_manager().find_peer(from_peer_id).map(|c| c.desc)
594                } else {
595                    call_req.peer_info
596                };
597
598                if let Some(from_peer_desc) = from_peer_desc {
599                    info!(
600                        "{} to-peer found, endpoints: {}, always_call: {}, to-peer.is_wan: {}.",
601                        log_key,
602                        endpoints_to_string(to_peer_cache.desc.connect_info().endpoints()),
603                        call_req.is_always_call,
604                        to_peer_cache.is_wan
605                    );
606
607                    if self.0.call_stub.insert(from_peer_id, &call_req.seq) {
608                        if call_req.is_always_call || !to_peer_cache.is_wan {
609                            let called_seq = self.0.seq_generator.generate();
610                            let mut called_req = SnCalled {
611                                seq: called_seq,
612                                to_peer_id: call_req.to_peer_id.clone(),
613                                sn_peer_id: self.local_device_id().clone(),
614                                peer_info: from_peer_desc,
615                                call_seq: call_req.seq,
616                                call_send_time: call_req.send_time,
617                                payload: SizedOwnedData::from(vec![]),
618                                reverse_endpoint_array: vec![],
619                                active_pn_list: vec![],
620                            };
621
622                            std::mem::swap(&mut call_req.payload, &mut called_req.payload);
623                            if let Some(eps) = call_req.reverse_endpoint_array.as_mut() {
624                                std::mem::swap(eps, &mut called_req.reverse_endpoint_array);
625                            }
626                            if let Some(pn_list) = call_req.active_pn_list.as_mut() {
627                                std::mem::swap(pn_list, &mut called_req.active_pn_list);
628                            }
629
630                            let called_log =
631                                format!("{} called-req seq({})", log_key, called_seq.value());
632                            log::debug!(
633                                "{} will send with payload(len={}) pn_list({:?}).",
634                                called_log,
635                                called_req.payload.len(),
636                                called_req.active_pn_list
637                            );
638                            self.resend_queue().send(
639                                to_peer_cache.sender.clone(),
640                                DynamicPackage::from(called_req),
641                                called_seq.value(),
642                                called_log,
643                            );
644                            // self.call_tracker.calls.insert(called_seq, (call_req.send_time, Instant::now(), call_req.to_peer_id.clone()));
645                        }
646                    } else {
647                        info!("{} ignore send called req for already exists.", log_key);
648                    }
649
650                    SnCallResp {
651                        seq: call_req.seq,
652                        sn_peer_id: self.local_device_id().clone(),
653                        result: BuckyErrorCode::Ok.into_u8(),
654                        to_peer_info: Some(to_peer_cache.desc),
655                    }
656                } else {
657                    warn!("{} without from-desc.", log_key);
658
659                    SnCallResp {
660                        seq: call_req.seq,
661                        sn_peer_id: self.local_device_id().clone(),
662                        result: BuckyErrorCode::NotFound.into_u8(),
663                        to_peer_info: None,
664                    }
665                }
666            } else {
667                warn!("{} to-peer not found.", log_key);
668                SnCallResp {
669                    seq: call_req.seq,
670                    sn_peer_id: self.local_device_id().clone(),
671                    result: BuckyErrorCode::NotFound.into_u8(),
672                    to_peer_info: None,
673                }
674            };
675
676        match &call_resp.result {
677            0 => { /* wait confirm */ },
678
679            _ => {
680                if let Some(call_requestor) = call_requestor.as_ref() {
681                    call_requestor.peer_status.record(call_req.to_peer_id.clone(), call_req.seq, BuckyErrorCode::from(call_resp.result as u16));
682                }
683            }
684
685        }
686
687        self.send_resp(
688            resp_sender,
689            DynamicPackage::from(call_resp),
690            format!("{} call-resp", log_key),
691        );
692    }
693
694    fn handle_called_resp(&self, called_resp: Box<SnCalledResp>, _aes_key: Option<&MixAesKey>) {
695        info!("called-resp seq {}.", called_resp.seq.value());
696        self.resend_queue().confirm_pkg(called_resp.seq.value());
697
698        // 统计性能
699        // if let Some((call_send_time, called_send_time, peerid)) = self.call_tracker.calls.remove(&called_resp.seq) {
700        //     if let Some(cached_peer) = self.peer_mgr.find_peer(&peerid, FindPeerReason::Other) {
701        //         let now_time_stamp = bucky_time_now();
702        //         if now_time_stamp > call_send_time {
703        //             let call_delay = (now_time_stamp - call_send_time) / 1000;
704        //             cached_peer.receipt.call_delay = ((cached_peer.receipt.call_delay as u64 * 7 + call_delay) / 8) as u16;
705        //         }
706
707        //         let rto = Instant::now().duration_since(called_send_time).as_millis() as u32;
708        //         cached_peer.receipt.rto = ((cached_peer.receipt.rto as u32 * 7 + rto) / 8) as u16;
709        //     }
710        // }
711    }
712}
713
714impl ResendCallbackTrait for SnService {
715    fn on_callback(&self, pkg: Arc<PackageBox>, errno: BuckyErrorCode) {
716        if let Some(p) = pkg.packages_no_exchange()
717                                    .get(0)
718                                    .map(| p | {
719                                        let p: &SnCalled = p.as_ref();
720                                        p
721                                    }) {
722            self.peer_manager().find_peer(&p.peer_info.desc().device_id())
723                .map(| requestor | {
724                    requestor.peer_status.record(p.to_peer_id.clone(), p.call_seq, errno);
725                });
726        }
727    }
728}