1use crate::node::network_config::{Nonblocking, Udp};
2use crate::node::Interface;
3use crate::node::Node;
4use crate::node::{Active, Idle};
5use crate::prelude::*;
6use std::marker::PhantomData;
7use std::net::SocketAddr;
8use std::ops::DerefMut;
9use std::sync::Arc;
10use tokio::sync::Mutex as TokioMutex;
11
12use crate::node::udp::*;
13
14use chrono::Utc;
15
16use postcard::{from_bytes, to_allocvec};
17#[cfg(feature = "quic")]
18use quinn::Connection as QuicConnection;
19use std::result::Result;
20use tracing::*;
21
22impl<T: Message> From<Node<Nonblocking, Udp, Idle, T>> for Node<Nonblocking, Udp, Active, T> {
23 fn from(node: Node<Nonblocking, Udp, Idle, T>) -> Self {
24 Self {
25 __state: PhantomData,
26 __data_type: PhantomData,
27 cfg: node.cfg,
28 runtime: node.runtime,
29 rt_handle: node.rt_handle,
30 stream: node.stream,
31 topic: node.topic,
32 socket: node.socket,
33 buffer: node.buffer,
34 #[cfg(feature = "quic")]
35 endpoint: node.endpoint,
36 #[cfg(feature = "quic")]
37 connection: node.connection,
38 subscription_data: node.subscription_data,
39 task_subscribe: None,
40 }
41 }
42}
43
44use crate::node::Block;
45use std::fmt::Debug;
46
47impl<T: Message + 'static, B: Block + Debug> Node<B, Udp, Active, T> {
48 #[tracing::instrument]
49 #[inline]
50 async fn publish_internal(&self, val: T) -> Result<(), Error> {
51 let packet = Msg::new(MsgType::Set, self.topic.clone(), val)
52 .to_generic()?
53 .as_bytes()?;
54
55 let socket = match self.socket.as_ref() {
56 Some(socket) => socket,
57 None => return Err(Error::AccessSocket),
58 };
59
60 socket
61 .send_to(&packet, self.cfg.network_cfg.host_addr)
62 .await?;
63 Ok(())
64 }
65
66 #[tracing::instrument]
67 #[inline]
68 async fn publish_msg_internal(&self, msg: Msg<T>) -> Result<(), Error> {
69 let packet = msg.to_generic()?.as_bytes()?;
70 let socket = match self.socket.as_ref() {
71 Some(socket) => socket,
72 None => return Err(Error::AccessSocket),
73 };
74
75 socket
76 .send_to(&packet, self.cfg.network_cfg.host_addr)
77 .await?;
78 Ok(())
79 }
80
81 #[tracing::instrument]
82 #[inline]
83 async fn request_nth_back_internal(&self, n: usize) -> Result<Msg<T>, Error> {
84 let packet = GenericMsg::get_nth::<T>(self.topic.clone(), n).as_bytes()?;
85 let buffer = self.buffer.clone();
86
87 if let Some(socket) = &self.socket {
88 send_msg(socket, packet, self.cfg.network_cfg.host_addr).await?;
89 let msg = await_response(socket, buffer).await?.try_into()?;
90 Ok(msg)
91 } else {
92 Err(Error::AccessSocket)
93 }
94 }
95
96 #[tracing::instrument]
97 #[inline]
98 async fn topics_internal(&self) -> Result<Msg<Vec<String>>, Error> {
99 let packet = GenericMsg::topics().as_bytes()?;
100 let buffer = self.buffer.clone();
101
102 if let Some(socket) = &self.socket {
103 send_msg(socket, packet, self.cfg.network_cfg.host_addr).await?;
104 let msg = await_response(socket, buffer).await?.try_into()?;
105 Ok(msg)
106 } else {
107 Err(Error::AccessSocket)
108 }
109 }
110}
111
112impl<T: Message + 'static> Node<Nonblocking, Udp, Active, T> {
113 #[tracing::instrument]
114 #[inline]
115 pub async fn publish(&self, val: T) -> Result<(), Error> {
116 self.publish_internal(val).await?;
117 Ok(())
118 }
119
120 pub async fn publish_msg(&self, msg: Msg<T>) -> Result<(), Error> {
121 self.publish_msg_internal(msg).await?;
122 Ok(())
123 }
124
125 #[tracing::instrument]
126 #[inline]
127 pub async fn request(&self) -> Result<Msg<T>, Error> {
128 let msg = self.request_nth_back_internal(0).await?;
129 Ok(msg)
130 }
131
132 #[tracing::instrument]
133 #[inline]
134 pub async fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
135 let msg = self.topics_internal().await?;
136 Ok(msg)
137 }
138}
139
140use crate::node::network_config::Blocking;
143
144impl<T: Message> From<Node<Blocking, Udp, Idle, T>> for Node<Blocking, Udp, Active, T> {
145 fn from(node: Node<Blocking, Udp, Idle, T>) -> Self {
146 Self {
147 __state: PhantomData,
148 __data_type: PhantomData,
149 cfg: node.cfg,
150 runtime: node.runtime,
151 rt_handle: node.rt_handle,
152 stream: node.stream,
153 topic: node.topic,
154 socket: node.socket,
155 buffer: node.buffer,
156 #[cfg(feature = "quic")]
157 endpoint: node.endpoint,
158 #[cfg(feature = "quic")]
159 connection: node.connection,
160 subscription_data: node.subscription_data,
161 task_subscribe: None,
162 }
163 }
164}
165
166impl<T: Message + 'static> Node<Blocking, Udp, Active, T> {
167 #[tracing::instrument]
168 #[inline]
169 pub fn publish(&self, val: T) -> Result<(), Error> {
170 match &self.rt_handle {
171 Some(handle) => handle.block_on(async {
172 self.publish_internal(val).await?;
173 Ok(())
174 }),
175 None => Err(Error::HandleAccess),
176 }
177 }
178
179 #[tracing::instrument]
180 #[inline]
181 pub fn publish_msg(&self, msg: Msg<T>) -> Result<(), Error> {
182 match &self.rt_handle {
183 Some(handle) => handle.block_on(async {
184 self.publish_msg_internal(msg).await?;
185 Ok(())
186 }),
187 None => Err(Error::HandleAccess),
188 }
189 }
190
191 #[tracing::instrument]
192 #[inline]
193 pub fn request(&self) -> Result<Msg<T>, Error> {
194 match &self.rt_handle {
195 Some(handle) => handle.block_on(async {
196 let msg = self.request_nth_back_internal(0).await?;
197 Ok(msg)
198 }),
199 None => Err(Error::HandleAccess),
200 }
201 }
202
203 #[tracing::instrument]
204 #[inline]
205 pub fn request_nth_back(&self, n: usize) -> Result<Msg<T>, Error> {
206 match &self.rt_handle {
207 Some(handle) => handle.block_on(async {
208 let msg = self.request_nth_back_internal(n).await?;
209 Ok(msg)
210 }),
211 None => Err(Error::HandleAccess),
212 }
213 }
214
215 #[tracing::instrument]
216 #[inline]
217 pub fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
218 match &self.rt_handle {
219 Some(handle) => handle.block_on(async {
220 let msg = self.topics_internal().await?;
221 Ok(msg)
222 }),
223 None => Err(Error::HandleAccess),
224 }
225 }
226}