Skip to main content

vox_types/
message.rs

1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{
8    BindingDirection, CborPayload, ChannelId, ConnectionId, Metadata, MethodId, RequestId,
9};
10use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
11use vox_schema::opaque_encoded_borrowed;
12
13/// Default per-channel initial credit and inbound queue capacity.
14// r[impl rpc.flow-control.credit.initial]
15pub const DEFAULT_INITIAL_CHANNEL_CREDIT: u32 = 16;
16
17/// Per-connection limits advertised by a peer.
18// r[impl session.connection-settings]
19// r[impl session.parity]
20// r[impl connection.parity]
21// r[impl rpc.flow-control]
22// r[impl rpc.flow-control.max-concurrent-requests]
23// r[impl rpc.flow-control.max-concurrent-requests.default]
24// r[impl rpc.flow-control.credit.initial]
25#[derive(Debug, Clone, PartialEq, Eq, Facet)]
26pub struct ConnectionSettings {
27    /// Whether this peer will use odd or even IDs for requests and channels on this connection.
28    pub parity: Parity,
29    /// Maximum number of in-flight requests this peer is willing to accept on this connection.
30    pub max_concurrent_requests: u32,
31    /// Initial per-channel credit this peer grants for channels it receives.
32    #[facet(default = DEFAULT_INITIAL_CHANNEL_CREDIT)]
33    pub initial_channel_credit: u32,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub struct SessionResumeKey(pub [u8; 16]);
38
39impl<'payload> Message<'payload> {
40    // Message has no methods on purpose. it's all just plain data.
41    // Adding constructors or getters is forbidden.
42}
43
44/// Whether a peer will use odd or even IDs for requests and channels
45/// on a given connection.
46// r[impl session.parity]
47// r[impl session.role]
48// r[impl connection.parity]
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
50#[repr(u8)]
51pub enum Parity {
52    Odd,
53    Even,
54}
55
56impl Parity {
57    /// Returns the opposite parity.
58    pub fn other(self) -> Self {
59        match self {
60            Parity::Odd => Parity::Even,
61            Parity::Even => Parity::Odd,
62        }
63    }
64}
65
66structstruck::strike! {
67    /// Protocol message.
68    // r[impl session]
69    // r[impl session.message]
70    // r[impl session.message.connection-id]
71    // r[impl session.peer]
72    // r[impl session.symmetry]
73    #[structstruck::each[derive(Debug, Facet)]]
74    pub struct Message<'payload> {
75        /// Connection ID: 0 for control messages (ProtocolError, Ping, Pong)
76        pub connection_id: ConnectionId,
77
78        /// Message payload
79        pub payload:
80            #[repr(u8)]
81            // r[impl session.message.payloads]
82            pub enum MessagePayload<'payload> {
83                // ========================================================================
84                // Control (conn 0 only)
85                // ========================================================================
86
87                /// Sent by either peer when the counterpart has violated the protocol.
88                /// The sender closes the transport immediately after sending this message.
89                /// No reply is expected or valid.
90                // r[impl session.protocol-error]
91                ProtocolError(pub struct ProtocolError<'payload> {
92                    /// Human-readable description of the protocol violation.
93                    pub description: &'payload str,
94                }),
95
96                // ========================================================================
97                // Connection control
98                // ========================================================================
99
100                /// Request a new virtual connection. This is sent on the desired connection
101                /// ID, even though it doesn't exist yet.
102                // r[impl connection.open]
103                // r[impl connection.virtual]
104                // r[impl session.connection-settings.open]
105                ConnectionOpen(pub struct ConnectionOpen<'payload> {
106                    /// Connection limits advertised by the opener.
107                    /// Parity is included in ConnectionSettings.
108                    pub connection_settings: ConnectionSettings,
109
110                    /// Metadata associated with the connection.
111                    pub metadata: Metadata<'payload>,
112                }),
113
114                /// Accept a virtual connection request — sent on the connection ID requested.
115                // r[impl session.connection-settings.open]
116                ConnectionAccept(pub struct ConnectionAccept<'payload> {
117                    /// Connection limits advertised by the accepter.
118                    pub connection_settings: ConnectionSettings,
119
120                    /// Metadata associated with the connection.
121                    pub metadata: Metadata<'payload>,
122                }),
123
124                /// Reject a virtual connection request — sent on the connection ID requested.
125                ConnectionReject(pub struct ConnectionReject<'payload> {
126                    /// Metadata associated with the rejection.
127                    pub metadata: Metadata<'payload>,
128                }),
129
130                /// Close a virtual connection. Trying to close conn 0 is a protocol error.
131                ConnectionClose(pub struct ConnectionClose<'payload> {
132                    /// Metadata associated with the close.
133                    pub metadata: Metadata<'payload>,
134                }),
135
136
137                // ========================================================================
138                // RPC
139                // ========================================================================
140
141                RequestMessage(
142                    pub struct RequestMessage<'payload> {
143                        /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
144                        pub id: RequestId,
145
146                        /// Request paylaod
147                        pub body:
148                            #[repr(u8)]
149                            pub enum RequestBody<'payload> {
150                                /// Perform a request (or a "call")
151                                Call(pub struct RequestCall<'payload> {
152                                    /// Unique method identifier, hash of fully qualified name + args etc.
153                                    pub method_id: MethodId,
154
155                                    /// Metadata associated with this call
156                                    pub metadata: Metadata<'payload>,
157
158                                    /// Argument tuple
159                                    pub args: Payload<'payload>,
160
161                                    /// CBOR-encoded schemas for this call's args tuple
162                                    /// Non-empty on the first call for each method on a connection.
163                                    pub schemas: CborPayload,
164                                }),
165
166                                /// Respond to a request
167                                Response(struct RequestResponse<'payload> {
168                                    /// Arbitrary response metadata
169                                    pub metadata: Metadata<'payload>,
170
171                                    /// Return value (`Result<T, VoxError<E>>`, where E could be Infallible depending on signature)
172                                    pub ret: Payload<'payload>,
173
174                                    /// CBOR-encoded schemas for this response's return type.
175                                    /// Non-empty on the first response for each method on a connection.
176                                    pub schemas: CborPayload,
177                                }),
178
179                                /// Cancel processing of a request.
180                                Cancel(struct RequestCancel<'payload> {
181                                    /// Arbitrary cancel metadata
182                                    pub metadata: Metadata<'payload>,
183                                }),
184                            },
185                    }
186                ),
187
188                /// Advertise schemas for a method binding on this connection.
189                ///
190                /// This is sent ahead of payload-bearing messages so a batch can
191                /// establish all required schema bindings before their first use.
192                SchemaMessage(pub struct SchemaMessage {
193                    /// Unique method identifier the binding applies to.
194                    pub method_id: MethodId,
195
196                    /// Whether the binding applies to request args or responses.
197                    pub direction: BindingDirection,
198
199                    /// CBOR-encoded schema payload for this binding.
200                    pub schemas: CborPayload,
201                }),
202
203                // ========================================================================
204                // Channels
205                // ========================================================================
206
207                ChannelMessage(
208                    pub struct ChannelMessage<'payload> {
209                        /// Channel ID (unique per-connection)
210                        pub id: ChannelId,
211
212                        /// Channel message body
213                        pub body:
214                            #[repr(u8)]
215                            pub enum ChannelBody<'payload> {
216                                /// Send an item on a channel. Channels are not "opened", they are created
217                                /// implicitly by calls.
218                                Item(pub struct ChannelItem<'payload> {
219                                    /// The item itself
220                                    pub item: Payload<'payload>,
221                                }),
222
223                                /// Close a channel — sent by the sender of the channel when they're gracefully done
224                                /// with a channel.
225                                Close(pub struct ChannelClose<'payload> {
226                                    /// Metadata associated with closing the channel.
227                                    pub metadata: Metadata<'payload>,
228                                }),
229
230                                /// Reset a channel — sent by the receiver of a channel when they would like the sender
231                                /// to please, stop sending items through.
232                                Reset(pub struct ChannelReset<'payload> {
233                                    /// Metadata associated with resetting the channel.
234                                    pub metadata: Metadata<'payload>,
235                                }),
236
237                                /// Grant additional send credit to a channel sender.
238                                // r[impl rpc.flow-control.credit.grant]
239                                GrantCredit(pub struct ChannelGrantCredit {
240                                    /// Number of additional items the sender may send.
241                                    pub additional: u32,
242                                }),
243                            },
244                    }
245                ),
246
247                // ========================================================================
248                // Keepalive
249                // ========================================================================
250
251                /// Liveness probe for dead-peer detection.
252                Ping(pub struct Ping {
253                    /// Opaque nonce echoed by the Pong response.
254                    pub nonce: u64,
255                }),
256
257                /// Reply to a keepalive Ping.
258                Pong(pub struct Pong {
259                    /// Echo of the received ping nonce.
260                    pub nonce: u64,
261                }),
262
263            },
264    }
265
266}
267
268/// A payload — arguments for a request, or return type for a response.
269///
270/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
271/// serialization/deserialization through the adapter contract:
272/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
273/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
274// r[impl zerocopy.payload]
275#[derive(Debug, Facet)]
276#[repr(u8)]
277#[facet(opaque = PayloadAdapter, traits(Debug))]
278pub enum Payload<'payload> {
279    // r[impl zerocopy.payload.borrowed]
280    /// Type-erased pointer to caller-owned memory + its Shape.
281    Value {
282        ptr: PtrConst,
283        shape: &'static Shape,
284        _lt: PhantomData<&'payload ()>,
285    },
286
287    // r[impl zerocopy.payload.bytes]
288    /// Raw bytes borrowed from the backing (zero-copy).
289    PostcardBytes(&'payload [u8]),
290}
291
292impl<'payload> Payload<'payload> {
293    /// Construct an outgoing borrowed payload from a concrete value.
294    pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
295        unsafe {
296            Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
297        }
298    }
299
300    /// Construct an outgoing owned payload from a raw pointer + shape.
301    ///
302    /// # Safety
303    ///
304    /// The pointed value must remain alive until serialization has completed.
305    pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
306        Self::Value {
307            ptr,
308            shape,
309            _lt: PhantomData,
310        }
311    }
312
313    /// Create a new `Payload` that borrows the same data with a shorter lifetime.
314    ///
315    /// For `Outgoing`: same ptr/shape, new lifetime.
316    /// For `Incoming`: reborrows the byte slice.
317    pub fn reborrow(&self) -> Payload<'_> {
318        match self {
319            Payload::Value { ptr, shape, .. } => Payload::Value {
320                ptr: *ptr,
321                shape,
322                _lt: PhantomData,
323            },
324            Payload::PostcardBytes(bytes) => Payload::PostcardBytes(bytes),
325        }
326    }
327}
328
329// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
330// guarantees the pointee outlives any use across threads.
331unsafe impl<'payload> Send for Payload<'payload> {}
332
333/// Adapter that bridges [`Payload`] through the opaque field contract.
334// r[impl zerocopy.framing.value.opaque]
335pub struct PayloadAdapter;
336
337impl FacetOpaqueAdapter for PayloadAdapter {
338    type Error = String;
339    type SendValue<'a> = Payload<'a>;
340    type RecvValue<'de> = Payload<'de>;
341
342    fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
343        match value {
344            Payload::Value { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
345            Payload::PostcardBytes(bytes) => opaque_encoded_borrowed(bytes),
346        }
347    }
348
349    fn deserialize_build<'de>(
350        input: OpaqueDeserialize<'de>,
351    ) -> Result<Self::RecvValue<'de>, Self::Error> {
352        match input {
353            OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::PostcardBytes(bytes)),
354            OpaqueDeserialize::Owned(_) => {
355                Err("payload bytes must be borrowed from backing, not owned".into())
356            }
357        }
358    }
359}
360
361/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
362pub struct MessageFamily;
363
364impl crate::MsgFamily for MessageFamily {
365    type Msg<'a> = Message<'a>;
366}
367
368// SAFETY: all types below are covariant in their lifetime parameter
369// (they contain only Cow<'a, str>, Vec<MetadataEntry<'a>>, etc.).
370crate::impl_reborrow!(
371    Message,
372    RequestMessage,
373    RequestCall,
374    RequestResponse,
375    ConnectionOpen,
376    ConnectionAccept,
377    ConnectionReject,
378    ConnectionClose,
379    ChannelMessage,
380    ChannelItem,
381    ChannelClose,
382    ChannelReset,
383);