Skip to main content

vox_types/
message.rs

1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{
8    BindingDirection, CborPayload, ChannelId, ConnectionId, Metadata, MethodId, RequestId,
9};
10use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
11use vox_schema::opaque_encoded_borrowed;
12
13/// Per-connection limits advertised by a peer.
14// r[impl session.connection-settings]
15// r[impl session.parity]
16// r[impl connection.parity]
17// r[impl rpc.flow-control]
18// r[impl rpc.flow-control.max-concurrent-requests]
19// r[impl rpc.flow-control.max-concurrent-requests.default]
20#[derive(Debug, Clone, PartialEq, Eq, Facet)]
21pub struct ConnectionSettings {
22    /// Whether this peer will use odd or even IDs for requests and channels on this connection.
23    pub parity: Parity,
24    /// Maximum number of in-flight requests this peer is willing to accept on this connection.
25    pub max_concurrent_requests: u32,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub struct SessionResumeKey(pub [u8; 16]);
30
31impl<'payload> Message<'payload> {
32    // Message has no methods on purpose. it's all just plain data.
33    // Adding constructors or getters is forbidden.
34}
35
36/// Whether a peer will use odd or even IDs for requests and channels
37/// on a given connection.
38// r[impl session.parity]
39// r[impl session.role]
40// r[impl connection.parity]
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
42#[repr(u8)]
43pub enum Parity {
44    Odd,
45    Even,
46}
47
48impl Parity {
49    /// Returns the opposite parity.
50    pub fn other(self) -> Self {
51        match self {
52            Parity::Odd => Parity::Even,
53            Parity::Even => Parity::Odd,
54        }
55    }
56}
57
58structstruck::strike! {
59    /// Protocol message.
60    // r[impl session]
61    // r[impl session.message]
62    // r[impl session.message.connection-id]
63    // r[impl session.peer]
64    // r[impl session.symmetry]
65    #[structstruck::each[derive(Debug, Facet)]]
66    pub struct Message<'payload> {
67        /// Connection ID: 0 for control messages (ProtocolError, Ping, Pong)
68        pub connection_id: ConnectionId,
69
70        /// Message payload
71        pub payload:
72            #[repr(u8)]
73            // r[impl session.message.payloads]
74            pub enum MessagePayload<'payload> {
75                // ========================================================================
76                // Control (conn 0 only)
77                // ========================================================================
78
79                /// Sent by either peer when the counterpart has violated the protocol.
80                /// The sender closes the transport immediately after sending this message.
81                /// No reply is expected or valid.
82                // r[impl session.protocol-error]
83                ProtocolError(pub struct ProtocolError<'payload> {
84                    /// Human-readable description of the protocol violation.
85                    pub description: &'payload str,
86                }),
87
88                // ========================================================================
89                // Connection control
90                // ========================================================================
91
92                /// Request a new virtual connection. This is sent on the desired connection
93                /// ID, even though it doesn't exist yet.
94                // r[impl connection.open]
95                // r[impl connection.virtual]
96                // r[impl session.connection-settings.open]
97                ConnectionOpen(pub struct ConnectionOpen<'payload> {
98                    /// Connection limits advertised by the opener.
99                    /// Parity is included in ConnectionSettings.
100                    pub connection_settings: ConnectionSettings,
101
102                    /// Metadata associated with the connection.
103                    pub metadata: Metadata<'payload>,
104                }),
105
106                /// Accept a virtual connection request — sent on the connection ID requested.
107                // r[impl session.connection-settings.open]
108                ConnectionAccept(pub struct ConnectionAccept<'payload> {
109                    /// Connection limits advertised by the accepter.
110                    pub connection_settings: ConnectionSettings,
111
112                    /// Metadata associated with the connection.
113                    pub metadata: Metadata<'payload>,
114                }),
115
116                /// Reject a virtual connection request — sent on the connection ID requested.
117                ConnectionReject(pub struct ConnectionReject<'payload> {
118                    /// Metadata associated with the rejection.
119                    pub metadata: Metadata<'payload>,
120                }),
121
122                /// Close a virtual connection. Trying to close conn 0 is a protocol error.
123                ConnectionClose(pub struct ConnectionClose<'payload> {
124                    /// Metadata associated with the close.
125                    pub metadata: Metadata<'payload>,
126                }),
127
128
129                // ========================================================================
130                // RPC
131                // ========================================================================
132
133                RequestMessage(
134                    pub struct RequestMessage<'payload> {
135                        /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
136                        pub id: RequestId,
137
138                        /// Request paylaod
139                        pub body:
140                            #[repr(u8)]
141                            pub enum RequestBody<'payload> {
142                                /// Perform a request (or a "call")
143                                Call(pub struct RequestCall<'payload> {
144                                    /// Unique method identifier, hash of fully qualified name + args etc.
145                                    pub method_id: MethodId,
146
147                                    /// Metadata associated with this call
148                                    pub metadata: Metadata<'payload>,
149
150                                    /// Argument tuple
151                                    pub args: Payload<'payload>,
152
153                                    /// CBOR-encoded schemas for this call's args tuple
154                                    /// Non-empty on the first call for each method on a connection.
155                                    pub schemas: CborPayload,
156                                }),
157
158                                /// Respond to a request
159                                Response(struct RequestResponse<'payload> {
160                                    /// Arbitrary response metadata
161                                    pub metadata: Metadata<'payload>,
162
163                                    /// Return value (`Result<T, VoxError<E>>`, where E could be Infallible depending on signature)
164                                    pub ret: Payload<'payload>,
165
166                                    /// CBOR-encoded schemas for this response's return type.
167                                    /// Non-empty on the first response for each method on a connection.
168                                    pub schemas: CborPayload,
169                                }),
170
171                                /// Cancel processing of a request.
172                                Cancel(struct RequestCancel<'payload> {
173                                    /// Arbitrary cancel metadata
174                                    pub metadata: Metadata<'payload>,
175                                }),
176                            },
177                    }
178                ),
179
180                /// Advertise schemas for a method binding on this connection.
181                ///
182                /// This is sent ahead of payload-bearing messages so a batch can
183                /// establish all required schema bindings before their first use.
184                SchemaMessage(pub struct SchemaMessage {
185                    /// Unique method identifier the binding applies to.
186                    pub method_id: MethodId,
187
188                    /// Whether the binding applies to request args or responses.
189                    pub direction: BindingDirection,
190
191                    /// CBOR-encoded schema payload for this binding.
192                    pub schemas: CborPayload,
193                }),
194
195                // ========================================================================
196                // Channels
197                // ========================================================================
198
199                ChannelMessage(
200                    pub struct ChannelMessage<'payload> {
201                        /// Channel ID (unique per-connection)
202                        pub id: ChannelId,
203
204                        /// Channel message body
205                        pub body:
206                            #[repr(u8)]
207                            pub enum ChannelBody<'payload> {
208                                /// Send an item on a channel. Channels are not "opened", they are created
209                                /// implicitly by calls.
210                                Item(pub struct ChannelItem<'payload> {
211                                    /// The item itself
212                                    pub item: Payload<'payload>,
213                                }),
214
215                                /// Close a channel — sent by the sender of the channel when they're gracefully done
216                                /// with a channel.
217                                Close(pub struct ChannelClose<'payload> {
218                                    /// Metadata associated with closing the channel.
219                                    pub metadata: Metadata<'payload>,
220                                }),
221
222                                /// Reset a channel — sent by the receiver of a channel when they would like the sender
223                                /// to please, stop sending items through.
224                                Reset(pub struct ChannelReset<'payload> {
225                                    /// Metadata associated with resetting the channel.
226                                    pub metadata: Metadata<'payload>,
227                                }),
228
229                                /// Grant additional send credit to a channel sender.
230                                // r[impl rpc.flow-control.credit.grant]
231                                GrantCredit(pub struct ChannelGrantCredit {
232                                    /// Number of additional items the sender may send.
233                                    pub additional: u32,
234                                }),
235                            },
236                    }
237                ),
238
239                // ========================================================================
240                // Keepalive
241                // ========================================================================
242
243                /// Liveness probe for dead-peer detection.
244                Ping(pub struct Ping {
245                    /// Opaque nonce echoed by the Pong response.
246                    pub nonce: u64,
247                }),
248
249                /// Reply to a keepalive Ping.
250                Pong(pub struct Pong {
251                    /// Echo of the received ping nonce.
252                    pub nonce: u64,
253                }),
254
255            },
256    }
257
258}
259
260/// A payload — arguments for a request, or return type for a response.
261///
262/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
263/// serialization/deserialization through the adapter contract:
264/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
265/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
266// r[impl zerocopy.payload]
267#[derive(Debug, Facet)]
268#[repr(u8)]
269#[facet(opaque = PayloadAdapter, traits(Debug))]
270pub enum Payload<'payload> {
271    // r[impl zerocopy.payload.borrowed]
272    /// Type-erased pointer to caller-owned memory + its Shape.
273    Value {
274        ptr: PtrConst,
275        shape: &'static Shape,
276        _lt: PhantomData<&'payload ()>,
277    },
278
279    // r[impl zerocopy.payload.bytes]
280    /// Raw bytes borrowed from the backing (zero-copy).
281    PostcardBytes(&'payload [u8]),
282}
283
284impl<'payload> Payload<'payload> {
285    /// Construct an outgoing borrowed payload from a concrete value.
286    pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
287        unsafe {
288            Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
289        }
290    }
291
292    /// Construct an outgoing owned payload from a raw pointer + shape.
293    ///
294    /// # Safety
295    ///
296    /// The pointed value must remain alive until serialization has completed.
297    pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
298        Self::Value {
299            ptr,
300            shape,
301            _lt: PhantomData,
302        }
303    }
304
305    /// Create a new `Payload` that borrows the same data with a shorter lifetime.
306    ///
307    /// For `Outgoing`: same ptr/shape, new lifetime.
308    /// For `Incoming`: reborrows the byte slice.
309    pub fn reborrow(&self) -> Payload<'_> {
310        match self {
311            Payload::Value { ptr, shape, .. } => Payload::Value {
312                ptr: *ptr,
313                shape,
314                _lt: PhantomData,
315            },
316            Payload::PostcardBytes(bytes) => Payload::PostcardBytes(bytes),
317        }
318    }
319}
320
321// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
322// guarantees the pointee outlives any use across threads.
323unsafe impl<'payload> Send for Payload<'payload> {}
324
325/// Adapter that bridges [`Payload`] through the opaque field contract.
326// r[impl zerocopy.framing.value.opaque]
327pub struct PayloadAdapter;
328
329impl FacetOpaqueAdapter for PayloadAdapter {
330    type Error = String;
331    type SendValue<'a> = Payload<'a>;
332    type RecvValue<'de> = Payload<'de>;
333
334    fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
335        match value {
336            Payload::Value { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
337            Payload::PostcardBytes(bytes) => opaque_encoded_borrowed(bytes),
338        }
339    }
340
341    fn deserialize_build<'de>(
342        input: OpaqueDeserialize<'de>,
343    ) -> Result<Self::RecvValue<'de>, Self::Error> {
344        match input {
345            OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::PostcardBytes(bytes)),
346            OpaqueDeserialize::Owned(_) => {
347                Err("payload bytes must be borrowed from backing, not owned".into())
348            }
349        }
350    }
351}
352
353/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
354pub struct MessageFamily;
355
356impl crate::MsgFamily for MessageFamily {
357    type Msg<'a> = Message<'a>;
358}
359
360// SAFETY: all types below are covariant in their lifetime parameter
361// (they contain only Cow<'a, str>, Vec<MetadataEntry<'a>>, etc.).
362crate::impl_reborrow!(
363    Message,
364    RequestMessage,
365    RequestCall,
366    RequestResponse,
367    ConnectionOpen,
368    ConnectionAccept,
369    ConnectionReject,
370    ConnectionClose,
371    ChannelMessage,
372    ChannelItem,
373    ChannelClose,
374    ChannelReset,
375);