1use crate::{
2 protocol::{self, *},
3 stack::{Stack, WeakStack},
4 tunnel::{BuildTunnelParams, TunnelContainer, TunnelState},
5 types::*,
6 MTU
7};
8use async_std::{pin::Pin, sync::Arc, task};
9use cyfs_base::*;
10use cyfs_debug::Mutex;
11use futures::{
12 task::{Context, Poll},
13 Future,
14};
15use log::*;
16use std::{
17 collections::{LinkedList, HashMap},
18 ops::{Deref, Drop},
19 task::Waker,
20 time::Duration,
21};
22
23#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
24pub struct DatagramSource {
25 pub remote: DeviceId,
26 pub vport: u16,
27}
28
29#[derive(Clone)]
30pub struct DatagramOptions {
31 pub sequence: Option<TempSeq>,
32 pub author_id: Option<DeviceId>,
33 pub create_time: Option<Timestamp>,
34 pub send_time: Option<Timestamp>,
35 pub plaintext: bool,
36}
37
38impl Default for DatagramOptions {
39 fn default() -> Self {
40 Self {
41 sequence: None,
42 author_id: None,
43 create_time: None,
44 send_time: None,
45 plaintext: false,
46 }
47 }
48}
49
50pub struct Datagram {
51 pub source: DatagramSource,
52 pub options: DatagramOptions,
53 pub data: Vec<u8>,
54}
55
56struct RecvBuffer {
57 capability: usize,
58 waker: Option<Waker>,
59 buffer: LinkedList<Datagram>,
60}
61
62struct DatagramFragment {
63 author_id: DeviceId,
64 from_vport: u16,
65 sequence: TempSeq,
66 to_vport: u16,
67 expire_time: u64,
68 datagrams: HashMap<u8, protocol::v0::Datagram>,
69 fragment_total: usize,
70}
71
72struct DatagramFragments {
73 fragments: HashMap<String, DatagramFragment>,
74 frag_data_size: usize,
75 frag_data_max_size: usize,
76 frag_expired_us: u64,
77}
78
79struct DatagramTunnelImpl {
80 stack: WeakStack,
81 sequence: TempSeqGenerator,
82 vport: u16,
83 recv_buffer: Mutex<RecvBuffer>,
84 frag_buffer: Arc<Mutex<DatagramFragments>>,
85}
86
87impl DatagramTunnelImpl {
88 fn poll_recv_v(
89 &self,
90 cx: &mut Context<'_>,
91 ) -> Poll<Result<LinkedList<Datagram>, std::io::Error>> {
92 let mut recv_buffer = self.recv_buffer.lock().unwrap();
93 if recv_buffer.buffer.len() == 0 {
94 recv_buffer.waker = Some(cx.waker().clone());
96 Poll::Pending
97 } else {
98 let mut datagrams = LinkedList::new();
99 datagrams.append(&mut recv_buffer.buffer);
100 Poll::Ready(Ok(datagrams))
101 }
102 }
103}
104
105impl std::fmt::Display for DatagramTunnelImpl {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "DatagramTunnel{{vport:{}}}", self.vport)
108 }
109}
110
111#[derive(Clone)]
112pub struct DatagramTunnel(Arc<DatagramTunnelImpl>);
113
114impl AsRef<DatagramTunnelImpl> for DatagramTunnel {
115 fn as_ref(&self) -> &DatagramTunnelImpl {
116 &self.0
117 }
118}
119
120impl DatagramTunnel {
121 pub(crate) fn new(stack: WeakStack, vport: u16, recv_buffer: usize) -> DatagramTunnel {
122 let cfg = Stack::from(&stack).config().datagram.clone();
123 let expired_tick_sec = cfg.expired_tick_sec;
124 let fragment_cache_size = cfg.fragment_cache_size;
125 let fragment_expired_us = cfg.fragment_expired_us;
126
127 let datagram_tunnel = DatagramTunnel(Arc::new(DatagramTunnelImpl {
128 stack,
129 sequence: TempSeqGenerator::new(),
130 vport,
131 recv_buffer: Mutex::new(RecvBuffer {
132 capability: recv_buffer,
133 waker: None,
134 buffer: LinkedList::new(),
135 }),
136 frag_buffer: Arc::new(Mutex::new(
137 DatagramFragments::new(fragment_cache_size, fragment_expired_us)
138 )),
139 }));
140
141 datagram_tunnel.fragment_timer(expired_tick_sec);
142
143 return datagram_tunnel;
144 }
145
146 pub fn recv_v(&self) -> impl Future<Output = Result<LinkedList<Datagram>, std::io::Error>> {
147 RecvV {
148 tunnel: self.clone(),
149 }
150 }
151
152 pub fn measure_data(&self, _options: &DatagramOptions) -> BuckyResult<usize> {
153 Ok(1024)
183 }
184
185 pub fn send_to_v(
186 &self,
187 _buf: &[&[u8]],
188 _options: &DatagramOptions,
189 _remote: &DeviceId,
190 _vport: u16,
191 ) -> Result<(), std::io::Error> {
192 unimplemented!()
193 }
194
195 fn package_max_len(&self, remote: &DeviceId) -> usize {
196 let stack = Stack::from(&self.as_ref().stack);
197 let tunnel = stack.tunnel_manager().container_of(remote);
198 if let Some(tunnel) = tunnel {
199 if tunnel.state() != TunnelState::Dead {
200 return tunnel.mtu();
201 }
202 }
203
204 return MTU-12;
205 }
206
207 fn send_datagram(
208 &self,
209 datagram: protocol::v0::Datagram,
210 remote: &DeviceId,
211 plaintext: bool
212 ) -> Result<(), std::io::Error> {
213 let stack = Stack::from(&self.as_ref().stack);
214 let tunnel = stack.tunnel_manager().container_of(remote);
215 if let Some(tunnel) = tunnel {
216 if tunnel.state() == TunnelState::Dead
217 || tunnel.state() == TunnelState::Connecting {
218 debug!(
219 "{} tunnel to {} dead, will build tunnel",
220 self.as_ref(),
221 remote
222 );
223 let arc_self = self.clone();
224 let remote = remote.to_owned();
225 task::spawn(async move {
226 if let Some(remote_device) = stack.device_cache().get(&remote).await {
227 let build_params = BuildTunnelParams {
228 remote_const: remote_device.desc().clone(),
229 remote_sn: None,
230 remote_desc: Some(remote_device),
231 };
232 let _ = tunnel.build_send(DynamicPackage::from(datagram), build_params, plaintext);
233 } else {
234 warn!(
235 "{} build tunnel to {} failed for device not in cache",
236 arc_self.as_ref(),
237 remote
238 );
239 }
240 });
241 Err(std::io::Error::new(
242 std::io::ErrorKind::NotConnected,
243 "pending on building tunnel",
244 ))
245 } else {
246 tunnel.send_package(DynamicPackage::from(datagram), plaintext)
247 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.msg()))
248 }
249 } else {
250 debug!(
251 "{} tunnel to {} not exists, will build tunnel",
252 self.as_ref(),
253 remote
254 );
255 let arc_self = self.clone();
256 let remote = remote.to_owned();
257 task::spawn(async move {
258 if let Some(remote_device) = stack.device_cache().get(&remote).await {
259 let tunnel = stack
260 .tunnel_manager()
261 .create_container(remote_device.desc())
262 .unwrap();
263 let build_params = BuildTunnelParams {
264 remote_const: remote_device.desc().clone(),
265 remote_sn: None,
266 remote_desc: Some(remote_device),
267 };
268 let _ = tunnel.build_send(DynamicPackage::from(datagram), build_params, plaintext);
269 } else {
270 warn!(
271 "{} build tunnel to {} failed for device not in cache",
272 arc_self.as_ref(),
273 remote
274 );
275 }
276 });
277 Err(std::io::Error::new(
278 std::io::ErrorKind::NotConnected,
279 "pending on building tunnel",
280 ))
281 }
282 }
283
284 fn build_datagram(
285 &self,
286 buf: &[u8],
287 options: &mut DatagramOptions,
288 remote: &DeviceId,
289 vport: u16,
290 piece: Option<(u8, u8)>,
291 ) -> protocol::v0::Datagram {
292 let datagram = protocol::v0::Datagram {
293 to_vport: vport,
294 from_vport: self.0.vport,
295 dest_zone: None,
296 hop_limit: None,
297 sequence: if options.sequence.is_some() {
298 let seq = options.sequence.unwrap();
299 if seq == TempSeq::default() {
300 let seq = self.0.sequence.generate();
301 options.sequence = Some(seq);
302 Some(seq)
303 } else {
304 Some(seq)
305 }
306 } else {
307 None
308 },
309 piece,
310 send_time: if options.send_time.is_some() {
311 let sendtime = bucky_time_now();
312 options.send_time = Some(sendtime);
313 Some(sendtime)
314 } else {
315 None
316 },
317 create_time: options.create_time,
318 author_id: options.author_id.as_ref().map(|id| id.clone()),
319 author: None,
320 inner_type: protocol::v0::DatagramType::Data,
321 data: TailedOwnedData::from(buf),
322 };
323
324 trace!(
325 "{} try send {} to {}:{}",
326 self.as_ref(),
327 datagram,
328 remote,
329 vport
330 );
331
332 datagram
333 }
334
335 pub fn send_to(
336 &self,
337 buf: &[u8],
338 options: &mut DatagramOptions,
339 remote: &DeviceId,
340 vport: u16,
341 ) -> Result<(), std::io::Error> {
342 let mtu = MTU;
343 let mut datagram = self.build_datagram(buf, options, remote, vport, None);
344 let mut fragment_len = datagram.fragment_len(mtu, options.plaintext);
345
346 if fragment_len == 0 {
347 self.send_datagram(datagram, remote, options.plaintext)
348 } else {
349 if options.sequence.is_none() {
350 let seq = self.0.sequence.generate();
351 options.sequence = Some(seq);
352 datagram.sequence = Some(seq);
353 fragment_len = datagram.fragment_len(mtu, options.plaintext);
354 }
355
356 let count = (buf.len() as f64 / fragment_len as f64).ceil() as u8;
357 let mut start = 0;
358 let mut end = fragment_len;
359 for i in 0..count {
360 let datagram = self.build_datagram(&buf[start..end], options, remote, vport, Some((i, count)));
361 let _ = self.send_datagram(datagram, remote, options.plaintext);
362
363 start += fragment_len;
364 end += fragment_len;
365 if end > buf.len() {
366 end = buf.len();
367 }
368 }
369
370 Ok(())
371 }
372 }
373
374 pub fn vport(&self) -> u16 {
375 self.0.vport
376 }
377
378 pub fn close(&self) {
379 let stack = Stack::from(&self.0.stack);
380 stack.datagram_manager().unbind(self.vport());
381 }
382
383
384 fn fragment_timer(&self, tick_sec: u64) {
385 let frag_buffer = self.0.frag_buffer.clone();
386 task::spawn(async move {
387 loop {
388 let fragments = frag_buffer.clone();
389 task::sleep(Duration::from_secs(tick_sec)).await;
390 {
391 let mut fragments = fragments.lock().unwrap();
392 fragments.expired_clear();
393 }
394 }
395 });
396 }
397
398 fn on_datagram(
399 &self,
400 pkg: &protocol::v0::Datagram,
401 from: &TunnelContainer,
402 plaintext: bool
403 ) -> Result<OnPackageResult, BuckyError> {
404 let datagram = Datagram {
405 source: DatagramSource {
406 remote: from.remote().clone(),
407 vport: pkg.from_vport,
408 },
409 options: DatagramOptions {
410 sequence: pkg.sequence,
411 author_id: pkg.author_id.as_ref().map(|id| id.clone()),
412 create_time: pkg.create_time,
413 send_time: pkg.send_time,
414 plaintext,
415 },
416 data: Vec::from(pkg.data.as_ref()),
417 };
418
419 if let Some(waker) = {
420 let mut recv_buffer = self.0.recv_buffer.lock().unwrap();
421 if recv_buffer.buffer.len() == recv_buffer.capability {
422 let _ = recv_buffer.buffer.pop_front();
423 }
424 recv_buffer.buffer.push_back(datagram);
425 if let Some(ref waker) = recv_buffer.waker {
426 let waker = waker.clone();
427 recv_buffer.waker = None;
428 Some(waker)
429 } else {
430 None
431 }
432 } {
433 waker.wake();
434 }
435 Ok(OnPackageResult::Handled)
436 }
437}
438
439impl OnPackage<protocol::v0::Datagram, (&TunnelContainer, bool)> for DatagramTunnel {
441 fn on_package(
442 &self,
443 pkg: &protocol::v0::Datagram,
444 context: (&TunnelContainer, bool),
445 ) -> Result<OnPackageResult, BuckyError> {
446 let (from, plaintext) = context;
447 log::trace!("{} recv {} from {}", self.as_ref(), pkg, from);
448 assert_eq!(pkg.to_vport, self.vport());
449
450 if pkg.piece.is_some() {
451 let reassemble_result = {
452 let mut frag_buffer = self.0.frag_buffer.lock().unwrap();
453 frag_buffer.reassemble(pkg, from)
454 };
455 match reassemble_result {
456 Ok(ret) => {
457 if let Some(p) = ret {
458 self.on_datagram(&p, from, plaintext)
459 } else {
460 return Ok(OnPackageResult::Handled);
461 }
462 }
463 Err(_) => {
464 return Ok(OnPackageResult::Handled);
465 }
466 }
467 } else {
468 self.on_datagram(pkg, from, plaintext)
469 }
470 }
471}
472
473pub struct RecvV {
474 tunnel: DatagramTunnel,
475}
476
477impl Future for RecvV {
478 type Output = Result<LinkedList<Datagram>, std::io::Error>;
479
480 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
481 let tunnel = self.tunnel.clone();
482 tunnel.0.poll_recv_v(cx)
483 }
484}
485
486struct DatagramTunnelGuardImpl(DatagramTunnel);
487
488impl Drop for DatagramTunnelGuardImpl {
489 fn drop(&mut self) {
490 self.0.close();
491 }
492}
493
494#[derive(Clone)]
495pub struct DatagramTunnelGuard(Arc<DatagramTunnelGuardImpl>);
496
497impl From<DatagramTunnel> for DatagramTunnelGuard {
498 fn from(tunnel: DatagramTunnel) -> Self {
499 Self(Arc::new(DatagramTunnelGuardImpl(tunnel)))
500 }
501}
502
503impl Deref for DatagramTunnelGuard {
504 type Target = DatagramTunnel;
505 fn deref(&self) -> &DatagramTunnel {
506 &(*self.0).0
507 }
508}
509
510impl DatagramFragments {
511 pub fn new(frag_data_max_size: usize, frag_expired_us: u64) -> Self {
512 DatagramFragments {
513 fragments: HashMap::new(),
514 frag_data_size: 0,
515 frag_data_max_size,
516 frag_expired_us
517 }
518 }
519
520 pub fn expired_clear(&mut self) {
521 let now = bucky_time_now();
522
523 let mut clear_size = 0;
524 for (_, fragment) in self.fragments.iter() {
525 if fragment.expire_time < now {
526 for (_, pkg) in fragment.datagrams.iter() {
527 clear_size += pkg.data.as_ref().len();
528 }
529 }
530 }
531
532 self.fragments.retain(|_, pkg| pkg.expire_time >= now);
533 if clear_size > 0 {
534 if self.frag_data_size < clear_size {
535 error!("size wrong. frag_data_size={} clear_size={}", self.frag_data_size, clear_size);
536
537 self.frag_data_size = 0;
538 } else {
539 info!("expired clear frag_data_size={} clear_size={}", self.frag_data_size, clear_size);
540
541 self.frag_data_size -= clear_size;
542 }
543 }
544 }
545
546 pub fn reassemble(&mut self, pkg: &protocol::v0::Datagram, from: &TunnelContainer) -> BuckyResult<Option<protocol::v0::Datagram>> {
547 if pkg.piece.is_none() || pkg.sequence.is_none() {
548 return Ok(None);
549 }
550
551 let mut fragment_add_check = |pkg: &protocol::v0::Datagram, from: &TunnelContainer| -> bool {let payload_size = pkg.data.as_ref().len();
553 if self.frag_data_size + payload_size > self.frag_data_max_size {
554 error!("fragment from={} from_vport={} to_vport={} sequence={:?} frage_data_size={} too many fragment, drop",
555 from.remote(), pkg.from_vport,
556 pkg.to_vport,
557 pkg.sequence,
558 self.frag_data_size);
559
560 return false;
561 }
562
563 self.frag_data_size += payload_size;
564
565 return true;
566 };
567
568 let datagram_key = |pkg: &protocol::v0::Datagram, from: &TunnelContainer| -> String {
569 format!("{}:{}:{}", from.remote(), pkg.from_vport, pkg.sequence.unwrap().value())
570 };
571
572 let payload_merge = |fragment: &DatagramFragment| -> protocol::v0::Datagram {
573 let mut payload_size = 0;
574 for i in 0..fragment.fragment_total {
575 let n = i as u8;
576 let frag = fragment.datagrams.get(&n).unwrap();
577 payload_size += frag.data.as_ref().len();
578 }
579
580 let mut payload = vec![0u8;payload_size];
581 let mut pos = 0;
582 for i in 0..fragment.fragment_total {
583 let n = i as u8;
584 let frag = fragment.datagrams.get(&n).unwrap();
585 let len = frag.data.as_ref().len();
586 payload[pos..pos+len].copy_from_slice(frag.data.as_ref());
587 pos += len;
588 }
589
590 let pkg = fragment.datagrams.get(&0).unwrap();
591 protocol::v0::Datagram {
592 to_vport: pkg.to_vport,
593 from_vport: pkg.from_vport,
594 dest_zone: pkg.dest_zone.clone(),
595 hop_limit: pkg.hop_limit.clone(),
596 sequence: pkg.sequence.clone(),
597 piece: pkg.piece.clone(),
598 send_time: pkg.send_time.clone(),
599 create_time: pkg.create_time.clone(),
600 author_id: pkg.author_id.clone(),
601 author: pkg.author.clone(),
602 inner_type: pkg.inner_type,
603 data: TailedOwnedData::from(payload),
604 }
605 };
606
607 let key = datagram_key(pkg, from);
608 if let Some(fragment) = self.fragments.get_mut(&key) {
609 let (fragment_index, _) = pkg.piece.unwrap();
610 if let Some(_) = fragment.datagrams.get(&fragment_index) {return Ok(None);
612 }
613
614 if !fragment_add_check(pkg, from) {
615 return Ok(None);
616 }
617
618 fragment.datagrams.insert(fragment_index, pkg.clone());
619
620 if fragment.datagrams.len() == fragment.fragment_total {let pkg = payload_merge(fragment);
622 self.fragments.remove(&key);
623 if self.frag_data_size < pkg.data.as_ref().len() {
624 error!("size wrong. frag_data_size={} pkg_data={}", self.frag_data_size, pkg.data.as_ref().len());
625
626 self.frag_data_size = 0;
627 } else {
628 self.frag_data_size -= pkg.data.as_ref().len();
629 }
630
631 return Ok(Some(pkg))
632 }
633
634 return Ok(None);
635 }
636
637 if !fragment_add_check(pkg, from) {
639 return Ok(None);
640 }
641
642 let expire_time = bucky_time_now() + self.frag_expired_us;
643 let (fragment_index, fragment_total) = pkg.piece.unwrap();
644
645 let mut fragment = DatagramFragment {
646 author_id: from.remote().clone(),
647 from_vport: pkg.from_vport,
648 sequence: pkg.sequence.unwrap(),
649 to_vport: pkg.to_vport,
650 expire_time,
651 datagrams: HashMap::new(),
652 fragment_total: fragment_total as usize,
653 };
654
655 fragment.datagrams.insert(fragment_index, pkg.clone());
656
657 self.fragments.insert(datagram_key(pkg, from), fragment);
658
659 Ok(None)
660 }
661}