cyfs_bdt/datagram/
tunnel.rs

1use crate::{
2    protocol::{self, *},
3    stack::{Stack, WeakStack},
4    tunnel::{BuildTunnelParams, TunnelContainer, TunnelState},
5    types::*, 
6    MTU
7};
8use async_std::{pin::Pin, sync::Arc, task};
9use cyfs_base::*;
10use cyfs_debug::Mutex;
11use futures::{
12    task::{Context, Poll},
13    Future,
14};
15use log::*;
16use std::{
17    collections::{LinkedList, HashMap},
18    ops::{Deref, Drop},
19    task::Waker,
20    time::Duration,
21};
22
23#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
24pub struct DatagramSource {
25    pub remote: DeviceId,
26    pub vport: u16,
27}
28
29#[derive(Clone)]
30pub struct DatagramOptions {
31    pub sequence: Option<TempSeq>,
32    pub author_id: Option<DeviceId>,
33    pub create_time: Option<Timestamp>,
34    pub send_time: Option<Timestamp>,
35    pub plaintext: bool,
36}
37
38impl Default for DatagramOptions {
39    fn default() -> Self {
40        Self {
41            sequence: None,
42            author_id: None,
43            create_time: None,
44            send_time: None,
45            plaintext: false,
46        }
47    }
48}
49
50pub struct Datagram {
51    pub source: DatagramSource,
52    pub options: DatagramOptions,
53    pub data: Vec<u8>,
54}
55
56struct RecvBuffer {
57    capability: usize,
58    waker: Option<Waker>,
59    buffer: LinkedList<Datagram>,
60}
61
62struct DatagramFragment {
63    author_id: DeviceId,
64    from_vport: u16,
65    sequence: TempSeq,
66    to_vport: u16,
67	expire_time: u64,
68	datagrams: HashMap<u8, protocol::v0::Datagram>,
69	fragment_total: usize,
70}
71
72struct DatagramFragments {
73    fragments: HashMap<String, DatagramFragment>,
74    frag_data_size: usize,
75    frag_data_max_size: usize,
76    frag_expired_us: u64,
77}
78
79struct DatagramTunnelImpl {
80    stack: WeakStack,
81    sequence: TempSeqGenerator,
82    vport: u16,
83    recv_buffer: Mutex<RecvBuffer>,
84    frag_buffer: Arc<Mutex<DatagramFragments>>,
85}
86
87impl DatagramTunnelImpl {
88    fn poll_recv_v(
89        &self,
90        cx: &mut Context<'_>,
91    ) -> Poll<Result<LinkedList<Datagram>, std::io::Error>> {
92        let mut recv_buffer = self.recv_buffer.lock().unwrap();
93        if recv_buffer.buffer.len() == 0 {
94            // assert_eq!(recv_buffer.waker.is_none(), true);
95            recv_buffer.waker = Some(cx.waker().clone());
96            Poll::Pending
97        } else {
98            let mut datagrams = LinkedList::new();
99            datagrams.append(&mut recv_buffer.buffer);
100            Poll::Ready(Ok(datagrams))
101        }
102    }
103}
104
105impl std::fmt::Display for DatagramTunnelImpl {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "DatagramTunnel{{vport:{}}}", self.vport)
108    }
109}
110
111#[derive(Clone)]
112pub struct DatagramTunnel(Arc<DatagramTunnelImpl>);
113
114impl AsRef<DatagramTunnelImpl> for DatagramTunnel {
115    fn as_ref(&self) -> &DatagramTunnelImpl {
116        &self.0
117    }
118}
119
120impl DatagramTunnel {
121    pub(crate) fn new(stack: WeakStack, vport: u16, recv_buffer: usize) -> DatagramTunnel {
122        let cfg = Stack::from(&stack).config().datagram.clone();
123        let expired_tick_sec = cfg.expired_tick_sec;
124        let fragment_cache_size = cfg.fragment_cache_size;
125        let fragment_expired_us = cfg.fragment_expired_us;
126
127        let datagram_tunnel = DatagramTunnel(Arc::new(DatagramTunnelImpl {
128            stack,
129            sequence: TempSeqGenerator::new(),
130            vport,
131            recv_buffer: Mutex::new(RecvBuffer {
132                capability: recv_buffer,
133                waker: None,
134                buffer: LinkedList::new(),
135            }),
136            frag_buffer: Arc::new(Mutex::new(
137                DatagramFragments::new(fragment_cache_size, fragment_expired_us)
138            )),
139        }));
140
141        datagram_tunnel.fragment_timer(expired_tick_sec);
142
143        return datagram_tunnel;
144    }
145
146    pub fn recv_v(&self) -> impl Future<Output = Result<LinkedList<Datagram>, std::io::Error>> {
147        RecvV {
148            tunnel: self.clone(),
149        }
150    }
151
152    pub fn measure_data(&self, _options: &DatagramOptions) -> BuckyResult<usize> {
153        // let datagram = protocol::Datagram {
154        //     to_vport: vport,
155        //     from_vport: self.vport(),
156        //     dest_zone: None,
157        //     hop_limit: None,
158        //     sequence: if options.with_sequence.is_some() {
159        //         let seq = self.0.sequence.generate();
160        //         options.with_sequence = Some(seq);
161        //         Some(seq)
162        //     } else {
163        //         None
164        //     },
165        //     piece: None,
166        //     send_time: if options.with_sendtime.is_some() {
167        //         let sendtime = bucky_time_now();
168        //         options.with_sendtime = Some(sendtime);
169        //         Some(sendtime)
170        //     } else {
171        //         None
172        //     },
173        //     create_time: options.create_time,
174        //     author_id: options.author_id,
175        //     author: None,
176        //     inner_type: protocol::DatagramType::Data,
177        //     data: TailedOwnedData::from(vec![]),
178        // };
179        // let size = datagram.raw_measure(purpose)?;
180        // Ok(interface::udp::MTU - KeyMixHash::raw_bytes().unwrap() - size)
181        // FIXME: 正确的实现
182        Ok(1024)
183    }
184
185    pub fn send_to_v(
186        &self,
187        _buf: &[&[u8]],
188        _options: &DatagramOptions,
189        _remote: &DeviceId,
190        _vport: u16,
191    ) -> Result<(), std::io::Error> {
192        unimplemented!()
193    }
194
195    fn package_max_len(&self, remote: &DeviceId) -> usize {
196        let stack = Stack::from(&self.as_ref().stack);
197        let tunnel = stack.tunnel_manager().container_of(remote);
198        if let Some(tunnel) = tunnel {
199            if tunnel.state() != TunnelState::Dead {
200                return tunnel.mtu();
201            }
202        }
203
204        return MTU-12;
205    }
206
207    fn send_datagram(
208        &self,
209        datagram: protocol::v0::Datagram,
210        remote: &DeviceId, 
211        plaintext: bool
212    ) -> Result<(), std::io::Error> {
213        let stack = Stack::from(&self.as_ref().stack);
214        let tunnel = stack.tunnel_manager().container_of(remote);
215        if let Some(tunnel) = tunnel {
216            if tunnel.state() == TunnelState::Dead
217                || tunnel.state() == TunnelState::Connecting {
218                debug!(
219                    "{} tunnel to {} dead, will build tunnel",
220                    self.as_ref(),
221                    remote
222                );
223                let arc_self = self.clone();
224                let remote = remote.to_owned();
225                task::spawn(async move {
226                    if let Some(remote_device) = stack.device_cache().get(&remote).await {
227                        let build_params = BuildTunnelParams {
228                            remote_const: remote_device.desc().clone(),
229                            remote_sn: None,
230                            remote_desc: Some(remote_device),
231                        };
232                        let _ = tunnel.build_send(DynamicPackage::from(datagram), build_params, plaintext);
233                    } else {
234                        warn!(
235                            "{} build tunnel to {} failed for device not in cache",
236                            arc_self.as_ref(),
237                            remote
238                        );
239                    }
240                });
241                Err(std::io::Error::new(
242                    std::io::ErrorKind::NotConnected,
243                    "pending on building tunnel",
244                ))
245            } else {
246                tunnel.send_package(DynamicPackage::from(datagram), plaintext)
247                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.msg()))
248            }
249        } else {
250            debug!(
251                "{} tunnel to {} not exists, will build tunnel",
252                self.as_ref(),
253                remote
254            );
255            let arc_self = self.clone();
256            let remote = remote.to_owned();
257            task::spawn(async move {
258                if let Some(remote_device) = stack.device_cache().get(&remote).await {
259                    let tunnel = stack
260                        .tunnel_manager()
261                        .create_container(remote_device.desc())
262                        .unwrap();
263                    let build_params = BuildTunnelParams {
264                        remote_const: remote_device.desc().clone(),
265                        remote_sn: None,
266                        remote_desc: Some(remote_device),
267                    };
268                    let _ = tunnel.build_send(DynamicPackage::from(datagram), build_params, plaintext);
269                } else {
270                    warn!(
271                        "{} build tunnel to {} failed for device not in cache",
272                        arc_self.as_ref(),
273                        remote
274                    );
275                }
276            });
277            Err(std::io::Error::new(
278                std::io::ErrorKind::NotConnected,
279                "pending on building tunnel",
280            ))
281        }
282    }
283
284    fn build_datagram(
285        &self,
286        buf: &[u8],
287        options: &mut DatagramOptions,
288        remote: &DeviceId,
289        vport: u16,
290        piece: Option<(u8, u8)>,
291    ) -> protocol::v0::Datagram {
292        let datagram = protocol::v0::Datagram {
293            to_vport: vport,
294            from_vport: self.0.vport,
295            dest_zone: None,
296            hop_limit: None,
297            sequence: if options.sequence.is_some() {
298                let seq = options.sequence.unwrap();
299                if seq == TempSeq::default() {
300                    let seq = self.0.sequence.generate();
301                    options.sequence = Some(seq);
302                    Some(seq)
303                } else {
304                    Some(seq)
305                }
306            } else {
307                None
308            },
309            piece,
310            send_time: if options.send_time.is_some() {
311                let sendtime = bucky_time_now();
312                options.send_time = Some(sendtime);
313                Some(sendtime)
314            } else {
315                None
316            },
317            create_time: options.create_time,
318            author_id: options.author_id.as_ref().map(|id| id.clone()),
319            author: None,
320            inner_type: protocol::v0::DatagramType::Data,
321            data: TailedOwnedData::from(buf),
322        };
323
324        trace!(
325            "{} try send {} to {}:{}",
326            self.as_ref(),
327            datagram,
328            remote,
329            vport
330        );
331
332        datagram
333    }
334
335    pub fn send_to(
336        &self,
337        buf: &[u8],
338        options: &mut DatagramOptions,
339        remote: &DeviceId,
340        vport: u16,
341    ) -> Result<(), std::io::Error> {
342        let mtu = MTU;
343        let mut datagram = self.build_datagram(buf, options, remote, vport, None);
344        let mut fragment_len = datagram.fragment_len(mtu, options.plaintext);
345
346        if fragment_len == 0 {
347            self.send_datagram(datagram, remote, options.plaintext)
348        } else {
349            if options.sequence.is_none() {
350                let seq = self.0.sequence.generate();
351                options.sequence = Some(seq);
352                datagram.sequence = Some(seq);
353                fragment_len = datagram.fragment_len(mtu, options.plaintext);
354            }
355
356            let count = (buf.len() as f64 / fragment_len as f64).ceil() as u8;
357            let mut start = 0;
358            let mut end = fragment_len;
359            for i in 0..count {
360                let datagram = self.build_datagram(&buf[start..end], options, remote, vport, Some((i, count)));
361                let _ = self.send_datagram(datagram, remote, options.plaintext);
362
363                start += fragment_len;
364                end += fragment_len;
365                if end > buf.len() {
366                    end = buf.len();
367                }
368            }
369
370            Ok(())
371        }
372    }
373
374    pub fn vport(&self) -> u16 {
375        self.0.vport
376    }
377
378    pub fn close(&self) {
379        let stack = Stack::from(&self.0.stack);
380        stack.datagram_manager().unbind(self.vport());
381    }
382
383
384    fn fragment_timer(&self, tick_sec: u64) {
385        let frag_buffer = self.0.frag_buffer.clone();
386        task::spawn(async move {
387            loop {
388                let fragments = frag_buffer.clone();
389                task::sleep(Duration::from_secs(tick_sec)).await;
390                {
391                    let mut fragments = fragments.lock().unwrap();
392                    fragments.expired_clear();
393                }
394            }
395        });
396    }
397
398    fn on_datagram(
399        &self,
400        pkg: &protocol::v0::Datagram,
401        from: &TunnelContainer, 
402        plaintext: bool
403    ) -> Result<OnPackageResult, BuckyError> {
404        let datagram = Datagram {
405            source: DatagramSource {
406                remote: from.remote().clone(),
407                vport: pkg.from_vport,
408            },
409            options: DatagramOptions {
410                sequence: pkg.sequence,
411                author_id: pkg.author_id.as_ref().map(|id| id.clone()),
412                create_time: pkg.create_time,
413                send_time: pkg.send_time,
414                plaintext,
415            },
416            data: Vec::from(pkg.data.as_ref()),
417        };
418
419        if let Some(waker) = {
420            let mut recv_buffer = self.0.recv_buffer.lock().unwrap();
421            if recv_buffer.buffer.len() == recv_buffer.capability {
422                let _ = recv_buffer.buffer.pop_front();
423            }
424            recv_buffer.buffer.push_back(datagram);
425            if let Some(ref waker) = recv_buffer.waker {
426                let waker = waker.clone();
427                recv_buffer.waker = None;
428                Some(waker)
429            } else {
430                None
431            }
432        } {
433            waker.wake();
434        }
435        Ok(OnPackageResult::Handled)
436    }
437}
438
439// FIXME: 整个 OnPackage体系的 package参数改成转移不是引用,这里就可以不用拷贝data
440impl OnPackage<protocol::v0::Datagram, (&TunnelContainer, bool)> for DatagramTunnel {
441    fn on_package(
442        &self,
443        pkg: &protocol::v0::Datagram,
444        context: (&TunnelContainer, bool),
445    ) -> Result<OnPackageResult, BuckyError> {
446        let (from, plaintext) = context;
447        log::trace!("{} recv {} from {}", self.as_ref(), pkg, from);
448        assert_eq!(pkg.to_vport, self.vport());
449
450        if pkg.piece.is_some()  {
451            let reassemble_result = {
452                let mut frag_buffer = self.0.frag_buffer.lock().unwrap();
453                frag_buffer.reassemble(pkg, from)
454            };
455            match reassemble_result {
456                Ok(ret) => {
457                    if let Some(p) = ret {
458                        self.on_datagram(&p, from, plaintext)
459                    } else {
460                        return Ok(OnPackageResult::Handled);
461                    }
462                }
463                Err(_) => {
464                    return Ok(OnPackageResult::Handled);
465                }
466            }
467        } else {
468            self.on_datagram(pkg, from, plaintext)
469        }
470    }
471}
472
473pub struct RecvV {
474    tunnel: DatagramTunnel,
475}
476
477impl Future for RecvV {
478    type Output = Result<LinkedList<Datagram>, std::io::Error>;
479
480    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
481        let tunnel = self.tunnel.clone();
482        tunnel.0.poll_recv_v(cx)
483    }
484}
485
486struct DatagramTunnelGuardImpl(DatagramTunnel);
487
488impl Drop for DatagramTunnelGuardImpl {
489    fn drop(&mut self) {
490        self.0.close();
491    }
492}
493
494#[derive(Clone)]
495pub struct DatagramTunnelGuard(Arc<DatagramTunnelGuardImpl>);
496
497impl From<DatagramTunnel> for DatagramTunnelGuard {
498    fn from(tunnel: DatagramTunnel) -> Self {
499        Self(Arc::new(DatagramTunnelGuardImpl(tunnel)))
500    }
501}
502
503impl Deref for DatagramTunnelGuard {
504    type Target = DatagramTunnel;
505    fn deref(&self) -> &DatagramTunnel {
506        &(*self.0).0
507    }
508}
509
510impl DatagramFragments {
511    pub fn new(frag_data_max_size: usize, frag_expired_us: u64) -> Self {
512        DatagramFragments {
513            fragments: HashMap::new(),
514            frag_data_size: 0,
515            frag_data_max_size,
516            frag_expired_us
517        }
518    }
519
520    pub fn expired_clear(&mut self) {
521        let now = bucky_time_now();
522
523        let mut clear_size = 0;
524        for (_, fragment) in self.fragments.iter() {
525            if fragment.expire_time < now {
526                for (_, pkg) in fragment.datagrams.iter() {
527                    clear_size += pkg.data.as_ref().len();
528                }
529            }
530        }
531
532        self.fragments.retain(|_, pkg| pkg.expire_time >= now);
533        if clear_size > 0 {
534            if self.frag_data_size < clear_size {
535                error!("size wrong. frag_data_size={} clear_size={}", self.frag_data_size, clear_size);
536
537                self.frag_data_size = 0;
538            } else {
539                info!("expired clear frag_data_size={} clear_size={}", self.frag_data_size, clear_size);
540
541                self.frag_data_size -= clear_size;
542            }
543        }
544    }
545
546    pub fn reassemble(&mut self, pkg: &protocol::v0::Datagram, from: &TunnelContainer) -> BuckyResult<Option<protocol::v0::Datagram>> {
547        if pkg.piece.is_none() || pkg.sequence.is_none() {
548            return Ok(None);
549        }
550
551        let mut fragment_add_check = |pkg: &protocol::v0::Datagram, from: &TunnelContainer| -> bool {//check size
552            let payload_size = pkg.data.as_ref().len();
553            if self.frag_data_size + payload_size > self.frag_data_max_size {
554                error!("fragment from={} from_vport={} to_vport={} sequence={:?} frage_data_size={} too many fragment, drop", 
555                    from.remote(), pkg.from_vport, 
556                    pkg.to_vport, 
557                    pkg.sequence,
558                    self.frag_data_size);
559    
560                return false;
561            }
562
563            self.frag_data_size += payload_size;
564
565            return true;
566        };
567
568        let datagram_key = |pkg: &protocol::v0::Datagram, from: &TunnelContainer| -> String {
569            format!("{}:{}:{}", from.remote(), pkg.from_vport, pkg.sequence.unwrap().value())
570        };
571
572        let payload_merge = |fragment: &DatagramFragment| -> protocol::v0::Datagram { 
573            let mut payload_size = 0;
574            for i in 0..fragment.fragment_total {
575                let n = i as u8;
576                let frag = fragment.datagrams.get(&n).unwrap();
577                payload_size += frag.data.as_ref().len();
578            }
579    
580            let mut payload = vec![0u8;payload_size];
581            let mut pos = 0;
582            for i in 0..fragment.fragment_total {
583                let n = i as u8;
584                let frag = fragment.datagrams.get(&n).unwrap();
585                let len = frag.data.as_ref().len();
586                payload[pos..pos+len].copy_from_slice(frag.data.as_ref());
587                pos += len;
588            }
589
590            let pkg = fragment.datagrams.get(&0).unwrap();
591            protocol::v0::Datagram {
592                to_vport: pkg.to_vport,
593                from_vport: pkg.from_vport,
594                dest_zone: pkg.dest_zone.clone(),
595                hop_limit: pkg.hop_limit.clone(),
596                sequence: pkg.sequence.clone(),
597                piece: pkg.piece.clone(),
598                send_time: pkg.send_time.clone(),
599                create_time: pkg.create_time.clone(),
600                author_id: pkg.author_id.clone(),
601                author: pkg.author.clone(),
602                inner_type: pkg.inner_type,
603                data: TailedOwnedData::from(payload),
604            }
605        };
606
607        let key = datagram_key(pkg, from);
608        if let Some(fragment) = self.fragments.get_mut(&key) {
609            let (fragment_index, _) = pkg.piece.unwrap();
610            if let Some(_) = fragment.datagrams.get(&fragment_index) {//duplicate
611                return Ok(None);
612            }
613
614            if !fragment_add_check(pkg, from) {
615                return Ok(None);
616            }
617
618            fragment.datagrams.insert(fragment_index, pkg.clone());
619
620            if fragment.datagrams.len() == fragment.fragment_total {//complete
621                let pkg = payload_merge(fragment);
622                self.fragments.remove(&key);
623                if self.frag_data_size < pkg.data.as_ref().len() {
624                    error!("size wrong. frag_data_size={} pkg_data={}", self.frag_data_size, pkg.data.as_ref().len());
625
626                    self.frag_data_size = 0;
627                } else {
628                    self.frag_data_size -= pkg.data.as_ref().len();
629                }
630
631                return Ok(Some(pkg))
632            }
633
634            return Ok(None);
635        }
636
637        //new
638        if !fragment_add_check(pkg, from) {
639            return Ok(None);
640        }
641
642        let expire_time = bucky_time_now() + self.frag_expired_us;
643        let (fragment_index, fragment_total) = pkg.piece.unwrap();
644
645        let mut fragment = DatagramFragment {
646            author_id: from.remote().clone(),
647            from_vport: pkg.from_vport,
648            sequence: pkg.sequence.unwrap(),
649            to_vport: pkg.to_vport,
650            expire_time,
651            datagrams: HashMap::new(),
652            fragment_total: fragment_total as usize,
653        };
654
655        fragment.datagrams.insert(fragment_index, pkg.clone());
656
657        self.fragments.insert(datagram_key(pkg, from), fragment);
658
659        Ok(None)
660    }
661}