1use std::{
6 net::SocketAddr,
7 time::{self},
8};
9
10use async_trait::async_trait;
11use tokio::{
12 net::UdpSocket,
13 sync::mpsc::{Receiver, Sender},
14};
15
16use tokio::time as tokio_time;
17
18use crate::{
19 msg::{Addr, TmPacket, YgwMessage},
20 LinkStatus, Result, YgwError, YgwLinkNodeProperties, YgwNode,
21};
22const MAX_DATAGRAM_LEN: usize = 2000;
23
24pub struct TmUdpNode {
25 props: YgwLinkNodeProperties,
26 socket: UdpSocket,
27 initial_bytes_to_strip: usize,
28}
29
30#[async_trait]
31impl YgwNode for TmUdpNode {
32 fn properties(&self) -> &crate::YgwLinkNodeProperties {
33 &self.props
34 }
35
36 fn sub_links(&self) -> &[crate::Link] {
37 &[]
38 }
39
40 async fn run(
41 self: Box<Self>,
42 node_id: u32,
43 tx: Sender<YgwMessage>,
44 mut rx: Receiver<YgwMessage>,
45 ) -> Result<()> {
46 let mut buf = [0; MAX_DATAGRAM_LEN];
47 let ibs = self.initial_bytes_to_strip;
48
49 let addr = Addr::new(node_id, 0);
50 let mut link_status = LinkStatus::new(addr);
51
52 let mut interval = tokio_time::interval(time::Duration::from_secs(1));
53 interval.set_missed_tick_behavior(tokio_time::MissedTickBehavior::Skip);
54
55 loop {
56 tokio::select! {
57 len = self.socket.recv(&mut buf) => {
58 match len {
59 Ok(len) => {
60 link_status.data_in(1, len as u64);
61 log::trace!("Got packet of size {len}");
62
63 let pkt = TmPacket {
64 data: buf[ibs..len].to_vec(),
65 acq_time: crate::protobuf::now(),
66 };
67 if let Err(_) = tx.send(YgwMessage::TmPacket(addr, pkt)).await {
68 return Err(YgwError::ServerShutdown);
69 }
70 },
71 Err(err) => {
72 log::warn!("Error receiving data from UDP socket: {:?}", err);
73 return Err(YgwError::IOError("Error receiving data from UDP socket".into(), err));
74 }
75 }
76 }
77 _ = interval.tick() => {
78 link_status.send(&tx).await?
79 }
80 msg = rx.recv() => {
81 match msg {
82 Some(msg) => log::info!("Received unexpected message {:?}", msg),
83 None => break
84
85 }
86 }
87 }
88 }
89
90 log::debug!("TM UDP node exiting");
91 Ok(())
92 }
93}
94
95impl TmUdpNode {
96 pub async fn new(
97 name: &str,
98 description: &str,
99 addr: SocketAddr,
100 initial_bytes_to_strip: usize,
101 ) -> Result<Self> {
102 let socket = UdpSocket::bind(addr)
103 .await
104 .map_err(|e| YgwError::IOError(format!("Failed to bind to {}", addr), e))?;
105
106 Ok(Self {
107 socket,
108 initial_bytes_to_strip,
109 props: YgwLinkNodeProperties::new(name, description)
110 .tm_packet(true),
111 })
112 }
113}
114
115