vox_types/message.rs
1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{
8 BindingDirection, CborPayload, ChannelId, ConnectionId, Metadata, MethodId, RequestId,
9};
10use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
11use vox_schema::opaque_encoded_borrowed;
12
13/// Default per-channel initial credit and inbound queue capacity.
14// r[impl rpc.flow-control.credit.initial]
15pub const DEFAULT_INITIAL_CHANNEL_CREDIT: u32 = 16;
16
17/// Per-connection limits advertised by a peer.
18// r[impl session.connection-settings]
19// r[impl session.parity]
20// r[impl connection.parity]
21// r[impl rpc.flow-control]
22// r[impl rpc.flow-control.max-concurrent-requests]
23// r[impl rpc.flow-control.max-concurrent-requests.default]
24// r[impl rpc.flow-control.credit.initial]
25#[derive(Debug, Clone, PartialEq, Eq, Facet)]
26pub struct ConnectionSettings {
27 /// Whether this peer will use odd or even IDs for requests and channels on this connection.
28 pub parity: Parity,
29 /// Maximum number of in-flight requests this peer is willing to accept on this connection.
30 pub max_concurrent_requests: u32,
31 /// Initial per-channel credit this peer grants for channels it receives.
32 #[facet(default = DEFAULT_INITIAL_CHANNEL_CREDIT)]
33 pub initial_channel_credit: u32,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub struct SessionResumeKey(pub [u8; 16]);
38
39impl<'payload> Message<'payload> {
40 // Message has no methods on purpose. it's all just plain data.
41 // Adding constructors or getters is forbidden.
42}
43
44/// Whether a peer will use odd or even IDs for requests and channels
45/// on a given connection.
46// r[impl session.parity]
47// r[impl session.role]
48// r[impl connection.parity]
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
50#[repr(u8)]
51pub enum Parity {
52 Odd,
53 Even,
54}
55
56impl Parity {
57 /// Returns the opposite parity.
58 pub fn other(self) -> Self {
59 match self {
60 Parity::Odd => Parity::Even,
61 Parity::Even => Parity::Odd,
62 }
63 }
64}
65
66structstruck::strike! {
67 /// Protocol message.
68 // r[impl session]
69 // r[impl session.message]
70 // r[impl session.message.connection-id]
71 // r[impl session.peer]
72 // r[impl session.symmetry]
73 #[structstruck::each[derive(Debug, Facet)]]
74 pub struct Message<'payload> {
75 /// Connection ID: 0 for control messages (ProtocolError, Ping, Pong)
76 pub connection_id: ConnectionId,
77
78 /// Message payload
79 pub payload:
80 #[repr(u8)]
81 // r[impl session.message.payloads]
82 pub enum MessagePayload<'payload> {
83 // ========================================================================
84 // Control (conn 0 only)
85 // ========================================================================
86
87 /// Sent by either peer when the counterpart has violated the protocol.
88 /// The sender closes the transport immediately after sending this message.
89 /// No reply is expected or valid.
90 // r[impl session.protocol-error]
91 ProtocolError(pub struct ProtocolError<'payload> {
92 /// Human-readable description of the protocol violation.
93 pub description: &'payload str,
94 }),
95
96 // ========================================================================
97 // Connection control
98 // ========================================================================
99
100 /// Request a new virtual connection. This is sent on the desired connection
101 /// ID, even though it doesn't exist yet.
102 // r[impl connection.open]
103 // r[impl connection.virtual]
104 // r[impl session.connection-settings.open]
105 ConnectionOpen(pub struct ConnectionOpen<'payload> {
106 /// Connection limits advertised by the opener.
107 /// Parity is included in ConnectionSettings.
108 pub connection_settings: ConnectionSettings,
109
110 /// Metadata associated with the connection.
111 pub metadata: Metadata<'payload>,
112 }),
113
114 /// Accept a virtual connection request — sent on the connection ID requested.
115 // r[impl session.connection-settings.open]
116 ConnectionAccept(pub struct ConnectionAccept<'payload> {
117 /// Connection limits advertised by the accepter.
118 pub connection_settings: ConnectionSettings,
119
120 /// Metadata associated with the connection.
121 pub metadata: Metadata<'payload>,
122 }),
123
124 /// Reject a virtual connection request — sent on the connection ID requested.
125 ConnectionReject(pub struct ConnectionReject<'payload> {
126 /// Metadata associated with the rejection.
127 pub metadata: Metadata<'payload>,
128 }),
129
130 /// Close a virtual connection. Trying to close conn 0 is a protocol error.
131 ConnectionClose(pub struct ConnectionClose<'payload> {
132 /// Metadata associated with the close.
133 pub metadata: Metadata<'payload>,
134 }),
135
136
137 // ========================================================================
138 // RPC
139 // ========================================================================
140
141 RequestMessage(
142 pub struct RequestMessage<'payload> {
143 /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
144 pub id: RequestId,
145
146 /// Request paylaod
147 pub body:
148 #[repr(u8)]
149 pub enum RequestBody<'payload> {
150 /// Perform a request (or a "call")
151 Call(pub struct RequestCall<'payload> {
152 /// Unique method identifier, hash of fully qualified name + args etc.
153 pub method_id: MethodId,
154
155 /// Metadata associated with this call
156 pub metadata: Metadata<'payload>,
157
158 /// Argument tuple
159 pub args: Payload<'payload>,
160
161 /// CBOR-encoded schemas for this call's args tuple
162 /// Non-empty on the first call for each method on a connection.
163 pub schemas: CborPayload,
164 }),
165
166 /// Respond to a request
167 Response(struct RequestResponse<'payload> {
168 /// Arbitrary response metadata
169 pub metadata: Metadata<'payload>,
170
171 /// Return value (`Result<T, VoxError<E>>`, where E could be Infallible depending on signature)
172 pub ret: Payload<'payload>,
173
174 /// CBOR-encoded schemas for this response's return type.
175 /// Non-empty on the first response for each method on a connection.
176 pub schemas: CborPayload,
177 }),
178
179 /// Cancel processing of a request.
180 Cancel(struct RequestCancel<'payload> {
181 /// Arbitrary cancel metadata
182 pub metadata: Metadata<'payload>,
183 }),
184 },
185 }
186 ),
187
188 /// Advertise schemas for a method binding on this connection.
189 ///
190 /// This is sent ahead of payload-bearing messages so a batch can
191 /// establish all required schema bindings before their first use.
192 SchemaMessage(pub struct SchemaMessage {
193 /// Unique method identifier the binding applies to.
194 pub method_id: MethodId,
195
196 /// Whether the binding applies to request args or responses.
197 pub direction: BindingDirection,
198
199 /// CBOR-encoded schema payload for this binding.
200 pub schemas: CborPayload,
201 }),
202
203 // ========================================================================
204 // Channels
205 // ========================================================================
206
207 ChannelMessage(
208 pub struct ChannelMessage<'payload> {
209 /// Channel ID (unique per-connection)
210 pub id: ChannelId,
211
212 /// Channel message body
213 pub body:
214 #[repr(u8)]
215 pub enum ChannelBody<'payload> {
216 /// Send an item on a channel. Channels are not "opened", they are created
217 /// implicitly by calls.
218 Item(pub struct ChannelItem<'payload> {
219 /// The item itself
220 pub item: Payload<'payload>,
221 }),
222
223 /// Close a channel — sent by the sender of the channel when they're gracefully done
224 /// with a channel.
225 Close(pub struct ChannelClose<'payload> {
226 /// Metadata associated with closing the channel.
227 pub metadata: Metadata<'payload>,
228 }),
229
230 /// Reset a channel — sent by the receiver of a channel when they would like the sender
231 /// to please, stop sending items through.
232 Reset(pub struct ChannelReset<'payload> {
233 /// Metadata associated with resetting the channel.
234 pub metadata: Metadata<'payload>,
235 }),
236
237 /// Grant additional send credit to a channel sender.
238 // r[impl rpc.flow-control.credit.grant]
239 GrantCredit(pub struct ChannelGrantCredit {
240 /// Number of additional items the sender may send.
241 pub additional: u32,
242 }),
243 },
244 }
245 ),
246
247 // ========================================================================
248 // Keepalive
249 // ========================================================================
250
251 /// Liveness probe for dead-peer detection.
252 Ping(pub struct Ping {
253 /// Opaque nonce echoed by the Pong response.
254 pub nonce: u64,
255 }),
256
257 /// Reply to a keepalive Ping.
258 Pong(pub struct Pong {
259 /// Echo of the received ping nonce.
260 pub nonce: u64,
261 }),
262
263 },
264 }
265
266}
267
268/// A payload — arguments for a request, or return type for a response.
269///
270/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
271/// serialization/deserialization through the adapter contract:
272/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
273/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
274// r[impl zerocopy.payload]
275#[derive(Debug, Facet)]
276#[repr(u8)]
277#[facet(opaque = PayloadAdapter, traits(Debug))]
278pub enum Payload<'payload> {
279 // r[impl zerocopy.payload.borrowed]
280 /// Type-erased pointer to caller-owned memory + its Shape.
281 Value {
282 ptr: PtrConst,
283 shape: &'static Shape,
284 _lt: PhantomData<&'payload ()>,
285 },
286
287 // r[impl zerocopy.payload.bytes]
288 /// Raw bytes borrowed from the backing (zero-copy).
289 PostcardBytes(&'payload [u8]),
290}
291
292impl<'payload> Payload<'payload> {
293 /// Construct an outgoing borrowed payload from a concrete value.
294 pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
295 unsafe {
296 Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
297 }
298 }
299
300 /// Construct an outgoing owned payload from a raw pointer + shape.
301 ///
302 /// # Safety
303 ///
304 /// The pointed value must remain alive until serialization has completed.
305 pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
306 Self::Value {
307 ptr,
308 shape,
309 _lt: PhantomData,
310 }
311 }
312
313 /// Create a new `Payload` that borrows the same data with a shorter lifetime.
314 ///
315 /// For `Outgoing`: same ptr/shape, new lifetime.
316 /// For `Incoming`: reborrows the byte slice.
317 pub fn reborrow(&self) -> Payload<'_> {
318 match self {
319 Payload::Value { ptr, shape, .. } => Payload::Value {
320 ptr: *ptr,
321 shape,
322 _lt: PhantomData,
323 },
324 Payload::PostcardBytes(bytes) => Payload::PostcardBytes(bytes),
325 }
326 }
327}
328
329// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
330// guarantees the pointee outlives any use across threads.
331unsafe impl<'payload> Send for Payload<'payload> {}
332
333/// Adapter that bridges [`Payload`] through the opaque field contract.
334// r[impl zerocopy.framing.value.opaque]
335pub struct PayloadAdapter;
336
337impl FacetOpaqueAdapter for PayloadAdapter {
338 type Error = String;
339 type SendValue<'a> = Payload<'a>;
340 type RecvValue<'de> = Payload<'de>;
341
342 fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
343 match value {
344 Payload::Value { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
345 Payload::PostcardBytes(bytes) => opaque_encoded_borrowed(bytes),
346 }
347 }
348
349 fn deserialize_build<'de>(
350 input: OpaqueDeserialize<'de>,
351 ) -> Result<Self::RecvValue<'de>, Self::Error> {
352 match input {
353 OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::PostcardBytes(bytes)),
354 OpaqueDeserialize::Owned(_) => {
355 Err("payload bytes must be borrowed from backing, not owned".into())
356 }
357 }
358 }
359}
360
361/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
362pub struct MessageFamily;
363
364impl crate::MsgFamily for MessageFamily {
365 type Msg<'a> = Message<'a>;
366}
367
368// SAFETY: all types below are covariant in their lifetime parameter
369// (they contain only Cow<'a, str>, Vec<MetadataEntry<'a>>, etc.).
370crate::impl_reborrow!(
371 Message,
372 RequestMessage,
373 RequestCall,
374 RequestResponse,
375 ConnectionOpen,
376 ConnectionAccept,
377 ConnectionReject,
378 ConnectionClose,
379 ChannelMessage,
380 ChannelItem,
381 ChannelClose,
382 ChannelReset,
383);