ygw/nodes/
tm_udp.rs

1//! Handler for UDP telemetry
2//! one UDP datagram = one TM packet
3//! There is an option to strip some bytes from the beginning of the datagram (similar with the UdpTmDataLink from Yamcs)
4
5use std::{
6    net::SocketAddr,
7    time::{self},
8};
9
10use async_trait::async_trait;
11use tokio::{
12    net::UdpSocket,
13    sync::mpsc::{Receiver, Sender},
14};
15
16use tokio::time as tokio_time;
17
18use crate::{
19    msg::{Addr, TmPacket, YgwMessage},
20    LinkStatus, Result, YgwError, YgwLinkNodeProperties, YgwNode,
21};
22const MAX_DATAGRAM_LEN: usize = 2000;
23
24pub struct TmUdpNode {
25    props: YgwLinkNodeProperties,
26    socket: UdpSocket,
27    initial_bytes_to_strip: usize,
28}
29
30#[async_trait]
31impl YgwNode for TmUdpNode {
32    fn properties(&self) -> &crate::YgwLinkNodeProperties {
33        &self.props
34    }
35
36    fn sub_links(&self) -> &[crate::Link] {
37        &[]
38    }
39
40    async fn run(
41        self: Box<Self>,
42        node_id: u32,
43        tx: Sender<YgwMessage>,
44        mut rx: Receiver<YgwMessage>,
45    ) -> Result<()> {
46        let mut buf = [0; MAX_DATAGRAM_LEN];
47        let ibs = self.initial_bytes_to_strip;
48
49        let addr = Addr::new(node_id, 0);
50        let mut link_status = LinkStatus::new(addr);
51
52        let mut interval = tokio_time::interval(time::Duration::from_secs(1));
53        interval.set_missed_tick_behavior(tokio_time::MissedTickBehavior::Skip);
54
55        loop {
56            tokio::select! {
57                len = self.socket.recv(&mut buf) => {
58                    match len {
59                        Ok(len) => {
60                            link_status.data_in(1, len as u64);
61                            log::trace!("Got packet of size {len}");
62
63                            let pkt = TmPacket {
64                                data: buf[ibs..len].to_vec(),
65                                acq_time: crate::protobuf::now(),
66                            };
67                            if let Err(_) = tx.send(YgwMessage::TmPacket(addr, pkt)).await {
68                                return Err(YgwError::ServerShutdown);
69                            }
70                        },
71                        Err(err) => {
72                            log::warn!("Error receiving data from UDP socket: {:?}", err);
73                            return Err(YgwError::IOError("Error receiving data from UDP socket".into(), err));
74                        }
75                    }
76                }
77                _ = interval.tick() => {
78                    link_status.send(&tx).await?
79                }
80                msg = rx.recv() => {
81                    match msg {
82                        Some(msg) => log::info!("Received unexpected message {:?}", msg),
83                        None => break
84
85                    }
86                }
87            }
88        }
89
90        log::debug!("TM UDP node exiting");
91        Ok(())
92    }
93}
94
95impl TmUdpNode {
96    pub async fn new(
97        name: &str,
98        description: &str,
99        addr: SocketAddr,
100        initial_bytes_to_strip: usize,
101    ) -> Result<Self> {
102        let socket = UdpSocket::bind(addr)
103            .await
104            .map_err(|e| YgwError::IOError(format!("Failed to bind to {}", addr), e))?;
105
106        Ok(Self {
107            socket,
108            initial_bytes_to_strip,
109            props: YgwLinkNodeProperties::new(name, description)
110                .tm_packet(true),
111        })
112    }
113}
114
115/*
116#[cfg(test)]
117mod tests {
118    use std::ops::Add;
119
120    use tokio::sync::mpsc::channel;
121
122    use crate::msg::TmPacket;
123
124    use super::*;
125
126    #[tokio::test]
127    async fn test1() {
128        let (tx, mut rx) = channel(10);
129
130        env_logger::init();
131        let addr: SocketAddr = ([127, 0, 0, 1], 9090).into();
132        let conf = TmUdpConfig {
133            id: 3,
134            addr,
135            initial_bytes_to_strip: 2,
136        };
137
138        let udp_tm_handler = UdpTmHandle::start(conf, tx).await.unwrap();
139
140        let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
141        socket.connect(addr).await.unwrap();
142
143        let buf = vec![1, 2, 3, 4, 5, 6];
144        socket.send(&buf).await.unwrap();
145
146        let msg = rx.recv().await.unwrap();
147        let addr = Addr::new(0, 0);
148
149        assert!(
150            matches!(msg, YgwMessage::TmPacket(addr1, TmPacket{data,..}) if addr1 == addr && data == &buf[2..])
151        );
152
153        udp_tm_handler.stop();
154    }
155}
156 */