ygw/
ygw_server.rs

1//! Handles connections from multiple Yamcs clients (or other clients implementing the protocol)
2//!
3//! Based on tokio tasks:
4//!  - one task accepts TCP connections and spawns a read and writer task for each connection
5//!  - one encoder task receives messages from nodes, encodes them to binary and sends it to all Yamcs writers
6//!  - one decoder task receives data from the Yamcs readers, decodes it to messages and sends the messages to the nodes
7//!
8//! The TCP protocol is formed by the length delimited messages:
9//!  - 4 bytes length = n (max 8MB)
10//!  - n bytes data
11//!
12//! For the encoding of the data see [`decode`](msg::decode).
13//!
14//! Orderly termination:
15//!  1. The accepter task listens to a cancellation token to know when it should quit.
16//!  2. The token is shared with the Yamcs readers which also quit when the token is canceled.
17//!  3. When the accepter and all the readers quit, the channel to the decoder gets closed and the decoder quits as well.
18//!  4. When the decoder quits, the channels between the decoder and the nodes gets closed and all the nodes are supposed to terminate graceful.
19//!  5. After all the nodes have terminated, the channel between the nodes and the encoder closes and then the encoder closes as well.
20//!  6. Finally the writers and the recorder will close after the channel coming from the encoder closes.
21const MAX_FRAME_LENGTH: usize = 16 * 1024 * 1024;
22use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
23
24use bytes::Bytes;
25
26use tokio::{
27    io::AsyncWriteExt,
28    net::{
29        tcp::{OwnedReadHalf, OwnedWriteHalf},
30        TcpListener,
31    },
32    select,
33    sync::mpsc::{channel, error::SendError, Receiver, Sender},
34    task::JoinHandle,
35};
36use tokio_stream::StreamExt;
37use tokio_util::{
38    codec::{FramedRead, LengthDelimitedCodec},
39    sync::CancellationToken,
40};
41
42use crate::{
43    hex8,
44    msg::{self, Addr, EncodedMessage, YgwMessage},
45    protobuf::{self, ygw::MessageType},
46    recorder::Recorder,
47    replay_server::start_replay_server,
48    Link, Result, YgwError, YgwLinkNodeProperties, YgwNode,
49};
50
51pub enum CtrlMessage {
52    NewYamcsConnection(YamcsConnection),
53    YamcsConnectionClosed(SocketAddr),
54}
55
56pub struct Server {
57    nodes: Vec<Box<dyn YgwNode>>,
58    addr: SocketAddr,
59    record_replay_conf: Option<(PathBuf, SocketAddr)>,
60}
61
62pub struct ServerBuilder {
63    nodes: Vec<Box<dyn YgwNode>>,
64    addr: SocketAddr,
65    record_replay_conf: Option<(PathBuf, SocketAddr)>,
66}
67
68impl Default for ServerBuilder {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl ServerBuilder {
75    /// creates a new server builder
76    /// the listening address is set to 127.0.0.1:7897
77    pub fn new() -> Self {
78        Self {
79            addr: ([127, 0, 0, 1], 7897).into(),
80            nodes: Vec::new(),
81            record_replay_conf: None,
82        }
83    }
84
85    pub fn set_addr(mut self, addr: SocketAddr) -> Self {
86        self.addr = addr;
87        self
88    }
89
90    pub fn add_node(mut self, node: Box<dyn YgwNode>) -> Self {
91        self.nodes.push(node);
92        self
93    }
94
95    pub fn with_record_replay_conf(
96        mut self,
97        record_replay_conf: Option<(PathBuf, SocketAddr)>,
98    ) -> Self {
99        self.record_replay_conf = record_replay_conf;
100        self
101    }
102
103    pub fn build(self) -> Server {
104        Server {
105            nodes: self.nodes,
106            addr: self.addr,
107            record_replay_conf: self.record_replay_conf,
108        }
109    }
110}
111
112pub struct ServerHandle {
113    pub addr: SocketAddr,
114    pub jh: JoinHandle<Result<()>>,
115    pub cancel_token: CancellationToken,
116}
117
118impl Server {
119    /// Runs the YgwServer.
120    ///
121    /// Starts the following Tokio tasks:
122    ///  * One acceptor task for accepting Yamcs connections.
123    ///  * One encoder task for encoding messages from nodes and sending them to Yamcs
124    ///  * One decoder task for decoding messages from Yamcs and sending them to nodes
125    ///     
126    ///
127    pub async fn start(mut self) -> Result<ServerHandle> {
128        let mut node_tx_map = HashMap::new();
129        let mut node_data = HashMap::new();
130        let mut node_id = 0;
131        let mut handles = Vec::new();
132
133        let (encoder_tx, encoder_rx) = tokio::sync::mpsc::channel(100);
134
135        let socket = TcpListener::bind(self.addr)
136            .await
137            .map_err(|e| YgwError::IOError(format!("Cannot bind to {}", self.addr), e))?;
138        let addr = socket.local_addr()?;
139        let cancel_token = CancellationToken::new();
140
141        for node in self.nodes.drain(..) {
142            let props = node.properties();
143            let (tx, rx) = tokio::sync::mpsc::channel(100);
144            node_data.insert(node_id, NodeData::new(node_id, props, node.sub_links()));
145
146            let encoder_tx = encoder_tx.clone();
147            log::info!("Starting node {} with id {}", props.name, node_id);
148            let jh = tokio::spawn(async move { node.run(node_id, encoder_tx, rx).await });
149            handles.push(jh);
150
151            node_tx_map.insert(node_id, tx);
152            node_id += 1;
153        }
154        let (ctrl_tx, ctrl_rx) = channel(10);
155
156        let (decoder_tx, decoder_rx) = tokio::sync::mpsc::channel(100);
157        let cancel_token2 = cancel_token.clone();
158        let accepter_jh =
159            tokio::spawn(
160                async move { accepter_task(ctrl_tx, socket, decoder_tx, cancel_token2).await },
161            );
162
163        let cancel_token2 = cancel_token.clone();
164        let cancel_token3 = cancel_token.clone();
165
166        let encoder_jh = tokio::spawn(async move {
167            if let Err(e) = encoder_task(
168                ctrl_rx,
169                encoder_rx,
170                node_data,
171                self.record_replay_conf,
172                cancel_token2,
173            )
174            .await
175            {
176                log::error!("Encoder task failed: {:?}", e);
177                cancel_token3.cancel();
178                Err(e)
179            } else {
180                Ok(())
181            }
182        });
183
184        let decoder_jh = tokio::spawn(async move { decoder_task(decoder_rx, node_tx_map).await });
185
186        let jh: JoinHandle<Result<()>> = tokio::spawn(async move {
187            let (res1, res2, res3) =
188                futures::future::join3(accepter_jh, encoder_jh, decoder_jh).await;
189            res1.map_err(|e| YgwError::from(e))
190                .and(res2.map_err(|e| YgwError::from(e)))
191                .and(res3.map_err(|e| YgwError::from(e)))
192                .map(|_| ())
193        });
194
195        Ok(ServerHandle {
196            jh,
197            addr,
198            cancel_token,
199        })
200    }
201}
202
203impl ServerHandle {
204    pub async fn run(self) -> Result<()> {
205        let ServerHandle {
206            jh, cancel_token, ..
207        } = self;
208
209        // Spawn a task to listen for shutdown signals
210        tokio::spawn(async move {
211            #[cfg(unix)]
212            {
213                use tokio::signal::unix::{signal, SignalKind};
214                let mut sigint = signal(SignalKind::interrupt())?;
215                let mut sigterm = signal(SignalKind::terminate())?;
216                tokio::select! {
217                    _ = sigint.recv() => log::info!("Received SIGINT"),
218                    _ = sigterm.recv() => log::info!("Received SIGTERM"),
219                }
220            }
221
222            #[cfg(windows)]
223            {
224                tokio::signal::ctrl_c().await?;
225                log::info!("Received Ctrl+C");
226            }
227
228            cancel_token.cancel();
229            Ok::<(), std::io::Error>(())
230        });
231
232        match jh.await {
233            Ok(result) => result,
234            Err(e) => Err(e.into()),
235        }
236    }
237}
238
239#[derive(Debug)]
240pub struct YamcsConnection {
241    addr: SocketAddr,
242    writer_jh: JoinHandle<Result<()>>,
243    reader_jh: JoinHandle<Result<()>>,
244    chan_tx: Sender<EncodedMessage>,
245    //if this option is true, the encoder will drop the Yamcs connection if it cannot keep up with the data
246    drop_if_full: bool,
247}
248
249impl PartialEq for YamcsConnection {
250    fn eq(&self, other: &Self) -> bool {
251        self.addr == other.addr
252    }
253}
254
255// used in the encoder stores data about nodes
256struct NodeData {
257    node_id: u32,
258    props: YgwLinkNodeProperties,
259    links: Vec<Link>,
260    /// collects the parameter definitions as sent by the node
261    /// sent in bulk to Yamcs upon connection
262    para_defs: protobuf::ygw::ParameterDefinitionList,
263
264    /// collects the command definitions as sent by the node
265    /// sent in bulk to Yamcs upon connection
266    cmd_defs: protobuf::ygw::CommandDefinitionList,
267
268    /// collects the command options as sent by the node
269    /// sent in bulk to Yamcs upon connection
270    cmd_opts: protobuf::ygw::CommandOptionList,
271
272    /// collects the parameter values per group as sent by the node
273    /// sent in bulk to Yamcs upon connection
274    para_values: HashMap<String, protobuf::ygw::ParameterData>,
275
276    /// collects the link status per link
277    /// sent in bulk to Yamcs upon connection
278    link_status: HashMap<u32, protobuf::ygw::LinkStatus>,
279}
280impl NodeData {
281    fn new(node_id: u32, props: &YgwLinkNodeProperties, links: &[Link]) -> Self {
282        Self {
283            node_id,
284            props: props.clone(),
285            links: links.to_vec(),
286            para_defs: protobuf::ygw::ParameterDefinitionList {
287                definitions: Vec::new(),
288            },
289            cmd_defs: protobuf::ygw::CommandDefinitionList {
290                definitions: Vec::new(),
291            },
292            cmd_opts: protobuf::ygw::CommandOptionList {
293                options: Vec::new(),
294            },
295            para_values: HashMap::new(),
296            link_status: HashMap::new(),
297        }
298    }
299
300    fn node_to_proto(&self) -> protobuf::ygw::Node {
301        protobuf::ygw::Node {
302            id: self.node_id,
303            name: self.props.name.clone(),
304            description: Some(self.props.description.clone()),
305            tm_packet: if self.props.tm_packet {
306                Some(true)
307            } else {
308                None
309            },
310            tm_frame: if self.props.tm_frame {
311                Some(true)
312            } else {
313                None
314            },
315            tc: if self.props.tc { Some(true) } else { None },
316            tc_frame: if self.props.tc_frame {
317                Some(true)
318            } else {
319                None
320            },
321            links: self.links.iter().map(|l| l.to_proto()).collect(),
322        }
323    }
324}
325
326/// Listens for new messages from the nodes
327/// encodes them to bytes and sends the bytes to all writer tasks (to be sent to Yamcs)
328///
329async fn encoder_task(
330    mut ctrl_rx: Receiver<CtrlMessage>,
331    mut encoder_rx: Receiver<YgwMessage>,
332    mut nodes: HashMap<u32, NodeData>,
333    recorder_replay_conf: Option<(PathBuf, SocketAddr)>,
334    cancel_token: CancellationToken,
335) -> Result<()> {
336    let mut connections: Vec<YamcsConnection> = Vec::new();
337
338    let mut rn = 0;
339    let recorder_tx: Option<Sender<EncodedMessage>> = match recorder_replay_conf {
340        None => None,
341        Some((dir, replay_addr)) => {
342            log::info!(
343                "Encoder: starting recorder with recording directory {}",
344                dir.display()
345            );
346            let (mut recorder, last_rn) = Recorder::new(&dir)?;
347            if let Some(last_rn) = last_rn {
348                rn = last_rn + 1;
349            }
350
351            let (recorder_tx, recorder_rx) = tokio::sync::mpsc::channel(100);
352            let (query_tx, query_rx) = tokio::sync::mpsc::channel(16); // For file queries
353
354            // Spawn the recorder task
355            tokio::spawn(async move {
356                if let Err(e) = recorder.record(recorder_rx, query_rx).await {
357                    log::error!("Recorder exited with error: {:?}", e);
358                }
359            });
360
361            log::info!(
362                "Encoder: starting the replay server listening on {}",
363                replay_addr
364            );
365            tokio::spawn(async move {
366                if let Err(e) = start_replay_server(replay_addr, query_tx, cancel_token).await {
367                    log::error!("Replay server exited with error: {:?}", e);
368                }
369            });
370
371            Some(recorder_tx)
372        }
373    };
374
375    // the ctrl_select changes to false if the channel from the accepter closes.
376    // We stop reading from the ctrl_rx but keep reading from the encoder_rx (coming from nodes).
377    // When all the nodes quit, the encoder_rx will be closed and only then the encoder quits.
378    let mut ctrl_select = true;
379    loop {
380        select! {
381            msg = encoder_rx.recv() => {
382                // message received from a node
383                match msg {
384                    Some(msg) => {
385                        rn+=1;
386                        let enc_msg = msg.encode(rn);
387                        if let Some(ref recorder_tx) = recorder_tx {
388                            if let Err(e) = recorder_tx.send(enc_msg.clone()).await {
389                                log::warn!("Error sending data to recorder: {:?}", e);
390                            }
391                        }
392
393                        send_data_to_all(&mut connections, enc_msg).await;
394
395                        match msg {
396                            YgwMessage::ParameterDefinitions(addr, pdefs) => {
397                                if let Some(node) = nodes.get_mut(&addr.node_id()) {
398                                    for def in pdefs.definitions {
399                                        if let Some(pos) = node.para_defs.definitions.iter().position(|x| x.relative_name == def.relative_name) {
400                                            node.para_defs.definitions[pos] = def;
401                                        } else {
402                                            node.para_defs.definitions.push(def);
403                                        }
404                                    }
405                                }
406                            },
407                            YgwMessage::ParameterData(addr, pvals) => {
408                                if let Some(node) = nodes.get_mut(&addr.node_id()) {
409                                    node.para_values.insert(pvals.group.clone(), pvals);
410                                }
411                            },
412                            YgwMessage::LinkStatus(addr, link_status) => {
413                                if let Some(node) = nodes.get_mut(&addr.node_id()) {
414                                    node.link_status.insert(addr.link_id(), link_status);
415                                }
416                            },
417                            YgwMessage::CommandDefinitions(addr, cmd_defs) => {
418                                if let Some(node) = nodes.get_mut(&addr.node_id()) {
419                                    for def in cmd_defs.definitions {
420                                        if let Some(pos) = node.cmd_defs.definitions.iter().position(|x| x.relative_name == def.relative_name) {
421                                            node.cmd_defs.definitions[pos] = def;
422                                        } else {
423                                            node.cmd_defs.definitions.push(def);
424                                        }
425                                    }
426                                }
427                            },
428                            YgwMessage::CommandOptions(addr, cmd_opts) => {
429                                if let Some(node) = nodes.get_mut(&addr.node_id()) {
430                                    node.cmd_opts.options.extend(cmd_opts.options);
431                                }
432                            },
433                            _ => {}
434                        }
435                    },
436                    None => {
437                        log::debug!("Encoder: channel from nodes closed");
438                        break
439                    }
440                }
441            }
442            msg = ctrl_rx.recv(), if ctrl_select => {
443                //control message
444                match msg {
445                    Some(CtrlMessage::NewYamcsConnection(yc)) => {
446                        if let Err(_)= send_initial_data(&yc, &nodes).await {
447                            log::warn!("Encoder: error sending initial data message to {}", yc.addr);
448                            continue;
449                        }
450                        connections.push(yc);
451                    },
452                    Some(CtrlMessage::YamcsConnectionClosed(addr)) => connections.retain(|yc| yc.addr != addr),
453                    None => {
454                        log::debug!("Encoder: channel from accepter closed, waiting for all nodes to quit");
455                        ctrl_select = false;
456                    },
457                }
458            }
459        }
460    }
461
462    log::debug!("Encoder task exiting");
463    Ok(())
464}
465
466/// Sends an encoded message to all connected Yamcs servers
467async fn send_data_to_all(connections: &mut Vec<YamcsConnection>, msg: EncodedMessage) {
468    let mut idx = 0;
469    while idx < connections.len() {
470        let msg1 = msg.clone();
471        let yc = &connections[idx];
472        if yc.drop_if_full {
473            if let Err(_) = yc.chan_tx.try_send(msg1) {
474                log::warn!("Channel to {} is full, dropping connection", yc.addr);
475                yc.reader_jh.abort();
476                yc.writer_jh.abort();
477                connections.remove(idx);
478                continue;
479            }
480        } else if let Err(_) = yc.chan_tx.send(msg1).await {
481            //channel closed, the writer quit
482            // (it hopefully printed an informative log message so no need to log anything extra here)
483            connections.remove(idx);
484            continue;
485        }
486        idx += 1;
487    }
488}
489
490/// Called when there is a new Yamcs connection
491/// Sends the node information, parameter definitions and parameter values
492async fn send_initial_data(
493    yc: &YamcsConnection,
494    nodes: &HashMap<u32, NodeData>,
495) -> std::result::Result<(), SendError<EncodedMessage>> {
496    //send the node information
497    let nl = protobuf::ygw::NodeList {
498        nodes: nodes.iter().map(|(_, nd)| nd.node_to_proto()).collect(),
499    };
500    let buf = msg::encode_node_info(&nl);
501    yc.chan_tx.send(buf).await?;
502
503    //send the parameter definitions
504    for nd in nodes.values() {
505        if !nd.para_defs.definitions.is_empty() {
506            let buf = msg::encode_message(
507                0,
508                &Addr::new(nd.node_id, 0),
509                MessageType::ParameterDefinitions,
510                &nd.para_defs,
511            );
512            yc.chan_tx.send(buf).await?;
513        }
514    }
515
516    //send the command definitions
517    for nd in nodes.values() {
518        if !nd.cmd_defs.definitions.is_empty() {
519            let buf = msg::encode_message(
520                0,
521                &Addr::new(nd.node_id, 0),
522                MessageType::CommandDefinitions,
523                &nd.cmd_defs,
524            );
525            yc.chan_tx.send(buf).await?;
526        }
527    }
528    //send the command options
529    for nd in nodes.values() {
530        if !nd.cmd_opts.options.is_empty() {
531            let buf = msg::encode_message(
532                0,
533                &Addr::new(nd.node_id, 0),
534                MessageType::CommandOptions,
535                &nd.cmd_opts,
536            );
537            yc.chan_tx.send(buf).await?;
538        }
539    }
540
541    //send the parameter values
542    for nd in nodes.values() {
543        for pdata in nd.para_values.values() {
544            let buf = msg::encode_message(
545                0,
546                &Addr::new(nd.node_id, 0),
547                MessageType::ParameterData,
548                pdata,
549            );
550            yc.chan_tx.send(buf).await?;
551        }
552    }
553
554    // send the link status
555    for nd in nodes.values() {
556        for (&link_id, lstatus) in nd.link_status.iter() {
557            let buf = msg::encode_message(
558                0,
559                &Addr::new(nd.node_id, link_id),
560                MessageType::LinkStatus,
561                lstatus,
562            );
563            yc.chan_tx.send(buf).await?;
564        }
565    }
566    Ok(())
567}
568
569/// receives data from the Yamcs readers, converts them to messages
570/// and sends the messages to the nodes
571async fn decoder_task(
572    mut decoder_rx: Receiver<Bytes>,
573    mut nodes: HashMap<u32, Sender<YgwMessage>>,
574) -> Result<()> {
575    loop {
576        match decoder_rx.recv().await {
577            Some(mut buf) => match YgwMessage::decode(&mut buf) {
578                Ok(msg) => {
579                    let node_id = msg.node_id();
580                    match nodes.get(&node_id) {
581                        Some(tx) => {
582                            if let Err(_) = tx.send(msg).await {
583                                log::warn!("Channel to node {} closed", node_id);
584                                nodes.remove(&node_id);
585                            }
586                        }
587                        None => {
588                            log::warn!("Received message for unknown node {} ", node_id);
589                        }
590                    }
591                }
592                Err(err) => log::warn!("Cannot decode data {}: {:?}", hex8(&buf), err),
593            },
594            None => break,
595        };
596    }
597
598    log::debug!("Decoder task exiting");
599    Ok(())
600}
601
602/// accepts connections from Yamcs and spawns a new reader task and a new writer task for each connection
603/// a channel to the writer task is created and is handed over to the encoder_task
604async fn accepter_task(
605    ctrl_tx: Sender<CtrlMessage>,
606    srv_sock: TcpListener,
607    decoder_tx: Sender<Bytes>,
608    cancel_token: CancellationToken,
609) -> Result<()> {
610    loop {
611        tokio::select! {
612            res = srv_sock.accept() => {
613                match res {
614                    Ok((sock, addr)) => {
615                        log::info!("New Yamcs connection from {}", addr);
616                        let (read_sock, write_sock) = sock.into_split();
617                        let (chan_tx, chan_rx) = channel(100);
618
619                        let decoder_tx2 = decoder_tx.clone();
620                        let ctrl_tx2 = ctrl_tx.clone();
621
622                        let cancel_token2 = cancel_token.clone();
623                        let reader_jh = tokio::spawn(async move { reader_task(ctrl_tx2, addr, read_sock, decoder_tx2, cancel_token2).await });
624                        let writer_jh = tokio::spawn(async move { writer_task(write_sock, chan_rx).await });
625
626                        let yc = YamcsConnection {
627                            addr,
628                            reader_jh,
629                            writer_jh,
630                            chan_tx,
631                            drop_if_full: false,
632                        };
633
634                        if let Err(_) = ctrl_tx.send(CtrlMessage::NewYamcsConnection(yc)).await {
635                            // channel closed
636                            break;
637                        }
638                    }
639                    Err(err) => {
640                        log::error!("Failed to accept connection: {}", err);
641                        break;
642                    }
643                }
644            },
645            _ = cancel_token.cancelled() => {
646                log::debug!("Accepter task received cancel signal.");
647                break;
648            }
649        }
650    }
651
652    log::debug!("Accepter task exiting");
653
654    Ok(())
655}
656
657/// reads data from one Yamcs server and sends it to the decoder
658/// if the connection is closed, send a message to the encoder
659/// to know not to send messages to that yamcs server anymore
660async fn reader_task(
661    ctrl_tx: Sender<CtrlMessage>,
662    addr: SocketAddr,
663    read_sock: OwnedReadHalf,
664    decoder_tx: Sender<Bytes>,
665    cancel_token: CancellationToken,
666) -> Result<()> {
667    let mut codec = LengthDelimitedCodec::new();
668    codec.set_max_frame_length(MAX_FRAME_LENGTH);
669    let mut stream = FramedRead::new(read_sock, codec);
670
671    loop {
672        select! {
673            result = stream.next() => {
674                match result {
675                    Some(Ok(buf)) => {
676                        let buf = buf.freeze();
677                        log::trace!("Received message {:}", hex8(&buf));
678                        if let Err(_) = decoder_tx.send(buf).await {
679                            //channel to decoder closed
680                            break;
681                        }
682                    }
683                    Some(Err(e)) => {
684                        //this can happen if the length of the data (first 4 bytes) is longer than the max
685                        //also if the socket closes in the middle of a message
686                        log::warn!("Error reading from {}: {:?}", addr, e);
687                        let _ = ctrl_tx.send(CtrlMessage::YamcsConnectionClosed(addr)).await;
688                        return Err(YgwError::IOError(format!("Error reading from {addr}"), e));
689                    }
690                    None => {
691                        log::info!("Yamcs connection {} closed", addr);
692                        let _ = ctrl_tx.send(CtrlMessage::YamcsConnectionClosed(addr)).await;
693                        break;
694                    }
695                }
696            }
697            _ = cancel_token.cancelled() => {
698                log::debug!("Reader task for {} received cancel signal", addr);
699                break;
700            }
701        }
702    }
703
704    Ok(())
705}
706
707//Writes data to one Yamcs server
708async fn writer_task(mut sock: OwnedWriteHalf, mut chan: Receiver<EncodedMessage>) -> Result<()> {
709    loop {
710        match chan.recv().await {
711            Some(msg) => {
712                sock.write_all(&msg).await?;
713            }
714            None => break,
715        }
716    }
717    log::debug!("Writer task exiting");
718    Ok(())
719}
720
721#[cfg(test)]
722mod tests {
723    use std::{
724        io::ErrorKind,
725        time::{Duration, Instant},
726    };
727
728    use async_trait::async_trait;
729
730    use tokio::{
731        io::{AsyncReadExt, AsyncWriteExt},
732        net::TcpStream,
733        sync::mpsc,
734    };
735    use tokio_util::codec::Framed;
736
737    use crate::{
738        msg::{Addr, TmPacket},
739        protobuf::{ygw::CommandId, ygw::PreparedCommand},
740        Link,
741    };
742
743    use super::*;
744
745    #[tokio::test]
746    async fn test_frame_too_long() {
747        let (addr, _node_id, _node_tx, _node_rx) = setup_test().await;
748
749        let mut conn = TcpStream::connect(addr).await.unwrap();
750        conn.write_u32((MAX_FRAME_LENGTH + 1) as u32).await.unwrap();
751
752        let mut buf = vec![0; 1024];
753        let _ = conn.read_buf(&mut buf).await.unwrap();
754
755        let r = conn.read_u32().await.unwrap_err();
756        assert_eq!(ErrorKind::UnexpectedEof, r.kind());
757    }
758
759    #[tokio::test]
760    async fn test_tm() {
761        //env_logger::init();
762        let (addr, node_id, node_tx, _node_rx) = setup_test().await;
763
764        let conn = TcpStream::connect(addr).await.unwrap();
765        let mut stream = Framed::new(conn, LengthDelimitedCodec::new());
766
767        tokio::task::yield_now().await;
768        node_tx
769            .send(YgwMessage::TmPacket(
770                Addr::new(node_id, 0),
771                TmPacket {
772                    data: vec![1, 2, 3, 7],
773                    acq_time: protobuf::now(),
774                },
775            ))
776            .await
777            .unwrap();
778
779        //first message is the node info
780        let _ = stream.next().await.unwrap().unwrap();
781        let buf2 = stream.next().await.unwrap().unwrap();
782        assert_eq!(34, buf2.len());
783        assert_eq!([1, 2, 3, 7], buf2[30..34]);
784    }
785
786    // TODO: this test fails because the EncodedMessage contains 8 bytes recording number whereas Yamcs does send it
787    // we should make the communication symmetrical again by introducing the 8 bytes rn even in the messages coming from Yamcs
788    #[tokio::test]
789    async fn test_tc() {
790        env_logger::init();
791        let (addr, node_id, _node_tx, mut node_rx) = setup_test().await;
792
793        let mut conn = TcpStream::connect(addr).await.unwrap();
794        let pc = prepared_cmd();
795        let msg = YgwMessage::Tc(Addr::new(node_id, 0), pc.clone());
796        let enc_msg = msg.encode(0);
797
798        conn.write_all(&enc_msg).await.unwrap();
799
800        let msg1 = node_rx.recv().await.unwrap();
801
802        assert_eq!(msg, msg1);
803    }
804
805    // performance test sending a TM packet from the node to a TCP client
806    // this is able to send about 620k msg/sec multithreaded and about 530k msg/sec single threaded on an i7
807    // That is about 1600 nanoseconds/message. Out of that, at least 1000 are spent on the syscall
808    // overhead for sending to the socket and probably at least half of the remaining
809    // on the memory allocation/deallocation for each packet (we have two allocations: one for the packet and
810    //    one for the encoded message).
811    // This throughput is probably 2-4 orders of magnitude better than what we expect the system to be used for
812    // #[tokio::test(flavor = "multi_thread", worker_threads = 3)]
813    // #[tokio::test(flavor = "current_thread")]
814    async fn _test_performance() {
815        //env_logger::init();
816        let (addr, node_id, node_tx, _node_rx) = setup_test().await;
817
818        let conn = TcpStream::connect(addr).await.unwrap();
819        let mut stream = Framed::new(conn, LengthDelimitedCodec::new());
820        let n = 1_000_000;
821        let client_handle = tokio::spawn(async move {
822            let mut count = 0;
823            let mut t0 = Instant::now();
824
825            while let Some(Ok(_)) = stream.next().await {
826                if count == 0 {
827                    t0 = Instant::now();
828                }
829                count += 1;
830                if count == n {
831                    break;
832                }
833            }
834            let d = t0.elapsed();
835            println!(
836                "Received {} messages in {:?}: speed {:.2} msg/millisec",
837                count,
838                d,
839                (count as f64) / (d.as_millis() as f64)
840            );
841        });
842        tokio::time::sleep(Duration::from_secs(1)).await;
843
844        tokio::spawn(async move {
845            let t0 = Instant::now();
846            for _ in 0..n {
847                node_tx
848                    .send(YgwMessage::TmPacket(
849                        Addr::new(node_id, 0),
850                        TmPacket {
851                            data: vec![0; 1024],
852                            acq_time: protobuf::now(),
853                        },
854                    ))
855                    .await
856                    .unwrap();
857            }
858            let d = t0.elapsed();
859            println!(
860                "Sent {} messages; speed {:.2} msg/millisec {} nanosec/message",
861                n,
862                (n as f64) / (d.as_millis() as f64),
863                d.as_nanos() / n
864            );
865        })
866        .await
867        .unwrap();
868
869        client_handle.await.unwrap();
870    }
871
872    async fn setup_test() -> (SocketAddr, u32, Sender<YgwMessage>, Receiver<YgwMessage>) {
873        let (tx, mut rx) = mpsc::channel(1);
874        let props = YgwLinkNodeProperties::new("test_node", "test node")
875            .tm_packet(true)
876            .tc(true);
877
878        let dn = DummyNode { props, tx };
879        let addr = ([127, 0, 0, 1], 0).into();
880        let server = ServerBuilder::new()
881            .set_addr(addr)
882            .add_node(Box::new(dn))
883            .build();
884        let server_handle = server.start().await.unwrap();
885
886        let x = rx.recv().await.unwrap();
887        (server_handle.addr, x.0, x.1, x.2)
888    }
889
890    fn prepared_cmd() -> PreparedCommand {
891        PreparedCommand {
892            command_id: CommandId {
893                generation_time: 100,
894                origin: String::from("test"),
895                sequence_number: 10,
896                command_name: None,
897            },
898            assignments: Vec::new(),
899            extra: HashMap::new(),
900            binary: Some(vec![1, 2, 3]),
901            ygw_cmd_id: Some(10),
902        }
903    }
904
905    /// waits for the rx,tx channels used to communicate between the node and the server and passed them out to be used in the test
906    struct DummyNode {
907        props: YgwLinkNodeProperties,
908        tx: mpsc::Sender<(u32, Sender<YgwMessage>, Receiver<YgwMessage>)>,
909    }
910
911    #[async_trait]
912    impl YgwNode for DummyNode {
913        fn properties(&self) -> &YgwLinkNodeProperties {
914            &self.props
915        }
916
917        fn sub_links(&self) -> &[Link] {
918            &[]
919        }
920
921        async fn run(
922            self: Box<Self>,
923            node_id: u32,
924            tx: Sender<YgwMessage>,
925            rx: Receiver<YgwMessage>,
926        ) -> Result<()> {
927            self.tx.send((node_id, tx, rx)).await.unwrap();
928
929            tokio::time::sleep(std::time::Duration::from_secs(200)).await;
930            Ok(())
931        }
932    }
933}