vox_types/message.rs
1//! Spec-level wire types.
2//!
3//! Canonical definitions live in `docs/content/spec/_index.md` and `docs/content/shm-spec/_index.md`.
4
5use std::marker::PhantomData;
6
7use crate::{
8 BindingDirection, CborPayload, ChannelId, ConnectionId, Metadata, MethodId, RequestId,
9};
10use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst, Shape};
11use vox_schema::opaque_encoded_borrowed;
12
13/// Per-connection limits advertised by a peer.
14// r[impl session.connection-settings]
15// r[impl session.parity]
16// r[impl connection.parity]
17// r[impl rpc.flow-control]
18// r[impl rpc.flow-control.max-concurrent-requests]
19// r[impl rpc.flow-control.max-concurrent-requests.default]
20#[derive(Debug, Clone, PartialEq, Eq, Facet)]
21pub struct ConnectionSettings {
22 /// Whether this peer will use odd or even IDs for requests and channels on this connection.
23 pub parity: Parity,
24 /// Maximum number of in-flight requests this peer is willing to accept on this connection.
25 pub max_concurrent_requests: u32,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub struct SessionResumeKey(pub [u8; 16]);
30
31impl<'payload> Message<'payload> {
32 // Message has no methods on purpose. it's all just plain data.
33 // Adding constructors or getters is forbidden.
34}
35
36/// Whether a peer will use odd or even IDs for requests and channels
37/// on a given connection.
38// r[impl session.parity]
39// r[impl session.role]
40// r[impl connection.parity]
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Facet)]
42#[repr(u8)]
43pub enum Parity {
44 Odd,
45 Even,
46}
47
48impl Parity {
49 /// Returns the opposite parity.
50 pub fn other(self) -> Self {
51 match self {
52 Parity::Odd => Parity::Even,
53 Parity::Even => Parity::Odd,
54 }
55 }
56}
57
58structstruck::strike! {
59 /// Protocol message.
60 // r[impl session]
61 // r[impl session.message]
62 // r[impl session.message.connection-id]
63 // r[impl session.peer]
64 // r[impl session.symmetry]
65 #[structstruck::each[derive(Debug, Facet)]]
66 pub struct Message<'payload> {
67 /// Connection ID: 0 for control messages (ProtocolError, Ping, Pong)
68 pub connection_id: ConnectionId,
69
70 /// Message payload
71 pub payload:
72 #[repr(u8)]
73 // r[impl session.message.payloads]
74 pub enum MessagePayload<'payload> {
75 // ========================================================================
76 // Control (conn 0 only)
77 // ========================================================================
78
79 /// Sent by either peer when the counterpart has violated the protocol.
80 /// The sender closes the transport immediately after sending this message.
81 /// No reply is expected or valid.
82 // r[impl session.protocol-error]
83 ProtocolError(pub struct ProtocolError<'payload> {
84 /// Human-readable description of the protocol violation.
85 pub description: &'payload str,
86 }),
87
88 // ========================================================================
89 // Connection control
90 // ========================================================================
91
92 /// Request a new virtual connection. This is sent on the desired connection
93 /// ID, even though it doesn't exist yet.
94 // r[impl connection.open]
95 // r[impl connection.virtual]
96 // r[impl session.connection-settings.open]
97 ConnectionOpen(pub struct ConnectionOpen<'payload> {
98 /// Connection limits advertised by the opener.
99 /// Parity is included in ConnectionSettings.
100 pub connection_settings: ConnectionSettings,
101
102 /// Metadata associated with the connection.
103 pub metadata: Metadata<'payload>,
104 }),
105
106 /// Accept a virtual connection request — sent on the connection ID requested.
107 // r[impl session.connection-settings.open]
108 ConnectionAccept(pub struct ConnectionAccept<'payload> {
109 /// Connection limits advertised by the accepter.
110 pub connection_settings: ConnectionSettings,
111
112 /// Metadata associated with the connection.
113 pub metadata: Metadata<'payload>,
114 }),
115
116 /// Reject a virtual connection request — sent on the connection ID requested.
117 ConnectionReject(pub struct ConnectionReject<'payload> {
118 /// Metadata associated with the rejection.
119 pub metadata: Metadata<'payload>,
120 }),
121
122 /// Close a virtual connection. Trying to close conn 0 is a protocol error.
123 ConnectionClose(pub struct ConnectionClose<'payload> {
124 /// Metadata associated with the close.
125 pub metadata: Metadata<'payload>,
126 }),
127
128
129 // ========================================================================
130 // RPC
131 // ========================================================================
132
133 RequestMessage(
134 pub struct RequestMessage<'payload> {
135 /// Unique (connection-wide) request identifier, caller-allocated (as per parity)
136 pub id: RequestId,
137
138 /// Request paylaod
139 pub body:
140 #[repr(u8)]
141 pub enum RequestBody<'payload> {
142 /// Perform a request (or a "call")
143 Call(pub struct RequestCall<'payload> {
144 /// Unique method identifier, hash of fully qualified name + args etc.
145 pub method_id: MethodId,
146
147 /// Metadata associated with this call
148 pub metadata: Metadata<'payload>,
149
150 /// Argument tuple
151 pub args: Payload<'payload>,
152
153 /// CBOR-encoded schemas for this call's args tuple
154 /// Non-empty on the first call for each method on a connection.
155 pub schemas: CborPayload,
156 }),
157
158 /// Respond to a request
159 Response(struct RequestResponse<'payload> {
160 /// Arbitrary response metadata
161 pub metadata: Metadata<'payload>,
162
163 /// Return value (`Result<T, VoxError<E>>`, where E could be Infallible depending on signature)
164 pub ret: Payload<'payload>,
165
166 /// CBOR-encoded schemas for this response's return type.
167 /// Non-empty on the first response for each method on a connection.
168 pub schemas: CborPayload,
169 }),
170
171 /// Cancel processing of a request.
172 Cancel(struct RequestCancel<'payload> {
173 /// Arbitrary cancel metadata
174 pub metadata: Metadata<'payload>,
175 }),
176 },
177 }
178 ),
179
180 /// Advertise schemas for a method binding on this connection.
181 ///
182 /// This is sent ahead of payload-bearing messages so a batch can
183 /// establish all required schema bindings before their first use.
184 SchemaMessage(pub struct SchemaMessage {
185 /// Unique method identifier the binding applies to.
186 pub method_id: MethodId,
187
188 /// Whether the binding applies to request args or responses.
189 pub direction: BindingDirection,
190
191 /// CBOR-encoded schema payload for this binding.
192 pub schemas: CborPayload,
193 }),
194
195 // ========================================================================
196 // Channels
197 // ========================================================================
198
199 ChannelMessage(
200 pub struct ChannelMessage<'payload> {
201 /// Channel ID (unique per-connection)
202 pub id: ChannelId,
203
204 /// Channel message body
205 pub body:
206 #[repr(u8)]
207 pub enum ChannelBody<'payload> {
208 /// Send an item on a channel. Channels are not "opened", they are created
209 /// implicitly by calls.
210 Item(pub struct ChannelItem<'payload> {
211 /// The item itself
212 pub item: Payload<'payload>,
213 }),
214
215 /// Close a channel — sent by the sender of the channel when they're gracefully done
216 /// with a channel.
217 Close(pub struct ChannelClose<'payload> {
218 /// Metadata associated with closing the channel.
219 pub metadata: Metadata<'payload>,
220 }),
221
222 /// Reset a channel — sent by the receiver of a channel when they would like the sender
223 /// to please, stop sending items through.
224 Reset(pub struct ChannelReset<'payload> {
225 /// Metadata associated with resetting the channel.
226 pub metadata: Metadata<'payload>,
227 }),
228
229 /// Grant additional send credit to a channel sender.
230 // r[impl rpc.flow-control.credit.grant]
231 GrantCredit(pub struct ChannelGrantCredit {
232 /// Number of additional items the sender may send.
233 pub additional: u32,
234 }),
235 },
236 }
237 ),
238
239 // ========================================================================
240 // Keepalive
241 // ========================================================================
242
243 /// Liveness probe for dead-peer detection.
244 Ping(pub struct Ping {
245 /// Opaque nonce echoed by the Pong response.
246 pub nonce: u64,
247 }),
248
249 /// Reply to a keepalive Ping.
250 Pong(pub struct Pong {
251 /// Echo of the received ping nonce.
252 pub nonce: u64,
253 }),
254
255 },
256 }
257
258}
259
260/// A payload — arguments for a request, or return type for a response.
261///
262/// Uses `#[facet(opaque = PayloadAdapter)]` so that format crates handle
263/// serialization/deserialization through the adapter contract:
264/// - **Send path:** `serialize_map` extracts `(ptr, shape)` from `Borrowed` or `Owned`.
265/// - **Recv path:** `deserialize_build` produces `RawBorrowed` or `RawOwned`.
266// r[impl zerocopy.payload]
267#[derive(Debug, Facet)]
268#[repr(u8)]
269#[facet(opaque = PayloadAdapter, traits(Debug))]
270pub enum Payload<'payload> {
271 // r[impl zerocopy.payload.borrowed]
272 /// Type-erased pointer to caller-owned memory + its Shape.
273 Value {
274 ptr: PtrConst,
275 shape: &'static Shape,
276 _lt: PhantomData<&'payload ()>,
277 },
278
279 // r[impl zerocopy.payload.bytes]
280 /// Raw bytes borrowed from the backing (zero-copy).
281 PostcardBytes(&'payload [u8]),
282}
283
284impl<'payload> Payload<'payload> {
285 /// Construct an outgoing borrowed payload from a concrete value.
286 pub fn outgoing<T: Facet<'payload>>(value: &'payload T) -> Self {
287 unsafe {
288 Self::outgoing_unchecked(PtrConst::new((value as *const T).cast::<u8>()), T::SHAPE)
289 }
290 }
291
292 /// Construct an outgoing owned payload from a raw pointer + shape.
293 ///
294 /// # Safety
295 ///
296 /// The pointed value must remain alive until serialization has completed.
297 pub unsafe fn outgoing_unchecked(ptr: PtrConst, shape: &'static Shape) -> Self {
298 Self::Value {
299 ptr,
300 shape,
301 _lt: PhantomData,
302 }
303 }
304
305 /// Create a new `Payload` that borrows the same data with a shorter lifetime.
306 ///
307 /// For `Outgoing`: same ptr/shape, new lifetime.
308 /// For `Incoming`: reborrows the byte slice.
309 pub fn reborrow(&self) -> Payload<'_> {
310 match self {
311 Payload::Value { ptr, shape, .. } => Payload::Value {
312 ptr: *ptr,
313 shape,
314 _lt: PhantomData,
315 },
316 Payload::PostcardBytes(bytes) => Payload::PostcardBytes(bytes),
317 }
318 }
319}
320
321// SAFETY: The pointer in `Outgoing` is valid for `'payload` and the caller
322// guarantees the pointee outlives any use across threads.
323unsafe impl<'payload> Send for Payload<'payload> {}
324
325/// Adapter that bridges [`Payload`] through the opaque field contract.
326// r[impl zerocopy.framing.value.opaque]
327pub struct PayloadAdapter;
328
329impl FacetOpaqueAdapter for PayloadAdapter {
330 type Error = String;
331 type SendValue<'a> = Payload<'a>;
332 type RecvValue<'de> = Payload<'de>;
333
334 fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
335 match value {
336 Payload::Value { ptr, shape, .. } => OpaqueSerialize { ptr: *ptr, shape },
337 Payload::PostcardBytes(bytes) => opaque_encoded_borrowed(bytes),
338 }
339 }
340
341 fn deserialize_build<'de>(
342 input: OpaqueDeserialize<'de>,
343 ) -> Result<Self::RecvValue<'de>, Self::Error> {
344 match input {
345 OpaqueDeserialize::Borrowed(bytes) => Ok(Payload::PostcardBytes(bytes)),
346 OpaqueDeserialize::Owned(_) => {
347 Err("payload bytes must be borrowed from backing, not owned".into())
348 }
349 }
350 }
351}
352
353/// Type-level tag for [`Message`] as a [`MsgFamily`](crate::MsgFamily).
354pub struct MessageFamily;
355
356impl crate::MsgFamily for MessageFamily {
357 type Msg<'a> = Message<'a>;
358}
359
360// SAFETY: all types below are covariant in their lifetime parameter
361// (they contain only Cow<'a, str>, Vec<MetadataEntry<'a>>, etc.).
362crate::impl_reborrow!(
363 Message,
364 RequestMessage,
365 RequestCall,
366 RequestResponse,
367 ConnectionOpen,
368 ConnectionAccept,
369 ConnectionReject,
370 ConnectionClose,
371 ChannelMessage,
372 ChannelItem,
373 ChannelClose,
374 ChannelReset,
375);