Skip to main content

roam_types/
message.rs

1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{ChannelId, ConnectionId, Metadata, MethodId, RequestId};
8use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
9use facet_postcard::opaque_encoded_borrowed;
10
11/// Per-connection limits advertised by a peer.
12// r[impl session.connection-settings]
13// r[impl session.parity]
14// r[impl connection.parity]
15// r[impl rpc.flow-control]
16// r[impl rpc.flow-control.max-concurrent-requests]
17// r[impl rpc.flow-control.max-concurrent-requests.default]
18#[derive(Debug, Clone, PartialEq, Eq, Facet)]
19pub struct ConnectionSettings {
20    /// Whether this peer will use odd or even IDs for requests and channels on this connection.
21    pub parity: Parity,
22    /// Maximum number of in-flight requests this peer is willing to accept on this connection.
23    pub max_concurrent_requests: u32,
24}
25
26impl<'payload> Message<'payload> {
27    // Message has no methods on purpose. it's all just plain data.
28    // Adding constructors or getters is forbidden.
29}
30
31/// Whether a peer will use odd or even IDs for requests and channels
32/// on a given connection.
33// r[impl session.parity]
34// r[impl session.role]
35// r[impl connection.parity]
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
37#[repr(u8)]
38pub enum Parity {
39    Odd,
40    Even,
41}
42
43impl Parity {
44    /// Returns the opposite parity.
45    pub fn other(self) -> Self {
46        match self {
47            Parity::Odd => Parity::Even,
48            Parity::Even => Parity::Odd,
49        }
50    }
51}
52
53structstruck::strike! {
54    /// Protocol message.
55    // r[impl session]
56    // r[impl session.message]
57    // r[impl session.message.connection-id]
58    // r[impl session.peer]
59    // r[impl session.symmetry]
60    #[structstruck::each[derive(Debug, Facet)]]
61    pub struct Message<'payload> {
62        /// Connection ID: 0 for control messages (Hello, HelloYourself)
63        pub connection_id: ConnectionId,
64
65        /// Message payload
66        pub payload:
67            #[repr(u8)]
68            // r[impl session.message.payloads]
69            pub enum MessagePayload<'payload> {
70                // ========================================================================
71                // Control (conn 0 only)
72                // ========================================================================
73
74                /// Sent by initiator to acceptor as the first message
75                // r[impl session.handshake]
76                // r[impl session.connection-settings.hello]
77                Hello(pub struct Hello<'payload> {
78                    /// Must be equal to 7
79                    pub version: u32,
80
81                    /// Connection limits advertised by the initiator for the root connection.
82                    /// Parity is included in ConnectionSettings.
83                    pub connection_settings: ConnectionSettings,
84
85                    /// Metadata associated with the connection.
86                    pub metadata: Metadata<'payload>,
87                }),
88
89                /// Sent by acceptor back to initiator. Poetic on purpose, I'm not changing the name.
90                // r[impl session.connection-settings.hello]
91                HelloYourself(pub struct HelloYourself<'payload> {
92                    /// Connection limits advertised by the acceptor for the root connection.
93                    pub connection_settings: ConnectionSettings,
94
95                    /// You can _also_ have metadata if you want.
96                    pub metadata: Metadata<'payload>,
97                }),
98
99                /// Sent by either peer when the counterpart has violated the protocol.
100                /// The sender closes the transport immediately after sending this message.
101                /// No reply is expected or valid.
102                // r[impl session.protocol-error]
103                ProtocolError(pub struct ProtocolError<'payload> {
104                    /// Human-readable description of the protocol violation.
105                    pub description: &'payload str,
106                }),
107
108                // ========================================================================
109                // Connection control
110                // ========================================================================
111
112                /// Request a new virtual connection. This is sent on the desired connection
113                /// ID, even though it doesn't exist yet.
114                // r[impl connection.open]
115                // r[impl connection.virtual]
116                // r[impl session.connection-settings.open]
117                ConnectionOpen(pub struct ConnectionOpen<'payload> {
118                    /// Connection limits advertised by the opener.
119                    /// Parity is included in ConnectionSettings.
120                    pub connection_settings: ConnectionSettings,
121
122                    /// Metadata associated with the connection.
123                    pub metadata: Metadata<'payload>,
124                }),
125
126                /// Accept a virtual connection request — sent on the connection ID requested.
127                // r[impl session.connection-settings.open]
128                ConnectionAccept(pub struct ConnectionAccept<'payload> {
129                    /// Connection limits advertised by the accepter.
130                    pub connection_settings: ConnectionSettings,
131
132                    /// Metadata associated with the connection.
133                    pub metadata: Metadata<'payload>,
134                }),
135
136                /// Reject a virtual connection request — sent on the connection ID requested.
137                ConnectionReject(pub struct ConnectionReject<'payload> {
138                    /// Metadata associated with the rejection.
139                    pub metadata: Metadata<'payload>,
140                }),
141
142                /// Close a virtual connection. Trying to close conn 0 is a protocol error.
143                ConnectionClose(pub struct ConnectionClose<'payload> {
144                    /// Metadata associated with the close.
145                    pub metadata: Metadata<'payload>,
146                }),
147
148
149                // ========================================================================
150                // RPC
151                // ========================================================================
152
153                RequestMessage(
154                    pub struct RequestMessage<'payload> {
155                        /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
156                        pub id: RequestId,
157
158                        /// Request paylaod
159                        pub body:
160                            #[repr(u8)]
161                            pub enum RequestBody<'payload> {
162                                /// Perform a request (or a "call")
163                                Call(pub struct RequestCall<'payload> {
164                                    /// Unique method identifier, hash of fully qualified name + args etc.
165                                    pub method_id: MethodId,
166
167                                    /// Channel identifiers, allocated by the caller, that are passed as part
168                                    /// of the arguments.
169                                    pub channels: Vec<ChannelId>,
170
171                                    /// Metadata associated with this call
172                                    pub metadata: Metadata<'payload>,
173
174                                    /// Argument tuple
175                                    #[facet(trailing)]
176                                    pub args: Payload<'payload>,
177                                }),
178
179                                /// Respond to a request
180                                Response(struct RequestResponse<'payload> {
181                                    /// Channel IDs for streams in the response, in return type declaration order.
182                                    pub channels: Vec<ChannelId>,
183
184                                    /// Arbitrary response metadata
185                                    pub metadata: Metadata<'payload>,
186
187                                    /// Return value (`Result<T, RoamError<E>>`, where E could be Infallible depending on signature)
188                                    #[facet(trailing)]
189                                    pub ret: Payload<'payload>,
190                                }),
191
192                                /// Cancel processing of a request.
193                                Cancel(struct RequestCancel<'payload> {
194                                    /// Arbitrary cancel metadata
195                                    pub metadata: Metadata<'payload>,
196                                }),
197                            },
198                    }
199                ),
200
201                // ========================================================================
202                // Channels
203                // ========================================================================
204
205                ChannelMessage(
206                    pub struct ChannelMessage<'payload> {
207                        /// Channel ID (unique per-connection)
208                        pub id: ChannelId,
209
210                        /// Channel message body
211                        pub body:
212                            #[repr(u8)]
213                            pub enum ChannelBody<'payload> {
214                                /// Send an item on a channel. Channels are not "opened", they are created
215                                /// implicitly by calls.
216                                Item(pub struct ChannelItem<'payload> {
217                                    /// The item itself
218                                    #[facet(trailing)]
219                                    pub item: Payload<'payload>,
220                                }),
221
222                                /// Close a channel — sent by the sender of the channel when they're gracefully done
223                                /// with a channel.
224                                Close(pub struct ChannelClose<'payload> {
225                                    /// Metadata associated with closing the channel.
226                                    pub metadata: Metadata<'payload>,
227                                }),
228
229                                /// Reset a channel — sent by the receiver of a channel when they would like the sender
230                                /// to please, stop sending items through.
231                                Reset(pub struct ChannelReset<'payload> {
232                                    /// Metadata associated with resetting the channel.
233                                    pub metadata: Metadata<'payload>,
234                                }),
235
236                                /// Grant additional send credit to a channel sender.
237                                // r[impl rpc.flow-control.credit.grant]
238                                GrantCredit(pub struct ChannelGrantCredit {
239                                    /// Number of additional items the sender may send.
240                                    pub additional: u32,
241                                }),
242                            },
243                    }
244                ),
245
246                // ========================================================================
247                // Keepalive
248                // ========================================================================
249                //
250                // NOTE: these variants are intentionally appended to preserve
251                // existing discriminants for earlier message payload variants.
252
253                /// Liveness probe for dead-peer detection.
254                Ping(pub struct Ping {
255                    /// Opaque nonce echoed by the Pong response.
256                    pub nonce: u64,
257                }),
258
259                /// Reply to a keepalive Ping.
260                Pong(pub struct Pong {
261                    /// Echo of the received ping nonce.
262                    pub nonce: u64,
263                }),
264
265            },
266    }
267
268}
269
270/// A payload — arguments for a request, or return type for a response.
271///
272/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
273/// serialization/deserialization through the adapter contract:
274/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
275/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
276// r[impl zerocopy.payload]
277#[derive(Debug, Facet)]
278#[repr(u8)]
279#[facet(opaque = PayloadAdapter, traits(Debug))]
280pub enum Payload<'payload> {
281    // r[impl zerocopy.payload.borrowed]
282    /// Outgoing: type-erased pointer to caller-owned memory + its Shape.
283    Outgoing {
284        ptr: PtrConst,
285        shape: &'static Shape,
286        _lt: PhantomData<&'payload ()>,
287    },
288
289    // r[impl zerocopy.payload.bytes]
290    /// Incoming: raw bytes borrowed from the backing (zero-copy).
291    Incoming(&'payload [u8]),
292}
293
294impl<'payload> Payload<'payload> {
295    /// Construct an outgoing borrowed payload from a concrete value.
296    pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
297        unsafe {
298            Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
299        }
300    }
301
302    /// Construct an outgoing owned payload from a raw pointer + shape.
303    ///
304    /// # Safety
305    ///
306    /// The pointed value must remain alive until serialization has completed.
307    pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
308        Self::Outgoing {
309            ptr,
310            shape,
311            _lt: PhantomData,
312        }
313    }
314
315    // ps: as_incoming_bytes was a bad idea. it's not here anymore.
316}
317
318// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
319// guarantees the pointee outlives any use across threads.
320unsafe impl<'payload> Send for Payload<'payload> {}
321
322/// Adapter that bridges [`Payload`] through the opaque field contract.
323// r[impl zerocopy.framing.value.opaque]
324pub struct PayloadAdapter;
325
326impl FacetOpaqueAdapter for PayloadAdapter {
327    type Error = String;
328    type SendValue<'a> = Payload<'a>;
329    type RecvValue<'de> = Payload<'de>;
330
331    fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
332        match value {
333            Payload::Outgoing { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
334            Payload::Incoming(bytes) => opaque_encoded_borrowed(bytes),
335        }
336    }
337
338    fn deserialize_build<'de>(
339        input: OpaqueDeserialize<'de>,
340    ) -> Result<Self::RecvValue<'de>, Self::Error> {
341        match input {
342            OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::Incoming(bytes)),
343            OpaqueDeserialize::Owned(_) => {
344                Err("payload bytes must be borrowed from backing, not owned".into())
345            }
346        }
347    }
348}
349
350/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
351pub struct MessageFamily;
352
353impl crate::MsgFamily for MessageFamily {
354    type Msg<'a> = Message<'a>;
355}