roam_types/message.rs
1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{ChannelId, ConnectionId, Metadata, MethodId, RequestId};
8use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
9use facet_postcard::opaque_encoded_borrowed;
10
11/// Per-connection limits advertised by a peer.
12// r[impl session.connection-settings]
13// r[impl session.parity]
14// r[impl connection.parity]
15// r[impl rpc.flow-control]
16// r[impl rpc.flow-control.max-concurrent-requests]
17// r[impl rpc.flow-control.max-concurrent-requests.default]
18#[derive(Debug, Clone, PartialEq, Eq, Facet)]
19pub struct ConnectionSettings {
20 /// Whether this peer will use odd or even IDs for requests and channels on this connection.
21 pub parity: Parity,
22 /// Maximum number of in-flight requests this peer is willing to accept on this connection.
23 pub max_concurrent_requests: u32,
24}
25
26impl<'payload> Message<'payload> {
27 // Message has no methods on purpose. it's all just plain data.
28 // Adding constructors or getters is forbidden.
29}
30
31/// Whether a peer will use odd or even IDs for requests and channels
32/// on a given connection.
33// r[impl session.parity]
34// r[impl session.role]
35// r[impl connection.parity]
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
37#[repr(u8)]
38pub enum Parity {
39 Odd,
40 Even,
41}
42
43impl Parity {
44 /// Returns the opposite parity.
45 pub fn other(self) -> Self {
46 match self {
47 Parity::Odd => Parity::Even,
48 Parity::Even => Parity::Odd,
49 }
50 }
51}
52
53structstruck::strike! {
54 /// Protocol message.
55 // r[impl session]
56 // r[impl session.message]
57 // r[impl session.message.connection-id]
58 // r[impl session.peer]
59 // r[impl session.symmetry]
60 #[structstruck::each[derive(Debug, Facet)]]
61 pub struct Message<'payload> {
62 /// Connection ID: 0 for control messages (Hello, HelloYourself)
63 pub connection_id: ConnectionId,
64
65 /// Message payload
66 pub payload:
67 #[repr(u8)]
68 // r[impl session.message.payloads]
69 pub enum MessagePayload<'payload> {
70 // ========================================================================
71 // Control (conn 0 only)
72 // ========================================================================
73
74 /// Sent by initiator to acceptor as the first message
75 // r[impl session.handshake]
76 // r[impl session.connection-settings.hello]
77 Hello(pub struct Hello<'payload> {
78 /// Must be equal to 7
79 pub version: u32,
80
81 /// Connection limits advertised by the initiator for the root connection.
82 /// Parity is included in ConnectionSettings.
83 pub connection_settings: ConnectionSettings,
84
85 /// Metadata associated with the connection.
86 pub metadata: Metadata<'payload>,
87 }),
88
89 /// Sent by acceptor back to initiator. Poetic on purpose, I'm not changing the name.
90 // r[impl session.connection-settings.hello]
91 HelloYourself(pub struct HelloYourself<'payload> {
92 /// Connection limits advertised by the acceptor for the root connection.
93 pub connection_settings: ConnectionSettings,
94
95 /// You can _also_ have metadata if you want.
96 pub metadata: Metadata<'payload>,
97 }),
98
99 /// Sent by either peer when the counterpart has violated the protocol.
100 /// The sender closes the transport immediately after sending this message.
101 /// No reply is expected or valid.
102 // r[impl session.protocol-error]
103 ProtocolError(pub struct ProtocolError<'payload> {
104 /// Human-readable description of the protocol violation.
105 pub description: &'payload str,
106 }),
107
108 // ========================================================================
109 // Connection control
110 // ========================================================================
111
112 /// Request a new virtual connection. This is sent on the desired connection
113 /// ID, even though it doesn't exist yet.
114 // r[impl connection.open]
115 // r[impl connection.virtual]
116 // r[impl session.connection-settings.open]
117 ConnectionOpen(pub struct ConnectionOpen<'payload> {
118 /// Connection limits advertised by the opener.
119 /// Parity is included in ConnectionSettings.
120 pub connection_settings: ConnectionSettings,
121
122 /// Metadata associated with the connection.
123 pub metadata: Metadata<'payload>,
124 }),
125
126 /// Accept a virtual connection request — sent on the connection ID requested.
127 // r[impl session.connection-settings.open]
128 ConnectionAccept(pub struct ConnectionAccept<'payload> {
129 /// Connection limits advertised by the accepter.
130 pub connection_settings: ConnectionSettings,
131
132 /// Metadata associated with the connection.
133 pub metadata: Metadata<'payload>,
134 }),
135
136 /// Reject a virtual connection request — sent on the connection ID requested.
137 ConnectionReject(pub struct ConnectionReject<'payload> {
138 /// Metadata associated with the rejection.
139 pub metadata: Metadata<'payload>,
140 }),
141
142 /// Close a virtual connection. Trying to close conn 0 is a protocol error.
143 ConnectionClose(pub struct ConnectionClose<'payload> {
144 /// Metadata associated with the close.
145 pub metadata: Metadata<'payload>,
146 }),
147
148
149 // ========================================================================
150 // RPC
151 // ========================================================================
152
153 RequestMessage(
154 pub struct RequestMessage<'payload> {
155 /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
156 pub id: RequestId,
157
158 /// Request paylaod
159 pub body:
160 #[repr(u8)]
161 pub enum RequestBody<'payload> {
162 /// Perform a request (or a "call")
163 Call(pub struct RequestCall<'payload> {
164 /// Unique method identifier, hash of fully qualified name + args etc.
165 pub method_id: MethodId,
166
167 /// Channel identifiers, allocated by the caller, that are passed as part
168 /// of the arguments.
169 pub channels: Vec<ChannelId>,
170
171 /// Metadata associated with this call
172 pub metadata: Metadata<'payload>,
173
174 /// Argument tuple
175 #[facet(trailing)]
176 pub args: Payload<'payload>,
177 }),
178
179 /// Respond to a request
180 Response(struct RequestResponse<'payload> {
181 /// Channel IDs for streams in the response, in return type declaration order.
182 pub channels: Vec<ChannelId>,
183
184 /// Arbitrary response metadata
185 pub metadata: Metadata<'payload>,
186
187 /// Return value (`Result<T, RoamError<E>>`, where E could be Infallible depending on signature)
188 #[facet(trailing)]
189 pub ret: Payload<'payload>,
190 }),
191
192 /// Cancel processing of a request.
193 Cancel(struct RequestCancel<'payload> {
194 /// Arbitrary cancel metadata
195 pub metadata: Metadata<'payload>,
196 }),
197 },
198 }
199 ),
200
201 // ========================================================================
202 // Channels
203 // ========================================================================
204
205 ChannelMessage(
206 pub struct ChannelMessage<'payload> {
207 /// Channel ID (unique per-connection)
208 pub id: ChannelId,
209
210 /// Channel message body
211 pub body:
212 #[repr(u8)]
213 pub enum ChannelBody<'payload> {
214 /// Send an item on a channel. Channels are not "opened", they are created
215 /// implicitly by calls.
216 Item(pub struct ChannelItem<'payload> {
217 /// The item itself
218 #[facet(trailing)]
219 pub item: Payload<'payload>,
220 }),
221
222 /// Close a channel — sent by the sender of the channel when they're gracefully done
223 /// with a channel.
224 Close(pub struct ChannelClose<'payload> {
225 /// Metadata associated with closing the channel.
226 pub metadata: Metadata<'payload>,
227 }),
228
229 /// Reset a channel — sent by the receiver of a channel when they would like the sender
230 /// to please, stop sending items through.
231 Reset(pub struct ChannelReset<'payload> {
232 /// Metadata associated with resetting the channel.
233 pub metadata: Metadata<'payload>,
234 }),
235
236 /// Grant additional send credit to a channel sender.
237 // r[impl rpc.flow-control.credit.grant]
238 GrantCredit(pub struct ChannelGrantCredit {
239 /// Number of additional items the sender may send.
240 pub additional: u32,
241 }),
242 },
243 }
244 ),
245
246 // ========================================================================
247 // Keepalive
248 // ========================================================================
249 //
250 // NOTE: these variants are intentionally appended to preserve
251 // existing discriminants for earlier message payload variants.
252
253 /// Liveness probe for dead-peer detection.
254 Ping(pub struct Ping {
255 /// Opaque nonce echoed by the Pong response.
256 pub nonce: u64,
257 }),
258
259 /// Reply to a keepalive Ping.
260 Pong(pub struct Pong {
261 /// Echo of the received ping nonce.
262 pub nonce: u64,
263 }),
264
265 },
266 }
267
268}
269
270/// A payload — arguments for a request, or return type for a response.
271///
272/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
273/// serialization/deserialization through the adapter contract:
274/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
275/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
276// r[impl zerocopy.payload]
277#[derive(Debug, Facet)]
278#[repr(u8)]
279#[facet(opaque = PayloadAdapter, traits(Debug))]
280pub enum Payload<'payload> {
281 // r[impl zerocopy.payload.borrowed]
282 /// Outgoing: type-erased pointer to caller-owned memory + its Shape.
283 Outgoing {
284 ptr: PtrConst,
285 shape: &'static Shape,
286 _lt: PhantomData<&'payload ()>,
287 },
288
289 // r[impl zerocopy.payload.bytes]
290 /// Incoming: raw bytes borrowed from the backing (zero-copy).
291 Incoming(&'payload [u8]),
292}
293
294impl<'payload> Payload<'payload> {
295 /// Construct an outgoing borrowed payload from a concrete value.
296 pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
297 unsafe {
298 Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
299 }
300 }
301
302 /// Construct an outgoing owned payload from a raw pointer + shape.
303 ///
304 /// # Safety
305 ///
306 /// The pointed value must remain alive until serialization has completed.
307 pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
308 Self::Outgoing {
309 ptr,
310 shape,
311 _lt: PhantomData,
312 }
313 }
314
315 // ps: as_incoming_bytes was a bad idea. it's not here anymore.
316}
317
318// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
319// guarantees the pointee outlives any use across threads.
320unsafe impl<'payload> Send for Payload<'payload> {}
321
322/// Adapter that bridges [`Payload`] through the opaque field contract.
323// r[impl zerocopy.framing.value.opaque]
324pub struct PayloadAdapter;
325
326impl FacetOpaqueAdapter for PayloadAdapter {
327 type Error = String;
328 type SendValue<'a> = Payload<'a>;
329 type RecvValue<'de> = Payload<'de>;
330
331 fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
332 match value {
333 Payload::Outgoing { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
334 Payload::Incoming(bytes) => opaque_encoded_borrowed(bytes),
335 }
336 }
337
338 fn deserialize_build<'de>(
339 input: OpaqueDeserialize<'de>,
340 ) -> Result<Self::RecvValue<'de>, Self::Error> {
341 match input {
342 OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::Incoming(bytes)),
343 OpaqueDeserialize::Owned(_) => {
344 Err("payload bytes must be borrowed from backing, not owned".into())
345 }
346 }
347 }
348}
349
350/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
351pub struct MessageFamily;
352
353impl crate::MsgFamily for MessageFamily {
354 type Msg<'a> = Message<'a>;
355}