vox_types/message.rs
1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{CborPayload, ChannelId, ConnectionId, Metadata, MethodId, RequestId};
8use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
9use vox_schema::opaque_encoded_borrowed;
10
11/// Per-connection limits advertised by a peer.
12// r[impl session.connection-settings]
13// r[impl session.parity]
14// r[impl connection.parity]
15// r[impl rpc.flow-control]
16// r[impl rpc.flow-control.max-concurrent-requests]
17// r[impl rpc.flow-control.max-concurrent-requests.default]
18#[derive(Debug, Clone, PartialEq, Eq, Facet)]
19pub struct ConnectionSettings {
20 /// Whether this peer will use odd or even IDs for requests and channels on this connection.
21 pub parity: Parity,
22 /// Maximum number of in-flight requests this peer is willing to accept on this connection.
23 pub max_concurrent_requests: u32,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SessionResumeKey(pub [u8; 16]);
28
29impl<'payload> Message<'payload> {
30 // Message has no methods on purpose. it's all just plain data.
31 // Adding constructors or getters is forbidden.
32}
33
34/// Whether a peer will use odd or even IDs for requests and channels
35/// on a given connection.
36// r[impl session.parity]
37// r[impl session.role]
38// r[impl connection.parity]
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
40#[repr(u8)]
41pub enum Parity {
42 Odd,
43 Even,
44}
45
46impl Parity {
47 /// Returns the opposite parity.
48 pub fn other(self) -> Self {
49 match self {
50 Parity::Odd => Parity::Even,
51 Parity::Even => Parity::Odd,
52 }
53 }
54}
55
56structstruck::strike! {
57 /// Protocol message.
58 // r[impl session]
59 // r[impl session.message]
60 // r[impl session.message.connection-id]
61 // r[impl session.peer]
62 // r[impl session.symmetry]
63 #[structstruck::each[derive(Debug, Facet)]]
64 pub struct Message<'payload> {
65 /// Connection ID: 0 for control messages (ProtocolError, Ping, Pong)
66 pub connection_id: ConnectionId,
67
68 /// Message payload
69 pub payload:
70 #[repr(u8)]
71 // r[impl session.message.payloads]
72 pub enum MessagePayload<'payload> {
73 // ========================================================================
74 // Control (conn 0 only)
75 // ========================================================================
76
77 /// Sent by either peer when the counterpart has violated the protocol.
78 /// The sender closes the transport immediately after sending this message.
79 /// No reply is expected or valid.
80 // r[impl session.protocol-error]
81 ProtocolError(pub struct ProtocolError<'payload> {
82 /// Human-readable description of the protocol violation.
83 pub description: &'payload str,
84 }),
85
86 // ========================================================================
87 // Connection control
88 // ========================================================================
89
90 /// Request a new virtual connection. This is sent on the desired connection
91 /// ID, even though it doesn't exist yet.
92 // r[impl connection.open]
93 // r[impl connection.virtual]
94 // r[impl session.connection-settings.open]
95 ConnectionOpen(pub struct ConnectionOpen<'payload> {
96 /// Connection limits advertised by the opener.
97 /// Parity is included in ConnectionSettings.
98 pub connection_settings: ConnectionSettings,
99
100 /// Metadata associated with the connection.
101 pub metadata: Metadata<'payload>,
102 }),
103
104 /// Accept a virtual connection request — sent on the connection ID requested.
105 // r[impl session.connection-settings.open]
106 ConnectionAccept(pub struct ConnectionAccept<'payload> {
107 /// Connection limits advertised by the accepter.
108 pub connection_settings: ConnectionSettings,
109
110 /// Metadata associated with the connection.
111 pub metadata: Metadata<'payload>,
112 }),
113
114 /// Reject a virtual connection request — sent on the connection ID requested.
115 ConnectionReject(pub struct ConnectionReject<'payload> {
116 /// Metadata associated with the rejection.
117 pub metadata: Metadata<'payload>,
118 }),
119
120 /// Close a virtual connection. Trying to close conn 0 is a protocol error.
121 ConnectionClose(pub struct ConnectionClose<'payload> {
122 /// Metadata associated with the close.
123 pub metadata: Metadata<'payload>,
124 }),
125
126
127 // ========================================================================
128 // RPC
129 // ========================================================================
130
131 RequestMessage(
132 pub struct RequestMessage<'payload> {
133 /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
134 pub id: RequestId,
135
136 /// Request paylaod
137 pub body:
138 #[repr(u8)]
139 pub enum RequestBody<'payload> {
140 /// Perform a request (or a "call")
141 Call(pub struct RequestCall<'payload> {
142 /// Unique method identifier, hash of fully qualified name + args etc.
143 pub method_id: MethodId,
144
145 /// Metadata associated with this call
146 pub metadata: Metadata<'payload>,
147
148 /// Argument tuple
149 pub args: Payload<'payload>,
150
151 /// CBOR-encoded schemas for this call's args tuple
152 /// Non-empty on the first call for each method on a connection.
153 pub schemas: CborPayload,
154 }),
155
156 /// Respond to a request
157 Response(struct RequestResponse<'payload> {
158 /// Arbitrary response metadata
159 pub metadata: Metadata<'payload>,
160
161 /// Return value (`Result<T, VoxError<E>>`, where E could be Infallible depending on signature)
162 pub ret: Payload<'payload>,
163
164 /// CBOR-encoded schemas for this response's return type.
165 /// Non-empty on the first response for each method on a connection.
166 pub schemas: CborPayload,
167 }),
168
169 /// Cancel processing of a request.
170 Cancel(struct RequestCancel<'payload> {
171 /// Arbitrary cancel metadata
172 pub metadata: Metadata<'payload>,
173 }),
174 },
175 }
176 ),
177
178 // ========================================================================
179 // Channels
180 // ========================================================================
181
182 ChannelMessage(
183 pub struct ChannelMessage<'payload> {
184 /// Channel ID (unique per-connection)
185 pub id: ChannelId,
186
187 /// Channel message body
188 pub body:
189 #[repr(u8)]
190 pub enum ChannelBody<'payload> {
191 /// Send an item on a channel. Channels are not "opened", they are created
192 /// implicitly by calls.
193 Item(pub struct ChannelItem<'payload> {
194 /// The item itself
195 pub item: Payload<'payload>,
196 }),
197
198 /// Close a channel — sent by the sender of the channel when they're gracefully done
199 /// with a channel.
200 Close(pub struct ChannelClose<'payload> {
201 /// Metadata associated with closing the channel.
202 pub metadata: Metadata<'payload>,
203 }),
204
205 /// Reset a channel — sent by the receiver of a channel when they would like the sender
206 /// to please, stop sending items through.
207 Reset(pub struct ChannelReset<'payload> {
208 /// Metadata associated with resetting the channel.
209 pub metadata: Metadata<'payload>,
210 }),
211
212 /// Grant additional send credit to a channel sender.
213 // r[impl rpc.flow-control.credit.grant]
214 GrantCredit(pub struct ChannelGrantCredit {
215 /// Number of additional items the sender may send.
216 pub additional: u32,
217 }),
218 },
219 }
220 ),
221
222 // ========================================================================
223 // Keepalive
224 // ========================================================================
225
226 /// Liveness probe for dead-peer detection.
227 Ping(pub struct Ping {
228 /// Opaque nonce echoed by the Pong response.
229 pub nonce: u64,
230 }),
231
232 /// Reply to a keepalive Ping.
233 Pong(pub struct Pong {
234 /// Echo of the received ping nonce.
235 pub nonce: u64,
236 }),
237
238 },
239 }
240
241}
242
243/// A payload — arguments for a request, or return type for a response.
244///
245/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
246/// serialization/deserialization through the adapter contract:
247/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
248/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
249// r[impl zerocopy.payload]
250#[derive(Debug, Facet)]
251#[repr(u8)]
252#[facet(opaque = PayloadAdapter, traits(Debug))]
253pub enum Payload<'payload> {
254 // r[impl zerocopy.payload.borrowed]
255 /// Type-erased pointer to caller-owned memory + its Shape.
256 Value {
257 ptr: PtrConst,
258 shape: &'static Shape,
259 _lt: PhantomData<&'payload ()>,
260 },
261
262 // r[impl zerocopy.payload.bytes]
263 /// Raw bytes borrowed from the backing (zero-copy).
264 PostcardBytes(&'payload [u8]),
265}
266
267impl<'payload> Payload<'payload> {
268 /// Construct an outgoing borrowed payload from a concrete value.
269 pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
270 unsafe {
271 Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
272 }
273 }
274
275 /// Construct an outgoing owned payload from a raw pointer + shape.
276 ///
277 /// # Safety
278 ///
279 /// The pointed value must remain alive until serialization has completed.
280 pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
281 Self::Value {
282 ptr,
283 shape,
284 _lt: PhantomData,
285 }
286 }
287
288 /// Create a new `Payload` that borrows the same data with a shorter lifetime.
289 ///
290 /// For `Outgoing`: same ptr/shape, new lifetime.
291 /// For `Incoming`: reborrows the byte slice.
292 pub fn reborrow(&self) -> Payload<'_> {
293 match self {
294 Payload::Value { ptr, shape, .. } => Payload::Value {
295 ptr: *ptr,
296 shape,
297 _lt: PhantomData,
298 },
299 Payload::PostcardBytes(bytes) => Payload::PostcardBytes(bytes),
300 }
301 }
302}
303
304// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
305// guarantees the pointee outlives any use across threads.
306unsafe impl<'payload> Send for Payload<'payload> {}
307
308/// Adapter that bridges [`Payload`] through the opaque field contract.
309// r[impl zerocopy.framing.value.opaque]
310pub struct PayloadAdapter;
311
312impl FacetOpaqueAdapter for PayloadAdapter {
313 type Error = String;
314 type SendValue<'a> = Payload<'a>;
315 type RecvValue<'de> = Payload<'de>;
316
317 fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
318 match value {
319 Payload::Value { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
320 Payload::PostcardBytes(bytes) => opaque_encoded_borrowed(bytes),
321 }
322 }
323
324 fn deserialize_build<'de>(
325 input: OpaqueDeserialize<'de>,
326 ) -> Result<Self::RecvValue<'de>, Self::Error> {
327 match input {
328 OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::PostcardBytes(bytes)),
329 OpaqueDeserialize::Owned(_) => {
330 Err("payload bytes must be borrowed from backing, not owned".into())
331 }
332 }
333 }
334}
335
336/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
337pub struct MessageFamily;
338
339impl crate::MsgFamily for MessageFamily {
340 type Msg<'a> = Message<'a>;
341}