jetstream_rpc/
lib.rs

1#![doc(
2    html_logo_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
3)]
4#![doc(
5    html_favicon_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
6)]
7//! # JetStream Rpc
8//! Defines Rpc primitives for JetStream.
9//! Of note is the `Protocol` trait which is meant to be used with the `service` attribute macro.
10#![cfg_attr(docsrs, feature(doc_cfg))]
11
12use std::{
13    io::{self, ErrorKind, Read, Write},
14    mem,
15};
16
17use jetstream_wireformat::WireFormat;
18// Re-export codecs
19pub use tokio_util::codec::{Decoder, Encoder, Framed};
20
21extern crate tokio_util;
22
23pub mod client;
24pub mod server;
25
26/// A trait representing a message that can be encoded and decoded.
27#[cfg(not(target_arch = "wasm32"))]
28pub trait Message: WireFormat + Sync {}
29
30/// A trait representing a message that can be encoded and decoded.
31/// WebAssembly doesn't fully support Send+Sync, so we don't require those.
32#[cfg(target_arch = "wasm32")]
33pub trait Message: WireFormat {}
34
35#[repr(transparent)]
36pub struct Tag(u16);
37
38impl From<u16> for Tag {
39    fn from(tag: u16) -> Self {
40        Self(tag)
41    }
42}
43
44/// Defines the request and response types for the JetStream protocol.
45#[trait_variant::make(Send + Sync + Sized)]
46pub trait Protocol: Send + Sync {
47    type Request: Framer;
48    type Response: Framer;
49    type Error: std::error::Error + Send + Sync + 'static;
50    const VERSION: &'static str;
51    async fn rpc(
52        &mut self,
53        frame: Frame<Self::Request>,
54    ) -> Result<Frame<Self::Response>, Self::Error>;
55}
56
57#[derive(Debug, thiserror::Error)]
58pub enum Error {
59    #[error("io error: {0}")]
60    Io(#[from] io::Error),
61    #[error("generic error: {0}")]
62    Generic(#[from] Box<dyn std::error::Error + Send + Sync>),
63    #[error("{0}")]
64    Custom(String),
65    #[error("invalid response")]
66    InvalidResponse,
67}
68
69pub struct Frame<T: Framer> {
70    pub tag: u16,
71    pub msg: T,
72}
73
74impl<T: Framer> From<(u16, T)> for Frame<T> {
75    fn from((tag, msg): (u16, T)) -> Self {
76        Self { tag, msg }
77    }
78}
79
80impl<T: Framer> WireFormat for Frame<T> {
81    fn byte_size(&self) -> u32 {
82        let msg_size = self.msg.byte_size();
83        // size + type + tag + message size
84        (mem::size_of::<u32>() + mem::size_of::<u8>() + mem::size_of::<u16>())
85            as u32
86            + msg_size
87    }
88
89    fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()> {
90        self.byte_size().encode(writer)?;
91
92        let ty = self.msg.message_type();
93
94        ty.encode(writer)?;
95        self.tag.encode(writer)?;
96
97        self.msg.encode(writer)?;
98
99        Ok(())
100    }
101
102    fn decode<R: Read>(reader: &mut R) -> io::Result<Self> {
103        let byte_size: u32 = WireFormat::decode(reader)?;
104
105        // byte_size includes the size of byte_size so remove that from the
106        // expected length of the message.  Also make sure that byte_size is at least
107        // that long to begin with.
108        if byte_size < mem::size_of::<u32>() as u32 {
109            return Err(io::Error::new(
110                ErrorKind::InvalidData,
111                format!("byte_size(= {byte_size}) is less than 4 bytes"),
112            ));
113        }
114        let reader =
115            &mut reader.take((byte_size - mem::size_of::<u32>() as u32) as u64);
116
117        let mut ty = [0u8];
118        reader.read_exact(&mut ty)?;
119
120        let tag: u16 = WireFormat::decode(reader)?;
121        let msg = T::decode(reader, ty[0])?;
122
123        Ok(Frame { tag, msg })
124    }
125}
126
127pub trait Framer: Sized + Send + Sync {
128    fn message_type(&self) -> u8;
129    /// Returns the number of bytes necessary to fully encode `self`.
130    fn byte_size(&self) -> u32;
131
132    /// Encodes `self` into `writer`.
133    fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()>;
134
135    /// Decodes `Self` from `reader`.
136    fn decode<R: Read>(reader: &mut R, ty: u8) -> io::Result<Self>;
137}