1extern crate alloc;
2use crate::prelude::*;
3
4use crate::node::network_config::{Nonblocking, Tcp};
5use crate::node::*;
6
7use tcp::try_connection;
8use tokio::net::UdpSocket;
9use tokio::sync::Mutex as TokioMutex;
10use tokio::time::{sleep, Duration};
11
12use tracing::*;
13
14use std::convert::TryInto;
15use std::net::SocketAddr;
16use std::result::Result;
17use std::sync::Arc;
18
19use alloc::vec::Vec;
20use postcard::{from_bytes, to_allocvec};
21use std::marker::PhantomData;
22
23#[cfg(feature = "quic")]
24use quinn::Endpoint;
25
26use crate::msg::*;
27use chrono::Utc;
28
29impl<T: Message> From<Node<Nonblocking, Tcp, Idle, T>> for Node<Nonblocking, Tcp, Active, T> {
30 fn from(node: Node<Nonblocking, Tcp, Idle, T>) -> Self {
31 Self {
32 __state: PhantomData,
33 __data_type: PhantomData,
34 cfg: node.cfg,
35 runtime: node.runtime,
36 rt_handle: node.rt_handle,
37 stream: node.stream,
38 topic: node.topic,
39 socket: node.socket,
40 buffer: node.buffer,
41 #[cfg(feature = "quic")]
42 endpoint: node.endpoint,
43 #[cfg(feature = "quic")]
44 connection: node.connection,
45 subscription_data: node.subscription_data,
46 task_subscribe: None,
47 }
48 }
49}
50
51impl<T: Message> From<Node<Nonblocking, Tcp, Idle, T>> for Node<Nonblocking, Tcp, Subscription, T> {
52 fn from(node: Node<Nonblocking, Tcp, Idle, T>) -> Self {
53 Self {
54 __state: PhantomData,
55 __data_type: PhantomData,
56 cfg: node.cfg,
57 runtime: node.runtime,
58 rt_handle: node.rt_handle,
59 stream: node.stream,
60 topic: node.topic,
61 socket: node.socket,
62 buffer: node.buffer,
63 #[cfg(feature = "quic")]
64 endpoint: node.endpoint,
65 #[cfg(feature = "quic")]
66 connection: node.connection,
67 subscription_data: node.subscription_data,
68 task_subscribe: None,
69 }
70 }
71}
72
73use crate::node::tcp::handshake;
74use tokio::net::TcpStream;
75
76impl<T: Message + 'static> Node<Nonblocking, Tcp, Idle, T> {
77 #[tracing::instrument(skip_all)]
79 pub async fn activate(mut self) -> Result<Node<Nonblocking, Tcp, Active, T>, Error> {
80 let addr = self.cfg.network_cfg.host_addr;
81 let topic = self.topic.clone();
82
83 let stream: Result<TcpStream, Error> = {
84 let stream = try_connection(addr).await?;
85 let stream = handshake(stream, topic).await?;
86 Ok(stream)
87 };
88 if let Ok(stream) = stream {
89 debug!(
90 "Established Node<=>Host TCP stream: {:?}",
91 stream.local_addr()
92 );
93 self.stream = Some(stream);
94 }
95
96 Ok(Node::<Nonblocking, Tcp, Active, T>::from(self))
97 }
98
99 #[tracing::instrument]
100 pub async fn subscribe(
101 mut self,
102 rate: Duration,
103 ) -> Result<Node<Nonblocking, Tcp, Subscription, T>, Error> {
104 let addr = self.cfg.network_cfg.host_addr;
105 let topic = self.topic.clone();
106
107 let subscription_data: Arc<TokioMutex<Option<Msg<T>>>> = Arc::new(TokioMutex::new(None));
108 let data = Arc::clone(&subscription_data);
109
110 let buffer = self.buffer.clone();
111 let packet = GenericMsg::subscribe(&topic, rate)?;
112
113 let task_subscribe = tokio::spawn(async move {
114 if let Ok(stream) = try_connection(addr).await {
115 if let Ok(stream) = handshake(stream, topic.clone()).await {
116 loop {
117 if let Err(e) = run_subscription::<T>(
118 packet.clone(),
119 buffer.clone(),
120 &stream,
121 data.clone(),
122 )
123 .await
124 {
125 error!("{:?}", e);
126 }
127 }
128 }
129 }
130 });
131 self.task_subscribe = Some(task_subscribe);
132
133 let mut subscription_node = Node::<Nonblocking, Tcp, Subscription, T>::from(self);
134 subscription_node.subscription_data = subscription_data;
135
136 Ok(subscription_node)
137 }
138}
139
140use crate::node::tcp::{await_response, send_msg};
141async fn run_subscription<T: Message>(
142 packet: GenericMsg,
143 buffer: Arc<TokioMutex<Vec<u8>>>,
144 stream: &TcpStream,
145 data: Arc<TokioMutex<Option<Msg<T>>>>,
146) -> Result<(), Error> {
147 send_msg(stream, packet.as_bytes()?).await?;
148
149 let mut buffer = buffer.lock().await;
150 loop {
151 match await_response(stream, &mut buffer).await {
152 Ok(msg) => {
153 match TryInto::<Msg<T>>::try_into(msg) {
154 Ok(msg) => {
155 let mut data = data.lock().await;
156 use std::ops::DerefMut;
157 match data.deref_mut() {
158 Some(existing) => {
159 let delta = msg.timestamp - existing.timestamp;
160 if delta <= chrono::Duration::zero() {
162 continue;
164 }
165
166 *data = Some(msg);
167 }
168 None => {
169 *data = Some(msg);
170 }
171 }
172 }
173 Err(e) => {
174 error!("{}", e);
175 }
176 }
177 }
178 Err(e) => {
179 error!("Subscription Error: {:?}", e);
180 continue;
181 }
182 };
183 }
184}
185
186impl<T: Message> From<Node<Blocking, Tcp, Idle, T>> for Node<Blocking, Tcp, Active, T> {
189 fn from(node: Node<Blocking, Tcp, Idle, T>) -> Self {
190 Self {
191 __state: PhantomData,
192 __data_type: PhantomData,
193 cfg: node.cfg,
194 runtime: node.runtime,
195 rt_handle: node.rt_handle,
196 stream: node.stream,
197 topic: node.topic,
198 socket: node.socket,
199 buffer: node.buffer,
200 #[cfg(feature = "quic")]
201 endpoint: node.endpoint,
202 #[cfg(feature = "quic")]
203 connection: node.connection,
204 subscription_data: node.subscription_data,
205 task_subscribe: None,
206 }
207 }
208}
209
210impl<T: Message> From<Node<Blocking, Tcp, Idle, T>> for Node<Blocking, Tcp, Subscription, T> {
211 fn from(node: Node<Blocking, Tcp, Idle, T>) -> Self {
212 Self {
213 __state: PhantomData,
214 __data_type: PhantomData,
215 cfg: node.cfg,
216 runtime: node.runtime,
217 rt_handle: node.rt_handle,
218 stream: node.stream,
219 topic: node.topic,
220 socket: node.socket,
221 buffer: node.buffer,
222 #[cfg(feature = "quic")]
223 endpoint: node.endpoint,
224 #[cfg(feature = "quic")]
225 connection: node.connection,
226 subscription_data: node.subscription_data,
227 task_subscribe: None,
228 }
229 }
230}
231
232use crate::node::network_config::Blocking;
233impl<T: Message + 'static> Node<Blocking, Tcp, Idle, T> {
234 #[tracing::instrument(skip_all)]
236 pub fn activate(mut self) -> Result<Node<Blocking, Tcp, Active, T>, Error> {
237 let addr = self.cfg.network_cfg.host_addr;
238 let topic = self.topic.clone();
239
240 let handle = match &self.rt_handle {
241 Some(handle) => handle,
242 None => return Err(Error::HandleAccess),
243 };
244
245 let stream: Result<TcpStream, Error> = handle.block_on(async move {
246 let stream = try_connection(addr).await?;
247 let stream = handshake(stream, topic).await?;
248 Ok(stream)
249 });
250 if let Ok(stream) = stream {
251 debug!(
252 "Established Node<=>Host TCP stream: {:?}",
253 stream.local_addr()
254 );
255 self.stream = Some(stream);
256 }
257
258 Ok(Node::<Blocking, Tcp, Active, T>::from(self))
259 }
260
261 #[tracing::instrument]
262 pub fn subscribe(
263 mut self,
264 rate: Duration,
265 ) -> Result<Node<Blocking, Tcp, Subscription, T>, Error> {
266 let addr = self.cfg.network_cfg.host_addr;
267 let topic = self.topic.clone();
268
269 let subscription_data: Arc<TokioMutex<Option<Msg<T>>>> = Arc::new(TokioMutex::new(None));
270 let data = Arc::clone(&subscription_data);
271
272 let buffer = self.buffer.clone();
273 let packet = GenericMsg::subscribe(&topic, rate)?;
274
275 let handle = match &self.rt_handle {
276 Some(handle) => handle,
277 None => return Err(Error::HandleAccess),
278 };
279
280 let task_subscribe = handle.spawn(async move {
281 if let Ok(stream) = try_connection(addr).await {
282 if let Ok(stream) = handshake(stream, topic.clone()).await {
283 loop {
284 if let Err(e) = run_subscription::<T>(
285 packet.clone(),
286 buffer.clone(),
287 &stream,
288 data.clone(),
289 )
290 .await
291 {
292 error!("{:?}", e);
293 }
294 }
295 }
296 }
297 });
298 self.task_subscribe = Some(task_subscribe);
299
300 let mut subscription_node = Node::<Blocking, Tcp, Subscription, T>::from(self);
301 subscription_node.subscription_data = subscription_data;
302
303 Ok(subscription_node)
304 }
305}