1const MAX_FRAME_LENGTH: usize = 16 * 1024 * 1024;
22use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
23
24use bytes::Bytes;
25
26use tokio::{
27 io::AsyncWriteExt,
28 net::{
29 tcp::{OwnedReadHalf, OwnedWriteHalf},
30 TcpListener,
31 },
32 select,
33 sync::mpsc::{channel, error::SendError, Receiver, Sender},
34 task::JoinHandle,
35};
36use tokio_stream::StreamExt;
37use tokio_util::{
38 codec::{FramedRead, LengthDelimitedCodec},
39 sync::CancellationToken,
40};
41
42use crate::{
43 hex8,
44 msg::{self, Addr, EncodedMessage, YgwMessage},
45 protobuf::{self, ygw::MessageType},
46 recorder::Recorder,
47 replay_server::start_replay_server,
48 Link, Result, YgwError, YgwLinkNodeProperties, YgwNode,
49};
50
51pub enum CtrlMessage {
52 NewYamcsConnection(YamcsConnection),
53 YamcsConnectionClosed(SocketAddr),
54}
55
56pub struct Server {
57 nodes: Vec<Box<dyn YgwNode>>,
58 addr: SocketAddr,
59 record_replay_conf: Option<(PathBuf, SocketAddr)>,
60}
61
62pub struct ServerBuilder {
63 nodes: Vec<Box<dyn YgwNode>>,
64 addr: SocketAddr,
65 record_replay_conf: Option<(PathBuf, SocketAddr)>,
66}
67
68impl Default for ServerBuilder {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl ServerBuilder {
75 pub fn new() -> Self {
78 Self {
79 addr: ([127, 0, 0, 1], 7897).into(),
80 nodes: Vec::new(),
81 record_replay_conf: None,
82 }
83 }
84
85 pub fn set_addr(mut self, addr: SocketAddr) -> Self {
86 self.addr = addr;
87 self
88 }
89
90 pub fn add_node(mut self, node: Box<dyn YgwNode>) -> Self {
91 self.nodes.push(node);
92 self
93 }
94
95 pub fn with_record_replay_conf(
96 mut self,
97 record_replay_conf: Option<(PathBuf, SocketAddr)>,
98 ) -> Self {
99 self.record_replay_conf = record_replay_conf;
100 self
101 }
102
103 pub fn build(self) -> Server {
104 Server {
105 nodes: self.nodes,
106 addr: self.addr,
107 record_replay_conf: self.record_replay_conf,
108 }
109 }
110}
111
112pub struct ServerHandle {
113 pub addr: SocketAddr,
114 pub jh: JoinHandle<Result<()>>,
115 pub cancel_token: CancellationToken,
116}
117
118impl Server {
119 pub async fn start(mut self) -> Result<ServerHandle> {
128 let mut node_tx_map = HashMap::new();
129 let mut node_data = HashMap::new();
130 let mut node_id = 0;
131 let mut handles = Vec::new();
132
133 let (encoder_tx, encoder_rx) = tokio::sync::mpsc::channel(100);
134
135 let socket = TcpListener::bind(self.addr)
136 .await
137 .map_err(|e| YgwError::IOError(format!("Cannot bind to {}", self.addr), e))?;
138 let addr = socket.local_addr()?;
139 let cancel_token = CancellationToken::new();
140
141 for node in self.nodes.drain(..) {
142 let props = node.properties();
143 let (tx, rx) = tokio::sync::mpsc::channel(100);
144 node_data.insert(node_id, NodeData::new(node_id, props, node.sub_links()));
145
146 let encoder_tx = encoder_tx.clone();
147 log::info!("Starting node {} with id {}", props.name, node_id);
148 let jh = tokio::spawn(async move { node.run(node_id, encoder_tx, rx).await });
149 handles.push(jh);
150
151 node_tx_map.insert(node_id, tx);
152 node_id += 1;
153 }
154 let (ctrl_tx, ctrl_rx) = channel(10);
155
156 let (decoder_tx, decoder_rx) = tokio::sync::mpsc::channel(100);
157 let cancel_token2 = cancel_token.clone();
158 let accepter_jh =
159 tokio::spawn(
160 async move { accepter_task(ctrl_tx, socket, decoder_tx, cancel_token2).await },
161 );
162
163 let cancel_token2 = cancel_token.clone();
164 let cancel_token3 = cancel_token.clone();
165
166 let encoder_jh = tokio::spawn(async move {
167 if let Err(e) = encoder_task(
168 ctrl_rx,
169 encoder_rx,
170 node_data,
171 self.record_replay_conf,
172 cancel_token2,
173 )
174 .await
175 {
176 log::error!("Encoder task failed: {:?}", e);
177 cancel_token3.cancel();
178 Err(e)
179 } else {
180 Ok(())
181 }
182 });
183
184 let decoder_jh = tokio::spawn(async move { decoder_task(decoder_rx, node_tx_map).await });
185
186 let jh: JoinHandle<Result<()>> = tokio::spawn(async move {
187 let (res1, res2, res3) =
188 futures::future::join3(accepter_jh, encoder_jh, decoder_jh).await;
189 res1.map_err(|e| YgwError::from(e))
190 .and(res2.map_err(|e| YgwError::from(e)))
191 .and(res3.map_err(|e| YgwError::from(e)))
192 .map(|_| ())
193 });
194
195 Ok(ServerHandle {
196 jh,
197 addr,
198 cancel_token,
199 })
200 }
201}
202
203impl ServerHandle {
204 pub async fn run(self) -> Result<()> {
205 let ServerHandle {
206 jh, cancel_token, ..
207 } = self;
208
209 tokio::spawn(async move {
211 #[cfg(unix)]
212 {
213 use tokio::signal::unix::{signal, SignalKind};
214 let mut sigint = signal(SignalKind::interrupt())?;
215 let mut sigterm = signal(SignalKind::terminate())?;
216 tokio::select! {
217 _ = sigint.recv() => log::info!("Received SIGINT"),
218 _ = sigterm.recv() => log::info!("Received SIGTERM"),
219 }
220 }
221
222 #[cfg(windows)]
223 {
224 tokio::signal::ctrl_c().await?;
225 log::info!("Received Ctrl+C");
226 }
227
228 cancel_token.cancel();
229 Ok::<(), std::io::Error>(())
230 });
231
232 match jh.await {
233 Ok(result) => result,
234 Err(e) => Err(e.into()),
235 }
236 }
237}
238
239#[derive(Debug)]
240pub struct YamcsConnection {
241 addr: SocketAddr,
242 writer_jh: JoinHandle<Result<()>>,
243 reader_jh: JoinHandle<Result<()>>,
244 chan_tx: Sender<EncodedMessage>,
245 drop_if_full: bool,
247}
248
249impl PartialEq for YamcsConnection {
250 fn eq(&self, other: &Self) -> bool {
251 self.addr == other.addr
252 }
253}
254
255struct NodeData {
257 node_id: u32,
258 props: YgwLinkNodeProperties,
259 links: Vec<Link>,
260 para_defs: protobuf::ygw::ParameterDefinitionList,
263
264 cmd_defs: protobuf::ygw::CommandDefinitionList,
267
268 cmd_opts: protobuf::ygw::CommandOptionList,
271
272 para_values: HashMap<String, protobuf::ygw::ParameterData>,
275
276 link_status: HashMap<u32, protobuf::ygw::LinkStatus>,
279}
280impl NodeData {
281 fn new(node_id: u32, props: &YgwLinkNodeProperties, links: &[Link]) -> Self {
282 Self {
283 node_id,
284 props: props.clone(),
285 links: links.to_vec(),
286 para_defs: protobuf::ygw::ParameterDefinitionList {
287 definitions: Vec::new(),
288 },
289 cmd_defs: protobuf::ygw::CommandDefinitionList {
290 definitions: Vec::new(),
291 },
292 cmd_opts: protobuf::ygw::CommandOptionList {
293 options: Vec::new(),
294 },
295 para_values: HashMap::new(),
296 link_status: HashMap::new(),
297 }
298 }
299
300 fn node_to_proto(&self) -> protobuf::ygw::Node {
301 protobuf::ygw::Node {
302 id: self.node_id,
303 name: self.props.name.clone(),
304 description: Some(self.props.description.clone()),
305 tm_packet: if self.props.tm_packet {
306 Some(true)
307 } else {
308 None
309 },
310 tm_frame: if self.props.tm_frame {
311 Some(true)
312 } else {
313 None
314 },
315 tc: if self.props.tc { Some(true) } else { None },
316 tc_frame: if self.props.tc_frame {
317 Some(true)
318 } else {
319 None
320 },
321 links: self.links.iter().map(|l| l.to_proto()).collect(),
322 }
323 }
324}
325
326async fn encoder_task(
330 mut ctrl_rx: Receiver<CtrlMessage>,
331 mut encoder_rx: Receiver<YgwMessage>,
332 mut nodes: HashMap<u32, NodeData>,
333 recorder_replay_conf: Option<(PathBuf, SocketAddr)>,
334 cancel_token: CancellationToken,
335) -> Result<()> {
336 let mut connections: Vec<YamcsConnection> = Vec::new();
337
338 let mut rn = 0;
339 let recorder_tx: Option<Sender<EncodedMessage>> = match recorder_replay_conf {
340 None => None,
341 Some((dir, replay_addr)) => {
342 log::info!(
343 "Encoder: starting recorder with recording directory {}",
344 dir.display()
345 );
346 let (mut recorder, last_rn) = Recorder::new(&dir)?;
347 if let Some(last_rn) = last_rn {
348 rn = last_rn + 1;
349 }
350
351 let (recorder_tx, recorder_rx) = tokio::sync::mpsc::channel(100);
352 let (query_tx, query_rx) = tokio::sync::mpsc::channel(16); tokio::spawn(async move {
356 if let Err(e) = recorder.record(recorder_rx, query_rx).await {
357 log::error!("Recorder exited with error: {:?}", e);
358 }
359 });
360
361 log::info!(
362 "Encoder: starting the replay server listening on {}",
363 replay_addr
364 );
365 tokio::spawn(async move {
366 if let Err(e) = start_replay_server(replay_addr, query_tx, cancel_token).await {
367 log::error!("Replay server exited with error: {:?}", e);
368 }
369 });
370
371 Some(recorder_tx)
372 }
373 };
374
375 let mut ctrl_select = true;
379 loop {
380 select! {
381 msg = encoder_rx.recv() => {
382 match msg {
384 Some(msg) => {
385 rn+=1;
386 let enc_msg = msg.encode(rn);
387 if let Some(ref recorder_tx) = recorder_tx {
388 if let Err(e) = recorder_tx.send(enc_msg.clone()).await {
389 log::warn!("Error sending data to recorder: {:?}", e);
390 }
391 }
392
393 send_data_to_all(&mut connections, enc_msg).await;
394
395 match msg {
396 YgwMessage::ParameterDefinitions(addr, pdefs) => {
397 if let Some(node) = nodes.get_mut(&addr.node_id()) {
398 for def in pdefs.definitions {
399 if let Some(pos) = node.para_defs.definitions.iter().position(|x| x.relative_name == def.relative_name) {
400 node.para_defs.definitions[pos] = def;
401 } else {
402 node.para_defs.definitions.push(def);
403 }
404 }
405 }
406 },
407 YgwMessage::ParameterData(addr, pvals) => {
408 if let Some(node) = nodes.get_mut(&addr.node_id()) {
409 node.para_values.insert(pvals.group.clone(), pvals);
410 }
411 },
412 YgwMessage::LinkStatus(addr, link_status) => {
413 if let Some(node) = nodes.get_mut(&addr.node_id()) {
414 node.link_status.insert(addr.link_id(), link_status);
415 }
416 },
417 YgwMessage::CommandDefinitions(addr, cmd_defs) => {
418 if let Some(node) = nodes.get_mut(&addr.node_id()) {
419 for def in cmd_defs.definitions {
420 if let Some(pos) = node.cmd_defs.definitions.iter().position(|x| x.relative_name == def.relative_name) {
421 node.cmd_defs.definitions[pos] = def;
422 } else {
423 node.cmd_defs.definitions.push(def);
424 }
425 }
426 }
427 },
428 YgwMessage::CommandOptions(addr, cmd_opts) => {
429 if let Some(node) = nodes.get_mut(&addr.node_id()) {
430 node.cmd_opts.options.extend(cmd_opts.options);
431 }
432 },
433 _ => {}
434 }
435 },
436 None => {
437 log::debug!("Encoder: channel from nodes closed");
438 break
439 }
440 }
441 }
442 msg = ctrl_rx.recv(), if ctrl_select => {
443 match msg {
445 Some(CtrlMessage::NewYamcsConnection(yc)) => {
446 if let Err(_)= send_initial_data(&yc, &nodes).await {
447 log::warn!("Encoder: error sending initial data message to {}", yc.addr);
448 continue;
449 }
450 connections.push(yc);
451 },
452 Some(CtrlMessage::YamcsConnectionClosed(addr)) => connections.retain(|yc| yc.addr != addr),
453 None => {
454 log::debug!("Encoder: channel from accepter closed, waiting for all nodes to quit");
455 ctrl_select = false;
456 },
457 }
458 }
459 }
460 }
461
462 log::debug!("Encoder task exiting");
463 Ok(())
464}
465
466async fn send_data_to_all(connections: &mut Vec<YamcsConnection>, msg: EncodedMessage) {
468 let mut idx = 0;
469 while idx < connections.len() {
470 let msg1 = msg.clone();
471 let yc = &connections[idx];
472 if yc.drop_if_full {
473 if let Err(_) = yc.chan_tx.try_send(msg1) {
474 log::warn!("Channel to {} is full, dropping connection", yc.addr);
475 yc.reader_jh.abort();
476 yc.writer_jh.abort();
477 connections.remove(idx);
478 continue;
479 }
480 } else if let Err(_) = yc.chan_tx.send(msg1).await {
481 connections.remove(idx);
484 continue;
485 }
486 idx += 1;
487 }
488}
489
490async fn send_initial_data(
493 yc: &YamcsConnection,
494 nodes: &HashMap<u32, NodeData>,
495) -> std::result::Result<(), SendError<EncodedMessage>> {
496 let nl = protobuf::ygw::NodeList {
498 nodes: nodes.iter().map(|(_, nd)| nd.node_to_proto()).collect(),
499 };
500 let buf = msg::encode_node_info(&nl);
501 yc.chan_tx.send(buf).await?;
502
503 for nd in nodes.values() {
505 if !nd.para_defs.definitions.is_empty() {
506 let buf = msg::encode_message(
507 0,
508 &Addr::new(nd.node_id, 0),
509 MessageType::ParameterDefinitions,
510 &nd.para_defs,
511 );
512 yc.chan_tx.send(buf).await?;
513 }
514 }
515
516 for nd in nodes.values() {
518 if !nd.cmd_defs.definitions.is_empty() {
519 let buf = msg::encode_message(
520 0,
521 &Addr::new(nd.node_id, 0),
522 MessageType::CommandDefinitions,
523 &nd.cmd_defs,
524 );
525 yc.chan_tx.send(buf).await?;
526 }
527 }
528 for nd in nodes.values() {
530 if !nd.cmd_opts.options.is_empty() {
531 let buf = msg::encode_message(
532 0,
533 &Addr::new(nd.node_id, 0),
534 MessageType::CommandOptions,
535 &nd.cmd_opts,
536 );
537 yc.chan_tx.send(buf).await?;
538 }
539 }
540
541 for nd in nodes.values() {
543 for pdata in nd.para_values.values() {
544 let buf = msg::encode_message(
545 0,
546 &Addr::new(nd.node_id, 0),
547 MessageType::ParameterData,
548 pdata,
549 );
550 yc.chan_tx.send(buf).await?;
551 }
552 }
553
554 for nd in nodes.values() {
556 for (&link_id, lstatus) in nd.link_status.iter() {
557 let buf = msg::encode_message(
558 0,
559 &Addr::new(nd.node_id, link_id),
560 MessageType::LinkStatus,
561 lstatus,
562 );
563 yc.chan_tx.send(buf).await?;
564 }
565 }
566 Ok(())
567}
568
569async fn decoder_task(
572 mut decoder_rx: Receiver<Bytes>,
573 mut nodes: HashMap<u32, Sender<YgwMessage>>,
574) -> Result<()> {
575 loop {
576 match decoder_rx.recv().await {
577 Some(mut buf) => match YgwMessage::decode(&mut buf) {
578 Ok(msg) => {
579 let node_id = msg.node_id();
580 match nodes.get(&node_id) {
581 Some(tx) => {
582 if let Err(_) = tx.send(msg).await {
583 log::warn!("Channel to node {} closed", node_id);
584 nodes.remove(&node_id);
585 }
586 }
587 None => {
588 log::warn!("Received message for unknown node {} ", node_id);
589 }
590 }
591 }
592 Err(err) => log::warn!("Cannot decode data {}: {:?}", hex8(&buf), err),
593 },
594 None => break,
595 };
596 }
597
598 log::debug!("Decoder task exiting");
599 Ok(())
600}
601
602async fn accepter_task(
605 ctrl_tx: Sender<CtrlMessage>,
606 srv_sock: TcpListener,
607 decoder_tx: Sender<Bytes>,
608 cancel_token: CancellationToken,
609) -> Result<()> {
610 loop {
611 tokio::select! {
612 res = srv_sock.accept() => {
613 match res {
614 Ok((sock, addr)) => {
615 log::info!("New Yamcs connection from {}", addr);
616 let (read_sock, write_sock) = sock.into_split();
617 let (chan_tx, chan_rx) = channel(100);
618
619 let decoder_tx2 = decoder_tx.clone();
620 let ctrl_tx2 = ctrl_tx.clone();
621
622 let cancel_token2 = cancel_token.clone();
623 let reader_jh = tokio::spawn(async move { reader_task(ctrl_tx2, addr, read_sock, decoder_tx2, cancel_token2).await });
624 let writer_jh = tokio::spawn(async move { writer_task(write_sock, chan_rx).await });
625
626 let yc = YamcsConnection {
627 addr,
628 reader_jh,
629 writer_jh,
630 chan_tx,
631 drop_if_full: false,
632 };
633
634 if let Err(_) = ctrl_tx.send(CtrlMessage::NewYamcsConnection(yc)).await {
635 break;
637 }
638 }
639 Err(err) => {
640 log::error!("Failed to accept connection: {}", err);
641 break;
642 }
643 }
644 },
645 _ = cancel_token.cancelled() => {
646 log::debug!("Accepter task received cancel signal.");
647 break;
648 }
649 }
650 }
651
652 log::debug!("Accepter task exiting");
653
654 Ok(())
655}
656
657async fn reader_task(
661 ctrl_tx: Sender<CtrlMessage>,
662 addr: SocketAddr,
663 read_sock: OwnedReadHalf,
664 decoder_tx: Sender<Bytes>,
665 cancel_token: CancellationToken,
666) -> Result<()> {
667 let mut codec = LengthDelimitedCodec::new();
668 codec.set_max_frame_length(MAX_FRAME_LENGTH);
669 let mut stream = FramedRead::new(read_sock, codec);
670
671 loop {
672 select! {
673 result = stream.next() => {
674 match result {
675 Some(Ok(buf)) => {
676 let buf = buf.freeze();
677 log::trace!("Received message {:}", hex8(&buf));
678 if let Err(_) = decoder_tx.send(buf).await {
679 break;
681 }
682 }
683 Some(Err(e)) => {
684 log::warn!("Error reading from {}: {:?}", addr, e);
687 let _ = ctrl_tx.send(CtrlMessage::YamcsConnectionClosed(addr)).await;
688 return Err(YgwError::IOError(format!("Error reading from {addr}"), e));
689 }
690 None => {
691 log::info!("Yamcs connection {} closed", addr);
692 let _ = ctrl_tx.send(CtrlMessage::YamcsConnectionClosed(addr)).await;
693 break;
694 }
695 }
696 }
697 _ = cancel_token.cancelled() => {
698 log::debug!("Reader task for {} received cancel signal", addr);
699 break;
700 }
701 }
702 }
703
704 Ok(())
705}
706
707async fn writer_task(mut sock: OwnedWriteHalf, mut chan: Receiver<EncodedMessage>) -> Result<()> {
709 loop {
710 match chan.recv().await {
711 Some(msg) => {
712 sock.write_all(&msg).await?;
713 }
714 None => break,
715 }
716 }
717 log::debug!("Writer task exiting");
718 Ok(())
719}
720
721#[cfg(test)]
722mod tests {
723 use std::{
724 io::ErrorKind,
725 time::{Duration, Instant},
726 };
727
728 use async_trait::async_trait;
729
730 use tokio::{
731 io::{AsyncReadExt, AsyncWriteExt},
732 net::TcpStream,
733 sync::mpsc,
734 };
735 use tokio_util::codec::Framed;
736
737 use crate::{
738 msg::{Addr, TmPacket},
739 protobuf::{ygw::CommandId, ygw::PreparedCommand},
740 Link,
741 };
742
743 use super::*;
744
745 #[tokio::test]
746 async fn test_frame_too_long() {
747 let (addr, _node_id, _node_tx, _node_rx) = setup_test().await;
748
749 let mut conn = TcpStream::connect(addr).await.unwrap();
750 conn.write_u32((MAX_FRAME_LENGTH + 1) as u32).await.unwrap();
751
752 let mut buf = vec![0; 1024];
753 let _ = conn.read_buf(&mut buf).await.unwrap();
754
755 let r = conn.read_u32().await.unwrap_err();
756 assert_eq!(ErrorKind::UnexpectedEof, r.kind());
757 }
758
759 #[tokio::test]
760 async fn test_tm() {
761 let (addr, node_id, node_tx, _node_rx) = setup_test().await;
763
764 let conn = TcpStream::connect(addr).await.unwrap();
765 let mut stream = Framed::new(conn, LengthDelimitedCodec::new());
766
767 tokio::task::yield_now().await;
768 node_tx
769 .send(YgwMessage::TmPacket(
770 Addr::new(node_id, 0),
771 TmPacket {
772 data: vec![1, 2, 3, 7],
773 acq_time: protobuf::now(),
774 },
775 ))
776 .await
777 .unwrap();
778
779 let _ = stream.next().await.unwrap().unwrap();
781 let buf2 = stream.next().await.unwrap().unwrap();
782 assert_eq!(34, buf2.len());
783 assert_eq!([1, 2, 3, 7], buf2[30..34]);
784 }
785
786 #[tokio::test]
789 async fn test_tc() {
790 env_logger::init();
791 let (addr, node_id, _node_tx, mut node_rx) = setup_test().await;
792
793 let mut conn = TcpStream::connect(addr).await.unwrap();
794 let pc = prepared_cmd();
795 let msg = YgwMessage::Tc(Addr::new(node_id, 0), pc.clone());
796 let enc_msg = msg.encode(0);
797
798 conn.write_all(&enc_msg).await.unwrap();
799
800 let msg1 = node_rx.recv().await.unwrap();
801
802 assert_eq!(msg, msg1);
803 }
804
805 async fn _test_performance() {
815 let (addr, node_id, node_tx, _node_rx) = setup_test().await;
817
818 let conn = TcpStream::connect(addr).await.unwrap();
819 let mut stream = Framed::new(conn, LengthDelimitedCodec::new());
820 let n = 1_000_000;
821 let client_handle = tokio::spawn(async move {
822 let mut count = 0;
823 let mut t0 = Instant::now();
824
825 while let Some(Ok(_)) = stream.next().await {
826 if count == 0 {
827 t0 = Instant::now();
828 }
829 count += 1;
830 if count == n {
831 break;
832 }
833 }
834 let d = t0.elapsed();
835 println!(
836 "Received {} messages in {:?}: speed {:.2} msg/millisec",
837 count,
838 d,
839 (count as f64) / (d.as_millis() as f64)
840 );
841 });
842 tokio::time::sleep(Duration::from_secs(1)).await;
843
844 tokio::spawn(async move {
845 let t0 = Instant::now();
846 for _ in 0..n {
847 node_tx
848 .send(YgwMessage::TmPacket(
849 Addr::new(node_id, 0),
850 TmPacket {
851 data: vec![0; 1024],
852 acq_time: protobuf::now(),
853 },
854 ))
855 .await
856 .unwrap();
857 }
858 let d = t0.elapsed();
859 println!(
860 "Sent {} messages; speed {:.2} msg/millisec {} nanosec/message",
861 n,
862 (n as f64) / (d.as_millis() as f64),
863 d.as_nanos() / n
864 );
865 })
866 .await
867 .unwrap();
868
869 client_handle.await.unwrap();
870 }
871
872 async fn setup_test() -> (SocketAddr, u32, Sender<YgwMessage>, Receiver<YgwMessage>) {
873 let (tx, mut rx) = mpsc::channel(1);
874 let props = YgwLinkNodeProperties::new("test_node", "test node")
875 .tm_packet(true)
876 .tc(true);
877
878 let dn = DummyNode { props, tx };
879 let addr = ([127, 0, 0, 1], 0).into();
880 let server = ServerBuilder::new()
881 .set_addr(addr)
882 .add_node(Box::new(dn))
883 .build();
884 let server_handle = server.start().await.unwrap();
885
886 let x = rx.recv().await.unwrap();
887 (server_handle.addr, x.0, x.1, x.2)
888 }
889
890 fn prepared_cmd() -> PreparedCommand {
891 PreparedCommand {
892 command_id: CommandId {
893 generation_time: 100,
894 origin: String::from("test"),
895 sequence_number: 10,
896 command_name: None,
897 },
898 assignments: Vec::new(),
899 extra: HashMap::new(),
900 binary: Some(vec![1, 2, 3]),
901 ygw_cmd_id: Some(10),
902 }
903 }
904
905 struct DummyNode {
907 props: YgwLinkNodeProperties,
908 tx: mpsc::Sender<(u32, Sender<YgwMessage>, Receiver<YgwMessage>)>,
909 }
910
911 #[async_trait]
912 impl YgwNode for DummyNode {
913 fn properties(&self) -> &YgwLinkNodeProperties {
914 &self.props
915 }
916
917 fn sub_links(&self) -> &[Link] {
918 &[]
919 }
920
921 async fn run(
922 self: Box<Self>,
923 node_id: u32,
924 tx: Sender<YgwMessage>,
925 rx: Receiver<YgwMessage>,
926 ) -> Result<()> {
927 self.tx.send((node_id, tx, rx)).await.unwrap();
928
929 tokio::time::sleep(std::time::Duration::from_secs(200)).await;
930 Ok(())
931 }
932 }
933}