cyfs_bdt/interface/
udp.rs

1use crate::{
2    types::*, 
3    history::keystore,
4    protocol::*,
5    stack::{Stack, WeakStack}
6};
7use super::{
8    manager::UpdateOuterResult
9};
10use async_std::sync::Arc;
11use cyfs_base::*;
12use log::*;
13use std::{cell::RefCell, net::UdpSocket, sync::RwLock, thread};
14use socket2::{Socket, Domain, Type};
15
16#[derive(Clone)]
17pub struct Config {
18    pub sim_loss_rate: u8, 
19    pub recv_buffer: usize, 
20    pub sn_only: bool
21}
22
23pub struct UdpPackageBox {
24    package_box: PackageBox,
25    remote: Endpoint,
26    local: Interface,
27}
28
29impl UdpPackageBox {
30    pub fn new(package_box: PackageBox, local: Interface, remote: Endpoint) -> Self {
31        Self {
32            package_box,
33            local,
34            remote,
35        }
36    }
37
38    pub fn remote(&self) -> &Endpoint {
39        &self.remote
40    }
41    pub fn local(&self) -> &Interface {
42        &self.local
43    }
44}
45
46impl Into<PackageBox> for UdpPackageBox {
47    fn into(self) -> PackageBox {
48        self.package_box
49    }
50}
51
52impl AsRef<PackageBox> for UdpPackageBox {
53    fn as_ref(&self) -> &PackageBox {
54        &self.package_box
55    }
56}
57
58//const MAGIC_NUMBER: u16 = u16::from_be_bytes([0u8, 0x80u8]);
59
60pub trait OnUdpPackageBox {
61    fn on_udp_package_box(&self, package_box: UdpPackageBox) -> Result<(), BuckyError>;
62}
63
64pub trait OnUdpRawData<Context> {
65    fn on_udp_raw_data(&self, data: &[u8], context: Context) -> Result<(), BuckyError>;
66}
67
68pub const MTU: usize = 1472;
69pub const MTU_LARGE: usize = 1024*30;
70
71thread_local! {
72    static UDP_RECV_BUFFER: RefCell<[u8; MTU_LARGE]> = RefCell::new([0u8; MTU_LARGE]);
73    static BOX_CRYPTO_BUFFER: RefCell<[u8; MTU_LARGE]> = RefCell::new([0u8; MTU_LARGE]);
74}
75
76struct InterfaceImpl {
77    config: Config, 
78    socket: UdpSocket, 
79    mapping_port: Option<u16>,
80    local: RwLock<Endpoint>,
81    outer: RwLock<Option<Endpoint>>,
82}
83
84#[derive(Clone)]
85pub struct Interface(Arc<InterfaceImpl>);
86
87impl std::fmt::Display for Interface {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(f, "UdpInterface {{local:{}}}", self.local())
90    }
91}
92
93impl std::fmt::Debug for Interface {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(f, "UdpInterface {{local:{}}}", self.local())
96    }
97}
98
99
100
101impl Interface {
102    pub fn bind(local: Endpoint, out: Option<Endpoint>, mapping_port: Option<u16>, config: Config) -> Result<Self, BuckyError> {
103        fn bind_socket(bind_addr: &Endpoint, recv_buffer: usize) -> Result<UdpSocket, BuckyError> {
104            let domain = if bind_addr.addr().is_ipv6() {
105                Domain::IPV6
106            } else {
107                Domain::IPV4
108            };
109
110            let socket = Socket::new(domain, Type::DGRAM, None).unwrap();
111
112            let _ = socket.set_recv_buffer_size(recv_buffer);
113            match socket.bind(&bind_addr.addr().clone().into()) {
114                Ok(_) => Ok(socket.into()), 
115                Err(err) => Err(BuckyError::from(err))
116            }
117        }
118 
119        let socket = {
120            if local.addr().is_ipv6() {
121                #[cfg(windows)]
122                {
123                    let mut default_local = Endpoint::default_udp(&local);
124                    default_local.mut_addr().set_port(local.addr().port());
125                    match bind_socket(&default_local, config.recv_buffer) {
126                        Ok(socket) => {
127                            // 避免udp被对端reset
128                            cyfs_util::init_udp_socket(&socket).map(|_| socket)
129                        }
130                        Err(err) => Err(BuckyError::from(err)),
131                    }
132                }
133                #[cfg(not(windows))]
134                {
135                    use std::os::unix::io::FromRawFd;
136                    unsafe {
137                        let raw_sock = libc::socket(libc::AF_INET6, libc::SOCK_DGRAM, 0);
138                        let yes: libc::c_int = 1;
139                        libc::setsockopt(
140                            raw_sock,
141                            libc::IPPROTO_IPV6,
142                            libc::IPV6_V6ONLY,
143                            &yes as *const libc::c_int as *const libc::c_void,
144                            std::mem::size_of::<libc::c_int>().try_into().unwrap(),
145                        );
146                        libc::setsockopt(
147                            raw_sock,
148                            libc::SOL_SOCKET,
149                            libc::SO_REUSEADDR | libc::SO_BROADCAST,
150                            &yes as *const libc::c_int as *const libc::c_void,
151                            std::mem::size_of::<libc::c_int>().try_into().unwrap(),
152                        );
153
154                        let recv_buf: libc::c_int = config.recv_buffer as libc::c_int;                        
155                        libc::setsockopt(raw_sock, 
156                            libc::SOL_SOCKET, 
157                            libc::SO_RCVBUF, 
158                            &recv_buf as *const libc::c_int as *const libc::c_void,
159                            std::mem::size_of::<libc::c_int>().try_into().unwrap(),);
160
161                        let addr = libc::sockaddr_in6 {
162                            #[cfg(any(target_os = "macos",target_os = "ios"))]
163                            sin6_len: 24,
164                            sin6_family: libc::AF_INET6 as libc::sa_family_t,
165                            sin6_port: local.addr().port().to_be(),
166                            sin6_flowinfo: 0,
167                            sin6_addr: libc::in6_addr { s6_addr: [0u8; 16] },
168                            sin6_scope_id: 0,
169                        };
170                        if libc::bind(
171                            raw_sock,
172                            &addr as *const libc::sockaddr_in6 as *const libc::sockaddr,
173                            std::mem::size_of::<libc::sockaddr_in6>()
174                                .try_into()
175                                .unwrap(),
176                        ) < 0
177                        {
178                            Err(BuckyError::from((
179                                BuckyErrorCode::AlreadyExists,
180                                "bind port failed",
181                            )))
182                        } else {
183                            Ok(UdpSocket::from_raw_fd(raw_sock))
184                        }
185                    }
186                }
187            } else {
188                let bind_addr = {
189                    if local.is_sys_default() {
190                        let mut default_local = Endpoint::default_udp(&local);
191                        default_local.mut_addr().set_port(local.addr().port());
192        
193                        default_local
194                    } else {
195                        local
196                    }
197                };
198
199                bind_socket(&bind_addr, config.recv_buffer)
200            }
201        }?;
202
203        Ok(Self(Arc::new(InterfaceImpl {
204            config, 
205            mapping_port, 
206            local: RwLock::new(local),
207            socket,
208            outer: RwLock::new(out),
209        })))
210    }
211
212
213    pub fn mapping_port(&self) -> Option<u16> {
214        self.0.mapping_port
215    }
216
217    pub fn reset(&self, local: &Endpoint) -> Self {
218        info!("{} reset with {}", self, local);
219        let new =  self.clone();
220        *new.0.local.write().unwrap() = local.clone();
221        *new.0.outer.write().unwrap() = None;
222        new
223    }
224
225    pub fn start(&self, stack: WeakStack) {
226        let ci = self.clone();
227        thread::spawn(move || {
228            info!("{} start on thread {:?}", ci, thread::current().id());
229            ci.recv_loop(stack);
230        });
231    }
232
233    pub fn close(&self) {
234        #[cfg(windows)]
235        {
236            use std::os::windows::io::AsRawSocket;
237            use winapi::um::winsock2::closesocket;
238            unsafe {
239                let raw = self.0.socket.as_raw_socket();
240                closesocket(raw.try_into().unwrap());
241            }
242        }
243        #[cfg(not(windows))]
244        {
245            use std::os::unix::io::AsRawFd;
246            unsafe {
247                let raw = self.0.socket.as_raw_fd();
248                libc::close(raw);
249            }
250        }
251    }
252
253    pub fn local(&self) -> Endpoint {
254        *self.0.local.read().unwrap()
255    }
256
257    pub fn outer(&self) -> Option<Endpoint> {
258        *self.0.outer.read().unwrap()
259    }
260
261    pub fn update_outer(&self, outer: &Endpoint) -> UpdateOuterResult {
262        let self_outer = &mut *self.0.outer.write().unwrap();
263        if let Some(outer_ep) = self_outer.as_ref() {
264            if *outer_ep != *outer {
265                info!("{} reset outer to {}", self, outer);
266                *self_outer = Some(*outer);
267                UpdateOuterResult::Reset
268            } else {
269                trace!("{} ignore update outer to {}", self, outer);
270                UpdateOuterResult::None
271            }
272        } else {
273            info!("{} update outer to {}", self, outer);
274            *self_outer = Some(*outer);
275            UpdateOuterResult::Update
276        }
277    }
278
279    pub fn is_same(&self, other: &Self) -> bool {
280        Arc::ptr_eq(&self.0, &other.0)
281    }
282
283    fn on_recv(&self, stack: Stack, recv: &mut [u8], from: Endpoint) {
284        if recv.len() == 0 {
285            return
286        }
287
288        if recv[0] & 0x80 != 0 {
289            match KeyMixHash::raw_decode(recv) {
290                Ok((mut mix_hash, raw_data)) => {
291                    mix_hash.as_mut()[0] &= 0x7f;
292                    if let Some(found_key) =
293                        stack.keystore().get_key_by_mix_hash(&mix_hash, true, true) {
294                        if self.0.config.sn_only {
295                            return; 
296                        }
297
298                        let _ = 
299                            stack.on_udp_raw_data(raw_data, (self.clone(), found_key.peerid, found_key.key, from));
300
301                        return;
302                    }
303                }
304                Err(err) => {
305                    error!("{} decode failed, from={}, len={}, e={}", self, from, recv.len(), &err);
306                    return;
307                }
308            }
309        }
310        let ctx =
311            PackageBoxDecodeContext::new_inplace(recv.as_mut_ptr(), recv.len(), stack.keystore());
312        match PackageBox::raw_decode_with_context(recv, ctx) {
313            Ok((package_box, _)) => {
314                if self.0.config.sn_only && !package_box.is_sn() {
315                    return;
316                }
317                let local_interface = self.clone();
318                if package_box.has_exchange() {
319                    async_std::task::spawn(async move {
320                        let exchange: &Exchange = package_box.packages()[0].as_ref();
321                        if !exchange.verify(stack.local_device_id()).await {
322                            warn!("{} exchg verify failed, from {}.", local_interface, from);
323                            return;
324                        }
325                        let _ = stack.on_udp_package_box(UdpPackageBox::new(
326                            package_box,
327                            local_interface,
328                            from,
329                        ));
330                    });
331                } else {
332                    let _ = stack.on_udp_package_box(UdpPackageBox::new(
333                        package_box,
334                        local_interface,
335                        from,
336                    ));
337                }
338            }
339            Err(err) => {
340                // do nothing
341                error!("{} decode failed, from={}, len={}, e={}", self, from, recv.len(), &err);
342            }
343        }
344    }
345
346    fn recv_loop(&self, weak_stack: WeakStack) {
347        UDP_RECV_BUFFER.with(|thread_recv_buf| {
348            let recv_buf = &mut thread_recv_buf.borrow_mut()[..];
349            loop {
350                let rr = self.0.socket.recv_from(recv_buf);
351                if rr.is_ok() {
352                    let stack = Stack::from(&weak_stack);
353                    let (len, from) = rr.unwrap();
354                    trace!("{} recv {} bytes from {}", self, len, from);
355                    let recv = &mut recv_buf[..len];
356                    // FIXME: 分发到工作线程去
357                    self.on_recv(stack, recv, Endpoint::from((Protocol::Udp, from)));
358                } else {
359                    let err = rr.err().unwrap();
360                    if let Some(10054i32) = err.raw_os_error() {
361                        // In Windows, if host A use UDP socket and call sendto() to send something to host B,
362                        // but B doesn't bind any port so that B doesn't receive the message,
363                        // and then host A call recvfrom() to receive some message,
364                        // recvfrom() will failed, and WSAGetLastError() will return 10054.
365                        // It's a bug of Windows.
366                        trace!("{} socket recv failed for {}, ingore this error", self, err);
367                    } else {
368                        info!("{} socket recv failed for {}, break recv loop", self, err);
369                        break;
370                    }
371                }
372            }
373        });
374    }
375
376    pub fn send_box_to(
377        &self,
378        context: &mut PackageBoxEncodeContext,
379        package_box: &PackageBox,
380        to: &Endpoint,
381    ) -> Result<usize, BuckyError> {
382        if self.0.config.sn_only && !package_box.is_sn() {
383            return Err(BuckyError::new(BuckyErrorCode::UnSupport, "interface is only for sn"));
384        } 
385        BOX_CRYPTO_BUFFER.with(|thread_crypto_buf| {
386            let crypto_buf = &mut thread_crypto_buf.borrow_mut()[..];
387           
388            let buf_len = crypto_buf.len();
389            let next_ptr = package_box
390                .raw_encode_with_context(crypto_buf, context, &None)
391                .map_err(|e| {
392                    error!("send_box_to encode failed, package_box: {:?}, e:{}", package_box, &e);
393                    e
394                })?;
395            let send_len = buf_len - next_ptr.len();
396            self.send_buf_to(&crypto_buf[..send_len], to)
397        })
398    }
399
400    pub fn send_box_mult(
401        context: &mut PackageBoxEncodeContext,
402        package_box: &PackageBox,
403        iter: impl Iterator<Item = (Self, Endpoint)>,
404        on_result: impl Fn(&Self, &Endpoint, Result<usize, BuckyError>) -> bool,
405    ) -> Result<usize, BuckyError> {
406        BOX_CRYPTO_BUFFER.with(|thread_crypto_buf| {
407            let crypto_buf = &mut thread_crypto_buf.borrow_mut()[..];
408            let buf_len = crypto_buf.len();
409            let next_ptr = package_box
410                .raw_encode_with_context(crypto_buf, context, &None)
411                .map_err(|e| {
412                    error!("send_box_mult encode failed, package_box: {:?}, e:{}", package_box, &e);
413                    e
414                })?;
415            let send_len = buf_len - next_ptr.len();
416            let mut send_count = 0;
417            for (from, to) in iter {
418                if from.local().is_same_ip_version(&to) {
419                    send_count += 1;
420                    let is_continue =
421                        on_result(&from, &to, from.send_buf_to(&crypto_buf[..send_len], &to));
422                    if !is_continue {
423                        break;
424                    }
425                }
426            }
427            Ok(send_count)
428        })
429    }
430
431    pub fn send_raw_data_to(
432        &self,
433        key: &MixAesKey,
434        data: &mut [u8],
435        to: &Endpoint,
436    ) -> Result<usize, BuckyError> {
437        if self.0.config.sn_only {
438            return Err(BuckyError::new(BuckyErrorCode::UnSupport, "interface is only for sn"));
439        }
440        let mix_hash = key.mix_hash();
441        let _ = mix_hash.raw_encode(data, &None)?;
442        data[0] |= 0x80;
443        self.send_buf_to(data, to)
444    }
445
446    pub fn send_buf_to(&self, buf: &[u8], to: &Endpoint) -> Result<usize, BuckyError> {
447        trace!("{} send {} bytes to {}", self, buf.len(), to);
448        if self.0.config.sim_loss_rate > 0 {
449            if rand::random::<u8>() < self.0.config.sim_loss_rate {
450                trace!("{} sim loss", self);
451                return Ok(buf.len());
452            }
453        }
454        self.0
455            .socket
456            .send_to(buf, to.addr())
457            .map_err(|e| BuckyError::from(e))
458    }
459}
460
461pub struct PackageBoxEncodeContext {
462    plaintext: bool,
463    ignore_exchange: bool, 
464    fixed_values: merge_context::FixedValues,
465    merged_values: Option<merge_context::ContextNames>,
466}
467
468impl PackageBoxEncodeContext {
469    pub fn plaintext(&self) -> bool {
470        self.plaintext
471    }
472
473    pub fn set_plaintext(&mut self, b: bool) {
474        self.plaintext = b
475    }
476
477    pub fn set_ignore_exchange(&mut self, b: bool) {
478        self.ignore_exchange = b
479    }
480}
481
482// 编码SnCall::payload
483impl From<&SnCall> for PackageBoxEncodeContext {
484    fn from(sn_call: &SnCall) -> Self {
485        let fixed_values: merge_context::FixedValues = sn_call.into();
486        let merged_values = fixed_values.clone_merged();
487        Self {
488            plaintext: false,
489            ignore_exchange: false, 
490            fixed_values,
491            merged_values: Some(merged_values),
492        }
493    }
494}
495
496// impl From<(&DeviceDesc, Timestamp)> for PackageBoxEncodeContext {
497//     fn from(params: (&DeviceDesc, Timestamp)) -> Self {
498//         let mut fixed_values = merge_context::FixedValues::new();
499//         fixed_values.insert("send_time", params.1);
500//         Self {
501//             ignore_exchange: false, 
502//             remote_const: Some(params.0.clone()),
503//             fixed_values,
504//             merged_values: None,
505//         }
506//     }
507// }
508
509impl Default for PackageBoxEncodeContext {
510    fn default() -> Self {
511        Self {
512            plaintext: false,
513            ignore_exchange: false, 
514            fixed_values: merge_context::FixedValues::new(),
515            merged_values: None,
516        }
517    }
518}
519
520enum DecryptBuffer<'de> {
521    Copy(&'de mut [u8]),
522    Inplace(*mut u8, usize),
523}
524
525pub trait PackageBoxVersionGetter {
526    fn version_of(&self, remote: &DeviceId) -> u8; 
527}
528
529pub struct PackageBoxDecodeContext<'de> {
530    decrypt_buf: DecryptBuffer<'de>,
531    keystore: &'de keystore::Keystore, 
532}
533
534impl<'de> PackageBoxDecodeContext<'de> {
535    pub fn new_copy(
536        decrypt_buf: &'de mut [u8], 
537        keystore: &'de keystore::Keystore, 
538    ) -> Self {
539        Self {
540            decrypt_buf: DecryptBuffer::Copy(decrypt_buf),
541            keystore, 
542        }
543    }
544
545    pub fn new_inplace(
546        ptr: *mut u8, 
547        len: usize, 
548        keystore: &'de keystore::Keystore, 
549    ) -> Self {
550        Self {
551            decrypt_buf: DecryptBuffer::Inplace(ptr, len),
552            keystore, 
553        }
554    }
555
556    // 返回用于aes 解码的buffer
557    pub unsafe fn decrypt_buf(self, data: &[u8]) -> &'de mut [u8] {
558        use DecryptBuffer::*;
559        match self.decrypt_buf {
560            Copy(decrypt_buf) => {
561                decrypt_buf[..data.len()].copy_from_slice(data);
562                decrypt_buf
563            }
564            Inplace(ptr, len) => {
565                std::slice::from_raw_parts_mut(ptr.offset((len - data.len()) as isize), data.len())
566            }
567        }
568    }
569    // 拿到local私钥
570    pub fn local_secret(&self) -> &PrivateKey {
571        self.keystore.private_key()
572    }
573
574    pub fn local_public_key(&self) -> &PublicKey {
575        self.keystore.public_key()
576    }
577
578    pub fn key_from_mixhash(&self, mix_hash: &KeyMixHash) -> Option<(DeviceId, MixAesKey)> {
579        self.keystore
580            .get_key_by_mix_hash(mix_hash, true, true)
581            .map(|k| (k.peerid, k.key))
582    }
583
584    pub fn version_of(&self, _remote: &DeviceId) -> u8 {
585        0
586    }
587}
588
589impl RawEncodeWithContext<PackageBoxEncodeContext> for PackageBox {
590    fn raw_measure_with_context(
591        &self,
592        _: &mut PackageBoxEncodeContext,
593        _purpose: &Option<RawEncodePurpose>,
594    ) -> Result<usize, BuckyError> {
595        //TODO
596        Ok(2048)
597    }
598
599    fn raw_encode_with_context<'a>(
600        &self,
601        buf: &'a mut [u8],
602        context: &mut PackageBoxEncodeContext,
603        purpose: &Option<RawEncodePurpose>,
604    ) -> Result<&'a mut [u8], BuckyError> {
605        let mut buf = buf;
606        if self.has_exchange() && !context.ignore_exchange {
607            let exchange: &Exchange = self.packages()[0].as_ref();
608            if buf.len() < exchange.key_encrypted.len() {
609                log::error!("try encode exchange without public-key");
610                    assert!(false);
611                    return Err(BuckyError::new(
612                        BuckyErrorCode::Failed,
613                        "try encode exchange without public-key",
614                    ));
615            }
616            // 首先用对端的const info加密aes key
617            buf[..exchange.key_encrypted.len()].copy_from_slice(&exchange.key_encrypted[..]);
618            buf = &mut buf[exchange.key_encrypted.len()..];
619        }
620
621        // 写入 key的mixhash
622        let mixhash = self.key().mix_hash();
623        let _ = mixhash.raw_encode(buf, purpose)?;
624        if context.plaintext {
625            buf[0] |= 0x80;
626        }
627        let buf = &mut buf[8..];
628
629        let mut encrypt_in_len = buf.len();
630        let to_encrypt_buf = buf;
631
632        // 编码所有包
633        let packages = if context.ignore_exchange {
634            self.packages_no_exchange()
635        } else {
636            self.packages()
637        };
638        let (mut other_context, mut buf, packages) = match context.merged_values.as_ref() {
639            Some(merged_values) => (
640                merge_context::OtherEncode::new(merged_values.clone(), Some(&context.fixed_values)),
641                &mut to_encrypt_buf[..],
642                packages,
643            ),
644            None => {
645                let mut first_context = merge_context::FirstEncode::from(&context.fixed_values); // merge_context::FirstEncode::new();
646                let enc: &dyn RawEncodeWithContext<merge_context::FirstEncode> =
647                    packages.get(0).unwrap().as_ref();
648                let buf = enc.raw_encode_with_context(to_encrypt_buf, &mut first_context, purpose)?;
649                (first_context.into(), buf, &packages[1..])
650            }
651        };
652        for p in packages {
653            let enc: &dyn RawEncodeWithContext<merge_context::OtherEncode> = p.as_ref();
654            buf = enc.raw_encode_with_context(buf, &mut other_context, purpose)?;
655        }
656        //let buf_len = buf.len();
657        encrypt_in_len -= buf.len();
658        // 用aes 加密package的部分
659        let len = if context.plaintext {
660            encrypt_in_len
661        } else {
662            self.key().enc_key.inplace_encrypt(to_encrypt_buf, encrypt_in_len)?
663        };
664
665        //info!("package_box udp encode: encrypt_in_len={} len={} buf_len={} plaintext={}", 
666        //    encrypt_in_len, len, buf_len, context.plaintext);
667
668        Ok(&mut to_encrypt_buf[len..])
669    }
670}
671
672impl<'de> RawDecodeWithContext<'de, PackageBoxDecodeContext<'de>> for PackageBox {
673    fn raw_decode_with_context(
674        buf: &'de [u8],
675        context: PackageBoxDecodeContext<'de>,
676    ) -> Result<(Self, &'de [u8]), BuckyError> {
677        Self::raw_decode_with_context(buf, (context, None))
678    }
679}
680
681impl<'de>
682    RawDecodeWithContext<
683        'de,
684        (
685            PackageBoxDecodeContext<'de>,
686            Option<merge_context::OtherDecode>,
687        ),
688    > for PackageBox
689{
690    fn raw_decode_with_context(
691        buf: &'de [u8],
692        c: (
693            PackageBoxDecodeContext<'de>,
694            Option<merge_context::OtherDecode>,
695        ),
696    ) -> Result<(Self, &'de [u8]), BuckyError> {
697        let (context, merged_values) = c;
698        let (mix_hash, hash_buf) = KeyMixHash::raw_decode(buf)?;
699
700        enum KeyStub {
701            Exist(DeviceId), 
702            Exchange(Vec<u8>)
703        }
704
705        struct KeyInfo {
706            enc_key: AesKey, 
707            mix_hash: KeyMixHash, 
708            stub: KeyStub
709        }
710
711
712        let mut mix_key = None;
713        let (key_info, buf) = {
714            match context.key_from_mixhash(&mix_hash) {
715                Some((remote, key)) => {
716                        mix_key = Some(key.mix_key);
717
718                    (KeyInfo {
719                        stub: KeyStub::Exist(remote), 
720                        enc_key: key.enc_key, 
721                        mix_hash
722                    }, hash_buf)
723                }, 
724                None => {
725                    let mut enc_key = AesKey::default();
726                    let (remain, _) = context.local_secret().decrypt_aeskey(buf, enc_key.as_mut_slice()).map_err(|e|{
727                        error!("decrypt aeskey err={}. (maybe: 1. local/remote device time is not correct 2. the packet is broken 3. the packet not contains Exchange info etc.. )", e);
728                        e
729                    })?;
730                    let encrypted = Vec::from(&buf[..buf.len() - remain.len()]);
731                    let (mix_hash, remain) = KeyMixHash::raw_decode(remain)?;
732                    (KeyInfo {
733                        stub: KeyStub::Exchange(encrypted), 
734                        enc_key, 
735                        mix_hash, 
736                    }, remain)
737                }
738            }
739        };
740
741        let mut version = if let KeyStub::Exist(remote) = &key_info.stub {
742            context.version_of(remote)
743        } else {
744            0
745        };
746        // 把原数据拷贝到context 给的buffer上去
747        let decrypt_buf = unsafe { context.decrypt_buf(buf) };
748        // 用key 解密数据
749        let decrypt_len =  key_info.enc_key.inplace_decrypt(decrypt_buf, buf.len())?;
750        let remain_buf = &buf[buf.len()..];
751        let decrypt_buf = &decrypt_buf[..decrypt_len];
752
753        let mut packages = vec![];
754
755        //解码所有package
756        if decrypt_len != 0 {
757            match merged_values {
758                Some(mut merged) => {
759                    let (package, buf) =
760                        DynamicPackage::raw_decode_with_context(decrypt_buf, (&mut merged, &mut version))?;
761                    packages.push(package);
762                    let mut buf_ptr = buf;
763                    while buf_ptr.len() > 0 {
764                        match DynamicPackage::raw_decode_with_context(buf_ptr, (&mut merged, &mut version)) {
765                            Ok((package, buf)) => {
766                                buf_ptr = buf;
767                                packages.push(package);
768                            }, 
769                            Err(err) => {
770                                if err.code() == BuckyErrorCode::NotSupport {
771                                    break;
772                                } else {
773                                    Err(err)?;
774                                }
775                            }
776                        };
777                    }
778                }
779                None => {
780                    let mut context = merge_context::FirstDecode::new();
781                    let (package, buf) = DynamicPackage::raw_decode_with_context(
782                        decrypt_buf[0..decrypt_len].as_ref(),
783                        (&mut context, &mut version)
784                    )?;
785                    packages.push(package);
786                    let mut context: merge_context::OtherDecode = context.into();
787                    let mut buf_ptr = buf;
788                    while buf_ptr.len() > 0 {
789                        match DynamicPackage::raw_decode_with_context(buf_ptr, (&mut context, &mut version)) {
790                            Ok((package, buf)) => {
791                                buf_ptr = buf;
792                                packages.push(package);
793                            }, 
794                            Err(err) => {
795                                if err.code() == BuckyErrorCode::NotSupport {
796                                    break;
797                                } else {
798                                    Err(err)?;
799                                }
800                            }
801                        };
802                    }
803                }
804            }
805        }
806
807        if mix_key.is_none() {
808            if packages.len() > 0 && packages[0].cmd_code().is_exchange() {
809                let exchange: &Exchange = packages[0].as_ref();
810                mix_key = Some(exchange.mix_key.clone());
811            } else {
812                return Err(BuckyError::new(BuckyErrorCode::ErrorState, "unkown mix_key"));
813            }
814        }
815
816        let key = MixAesKey {
817            enc_key: key_info.enc_key, 
818            mix_key: mix_key.unwrap()
819        };
820        match key_info.stub {
821            KeyStub::Exist(remote) => {
822                let mut package_box = PackageBox::encrypt_box(remote,key );
823                package_box.append(packages);
824                Ok((package_box, remain_buf))
825            }
826            KeyStub::Exchange(encrypted) => {
827                if packages.len() > 0 && packages[0].cmd_code().is_exchange() {
828                    let exchange: &mut Exchange = packages[0].as_mut();
829                    exchange.key_encrypted = encrypted;
830
831                    let mut package_box =
832                        PackageBox::encrypt_box(exchange.from_device_desc.desc().device_id(), key);
833                    package_box.append(packages);
834                    Ok((package_box, remain_buf))
835                } else {
836                    Err(BuckyError::new(BuckyErrorCode::InvalidData, "unkown from"))
837                }
838            }
839        }
840    }
841}