1use std::fmt;
2use std::marker::PhantomData;
3use std::sync::Arc;
4
5use bytes::{BufMut, Bytes, BytesMut};
6use rand::prelude::*;
7use serde::{Deserialize, Serialize};
8
9use futures::stream::StreamExt;
10
11use crate::error::*;
12use crate::stream::*;
13use crate::util::*;
14
15pub type RequestPriority = u8;
30
31pub const PRIO_HIGH: RequestPriority = 0x20;
50pub const PRIO_NORMAL: RequestPriority = 0x40;
52pub const PRIO_BACKGROUND: RequestPriority = 0x80;
54
55pub const PRIO_PRIMARY: RequestPriority = 0x00;
57pub const PRIO_SECONDARY: RequestPriority = 0x01;
59
60#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
66pub struct OrderTag(pub(crate) u64, pub(crate) u64);
67
68#[derive(Clone, Copy)]
71pub struct OrderTagStream(u64);
72
73impl OrderTag {
74 pub fn stream() -> OrderTagStream {
81 OrderTagStream(thread_rng().gen())
82 }
83}
84impl OrderTagStream {
85 pub fn order(&self, order: u64) -> OrderTag {
87 OrderTag(self.0, order)
88 }
89}
90
91pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
97 type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
99}
100
101pub struct Req<M: Message> {
108 pub(crate) msg: Arc<M>,
109 pub(crate) msg_ser: Option<Bytes>,
110 pub(crate) stream: AttachedStream,
111 pub(crate) order_tag: Option<OrderTag>,
112}
113
114impl<M: Message> Req<M> {
115 pub fn new(v: M) -> Result<Self, Error> {
117 Ok(v.into_req()?)
118 }
119
120 pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
123 Self {
124 stream: AttachedStream::Fixed(b),
125 ..self
126 }
127 }
128
129 pub fn with_stream(self, b: ByteStream) -> Self {
134 Self {
135 stream: AttachedStream::Stream(b),
136 ..self
137 }
138 }
139
140 pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
143 Self {
144 order_tag: Some(order_tag),
145 ..self
146 }
147 }
148
149 pub fn msg(&self) -> &M {
151 &self.msg
152 }
153
154 pub fn take_stream(&mut self) -> Option<ByteStream> {
156 std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
157 }
158
159 pub(crate) fn into_enc(
160 self,
161 prio: RequestPriority,
162 path: Bytes,
163 telemetry_id: Bytes,
164 ) -> ReqEnc {
165 ReqEnc {
166 prio,
167 path,
168 telemetry_id,
169 msg: self.msg_ser.unwrap(),
170 stream: self.stream.into_stream(),
171 order_tag: self.order_tag,
172 }
173 }
174
175 pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
176 let msg = rmp_serde::decode::from_slice(&enc.msg)?;
177 Ok(Req {
178 msg: Arc::new(msg),
179 msg_ser: Some(enc.msg),
180 stream: enc
181 .stream
182 .map(AttachedStream::Stream)
183 .unwrap_or(AttachedStream::None),
184 order_tag: enc.order_tag,
185 })
186 }
187}
188
189pub trait IntoReq<M: Message> {
191 fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>;
194 fn into_req_local(self) -> Req<M>;
198}
199
200impl<M: Message> IntoReq<M> for M {
201 fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> {
202 let msg_ser = rmp_to_vec_all_named(&self)?;
203 Ok(Req {
204 msg: Arc::new(self),
205 msg_ser: Some(Bytes::from(msg_ser)),
206 stream: AttachedStream::None,
207 order_tag: None,
208 })
209 }
210 fn into_req_local(self) -> Req<M> {
211 Req {
212 msg: Arc::new(self),
213 msg_ser: None,
214 stream: AttachedStream::None,
215 order_tag: None,
216 }
217 }
218}
219
220impl<M: Message> IntoReq<M> for Req<M> {
221 fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> {
222 Ok(self)
223 }
224 fn into_req_local(self) -> Req<M> {
225 self
226 }
227}
228
229impl<M: Message> Clone for Req<M> {
230 fn clone(&self) -> Self {
231 let stream = match &self.stream {
232 AttachedStream::None => AttachedStream::None,
233 AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()),
234 AttachedStream::Stream(_) => {
235 panic!("Cannot clone a Req<_> with a non-buffer attached stream")
236 }
237 };
238 Self {
239 msg: self.msg.clone(),
240 msg_ser: self.msg_ser.clone(),
241 stream,
242 order_tag: self.order_tag,
243 }
244 }
245}
246
247impl<M> fmt::Debug for Req<M>
248where
249 M: Message + fmt::Debug,
250{
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
252 write!(f, "Req[{:?}", self.msg)?;
253 match &self.stream {
254 AttachedStream::None => write!(f, "]"),
255 AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
256 AttachedStream::Stream(_) => write!(f, "; stream]"),
257 }
258 }
259}
260
261pub struct Resp<M: Message> {
266 pub(crate) _phantom: PhantomData<M>,
267 pub(crate) msg: M::Response,
268 pub(crate) stream: AttachedStream,
269 pub(crate) order_tag: Option<OrderTag>,
270}
271
272impl<M: Message> Resp<M> {
273 pub fn new(v: M::Response) -> Self {
275 Resp {
276 _phantom: Default::default(),
277 msg: v,
278 stream: AttachedStream::None,
279 order_tag: None,
280 }
281 }
282
283 pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
286 Self {
287 stream: AttachedStream::Fixed(b),
288 ..self
289 }
290 }
291
292 pub fn with_stream(self, b: ByteStream) -> Self {
295 Self {
296 stream: AttachedStream::Stream(b),
297 ..self
298 }
299 }
300
301 pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
304 Self {
305 order_tag: Some(order_tag),
306 ..self
307 }
308 }
309
310 pub fn msg(&self) -> &M::Response {
312 &self.msg
313 }
314
315 pub fn into_msg(self) -> M::Response {
318 self.msg
319 }
320
321 pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
325 (self.msg, self.stream.into_stream())
326 }
327
328 pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> {
329 Ok(RespEnc {
330 msg: rmp_to_vec_all_named(&self.msg)?.into(),
331 stream: self.stream.into_stream(),
332 order_tag: self.order_tag,
333 })
334 }
335
336 pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
337 let msg = rmp_serde::decode::from_slice(&enc.msg)?;
338 Ok(Self {
339 _phantom: Default::default(),
340 msg,
341 stream: enc
342 .stream
343 .map(AttachedStream::Stream)
344 .unwrap_or(AttachedStream::None),
345 order_tag: enc.order_tag,
346 })
347 }
348}
349
350impl<M> fmt::Debug for Resp<M>
351where
352 M: Message,
353 <M as Message>::Response: fmt::Debug,
354{
355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
356 write!(f, "Resp[{:?}", self.msg)?;
357 match &self.stream {
358 AttachedStream::None => write!(f, "]"),
359 AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
360 AttachedStream::Stream(_) => write!(f, "; stream]"),
361 }
362 }
363}
364
365pub(crate) enum AttachedStream {
368 None,
369 Fixed(Bytes),
370 Stream(ByteStream),
371}
372
373impl AttachedStream {
374 pub fn into_stream(self) -> Option<ByteStream> {
375 match self {
376 AttachedStream::None => None,
377 AttachedStream::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))),
378 AttachedStream::Stream(s) => Some(s),
379 }
380 }
381}
382
383pub(crate) struct ReqEnc {
395 pub(crate) prio: RequestPriority,
396 pub(crate) path: Bytes,
397 pub(crate) telemetry_id: Bytes,
398 pub(crate) msg: Bytes,
399 pub(crate) stream: Option<ByteStream>,
400 pub(crate) order_tag: Option<OrderTag>,
401}
402
403impl ReqEnc {
404 pub(crate) fn encode(self) -> (ByteStream, Option<OrderTag>) {
405 let mut buf = BytesMut::with_capacity(
406 self.path.len() + self.telemetry_id.len() + self.msg.len() + 16,
407 );
408
409 buf.put_u8(self.prio);
410
411 buf.put_u8(self.path.len() as u8);
412 buf.put(self.path);
413
414 buf.put_u8(self.telemetry_id.len() as u8);
415 buf.put(&self.telemetry_id[..]);
416
417 buf.put_u32(self.msg.len() as u32);
418
419 let header = buf.freeze();
420
421 let res_stream: ByteStream = if let Some(stream) = self.stream {
422 Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream))
423 } else {
424 Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]))
425 };
426 (res_stream, self.order_tag)
427 }
428
429 pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
430 Self::decode_aux(stream)
431 .await
432 .map_err(read_exact_error_to_error)
433 }
434
435 async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
436 let mut reader = ByteStreamReader::new(stream);
437
438 let prio = reader.read_u8().await?;
439
440 let path_len = reader.read_u8().await?;
441 let path = reader.read_exact(path_len as usize).await?;
442
443 let telemetry_id_len = reader.read_u8().await?;
444 let telemetry_id = reader.read_exact(telemetry_id_len as usize).await?;
445
446 let msg_len = reader.read_u32().await?;
447 let msg = reader.read_exact(msg_len as usize).await?;
448
449 Ok(Self {
450 prio,
451 path,
452 telemetry_id,
453 msg,
454 stream: Some(reader.into_stream()),
455 order_tag: None,
456 })
457 }
458}
459
460pub(crate) struct RespEnc {
471 msg: Bytes,
472 stream: Option<ByteStream>,
473 order_tag: Option<OrderTag>,
474}
475
476impl RespEnc {
477 pub(crate) fn encode(resp: Result<Self, Error>) -> (ByteStream, Option<OrderTag>) {
478 match resp {
479 Ok(Self {
480 msg,
481 stream,
482 order_tag,
483 }) => {
484 let mut buf = BytesMut::with_capacity(4);
485 buf.put_u32(msg.len() as u32);
486 let header = buf.freeze();
487
488 let res_stream: ByteStream = if let Some(stream) = stream {
489 Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream))
490 } else {
491 Box::pin(futures::stream::iter([Ok(header), Ok(msg)]))
492 };
493 (res_stream, order_tag)
494 }
495 Err(err) => {
496 let err = std::io::Error::new(
497 std::io::ErrorKind::Other,
498 format!("netapp error: {}", err),
499 );
500 (
501 Box::pin(futures::stream::once(async move { Err(err) })),
502 None,
503 )
504 }
505 }
506 }
507
508 pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
509 Self::decode_aux(stream)
510 .await
511 .map_err(read_exact_error_to_error)
512 }
513
514 async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
515 let mut reader = ByteStreamReader::new(stream);
516
517 let msg_len = reader.read_u32().await?;
518 let msg = reader.read_exact(msg_len as usize).await?;
519
520 reader.fill_buffer().await;
526
527 Ok(Self {
528 msg,
529 stream: Some(reader.into_stream()),
530 order_tag: None,
531 })
532 }
533}
534
535fn read_exact_error_to_error(e: ReadExactError) -> Error {
536 match e {
537 ReadExactError::Stream(err) => Error::Remote(err.kind(), err.to_string()),
538 ReadExactError::UnexpectedEos => Error::Framing,
539 }
540}