meadow/node/udp/
active.rs

1use crate::node::network_config::{Nonblocking, Udp};
2use crate::node::Interface;
3use crate::node::Node;
4use crate::node::{Active, Idle};
5use crate::prelude::*;
6use std::marker::PhantomData;
7use std::net::SocketAddr;
8use std::ops::DerefMut;
9use std::sync::Arc;
10use tokio::sync::Mutex as TokioMutex;
11
12use crate::node::udp::*;
13
14use chrono::Utc;
15
16use postcard::{from_bytes, to_allocvec};
17#[cfg(feature = "quic")]
18use quinn::Connection as QuicConnection;
19use std::result::Result;
20use tracing::*;
21
22impl<T: Message> From<Node<Nonblocking, Udp, Idle, T>> for Node<Nonblocking, Udp, Active, T> {
23    fn from(node: Node<Nonblocking, Udp, Idle, T>) -> Self {
24        Self {
25            __state: PhantomData,
26            __data_type: PhantomData,
27            cfg: node.cfg,
28            runtime: node.runtime,
29            rt_handle: node.rt_handle,
30            stream: node.stream,
31            topic: node.topic,
32            socket: node.socket,
33            buffer: node.buffer,
34            #[cfg(feature = "quic")]
35            endpoint: node.endpoint,
36            #[cfg(feature = "quic")]
37            connection: node.connection,
38            subscription_data: node.subscription_data,
39            task_subscribe: None,
40        }
41    }
42}
43
44use crate::node::Block;
45use std::fmt::Debug;
46
47impl<T: Message + 'static, B: Block + Debug> Node<B, Udp, Active, T> {
48    #[tracing::instrument]
49    #[inline]
50    async fn publish_internal(&self, val: T) -> Result<(), Error> {
51        let packet = Msg::new(MsgType::Set, self.topic.clone(), val)
52            .to_generic()?
53            .as_bytes()?;
54
55        let socket = match self.socket.as_ref() {
56            Some(socket) => socket,
57            None => return Err(Error::AccessSocket),
58        };
59
60        socket
61            .send_to(&packet, self.cfg.network_cfg.host_addr)
62            .await?;
63        Ok(())
64    }
65
66    #[tracing::instrument]
67    #[inline]
68    async fn publish_msg_internal(&self, msg: Msg<T>) -> Result<(), Error> {
69        let packet = msg.to_generic()?.as_bytes()?;
70        let socket = match self.socket.as_ref() {
71            Some(socket) => socket,
72            None => return Err(Error::AccessSocket),
73        };
74
75        socket
76            .send_to(&packet, self.cfg.network_cfg.host_addr)
77            .await?;
78        Ok(())
79    }
80
81    #[tracing::instrument]
82    #[inline]
83    async fn request_nth_back_internal(&self, n: usize) -> Result<Msg<T>, Error> {
84        let packet = GenericMsg::get_nth::<T>(self.topic.clone(), n).as_bytes()?;
85        let buffer = self.buffer.clone();
86
87        if let Some(socket) = &self.socket {
88            send_msg(socket, packet, self.cfg.network_cfg.host_addr).await?;
89            let msg = await_response(socket, buffer).await?.try_into()?;
90            Ok(msg)
91        } else {
92            Err(Error::AccessSocket)
93        }
94    }
95
96    #[tracing::instrument]
97    #[inline]
98    async fn topics_internal(&self) -> Result<Msg<Vec<String>>, Error> {
99        let packet = GenericMsg::topics().as_bytes()?;
100        let buffer = self.buffer.clone();
101
102        if let Some(socket) = &self.socket {
103            send_msg(socket, packet, self.cfg.network_cfg.host_addr).await?;
104            let msg = await_response(socket, buffer).await?.try_into()?;
105            Ok(msg)
106        } else {
107            Err(Error::AccessSocket)
108        }
109    }
110}
111
112impl<T: Message + 'static> Node<Nonblocking, Udp, Active, T> {
113    #[tracing::instrument]
114    #[inline]
115    pub async fn publish(&self, val: T) -> Result<(), Error> {
116        self.publish_internal(val).await?;
117        Ok(())
118    }
119
120    pub async fn publish_msg(&self, msg: Msg<T>) -> Result<(), Error> {
121        self.publish_msg_internal(msg).await?;
122        Ok(())
123    }
124
125    #[tracing::instrument]
126    #[inline]
127    pub async fn request(&self) -> Result<Msg<T>, Error> {
128        let msg = self.request_nth_back_internal(0).await?;
129        Ok(msg)
130    }
131
132    #[tracing::instrument]
133    #[inline]
134    pub async fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
135        let msg = self.topics_internal().await?;
136        Ok(msg)
137    }
138}
139
140//--------
141
142use crate::node::network_config::Blocking;
143
144impl<T: Message> From<Node<Blocking, Udp, Idle, T>> for Node<Blocking, Udp, Active, T> {
145    fn from(node: Node<Blocking, Udp, Idle, T>) -> Self {
146        Self {
147            __state: PhantomData,
148            __data_type: PhantomData,
149            cfg: node.cfg,
150            runtime: node.runtime,
151            rt_handle: node.rt_handle,
152            stream: node.stream,
153            topic: node.topic,
154            socket: node.socket,
155            buffer: node.buffer,
156            #[cfg(feature = "quic")]
157            endpoint: node.endpoint,
158            #[cfg(feature = "quic")]
159            connection: node.connection,
160            subscription_data: node.subscription_data,
161            task_subscribe: None,
162        }
163    }
164}
165
166impl<T: Message + 'static> Node<Blocking, Udp, Active, T> {
167    #[tracing::instrument]
168    #[inline]
169    pub fn publish(&self, val: T) -> Result<(), Error> {
170        match &self.rt_handle {
171            Some(handle) => handle.block_on(async {
172                self.publish_internal(val).await?;
173                Ok(())
174            }),
175            None => Err(Error::HandleAccess),
176        }
177    }
178
179    #[tracing::instrument]
180    #[inline]
181    pub fn publish_msg(&self, msg: Msg<T>) -> Result<(), Error> {
182        match &self.rt_handle {
183            Some(handle) => handle.block_on(async {
184                self.publish_msg_internal(msg).await?;
185                Ok(())
186            }),
187            None => Err(Error::HandleAccess),
188        }
189    }
190
191    #[tracing::instrument]
192    #[inline]
193    pub fn request(&self) -> Result<Msg<T>, Error> {
194        match &self.rt_handle {
195            Some(handle) => handle.block_on(async {
196                let msg = self.request_nth_back_internal(0).await?;
197                Ok(msg)
198            }),
199            None => Err(Error::HandleAccess),
200        }
201    }
202
203    #[tracing::instrument]
204    #[inline]
205    pub fn request_nth_back(&self, n: usize) -> Result<Msg<T>, Error> {
206        match &self.rt_handle {
207            Some(handle) => handle.block_on(async {
208                let msg = self.request_nth_back_internal(n).await?;
209                Ok(msg)
210            }),
211            None => Err(Error::HandleAccess),
212        }
213    }
214
215    #[tracing::instrument]
216    #[inline]
217    pub fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
218        match &self.rt_handle {
219            Some(handle) => handle.block_on(async {
220                let msg = self.topics_internal().await?;
221                Ok(msg)
222            }),
223            None => Err(Error::HandleAccess),
224        }
225    }
226}