ergot_base/socket/
mod.rs

1//! The "Sockets"
2//!
3//! Ergot is oriented around type-safe sockets. Rather than TCP/IP sockets,
4//! which provide users with either streams or frames of bytes (e.g. `[u8]`),
5//! Ergot sockets are always of a certain Rust data type, such as structs or
6//! enums. They provide an API very similar to "channels", a common way of
7//! passing data around within Rust programs.
8//!
9//! When messages are received from outside of the current application/firmware,
10//! messages are deserialized using the `postcard` serialization format, a
11//! compact, non-self-describing, binary format.
12//!
13//! When messages are sent locally within a device, no serialization or
14//! deserialization occurs, meaning that fundamentally sending data to an
15//! Ergot socket locally has no cost over using a normal channel.
16//!
17//! In general: Sockets **receive**, and the NetStack **sends**.
18//!
19//! ### Non-stateful sockets
20//!
21//! Currently, sockets in Ergot are not stateful, meaning that they only serve
22//! to receive messages. Replies may be made by sending a response to the
23//! source address of the received message, using the [`NetStack`] to send
24//! the response.
25//!
26//! Conceptually, this makes Ergot sockets similar to UDP sockets: delivery
27//! is not guaranteed.
28//!
29//! ### A variety of sockets
30//!
31//! Ergot allows for different implementations of what a "socket" is, with
32//! a common subset of functionality. Normally, this might sound like just the
33//! problem to solve with a Rust `trait`, however as we would like to store
34//! all of these items in a single intrusive linked list, this becomes
35//! problematic.
36//!
37//! Instead, the pinned sockets all feature a common socket header, which
38//! includes a hand-crafted vtable used to interact with the socket. This allows
39//! us to have the moral equivalent to `List<dyn Socket>`, but in a way that
40//! is easier to support on embedded devices without an allocator.
41//!
42//! This indirection allows us to be flexible both in intent of a socket, for
43//! example a socket that expects a single one-shot response, or a socket that
44//! expects a stream of requests; as well as flexible in the means of storage
45//! of a socket, for example using stackful bounded queues of message on
46//! embedded systems, or heapful unbounded queues of messages on systems with
47//! an allocator.
48//!
49//! This approach of using a linked list, common header, and vtable, is NEARLY
50//! IDENTICAL to how most async executors operate in Rust, particularly how
51//! Tasks containing differently-typed Futures are handled by the executor
52//! itself when it comes to polling or dropping a Task.
53//!
54//! [`NetStack`]: crate::NetStack
55
56use core::{
57    any::TypeId,
58    ptr::{self, NonNull},
59};
60
61use crate::{FrameKind, HeaderSeq, Key, ProtocolError};
62use cordyceps::{Linked, list::Links};
63
64pub mod raw;
65
66macro_rules! wrapper {
67    ($sto: ty, $($arr: ident)?) => {
68        #[repr(transparent)]
69        pub struct Socket<T, R, M, $(const $arr: usize)?>
70        where
71            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
72            R: mutex::ScopedRawMutex + 'static,
73            M: $crate::interface_manager::InterfaceManager + 'static,
74        {
75            socket: $crate::socket::raw::Socket<$sto, T, R, M>,
76        }
77
78        pub struct SocketHdl<'a, T, R, M, $(const $arr: usize)?>
79        where
80            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
81            R: mutex::ScopedRawMutex + 'static,
82            M: $crate::interface_manager::InterfaceManager + 'static,
83        {
84            hdl: $crate::socket::raw::SocketHdl<'a, $sto, T, R, M>,
85        }
86
87        pub struct Recv<'a, 'b, T, R, M, $(const $arr: usize)?>
88        where
89            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
90            R: mutex::ScopedRawMutex + 'static,
91            M: $crate::interface_manager::InterfaceManager + 'static,
92        {
93            recv: $crate::socket::raw::Recv<'a, 'b, $sto, T, R, M>,
94        }
95
96        impl<T, R, M, $(const $arr: usize)?> Socket<T, R, M, $($arr)?>
97        where
98            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
99            R: mutex::ScopedRawMutex + 'static,
100            M: $crate::interface_manager::InterfaceManager + 'static,
101        {
102            pub fn attach<'a>(self: core::pin::Pin<&'a mut Self>) -> SocketHdl<'a, T, R, M, $($arr)?> {
103                let socket: core::pin::Pin<&'a mut $crate::socket::raw::Socket<$sto, T, R, M>> = unsafe { self.map_unchecked_mut(|me| &mut me.socket) };
104                SocketHdl {
105                    hdl: socket.attach(),
106                }
107            }
108
109            pub fn attach_broadcast<'a>(
110                self: core::pin::Pin<&'a mut Self>,
111            ) -> SocketHdl<'a, T, R, M, $($arr)?> {
112                let socket: core::pin::Pin<&'a mut $crate::socket::raw::Socket<$sto, T, R, M>> = unsafe { self.map_unchecked_mut(|me| &mut me.socket) };
113                SocketHdl {
114                    hdl: socket.attach_broadcast(),
115                }
116            }
117
118            pub fn stack(&self) -> &'static crate::net_stack::NetStack<R, M> {
119                self.socket.stack()
120            }
121        }
122
123        impl<'a, T, R, M, $(const $arr: usize)?> SocketHdl<'a, T, R, M, $($arr)?>
124        where
125            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
126            R: mutex::ScopedRawMutex + 'static,
127            M: $crate::interface_manager::InterfaceManager + 'static,
128        {
129            pub fn port(&self) -> u8 {
130                self.hdl.port()
131            }
132
133            pub fn stack(&self) -> &'static crate::net_stack::NetStack<R, M> {
134                self.hdl.stack()
135            }
136
137            // TODO: This future is !Send? I don't fully understand why, but rustc complains
138            // that since `NonNull<OwnedSocket<E>>` is !Sync, then this future can't be Send,
139            // BUT impl'ing Sync unsafely on OwnedSocketHdl + OwnedSocket doesn't seem to help.
140            pub fn recv<'b>(&'b mut self) -> Recv<'b, 'a, T, R, M, $($arr)?> {
141                Recv {
142                    recv: self.hdl.recv(),
143                }
144            }
145        }
146
147        impl<T, R, M, $(const $arr: usize)?> Future for Recv<'_, '_, T, R, M, $($arr)?>
148        where
149            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
150            R: mutex::ScopedRawMutex + 'static,
151            M: $crate::interface_manager::InterfaceManager + 'static,
152        {
153            type Output = $crate::socket::Response<T>;
154
155            fn poll(
156                self: core::pin::Pin<&mut Self>,
157                cx: &mut core::task::Context<'_>,
158            ) -> core::task::Poll<Self::Output> {
159                let recv: core::pin::Pin<&mut $crate::socket::raw::Recv<'_, '_, $sto, T, R, M>> = unsafe { self.map_unchecked_mut(|me| &mut me.recv) };
160                recv.poll(cx)
161            }
162        }
163    };
164}
165
166pub mod single {
167    use mutex::ScopedRawMutex;
168    use serde::{Serialize, de::DeserializeOwned};
169
170    use crate::{
171        Key,
172        interface_manager::InterfaceManager,
173        net_stack::NetStack,
174        socket::{Attributes, raw},
175    };
176
177    impl<T: 'static> raw::Storage<T> for Option<T> {
178        #[inline]
179        fn is_full(&self) -> bool {
180            self.is_some()
181        }
182
183        #[inline]
184        fn is_empty(&self) -> bool {
185            self.is_none()
186        }
187
188        #[inline]
189        fn push(&mut self, t: T) -> Result<(), raw::StorageFull> {
190            if self.is_some() {
191                return Err(raw::StorageFull);
192            }
193            *self = Some(t);
194            Ok(())
195        }
196
197        #[inline]
198        fn try_pop(&mut self) -> Option<T> {
199            self.take()
200        }
201    }
202
203    wrapper!(Option<super::Response<T>>,);
204
205    impl<T, R, M> Socket<T, R, M>
206    where
207        T: Serialize + Clone + DeserializeOwned + 'static,
208        R: ScopedRawMutex + 'static,
209        M: InterfaceManager + 'static,
210    {
211        #[inline]
212        pub const fn new(net: &'static NetStack<R, M>, key: Key, attrs: Attributes) -> Self {
213            Self {
214                socket: raw::Socket::new(net, key, attrs, None),
215            }
216        }
217    }
218}
219
220#[cfg(feature = "std")]
221pub mod std_bounded {
222    use mutex::ScopedRawMutex;
223    use serde::{Serialize, de::DeserializeOwned};
224    use std::collections::VecDeque;
225
226    use crate::{Key, NetStack, interface_manager::InterfaceManager};
227
228    use super::{Attributes, raw};
229
230    pub struct Bounded<T> {
231        storage: std::collections::VecDeque<T>,
232        max_len: usize,
233    }
234
235    impl<T> Bounded<T> {
236        pub fn with_bound(bound: usize) -> Self {
237            Self {
238                storage: VecDeque::new(),
239                max_len: bound,
240            }
241        }
242    }
243
244    impl<T: 'static> raw::Storage<T> for Bounded<T> {
245        #[inline]
246        fn is_full(&self) -> bool {
247            self.storage.len() >= self.max_len
248        }
249
250        #[inline]
251        fn is_empty(&self) -> bool {
252            self.storage.is_empty()
253        }
254
255        #[inline]
256        fn push(&mut self, t: T) -> Result<(), raw::StorageFull> {
257            if self.is_full() {
258                return Err(raw::StorageFull);
259            }
260            self.storage.push_back(t);
261            Ok(())
262        }
263
264        #[inline]
265        fn try_pop(&mut self) -> Option<T> {
266            self.storage.pop_front()
267        }
268    }
269
270    wrapper!(Bounded<super::Response<T>>,);
271
272    impl<T, R, M> Socket<T, R, M>
273    where
274        T: Serialize + Clone + DeserializeOwned + 'static,
275        R: ScopedRawMutex + 'static,
276        M: InterfaceManager + 'static,
277    {
278        #[inline]
279        pub fn new(
280            net: &'static NetStack<R, M>,
281            key: Key,
282            attrs: Attributes,
283            bound: usize,
284        ) -> Self {
285            Self {
286                socket: raw::Socket::new(net, key, attrs, Bounded::with_bound(bound)),
287            }
288        }
289    }
290}
291
292pub mod stack_vec {
293    use mutex::ScopedRawMutex;
294    use serde::{Serialize, de::DeserializeOwned};
295
296    use crate::{Key, NetStack, interface_manager::InterfaceManager};
297
298    use super::{Attributes, raw};
299
300    pub struct Bounded<T: 'static, const N: usize> {
301        storage: heapless::Deque<T, N>,
302    }
303
304    impl<T: 'static, const N: usize> Bounded<T, N> {
305        pub const fn new() -> Self {
306            Self {
307                storage: heapless::Deque::new(),
308            }
309        }
310    }
311
312    impl<T: 'static, const N: usize> Default for Bounded<T, N> {
313        fn default() -> Self {
314            Self::new()
315        }
316    }
317
318    impl<T: 'static, const N: usize> raw::Storage<T> for Bounded<T, N> {
319        #[inline]
320        fn is_full(&self) -> bool {
321            self.storage.is_full()
322        }
323
324        #[inline]
325        fn is_empty(&self) -> bool {
326            self.storage.is_empty()
327        }
328
329        #[inline]
330        fn push(&mut self, t: T) -> Result<(), raw::StorageFull> {
331            self.storage.push_back(t).map_err(|_| raw::StorageFull)
332        }
333
334        #[inline]
335        fn try_pop(&mut self) -> Option<T> {
336            self.storage.pop_front()
337        }
338    }
339
340    wrapper!(Bounded<super::Response<T>, N>, N);
341
342    impl<T, R, M, const N: usize> Socket<T, R, M, N>
343    where
344        T: Serialize + Clone + DeserializeOwned + 'static,
345        R: ScopedRawMutex + 'static,
346        M: InterfaceManager + 'static,
347    {
348        #[inline]
349        pub const fn new(net: &'static NetStack<R, M>, key: Key, attrs: Attributes) -> Self {
350            Self {
351                socket: raw::Socket::new(net, key, attrs, Bounded::new()),
352            }
353        }
354    }
355}
356
357#[derive(Debug)]
358pub struct Attributes {
359    pub kind: FrameKind,
360    // If true: participates in service discovery and responds to ANY delivery.
361    // if false: is not included in service discovery, and only responds to specific port addressing.
362    pub discoverable: bool,
363}
364
365#[derive(Debug)]
366pub struct SocketHeader {
367    pub(crate) links: Links<SocketHeader>,
368    pub(crate) vtable: &'static SocketVTable,
369    pub(crate) key: Key,
370    pub(crate) attrs: Attributes,
371    pub(crate) port: u8,
372}
373
374// TODO: Way of signaling "socket consumed"?
375#[derive(Debug, PartialEq, Eq)]
376#[non_exhaustive]
377pub enum SocketSendError {
378    NoSpace,
379    DeserFailed,
380    TypeMismatch,
381    WhatTheHell,
382}
383
384#[derive(Debug, Clone)]
385pub struct SocketVTable {
386    pub(crate) recv_owned: Option<RecvOwned>,
387    pub(crate) recv_bor: Option<RecvBorrowed>,
388    pub(crate) recv_raw: RecvRaw,
389    pub(crate) recv_err: Option<RecvError>,
390    // NOTE: We do *not* have a `drop` impl here, because the list
391    // doesn't ACTUALLY own the nodes, so it is not responsible for dropping
392    // them. They are naturally destroyed by their true owner.
393}
394
395#[derive(Debug)]
396pub struct OwnedMessage<T: 'static> {
397    pub hdr: HeaderSeq,
398    pub t: T,
399}
400
401pub type Response<T> = Result<OwnedMessage<T>, OwnedMessage<ProtocolError>>;
402
403// TODO: replace with header and handle kind and stuff right!
404
405// Morally: &T, TypeOf<T>, src, dst
406// If return OK: the type has been moved OUT of the source
407// May serialize, or may be just moved.
408pub type RecvOwned = fn(
409    // The socket ptr
410    NonNull<()>,
411    // The T ptr
412    NonNull<()>,
413    // the header
414    HeaderSeq,
415    // The T ty
416    &TypeId,
417) -> Result<(), SocketSendError>;
418// Morally: &T, src, dst
419// Always a serialize
420pub type RecvBorrowed = fn(
421    // The socket ptr
422    NonNull<()>,
423    // The T ptr
424    NonNull<()>,
425    // the header
426    HeaderSeq,
427) -> Result<(), SocketSendError>;
428// Morally: it's a packet
429// Never a serialize, sometimes a deserialize
430pub type RecvRaw = fn(
431    // The socket ptr
432    NonNull<()>,
433    // The packet
434    &[u8],
435    // the header
436    HeaderSeq,
437) -> Result<(), SocketSendError>;
438
439pub type RecvError = fn(
440    // The socket ptr
441    NonNull<()>,
442    // the header
443    HeaderSeq,
444    // The Error
445    ProtocolError,
446);
447
448// --------------------------------------------------------------------------
449// impl SocketHeader
450// --------------------------------------------------------------------------
451
452unsafe impl Linked<Links<SocketHeader>> for SocketHeader {
453    type Handle = NonNull<SocketHeader>;
454
455    fn into_ptr(r: Self::Handle) -> core::ptr::NonNull<Self> {
456        r
457    }
458
459    unsafe fn from_ptr(ptr: core::ptr::NonNull<Self>) -> Self::Handle {
460        ptr
461    }
462
463    unsafe fn links(target: NonNull<Self>) -> NonNull<Links<SocketHeader>> {
464        // Safety: using `ptr::addr_of!` avoids creating a temporary
465        // reference, which stacked borrows dislikes.
466        let node = unsafe { ptr::addr_of_mut!((*target.as_ptr()).links) };
467        unsafe { NonNull::new_unchecked(node) }
468    }
469}
470
471impl SocketSendError {
472    pub fn to_error(&self) -> ProtocolError {
473        match self {
474            SocketSendError::NoSpace => ProtocolError::SSE_NO_SPACE,
475            SocketSendError::DeserFailed => ProtocolError::SSE_DESER_FAILED,
476            SocketSendError::TypeMismatch => ProtocolError::SSE_TYPE_MISMATCH,
477            SocketSendError::WhatTheHell => ProtocolError::SSE_WHAT_THE_HELL,
478        }
479    }
480}