garage_net/
message.rs

1use std::fmt;
2use std::marker::PhantomData;
3use std::sync::Arc;
4
5use bytes::{BufMut, Bytes, BytesMut};
6use rand::prelude::*;
7use serde::{Deserialize, Serialize};
8
9use futures::stream::StreamExt;
10
11use crate::error::*;
12use crate::stream::*;
13use crate::util::*;
14
15/// Priority of a request (click to read more about priorities).
16///
17/// This priority value is used to priorize messages
18/// in the send queue of the client, and their responses in the send queue of the
19/// server. Lower values mean higher priority.
20///
21/// This mechanism is useful for messages bigger than the maximum chunk size
22/// (set at `0x4000` bytes), such as large file transfers.
23/// In such case, all of the messages in the send queue with the highest priority
24/// will take turns to send individual chunks, in a round-robin fashion.
25/// Once all highest priority messages are sent successfully, the messages with
26/// the next highest priority will begin being sent in the same way.
27///
28/// The same priority value is given to a request and to its associated response.
29pub type RequestPriority = u8;
30
31// Usage of priority levels in Garage:
32//
33// PRIO_HIGH
34//      for liveness check events such as pings and important
35//      reconfiguration events such as layout changes
36//
37// PRIO_NORMAL
38//      for standard interactive requests to exchange metadata
39//
40// PRIO_NORMAL | PRIO_SECONDARY
41//      for standard interactive requests to exchange block data
42//
43// PRIO_BACKGROUND
44//      for background resync requests to exchange metadata
45// PRIO_BACKGROUND | PRIO_SECONDARY
46//      for background resync requests to exchange block data
47
48/// Priority class: high
49pub const PRIO_HIGH: RequestPriority = 0x20;
50/// Priority class: normal
51pub const PRIO_NORMAL: RequestPriority = 0x40;
52/// Priority class: background
53pub const PRIO_BACKGROUND: RequestPriority = 0x80;
54
55/// Priority: primary among given class
56pub const PRIO_PRIMARY: RequestPriority = 0x00;
57/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
58pub const PRIO_SECONDARY: RequestPriority = 0x01;
59
60// ----
61
62/// An order tag can be added to a message or a response to indicate
63/// whether it should be sent after or before other messages with order tags
64/// referencing a same stream
65#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
66pub struct OrderTag(pub(crate) u64, pub(crate) u64);
67
68/// A stream is an opaque identifier that defines a set of messages
69/// or responses that are ordered wrt one another using to order tags.
70#[derive(Clone, Copy)]
71pub struct OrderTagStream(u64);
72
73impl OrderTag {
74	/// Create a new stream from which to generate order tags. Example:
75	/// ```ignore
76	/// let stream = OrderTag.stream();
77	/// let tag_1 = stream.order(1);
78	/// let tag_2 = stream.order(2);
79	/// ```
80	pub fn stream() -> OrderTagStream {
81		OrderTagStream(thread_rng().gen())
82	}
83}
84impl OrderTagStream {
85	/// Create the order tag for message `order` in this stream
86	pub fn order(&self, order: u64) -> OrderTag {
87		OrderTag(self.0, order)
88	}
89}
90
91// ----
92
93/// This trait should be implemented by all messages your application
94/// wants to handle. It specifies which data type should be sent
95/// as a response to this message in the RPC protocol.
96pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
97	/// The type of the response that is sent in response to this message
98	type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
99}
100
101// ----
102
103/// The Req<M> is a helper object used to create requests and attach them
104/// a stream of data. If the stream is a fixed Bytes and not a ByteStream,
105/// Req<M> is cheaply cloneable to allow the request to be sent to different
106/// peers (Clone will panic if the stream is a ByteStream).
107pub struct Req<M: Message> {
108	pub(crate) msg: Arc<M>,
109	pub(crate) msg_ser: Option<Bytes>,
110	pub(crate) stream: AttachedStream,
111	pub(crate) order_tag: Option<OrderTag>,
112}
113
114impl<M: Message> Req<M> {
115	/// Creates a new request from a base message `M`
116	pub fn new(v: M) -> Result<Self, Error> {
117		Ok(v.into_req()?)
118	}
119
120	/// Attach a stream to message in request, where the stream is streamed
121	/// from a fixed `Bytes` buffer
122	pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
123		Self {
124			stream: AttachedStream::Fixed(b),
125			..self
126		}
127	}
128
129	/// Attach a stream to message in request, where the stream is
130	/// an instance of `ByteStream`. Note than when a `Req<M>` has an attached
131	/// stream which is a `ByteStream` instance, it can no longer be cloned
132	/// to be sent to different nodes (`.clone()` will panic)
133	pub fn with_stream(self, b: ByteStream) -> Self {
134		Self {
135			stream: AttachedStream::Stream(b),
136			..self
137		}
138	}
139
140	/// Add an order tag to this request to indicate in which order it should
141	/// be sent.
142	pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
143		Self {
144			order_tag: Some(order_tag),
145			..self
146		}
147	}
148
149	/// Get a reference to the message `M` contained in this request
150	pub fn msg(&self) -> &M {
151		&self.msg
152	}
153
154	/// Takes out the stream attached to this request, if any
155	pub fn take_stream(&mut self) -> Option<ByteStream> {
156		std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
157	}
158
159	pub(crate) fn into_enc(
160		self,
161		prio: RequestPriority,
162		path: Bytes,
163		telemetry_id: Bytes,
164	) -> ReqEnc {
165		ReqEnc {
166			prio,
167			path,
168			telemetry_id,
169			msg: self.msg_ser.unwrap(),
170			stream: self.stream.into_stream(),
171			order_tag: self.order_tag,
172		}
173	}
174
175	pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
176		let msg = rmp_serde::decode::from_slice(&enc.msg)?;
177		Ok(Req {
178			msg: Arc::new(msg),
179			msg_ser: Some(enc.msg),
180			stream: enc
181				.stream
182				.map(AttachedStream::Stream)
183				.unwrap_or(AttachedStream::None),
184			order_tag: enc.order_tag,
185		})
186	}
187}
188
189/// `IntoReq<M>` represents any object that can be transformed into `Req<M>`
190pub trait IntoReq<M: Message> {
191	/// Transform the object into a `Req<M>`, serializing the message M
192	/// to be sent to remote nodes
193	fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>;
194	/// Transform the object into a `Req<M>`, skipping the serialization
195	/// of message M, in the case we are not sending this RPC message to
196	/// a remote node
197	fn into_req_local(self) -> Req<M>;
198}
199
200impl<M: Message> IntoReq<M> for M {
201	fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> {
202		let msg_ser = rmp_to_vec_all_named(&self)?;
203		Ok(Req {
204			msg: Arc::new(self),
205			msg_ser: Some(Bytes::from(msg_ser)),
206			stream: AttachedStream::None,
207			order_tag: None,
208		})
209	}
210	fn into_req_local(self) -> Req<M> {
211		Req {
212			msg: Arc::new(self),
213			msg_ser: None,
214			stream: AttachedStream::None,
215			order_tag: None,
216		}
217	}
218}
219
220impl<M: Message> IntoReq<M> for Req<M> {
221	fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> {
222		Ok(self)
223	}
224	fn into_req_local(self) -> Req<M> {
225		self
226	}
227}
228
229impl<M: Message> Clone for Req<M> {
230	fn clone(&self) -> Self {
231		let stream = match &self.stream {
232			AttachedStream::None => AttachedStream::None,
233			AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()),
234			AttachedStream::Stream(_) => {
235				panic!("Cannot clone a Req<_> with a non-buffer attached stream")
236			}
237		};
238		Self {
239			msg: self.msg.clone(),
240			msg_ser: self.msg_ser.clone(),
241			stream,
242			order_tag: self.order_tag,
243		}
244	}
245}
246
247impl<M> fmt::Debug for Req<M>
248where
249	M: Message + fmt::Debug,
250{
251	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
252		write!(f, "Req[{:?}", self.msg)?;
253		match &self.stream {
254			AttachedStream::None => write!(f, "]"),
255			AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
256			AttachedStream::Stream(_) => write!(f, "; stream]"),
257		}
258	}
259}
260
261// ----
262
263/// The Resp<M> represents a full response from a RPC that may have
264/// an attached stream.
265pub struct Resp<M: Message> {
266	pub(crate) _phantom: PhantomData<M>,
267	pub(crate) msg: M::Response,
268	pub(crate) stream: AttachedStream,
269	pub(crate) order_tag: Option<OrderTag>,
270}
271
272impl<M: Message> Resp<M> {
273	/// Creates a new response from a base response message
274	pub fn new(v: M::Response) -> Self {
275		Resp {
276			_phantom: Default::default(),
277			msg: v,
278			stream: AttachedStream::None,
279			order_tag: None,
280		}
281	}
282
283	/// Attach a stream to message in response, where the stream is streamed
284	/// from a fixed `Bytes` buffer
285	pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
286		Self {
287			stream: AttachedStream::Fixed(b),
288			..self
289		}
290	}
291
292	/// Attach a stream to message in response, where the stream is
293	/// an instance of `ByteStream`.
294	pub fn with_stream(self, b: ByteStream) -> Self {
295		Self {
296			stream: AttachedStream::Stream(b),
297			..self
298		}
299	}
300
301	/// Add an order tag to this response to indicate in which order it should
302	/// be sent.
303	pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
304		Self {
305			order_tag: Some(order_tag),
306			..self
307		}
308	}
309
310	/// Get a reference to the response message contained in this request
311	pub fn msg(&self) -> &M::Response {
312		&self.msg
313	}
314
315	/// Transforms the `Resp<M>` into the response message it contains,
316	/// dropping everything else (including attached data stream)
317	pub fn into_msg(self) -> M::Response {
318		self.msg
319	}
320
321	/// Transforms the `Resp<M>` into, on the one side, the response message
322	/// it contains, and on the other side, the associated data stream
323	/// if it exists
324	pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
325		(self.msg, self.stream.into_stream())
326	}
327
328	pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> {
329		Ok(RespEnc {
330			msg: rmp_to_vec_all_named(&self.msg)?.into(),
331			stream: self.stream.into_stream(),
332			order_tag: self.order_tag,
333		})
334	}
335
336	pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
337		let msg = rmp_serde::decode::from_slice(&enc.msg)?;
338		Ok(Self {
339			_phantom: Default::default(),
340			msg,
341			stream: enc
342				.stream
343				.map(AttachedStream::Stream)
344				.unwrap_or(AttachedStream::None),
345			order_tag: enc.order_tag,
346		})
347	}
348}
349
350impl<M> fmt::Debug for Resp<M>
351where
352	M: Message,
353	<M as Message>::Response: fmt::Debug,
354{
355	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
356		write!(f, "Resp[{:?}", self.msg)?;
357		match &self.stream {
358			AttachedStream::None => write!(f, "]"),
359			AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
360			AttachedStream::Stream(_) => write!(f, "; stream]"),
361		}
362	}
363}
364
365// ----
366
367pub(crate) enum AttachedStream {
368	None,
369	Fixed(Bytes),
370	Stream(ByteStream),
371}
372
373impl AttachedStream {
374	pub fn into_stream(self) -> Option<ByteStream> {
375		match self {
376			AttachedStream::None => None,
377			AttachedStream::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))),
378			AttachedStream::Stream(s) => Some(s),
379		}
380	}
381}
382
383// ---- ----
384
385/// Encoding for requests into a ByteStream:
386/// - priority: u8
387/// - path length: u8
388/// - path: [u8; path length]
389/// - telemetry id length: u8
390/// - telemetry id: [u8; telemetry id length]
391/// - msg len: u32
392/// - msg [u8; ..]
393/// - the attached stream as the rest of the encoded stream
394pub(crate) struct ReqEnc {
395	pub(crate) prio: RequestPriority,
396	pub(crate) path: Bytes,
397	pub(crate) telemetry_id: Bytes,
398	pub(crate) msg: Bytes,
399	pub(crate) stream: Option<ByteStream>,
400	pub(crate) order_tag: Option<OrderTag>,
401}
402
403impl ReqEnc {
404	pub(crate) fn encode(self) -> (ByteStream, Option<OrderTag>) {
405		let mut buf = BytesMut::with_capacity(
406			self.path.len() + self.telemetry_id.len() + self.msg.len() + 16,
407		);
408
409		buf.put_u8(self.prio);
410
411		buf.put_u8(self.path.len() as u8);
412		buf.put(self.path);
413
414		buf.put_u8(self.telemetry_id.len() as u8);
415		buf.put(&self.telemetry_id[..]);
416
417		buf.put_u32(self.msg.len() as u32);
418
419		let header = buf.freeze();
420
421		let res_stream: ByteStream = if let Some(stream) = self.stream {
422			Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream))
423		} else {
424			Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]))
425		};
426		(res_stream, self.order_tag)
427	}
428
429	pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
430		Self::decode_aux(stream)
431			.await
432			.map_err(read_exact_error_to_error)
433	}
434
435	async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
436		let mut reader = ByteStreamReader::new(stream);
437
438		let prio = reader.read_u8().await?;
439
440		let path_len = reader.read_u8().await?;
441		let path = reader.read_exact(path_len as usize).await?;
442
443		let telemetry_id_len = reader.read_u8().await?;
444		let telemetry_id = reader.read_exact(telemetry_id_len as usize).await?;
445
446		let msg_len = reader.read_u32().await?;
447		let msg = reader.read_exact(msg_len as usize).await?;
448
449		Ok(Self {
450			prio,
451			path,
452			telemetry_id,
453			msg,
454			stream: Some(reader.into_stream()),
455			order_tag: None,
456		})
457	}
458}
459
460/// Encoding for responses into a ByteStream:
461/// IF SUCCESS:
462/// - 0: u8
463/// - msg len: u32
464/// - msg [u8; ..]
465/// - the attached stream as the rest of the encoded stream
466/// IF ERROR:
467/// - message length + 1: u8
468/// - error code: u8
469/// - message: [u8; message_length]
470pub(crate) struct RespEnc {
471	msg: Bytes,
472	stream: Option<ByteStream>,
473	order_tag: Option<OrderTag>,
474}
475
476impl RespEnc {
477	pub(crate) fn encode(resp: Result<Self, Error>) -> (ByteStream, Option<OrderTag>) {
478		match resp {
479			Ok(Self {
480				msg,
481				stream,
482				order_tag,
483			}) => {
484				let mut buf = BytesMut::with_capacity(4);
485				buf.put_u32(msg.len() as u32);
486				let header = buf.freeze();
487
488				let res_stream: ByteStream = if let Some(stream) = stream {
489					Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream))
490				} else {
491					Box::pin(futures::stream::iter([Ok(header), Ok(msg)]))
492				};
493				(res_stream, order_tag)
494			}
495			Err(err) => {
496				let err = std::io::Error::new(
497					std::io::ErrorKind::Other,
498					format!("netapp error: {}", err),
499				);
500				(
501					Box::pin(futures::stream::once(async move { Err(err) })),
502					None,
503				)
504			}
505		}
506	}
507
508	pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
509		Self::decode_aux(stream)
510			.await
511			.map_err(read_exact_error_to_error)
512	}
513
514	async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
515		let mut reader = ByteStreamReader::new(stream);
516
517		let msg_len = reader.read_u32().await?;
518		let msg = reader.read_exact(msg_len as usize).await?;
519
520		// Check whether the response stream still has data or not.
521		// If no more data is coming, this will defuse the request canceller.
522		// If we didn't do this, and the client doesn't try to read from the stream,
523		// the request canceller doesn't know that we read everything and
524		// sends a cancellation message to the server (which they don't care about).
525		reader.fill_buffer().await;
526
527		Ok(Self {
528			msg,
529			stream: Some(reader.into_stream()),
530			order_tag: None,
531		})
532	}
533}
534
535fn read_exact_error_to_error(e: ReadExactError) -> Error {
536	match e {
537		ReadExactError::Stream(err) => Error::Remote(err.kind(), err.to_string()),
538		ReadExactError::UnexpectedEos => Error::Framing,
539	}
540}