Skip to main content

moq_transport/message/
mod.rs

1//! Low-level message sent over the wire, as defined in the specification.
2//!
3//! All of these messages are sent over a bidirectional QUIC stream.
4//! This introduces some head-of-line blocking but preserves ordering.
5//! The only exception are OBJECT "messages", which are sent over dedicated QUIC streams.
6//!
7
8mod fetch;
9mod fetch_cancel;
10mod fetch_error;
11mod fetch_ok;
12mod fetch_type;
13mod filter_type;
14mod go_away;
15mod group_order;
16mod max_request_id;
17mod pubilsh_namespace_done;
18mod publish;
19mod publish_done;
20mod publish_error;
21mod publish_namespace;
22mod publish_namespace_cancel;
23mod publish_namespace_error;
24mod publish_namespace_ok;
25mod publish_ok;
26mod publisher;
27mod requests_blocked;
28mod subscribe;
29mod subscribe_error;
30mod subscribe_namespace;
31mod subscribe_namespace_error;
32mod subscribe_namespace_ok;
33mod subscribe_ok;
34mod subscribe_update;
35mod subscriber;
36mod track_status;
37mod track_status_error;
38mod track_status_ok;
39mod unsubscribe;
40mod unsubscribe_namespace;
41
42pub use fetch::*;
43pub use fetch_cancel::*;
44pub use fetch_error::*;
45pub use fetch_ok::*;
46pub use fetch_type::*;
47pub use filter_type::*;
48pub use go_away::*;
49pub use group_order::*;
50pub use max_request_id::*;
51pub use pubilsh_namespace_done::*;
52pub use publish::*;
53pub use publish_done::*;
54pub use publish_error::*;
55pub use publish_namespace::*;
56pub use publish_namespace_cancel::*;
57pub use publish_namespace_error::*;
58pub use publish_namespace_ok::*;
59pub use publish_ok::*;
60pub use publisher::*;
61pub use requests_blocked::*;
62pub use subscribe::*;
63pub use subscribe_error::*;
64pub use subscribe_namespace::*;
65pub use subscribe_namespace_error::*;
66pub use subscribe_namespace_ok::*;
67pub use subscribe_ok::*;
68pub use subscribe_update::*;
69pub use subscriber::*;
70pub use track_status::*;
71pub use track_status_error::*;
72pub use track_status_ok::*;
73pub use unsubscribe::*;
74pub use unsubscribe_namespace::*;
75
76use crate::coding::{Decode, DecodeError, Encode, EncodeError};
77use std::fmt;
78
79// Use a macro to generate the message types rather than copy-paste.
80// This implements a decode/encode method that uses the specified type.
81macro_rules! message_types {
82    {$($name:ident = $val:expr,)*} => {
83		/// All supported message types.
84		#[derive(Clone)]
85		pub enum Message {
86			$($name($name)),*
87		}
88
89		impl Decode for Message {
90			fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
91				let t = u64::decode(r)?;
92				let _len = u16::decode(r)?;
93
94				// TODO: Check the length of the message.
95
96				match t {
97					$($val => {
98						let msg = $name::decode(r)?;
99						Ok(Self::$name(msg))
100					})*
101					_ => Err(DecodeError::InvalidMessage(t)),
102				}
103			}
104		}
105
106		impl Encode for Message {
107			fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
108				match self {
109					$(Self::$name(ref m) => {
110						self.id().encode(w)?;
111
112						// Find out the length of the message
113						// by encoding it into a buffer and then encoding the length.
114						// This is a bit wasteful, but it's the only way to know the length.
115                        // TODO SLG - perhaps we can store the position of the Length field in the BufMut and
116                        //       write the length later, to avoid the copy of the message bytes?
117						let mut buf = Vec::new();
118						m.encode(&mut buf).unwrap();
119                        if buf.len() > u16::MAX as usize {
120                            return Err(EncodeError::MsgBoundsExceeded);
121                        }
122                        (buf.len() as u16).encode(w)?;
123
124						// At least don't encode the message twice.
125						// Instead, write the buffer directly to the writer.
126                        Self::encode_remaining(w, buf.len())?;
127						w.put_slice(&buf);
128						Ok(())
129					},)*
130				}
131			}
132		}
133
134		impl Message {
135			pub fn id(&self) -> u64 {
136				match self {
137					$(Self::$name(_) => {
138						$val
139					},)*
140				}
141			}
142
143			pub fn name(&self) -> &'static str {
144				match self {
145					$(Self::$name(_) => {
146						stringify!($name)
147					},)*
148				}
149			}
150		}
151
152		$(impl From<$name> for Message {
153			fn from(m: $name) -> Self {
154				Message::$name(m)
155			}
156		})*
157
158		impl fmt::Debug for Message {
159			// Delegate to the message formatter
160			fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161				match self {
162					$(Self::$name(ref m) => m.fmt(f),)*
163				}
164			}
165		}
166    }
167}
168
169// Each message is prefixed with the given VarInt type.
170message_types! {
171    // NOTE: Setup messages are in another module.
172    // SetupClient = 0x20
173    // SetupServer = 0x21
174    // SetupClient = 0x40  // legacy, used in draft versions <= 10
175    // SetupServer = 0x41  // legacy, used in draft versions <= 10
176
177    // Misc
178    GoAway = 0x10,
179    MaxRequestId = 0x15,
180    RequestsBlocked = 0x1a,
181
182    // SUBSCRIBE family, sent by subscriber
183    SubscribeUpdate = 0x2,
184    Subscribe = 0x3,
185    Unsubscribe = 0xa,
186    // SUBSCRIBE family, sent by publisher
187    SubscribeOk = 0x4,
188    SubscribeError = 0x5,
189
190    // ANNOUNCE family, sent by publisher
191    PublishNamespace = 0x6,
192    PublishNamespaceDone = 0x9,
193    // ANNOUNCE family, sent by subscriber
194    PublishNamespaceOk = 0x7,
195    PublishNamespaceError = 0x8,
196    PublishNamespaceCancel = 0xc,
197
198    // TRACK_STATUS family, sent by subscriber
199    TrackStatus = 0xd,
200    // TRACK_STATUS family, sent by publisher
201    TrackStatusOk = 0xe,
202    TrackStatusError = 0xf,
203
204    // NAMESPACE family, sent by subscriber
205    SubscribeNamespace = 0x11,
206    UnsubscribeNamespace = 0x14,
207    // NAMESPACE family, sent by publisher
208    SubscribeNamespaceOk = 0x12,
209    SubscribeNamespaceError = 0x13,
210
211    // FETCH family, sent by subscriber
212    Fetch = 0x16,
213    FetchCancel = 0x17,
214    // FETCH family, sent by publisher
215    FetchOk = 0x18,
216    FetchError = 0x19,
217
218    // PUBLISH family, sent by publisher
219    Publish = 0x1d,
220    PublishDone = 0xb,
221    // PUBLISH family, sent by subscriber
222    PublishOk = 0x1e,
223    PublishError = 0x1f,
224}