meadow/node/tcp/
idle.rs

1extern crate alloc;
2use crate::prelude::*;
3
4use crate::node::network_config::{Nonblocking, Tcp};
5use crate::node::*;
6
7use tcp::try_connection;
8use tokio::net::UdpSocket;
9use tokio::sync::Mutex as TokioMutex;
10use tokio::time::{sleep, Duration};
11
12use tracing::*;
13
14use std::convert::TryInto;
15use std::net::SocketAddr;
16use std::result::Result;
17use std::sync::Arc;
18
19use alloc::vec::Vec;
20use postcard::{from_bytes, to_allocvec};
21use std::marker::PhantomData;
22
23#[cfg(feature = "quic")]
24use quinn::Endpoint;
25
26use crate::msg::*;
27use chrono::Utc;
28
29impl<T: Message> From<Node<Nonblocking, Tcp, Idle, T>> for Node<Nonblocking, Tcp, Active, T> {
30    fn from(node: Node<Nonblocking, Tcp, Idle, T>) -> Self {
31        Self {
32            __state: PhantomData,
33            __data_type: PhantomData,
34            cfg: node.cfg,
35            runtime: node.runtime,
36            rt_handle: node.rt_handle,
37            stream: node.stream,
38            topic: node.topic,
39            socket: node.socket,
40            buffer: node.buffer,
41            #[cfg(feature = "quic")]
42            endpoint: node.endpoint,
43            #[cfg(feature = "quic")]
44            connection: node.connection,
45            subscription_data: node.subscription_data,
46            task_subscribe: None,
47        }
48    }
49}
50
51impl<T: Message> From<Node<Nonblocking, Tcp, Idle, T>> for Node<Nonblocking, Tcp, Subscription, T> {
52    fn from(node: Node<Nonblocking, Tcp, Idle, T>) -> Self {
53        Self {
54            __state: PhantomData,
55            __data_type: PhantomData,
56            cfg: node.cfg,
57            runtime: node.runtime,
58            rt_handle: node.rt_handle,
59            stream: node.stream,
60            topic: node.topic,
61            socket: node.socket,
62            buffer: node.buffer,
63            #[cfg(feature = "quic")]
64            endpoint: node.endpoint,
65            #[cfg(feature = "quic")]
66            connection: node.connection,
67            subscription_data: node.subscription_data,
68            task_subscribe: None,
69        }
70    }
71}
72
73use crate::node::tcp::handshake;
74use tokio::net::TcpStream;
75
76impl<T: Message + 'static> Node<Nonblocking, Tcp, Idle, T> {
77    /// Attempt connection from the Node to the Host located at the specified address
78    #[tracing::instrument(skip_all)]
79    pub async fn activate(mut self) -> Result<Node<Nonblocking, Tcp, Active, T>, Error> {
80        let addr = self.cfg.network_cfg.host_addr;
81        let topic = self.topic.clone();
82
83        let stream: Result<TcpStream, Error> = {
84            let stream = try_connection(addr).await?;
85            let stream = handshake(stream, topic).await?;
86            Ok(stream)
87        };
88        if let Ok(stream) = stream {
89            debug!(
90                "Established Node<=>Host TCP stream: {:?}",
91                stream.local_addr()
92            );
93            self.stream = Some(stream);
94        }
95
96        Ok(Node::<Nonblocking, Tcp, Active, T>::from(self))
97    }
98
99    #[tracing::instrument]
100    pub async fn subscribe(
101        mut self,
102        rate: Duration,
103    ) -> Result<Node<Nonblocking, Tcp, Subscription, T>, Error> {
104        let addr = self.cfg.network_cfg.host_addr;
105        let topic = self.topic.clone();
106
107        let subscription_data: Arc<TokioMutex<Option<Msg<T>>>> = Arc::new(TokioMutex::new(None));
108        let data = Arc::clone(&subscription_data);
109
110        let buffer = self.buffer.clone();
111        let packet = GenericMsg::subscribe(&topic, rate)?;
112
113        let task_subscribe = tokio::spawn(async move {
114            if let Ok(stream) = try_connection(addr).await {
115                if let Ok(stream) = handshake(stream, topic.clone()).await {
116                    loop {
117                        if let Err(e) = run_subscription::<T>(
118                            packet.clone(),
119                            buffer.clone(),
120                            &stream,
121                            data.clone(),
122                        )
123                        .await
124                        {
125                            error!("{:?}", e);
126                        }
127                    }
128                }
129            }
130        });
131        self.task_subscribe = Some(task_subscribe);
132
133        let mut subscription_node = Node::<Nonblocking, Tcp, Subscription, T>::from(self);
134        subscription_node.subscription_data = subscription_data;
135
136        Ok(subscription_node)
137    }
138}
139
140use crate::node::tcp::{await_response, send_msg};
141async fn run_subscription<T: Message>(
142    packet: GenericMsg,
143    buffer: Arc<TokioMutex<Vec<u8>>>,
144    stream: &TcpStream,
145    data: Arc<TokioMutex<Option<Msg<T>>>>,
146) -> Result<(), Error> {
147    send_msg(stream, packet.as_bytes()?).await?;
148
149    let mut buffer = buffer.lock().await;
150    loop {
151        match await_response(stream, &mut buffer).await {
152            Ok(msg) => {
153                match TryInto::<Msg<T>>::try_into(msg) {
154                    Ok(msg) => {
155                        let mut data = data.lock().await;
156                        use std::ops::DerefMut;
157                        match data.deref_mut() {
158                            Some(existing) => {
159                                let delta = msg.timestamp - existing.timestamp;
160                                // println!("The time difference between msg tx/rx is: {} us",delta);
161                                if delta <= chrono::Duration::zero() {
162                                    // println!("Data is not newer, skipping to next subscription iteration");
163                                    continue;
164                                }
165
166                                *data = Some(msg);
167                            }
168                            None => {
169                                *data = Some(msg);
170                            }
171                        }
172                    }
173                    Err(e) => {
174                        error!("{}", e);
175                    }
176                }
177            }
178            Err(e) => {
179                error!("Subscription Error: {:?}", e);
180                continue;
181            }
182        };
183    }
184}
185
186//------
187
188impl<T: Message> From<Node<Blocking, Tcp, Idle, T>> for Node<Blocking, Tcp, Active, T> {
189    fn from(node: Node<Blocking, Tcp, Idle, T>) -> Self {
190        Self {
191            __state: PhantomData,
192            __data_type: PhantomData,
193            cfg: node.cfg,
194            runtime: node.runtime,
195            rt_handle: node.rt_handle,
196            stream: node.stream,
197            topic: node.topic,
198            socket: node.socket,
199            buffer: node.buffer,
200            #[cfg(feature = "quic")]
201            endpoint: node.endpoint,
202            #[cfg(feature = "quic")]
203            connection: node.connection,
204            subscription_data: node.subscription_data,
205            task_subscribe: None,
206        }
207    }
208}
209
210impl<T: Message> From<Node<Blocking, Tcp, Idle, T>> for Node<Blocking, Tcp, Subscription, T> {
211    fn from(node: Node<Blocking, Tcp, Idle, T>) -> Self {
212        Self {
213            __state: PhantomData,
214            __data_type: PhantomData,
215            cfg: node.cfg,
216            runtime: node.runtime,
217            rt_handle: node.rt_handle,
218            stream: node.stream,
219            topic: node.topic,
220            socket: node.socket,
221            buffer: node.buffer,
222            #[cfg(feature = "quic")]
223            endpoint: node.endpoint,
224            #[cfg(feature = "quic")]
225            connection: node.connection,
226            subscription_data: node.subscription_data,
227            task_subscribe: None,
228        }
229    }
230}
231
232use crate::node::network_config::Blocking;
233impl<T: Message + 'static> Node<Blocking, Tcp, Idle, T> {
234    /// Attempt connection from the Node to the Host located at the specified address
235    #[tracing::instrument(skip_all)]
236    pub fn activate(mut self) -> Result<Node<Blocking, Tcp, Active, T>, Error> {
237        let addr = self.cfg.network_cfg.host_addr;
238        let topic = self.topic.clone();
239
240        let handle = match &self.rt_handle {
241            Some(handle) => handle,
242            None => return Err(Error::HandleAccess),
243        };
244
245        let stream: Result<TcpStream, Error> = handle.block_on(async move {
246            let stream = try_connection(addr).await?;
247            let stream = handshake(stream, topic).await?;
248            Ok(stream)
249        });
250        if let Ok(stream) = stream {
251            debug!(
252                "Established Node<=>Host TCP stream: {:?}",
253                stream.local_addr()
254            );
255            self.stream = Some(stream);
256        }
257
258        Ok(Node::<Blocking, Tcp, Active, T>::from(self))
259    }
260
261    #[tracing::instrument]
262    pub fn subscribe(
263        mut self,
264        rate: Duration,
265    ) -> Result<Node<Blocking, Tcp, Subscription, T>, Error> {
266        let addr = self.cfg.network_cfg.host_addr;
267        let topic = self.topic.clone();
268
269        let subscription_data: Arc<TokioMutex<Option<Msg<T>>>> = Arc::new(TokioMutex::new(None));
270        let data = Arc::clone(&subscription_data);
271
272        let buffer = self.buffer.clone();
273        let packet = GenericMsg::subscribe(&topic, rate)?;
274
275        let handle = match &self.rt_handle {
276            Some(handle) => handle,
277            None => return Err(Error::HandleAccess),
278        };
279
280        let task_subscribe = handle.spawn(async move {
281            if let Ok(stream) = try_connection(addr).await {
282                if let Ok(stream) = handshake(stream, topic.clone()).await {
283                    loop {
284                        if let Err(e) = run_subscription::<T>(
285                            packet.clone(),
286                            buffer.clone(),
287                            &stream,
288                            data.clone(),
289                        )
290                        .await
291                        {
292                            error!("{:?}", e);
293                        }
294                    }
295                }
296            }
297        });
298        self.task_subscribe = Some(task_subscribe);
299
300        let mut subscription_node = Node::<Blocking, Tcp, Subscription, T>::from(self);
301        subscription_node.subscription_data = subscription_data;
302
303        Ok(subscription_node)
304    }
305}