Skip to main content

vox_types/
message.rs

1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{CborPayload, ChannelId, ConnectionId, Metadata, MethodId, RequestId};
8use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
9use vox_schema::opaque_encoded_borrowed;
10
11/// Per-connection limits advertised by a peer.
12// r[impl session.connection-settings]
13// r[impl session.parity]
14// r[impl connection.parity]
15// r[impl rpc.flow-control]
16// r[impl rpc.flow-control.max-concurrent-requests]
17// r[impl rpc.flow-control.max-concurrent-requests.default]
18#[derive(Debug, Clone, PartialEq, Eq, Facet)]
19pub struct ConnectionSettings {
20    /// Whether this peer will use odd or even IDs for requests and channels on this connection.
21    pub parity: Parity,
22    /// Maximum number of in-flight requests this peer is willing to accept on this connection.
23    pub max_concurrent_requests: u32,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SessionResumeKey(pub [u8; 16]);
28
29impl<'payload> Message<'payload> {
30    // Message has no methods on purpose. it's all just plain data.
31    // Adding constructors or getters is forbidden.
32}
33
34/// Whether a peer will use odd or even IDs for requests and channels
35/// on a given connection.
36// r[impl session.parity]
37// r[impl session.role]
38// r[impl connection.parity]
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
40#[repr(u8)]
41pub enum Parity {
42    Odd,
43    Even,
44}
45
46impl Parity {
47    /// Returns the opposite parity.
48    pub fn other(self) -> Self {
49        match self {
50            Parity::Odd => Parity::Even,
51            Parity::Even => Parity::Odd,
52        }
53    }
54}
55
56structstruck::strike! {
57    /// Protocol message.
58    // r[impl session]
59    // r[impl session.message]
60    // r[impl session.message.connection-id]
61    // r[impl session.peer]
62    // r[impl session.symmetry]
63    #[structstruck::each[derive(Debug, Facet)]]
64    pub struct Message<'payload> {
65        /// Connection ID: 0 for control messages (ProtocolError, Ping, Pong)
66        pub connection_id: ConnectionId,
67
68        /// Message payload
69        pub payload:
70            #[repr(u8)]
71            // r[impl session.message.payloads]
72            pub enum MessagePayload<'payload> {
73                // ========================================================================
74                // Control (conn 0 only)
75                // ========================================================================
76
77                /// Sent by either peer when the counterpart has violated the protocol.
78                /// The sender closes the transport immediately after sending this message.
79                /// No reply is expected or valid.
80                // r[impl session.protocol-error]
81                ProtocolError(pub struct ProtocolError<'payload> {
82                    /// Human-readable description of the protocol violation.
83                    pub description: &'payload str,
84                }),
85
86                // ========================================================================
87                // Connection control
88                // ========================================================================
89
90                /// Request a new virtual connection. This is sent on the desired connection
91                /// ID, even though it doesn't exist yet.
92                // r[impl connection.open]
93                // r[impl connection.virtual]
94                // r[impl session.connection-settings.open]
95                ConnectionOpen(pub struct ConnectionOpen<'payload> {
96                    /// Connection limits advertised by the opener.
97                    /// Parity is included in ConnectionSettings.
98                    pub connection_settings: ConnectionSettings,
99
100                    /// Metadata associated with the connection.
101                    pub metadata: Metadata<'payload>,
102                }),
103
104                /// Accept a virtual connection request — sent on the connection ID requested.
105                // r[impl session.connection-settings.open]
106                ConnectionAccept(pub struct ConnectionAccept<'payload> {
107                    /// Connection limits advertised by the accepter.
108                    pub connection_settings: ConnectionSettings,
109
110                    /// Metadata associated with the connection.
111                    pub metadata: Metadata<'payload>,
112                }),
113
114                /// Reject a virtual connection request — sent on the connection ID requested.
115                ConnectionReject(pub struct ConnectionReject<'payload> {
116                    /// Metadata associated with the rejection.
117                    pub metadata: Metadata<'payload>,
118                }),
119
120                /// Close a virtual connection. Trying to close conn 0 is a protocol error.
121                ConnectionClose(pub struct ConnectionClose<'payload> {
122                    /// Metadata associated with the close.
123                    pub metadata: Metadata<'payload>,
124                }),
125
126
127                // ========================================================================
128                // RPC
129                // ========================================================================
130
131                RequestMessage(
132                    pub struct RequestMessage<'payload> {
133                        /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
134                        pub id: RequestId,
135
136                        /// Request paylaod
137                        pub body:
138                            #[repr(u8)]
139                            pub enum RequestBody<'payload> {
140                                /// Perform a request (or a "call")
141                                Call(pub struct RequestCall<'payload> {
142                                    /// Unique method identifier, hash of fully qualified name + args etc.
143                                    pub method_id: MethodId,
144
145                                    /// Metadata associated with this call
146                                    pub metadata: Metadata<'payload>,
147
148                                    /// Argument tuple
149                                    pub args: Payload<'payload>,
150
151                                    /// CBOR-encoded schemas for this call's args tuple
152                                    /// Non-empty on the first call for each method on a connection.
153                                    pub schemas: CborPayload,
154                                }),
155
156                                /// Respond to a request
157                                Response(struct RequestResponse<'payload> {
158                                    /// Arbitrary response metadata
159                                    pub metadata: Metadata<'payload>,
160
161                                    /// Return value (`Result<T, VoxError<E>>`, where E could be Infallible depending on signature)
162                                    pub ret: Payload<'payload>,
163
164                                    /// CBOR-encoded schemas for this response's return type.
165                                    /// Non-empty on the first response for each method on a connection.
166                                    pub schemas: CborPayload,
167                                }),
168
169                                /// Cancel processing of a request.
170                                Cancel(struct RequestCancel<'payload> {
171                                    /// Arbitrary cancel metadata
172                                    pub metadata: Metadata<'payload>,
173                                }),
174                            },
175                    }
176                ),
177
178                // ========================================================================
179                // Channels
180                // ========================================================================
181
182                ChannelMessage(
183                    pub struct ChannelMessage<'payload> {
184                        /// Channel ID (unique per-connection)
185                        pub id: ChannelId,
186
187                        /// Channel message body
188                        pub body:
189                            #[repr(u8)]
190                            pub enum ChannelBody<'payload> {
191                                /// Send an item on a channel. Channels are not "opened", they are created
192                                /// implicitly by calls.
193                                Item(pub struct ChannelItem<'payload> {
194                                    /// The item itself
195                                    pub item: Payload<'payload>,
196                                }),
197
198                                /// Close a channel — sent by the sender of the channel when they're gracefully done
199                                /// with a channel.
200                                Close(pub struct ChannelClose<'payload> {
201                                    /// Metadata associated with closing the channel.
202                                    pub metadata: Metadata<'payload>,
203                                }),
204
205                                /// Reset a channel — sent by the receiver of a channel when they would like the sender
206                                /// to please, stop sending items through.
207                                Reset(pub struct ChannelReset<'payload> {
208                                    /// Metadata associated with resetting the channel.
209                                    pub metadata: Metadata<'payload>,
210                                }),
211
212                                /// Grant additional send credit to a channel sender.
213                                // r[impl rpc.flow-control.credit.grant]
214                                GrantCredit(pub struct ChannelGrantCredit {
215                                    /// Number of additional items the sender may send.
216                                    pub additional: u32,
217                                }),
218                            },
219                    }
220                ),
221
222                // ========================================================================
223                // Keepalive
224                // ========================================================================
225
226                /// Liveness probe for dead-peer detection.
227                Ping(pub struct Ping {
228                    /// Opaque nonce echoed by the Pong response.
229                    pub nonce: u64,
230                }),
231
232                /// Reply to a keepalive Ping.
233                Pong(pub struct Pong {
234                    /// Echo of the received ping nonce.
235                    pub nonce: u64,
236                }),
237
238            },
239    }
240
241}
242
243/// A payload — arguments for a request, or return type for a response.
244///
245/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
246/// serialization/deserialization through the adapter contract:
247/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
248/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
249// r[impl zerocopy.payload]
250#[derive(Debug, Facet)]
251#[repr(u8)]
252#[facet(opaque = PayloadAdapter, traits(Debug))]
253pub enum Payload<'payload> {
254    // r[impl zerocopy.payload.borrowed]
255    /// Type-erased pointer to caller-owned memory + its Shape.
256    Value {
257        ptr: PtrConst,
258        shape: &'static Shape,
259        _lt: PhantomData<&'payload ()>,
260    },
261
262    // r[impl zerocopy.payload.bytes]
263    /// Raw bytes borrowed from the backing (zero-copy).
264    PostcardBytes(&'payload [u8]),
265}
266
267impl<'payload> Payload<'payload> {
268    /// Construct an outgoing borrowed payload from a concrete value.
269    pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
270        unsafe {
271            Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
272        }
273    }
274
275    /// Construct an outgoing owned payload from a raw pointer + shape.
276    ///
277    /// # Safety
278    ///
279    /// The pointed value must remain alive until serialization has completed.
280    pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
281        Self::Value {
282            ptr,
283            shape,
284            _lt: PhantomData,
285        }
286    }
287
288    /// Create a new `Payload` that borrows the same data with a shorter lifetime.
289    ///
290    /// For `Outgoing`: same ptr/shape, new lifetime.
291    /// For `Incoming`: reborrows the byte slice.
292    pub fn reborrow(&self) -> Payload<'_> {
293        match self {
294            Payload::Value { ptr, shape, .. } => Payload::Value {
295                ptr: *ptr,
296                shape,
297                _lt: PhantomData,
298            },
299            Payload::PostcardBytes(bytes) => Payload::PostcardBytes(bytes),
300        }
301    }
302}
303
304// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
305// guarantees the pointee outlives any use across threads.
306unsafe impl<'payload> Send for Payload<'payload> {}
307
308/// Adapter that bridges [`Payload`] through the opaque field contract.
309// r[impl zerocopy.framing.value.opaque]
310pub struct PayloadAdapter;
311
312impl FacetOpaqueAdapter for PayloadAdapter {
313    type Error = String;
314    type SendValue<'a> = Payload<'a>;
315    type RecvValue<'de> = Payload<'de>;
316
317    fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
318        match value {
319            Payload::Value { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
320            Payload::PostcardBytes(bytes) => opaque_encoded_borrowed(bytes),
321        }
322    }
323
324    fn deserialize_build<'de>(
325        input: OpaqueDeserialize<'de>,
326    ) -> Result<Self::RecvValue<'de>, Self::Error> {
327        match input {
328            OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::PostcardBytes(bytes)),
329            OpaqueDeserialize::Owned(_) => {
330                Err("payload bytes must be borrowed from backing, not owned".into())
331            }
332        }
333    }
334}
335
336/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
337pub struct MessageFamily;
338
339impl crate::MsgFamily for MessageFamily {
340    type Msg<'a> = Message<'a>;
341}