ergot_base/socket/
mod.rs

1//! The "Sockets"
2//!
3//! Ergot is oriented around type-safe sockets. Rather than TCP/IP sockets,
4//! which provide users with either streams or frames of bytes (e.g. `[u8]`),
5//! Ergot sockets are always of a certain Rust data type, such as structs or
6//! enums. They provide an API very similar to "channels", a common way of
7//! passing data around within Rust programs.
8//!
9//! When messages are received from outside of the current application/firmware,
10//! messages are deserialized using the `postcard` serialization format, a
11//! compact, non-self-describing, binary format.
12//!
13//! When messages are sent locally within a device, no serialization or
14//! deserialization occurs, meaning that fundamentally sending data to an
15//! Ergot socket locally has no cost over using a normal channel.
16//!
17//! In general: Sockets **receive**, and the NetStack **sends**.
18//!
19//! ### Non-stateful sockets
20//!
21//! Currently, sockets in Ergot are not stateful, meaning that they only serve
22//! to receive messages. Replies may be made by sending a response to the
23//! source address of the received message, using the [`NetStack`] to send
24//! the response.
25//!
26//! Conceptually, this makes Ergot sockets similar to UDP sockets: delivery
27//! is not guaranteed.
28//!
29//! ### A variety of sockets
30//!
31//! Ergot allows for different implementations of what a "socket" is, with
32//! a common subset of functionality. Normally, this might sound like just the
33//! problem to solve with a Rust `trait`, however as we would like to store
34//! all of these items in a single intrusive linked list, this becomes
35//! problematic.
36//!
37//! Instead, the pinned sockets all feature a common socket header, which
38//! includes a hand-crafted vtable used to interact with the socket. This allows
39//! us to have the moral equivalent to `List<dyn Socket>`, but in a way that
40//! is easier to support on embedded devices without an allocator.
41//!
42//! This indirection allows us to be flexible both in intent of a socket, for
43//! example a socket that expects a single one-shot response, or a socket that
44//! expects a stream of requests; as well as flexible in the means of storage
45//! of a socket, for example using stackful bounded queues of message on
46//! embedded systems, or heapful unbounded queues of messages on systems with
47//! an allocator.
48//!
49//! This approach of using a linked list, common header, and vtable, is NEARLY
50//! IDENTICAL to how most async executors operate in Rust, particularly how
51//! Tasks containing differently-typed Futures are handled by the executor
52//! itself when it comes to polling or dropping a Task.
53//!
54//! [`NetStack`]: crate::NetStack
55
56use core::{
57    any::TypeId,
58    ptr::{self, NonNull},
59};
60
61use crate::{FrameKind, HeaderSeq, Key, ProtocolError};
62use cordyceps::{Linked, list::Links};
63
64pub mod raw;
65
66macro_rules! wrapper {
67    ($sto: ty, $($arr: ident)?) => {
68        #[repr(transparent)]
69        pub struct Socket<T, R, M, $(const $arr: usize)?>
70        where
71            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
72            R: mutex::ScopedRawMutex + 'static,
73            M: $crate::interface_manager::InterfaceManager + 'static,
74        {
75            socket: $crate::socket::raw::Socket<$sto, T, R, M>,
76        }
77
78        pub struct SocketHdl<'a, T, R, M, $(const $arr: usize)?>
79        where
80            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
81            R: mutex::ScopedRawMutex + 'static,
82            M: $crate::interface_manager::InterfaceManager + 'static,
83        {
84            hdl: $crate::socket::raw::SocketHdl<'a, $sto, T, R, M>,
85        }
86
87        pub struct Recv<'a, 'b, T, R, M, $(const $arr: usize)?>
88        where
89            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
90            R: mutex::ScopedRawMutex + 'static,
91            M: $crate::interface_manager::InterfaceManager + 'static,
92        {
93            recv: $crate::socket::raw::Recv<'a, 'b, $sto, T, R, M>,
94        }
95
96        impl<T, R, M, $(const $arr: usize)?> Socket<T, R, M, $($arr)?>
97        where
98            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
99            R: mutex::ScopedRawMutex + 'static,
100            M: $crate::interface_manager::InterfaceManager + 'static,
101        {
102            pub fn attach<'a>(self: core::pin::Pin<&'a mut Self>) -> SocketHdl<'a, T, R, M, $($arr)?> {
103                let socket: core::pin::Pin<&'a mut $crate::socket::raw::Socket<$sto, T, R, M>> = unsafe { self.map_unchecked_mut(|me| &mut me.socket) };
104                SocketHdl {
105                    hdl: socket.attach(),
106                }
107            }
108
109            pub fn attach_broadcast<'a>(
110                self: core::pin::Pin<&'a mut Self>,
111            ) -> SocketHdl<'a, T, R, M, $($arr)?> {
112                let socket: core::pin::Pin<&'a mut $crate::socket::raw::Socket<$sto, T, R, M>> = unsafe { self.map_unchecked_mut(|me| &mut me.socket) };
113                SocketHdl {
114                    hdl: socket.attach_broadcast(),
115                }
116            }
117
118            pub fn stack(&self) -> &'static crate::net_stack::NetStack<R, M> {
119                self.socket.stack()
120            }
121        }
122
123        impl<'a, T, R, M, $(const $arr: usize)?> SocketHdl<'a, T, R, M, $($arr)?>
124        where
125            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
126            R: mutex::ScopedRawMutex + 'static,
127            M: $crate::interface_manager::InterfaceManager + 'static,
128        {
129            pub fn port(&self) -> u8 {
130                self.hdl.port()
131            }
132
133            pub fn stack(&self) -> &'static crate::net_stack::NetStack<R, M> {
134                self.hdl.stack()
135            }
136
137            // TODO: This future is !Send? I don't fully understand why, but rustc complains
138            // that since `NonNull<OwnedSocket<E>>` is !Sync, then this future can't be Send,
139            // BUT impl'ing Sync unsafely on OwnedSocketHdl + OwnedSocket doesn't seem to help.
140            pub fn recv<'b>(&'b mut self) -> Recv<'b, 'a, T, R, M, $($arr)?> {
141                Recv {
142                    recv: self.hdl.recv(),
143                }
144            }
145        }
146
147        impl<T, R, M, $(const $arr: usize)?> Future for Recv<'_, '_, T, R, M, $($arr)?>
148        where
149            T: serde::Serialize + Clone + serde::de::DeserializeOwned + 'static,
150            R: mutex::ScopedRawMutex + 'static,
151            M: $crate::interface_manager::InterfaceManager + 'static,
152        {
153            type Output = $crate::socket::Response<T>;
154
155            fn poll(
156                self: core::pin::Pin<&mut Self>,
157                cx: &mut core::task::Context<'_>,
158            ) -> core::task::Poll<Self::Output> {
159                let recv: core::pin::Pin<&mut $crate::socket::raw::Recv<'_, '_, $sto, T, R, M>> = unsafe { self.map_unchecked_mut(|me| &mut me.recv) };
160                recv.poll(cx)
161            }
162        }
163    };
164}
165
166pub mod single {
167    use mutex::ScopedRawMutex;
168    use serde::{Serialize, de::DeserializeOwned};
169
170    use crate::{
171        Key,
172        interface_manager::InterfaceManager,
173        net_stack::NetStack,
174        socket::{Attributes, raw},
175    };
176
177    impl<T: 'static> raw::Storage<T> for Option<T> {
178        #[inline]
179        fn is_full(&self) -> bool {
180            self.is_some()
181        }
182
183        #[inline]
184        fn is_empty(&self) -> bool {
185            self.is_none()
186        }
187
188        #[inline]
189        fn push(&mut self, t: T) -> Result<(), raw::StorageFull> {
190            if self.is_some() {
191                return Err(raw::StorageFull);
192            }
193            *self = Some(t);
194            Ok(())
195        }
196
197        #[inline]
198        fn try_pop(&mut self) -> Option<T> {
199            self.take()
200        }
201    }
202
203    wrapper!(Option<super::Response<T>>,);
204
205    impl<T, R, M> Socket<T, R, M>
206    where
207        T: Serialize + Clone + DeserializeOwned + 'static,
208        R: ScopedRawMutex + 'static,
209        M: InterfaceManager + 'static,
210    {
211        #[inline]
212        pub const fn new(net: &'static NetStack<R, M>, key: Key, attrs: Attributes) -> Self {
213            Self {
214                socket: raw::Socket::new(net, key, attrs, None),
215            }
216        }
217    }
218}
219
220pub mod std_bounded {
221    use mutex::ScopedRawMutex;
222    use serde::{Serialize, de::DeserializeOwned};
223    use std::collections::VecDeque;
224
225    use crate::{Key, NetStack, interface_manager::InterfaceManager};
226
227    use super::{Attributes, raw};
228
229    pub struct Bounded<T> {
230        storage: std::collections::VecDeque<T>,
231        max_len: usize,
232    }
233
234    impl<T> Bounded<T> {
235        pub fn with_bound(bound: usize) -> Self {
236            Self {
237                storage: VecDeque::new(),
238                max_len: bound,
239            }
240        }
241    }
242
243    impl<T: 'static> raw::Storage<T> for Bounded<T> {
244        #[inline]
245        fn is_full(&self) -> bool {
246            self.storage.len() >= self.max_len
247        }
248
249        #[inline]
250        fn is_empty(&self) -> bool {
251            self.storage.is_empty()
252        }
253
254        #[inline]
255        fn push(&mut self, t: T) -> Result<(), raw::StorageFull> {
256            if self.is_full() {
257                return Err(raw::StorageFull);
258            }
259            self.storage.push_back(t);
260            Ok(())
261        }
262
263        #[inline]
264        fn try_pop(&mut self) -> Option<T> {
265            self.storage.pop_front()
266        }
267    }
268
269    wrapper!(Bounded<super::Response<T>>,);
270
271    impl<T, R, M> Socket<T, R, M>
272    where
273        T: Serialize + Clone + DeserializeOwned + 'static,
274        R: ScopedRawMutex + 'static,
275        M: InterfaceManager + 'static,
276    {
277        #[inline]
278        pub fn new(
279            net: &'static NetStack<R, M>,
280            key: Key,
281            attrs: Attributes,
282            bound: usize,
283        ) -> Self {
284            Self {
285                socket: raw::Socket::new(net, key, attrs, Bounded::with_bound(bound)),
286            }
287        }
288    }
289}
290
291pub mod stack_vec {
292    use mutex::ScopedRawMutex;
293    use serde::{Serialize, de::DeserializeOwned};
294
295    use crate::{Key, NetStack, interface_manager::InterfaceManager};
296
297    use super::{Attributes, raw};
298
299    pub struct Bounded<T: 'static, const N: usize> {
300        storage: heapless::Vec<T, N>,
301    }
302
303    impl<T: 'static, const N: usize> Bounded<T, N> {
304        pub const fn new() -> Self {
305            Self {
306                storage: heapless::Vec::new(),
307            }
308        }
309    }
310
311    impl<T: 'static, const N: usize> Default for Bounded<T, N> {
312        fn default() -> Self {
313            Self::new()
314        }
315    }
316
317    impl<T: 'static, const N: usize> raw::Storage<T> for Bounded<T, N> {
318        #[inline]
319        fn is_full(&self) -> bool {
320            self.storage.is_full()
321        }
322
323        #[inline]
324        fn is_empty(&self) -> bool {
325            self.storage.is_empty()
326        }
327
328        #[inline]
329        fn push(&mut self, t: T) -> Result<(), raw::StorageFull> {
330            self.storage.push(t).map_err(|_| raw::StorageFull)
331        }
332
333        #[inline]
334        fn try_pop(&mut self) -> Option<T> {
335            self.storage.pop()
336        }
337    }
338
339    wrapper!(Bounded<super::Response<T>, N>, N);
340
341    impl<T, R, M, const N: usize> Socket<T, R, M, N>
342    where
343        T: Serialize + Clone + DeserializeOwned + 'static,
344        R: ScopedRawMutex + 'static,
345        M: InterfaceManager + 'static,
346    {
347        #[inline]
348        pub const fn new(net: &'static NetStack<R, M>, key: Key, attrs: Attributes) -> Self {
349            Self {
350                socket: raw::Socket::new(net, key, attrs, Bounded::new()),
351            }
352        }
353    }
354}
355
356#[derive(Debug)]
357pub struct Attributes {
358    pub kind: FrameKind,
359    // If true: participates in service discovery and responds to ANY delivery.
360    // if false: is not included in service discovery, and only responds to specific port addressing.
361    pub discoverable: bool,
362}
363
364#[derive(Debug)]
365pub struct SocketHeader {
366    pub(crate) links: Links<SocketHeader>,
367    pub(crate) vtable: &'static SocketVTable,
368    pub(crate) key: Key,
369    pub(crate) attrs: Attributes,
370    pub(crate) port: u8,
371}
372
373// TODO: Way of signaling "socket consumed"?
374#[derive(Debug, PartialEq, Eq)]
375#[non_exhaustive]
376pub enum SocketSendError {
377    NoSpace,
378    DeserFailed,
379    TypeMismatch,
380    WhatTheHell,
381}
382
383#[derive(Debug, Clone)]
384pub struct SocketVTable {
385    pub(crate) recv_owned: Option<RecvOwned>,
386    pub(crate) recv_bor: Option<RecvBorrowed>,
387    pub(crate) recv_raw: RecvRaw,
388    pub(crate) recv_err: Option<RecvError>,
389    // NOTE: We do *not* have a `drop` impl here, because the list
390    // doesn't ACTUALLY own the nodes, so it is not responsible for dropping
391    // them. They are naturally destroyed by their true owner.
392}
393
394#[derive(Debug)]
395pub struct OwnedMessage<T: 'static> {
396    pub hdr: HeaderSeq,
397    pub t: T,
398}
399
400pub type Response<T> = Result<OwnedMessage<T>, OwnedMessage<ProtocolError>>;
401
402// TODO: replace with header and handle kind and stuff right!
403
404// Morally: &T, TypeOf<T>, src, dst
405// If return OK: the type has been moved OUT of the source
406// May serialize, or may be just moved.
407pub type RecvOwned = fn(
408    // The socket ptr
409    NonNull<()>,
410    // The T ptr
411    NonNull<()>,
412    // the header
413    HeaderSeq,
414    // The T ty
415    &TypeId,
416) -> Result<(), SocketSendError>;
417// Morally: &T, src, dst
418// Always a serialize
419pub type RecvBorrowed = fn(
420    // The socket ptr
421    NonNull<()>,
422    // The T ptr
423    NonNull<()>,
424    // the header
425    HeaderSeq,
426) -> Result<(), SocketSendError>;
427// Morally: it's a packet
428// Never a serialize, sometimes a deserialize
429pub type RecvRaw = fn(
430    // The socket ptr
431    NonNull<()>,
432    // The packet
433    &[u8],
434    // the header
435    HeaderSeq,
436) -> Result<(), SocketSendError>;
437
438pub type RecvError = fn(
439    // The socket ptr
440    NonNull<()>,
441    // the header
442    HeaderSeq,
443    // The Error
444    ProtocolError,
445);
446
447// --------------------------------------------------------------------------
448// impl SocketHeader
449// --------------------------------------------------------------------------
450
451unsafe impl Linked<Links<SocketHeader>> for SocketHeader {
452    type Handle = NonNull<SocketHeader>;
453
454    fn into_ptr(r: Self::Handle) -> std::ptr::NonNull<Self> {
455        r
456    }
457
458    unsafe fn from_ptr(ptr: std::ptr::NonNull<Self>) -> Self::Handle {
459        ptr
460    }
461
462    unsafe fn links(target: NonNull<Self>) -> NonNull<Links<SocketHeader>> {
463        // Safety: using `ptr::addr_of!` avoids creating a temporary
464        // reference, which stacked borrows dislikes.
465        let node = unsafe { ptr::addr_of_mut!((*target.as_ptr()).links) };
466        unsafe { NonNull::new_unchecked(node) }
467    }
468}
469
470impl SocketSendError {
471    pub fn to_error(&self) -> ProtocolError {
472        match self {
473            SocketSendError::NoSpace => ProtocolError::SSE_NO_SPACE,
474            SocketSendError::DeserFailed => ProtocolError::SSE_DESER_FAILED,
475            SocketSendError::TypeMismatch => ProtocolError::SSE_TYPE_MISMATCH,
476            SocketSendError::WhatTheHell => ProtocolError::SSE_WHAT_THE_HELL,
477        }
478    }
479}