1#![doc(
2 html_logo_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
3)]
4#![doc(
5 html_favicon_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
11
12use std::{
13 io::{self, ErrorKind, Read, Write},
14 mem,
15};
16
17use jetstream_wireformat::WireFormat;
18pub use tokio_util::codec::{Decoder, Encoder, Framed};
20
21extern crate tokio_util;
22
23pub mod client;
24pub mod server;
25
26#[cfg(not(target_arch = "wasm32"))]
28pub trait Message: WireFormat + Sync {}
29
30#[cfg(target_arch = "wasm32")]
33pub trait Message: WireFormat {}
34
35#[repr(transparent)]
36pub struct Tag(u16);
37
38impl From<u16> for Tag {
39 fn from(tag: u16) -> Self {
40 Self(tag)
41 }
42}
43
44#[trait_variant::make(Send + Sync + Sized)]
46pub trait Protocol: Send + Sync {
47 type Request: Framer;
48 type Response: Framer;
49 type Error: std::error::Error + Send + Sync + 'static;
50 const VERSION: &'static str;
51 async fn rpc(
52 &mut self,
53 frame: Frame<Self::Request>,
54 ) -> Result<Frame<Self::Response>, Self::Error>;
55}
56
57#[derive(Debug, thiserror::Error)]
58pub enum Error {
59 #[error("io error: {0}")]
60 Io(#[from] io::Error),
61 #[error("generic error: {0}")]
62 Generic(#[from] Box<dyn std::error::Error + Send + Sync>),
63 #[error("{0}")]
64 Custom(String),
65 #[error("invalid response")]
66 InvalidResponse,
67}
68
69pub struct Frame<T: Framer> {
70 pub tag: u16,
71 pub msg: T,
72}
73
74impl<T: Framer> From<(u16, T)> for Frame<T> {
75 fn from((tag, msg): (u16, T)) -> Self {
76 Self { tag, msg }
77 }
78}
79
80impl<T: Framer> WireFormat for Frame<T> {
81 fn byte_size(&self) -> u32 {
82 let msg_size = self.msg.byte_size();
83 (mem::size_of::<u32>() + mem::size_of::<u8>() + mem::size_of::<u16>())
85 as u32
86 + msg_size
87 }
88
89 fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()> {
90 self.byte_size().encode(writer)?;
91
92 let ty = self.msg.message_type();
93
94 ty.encode(writer)?;
95 self.tag.encode(writer)?;
96
97 self.msg.encode(writer)?;
98
99 Ok(())
100 }
101
102 fn decode<R: Read>(reader: &mut R) -> io::Result<Self> {
103 let byte_size: u32 = WireFormat::decode(reader)?;
104
105 if byte_size < mem::size_of::<u32>() as u32 {
109 return Err(io::Error::new(
110 ErrorKind::InvalidData,
111 format!("byte_size(= {byte_size}) is less than 4 bytes"),
112 ));
113 }
114 let reader =
115 &mut reader.take((byte_size - mem::size_of::<u32>() as u32) as u64);
116
117 let mut ty = [0u8];
118 reader.read_exact(&mut ty)?;
119
120 let tag: u16 = WireFormat::decode(reader)?;
121 let msg = T::decode(reader, ty[0])?;
122
123 Ok(Frame { tag, msg })
124 }
125}
126
127pub trait Framer: Sized + Send + Sync {
128 fn message_type(&self) -> u8;
129 fn byte_size(&self) -> u32;
131
132 fn encode<W: Write>(&self, writer: &mut W) -> io::Result<()>;
134
135 fn decode<R: Read>(reader: &mut R, ty: u8) -> io::Result<Self>;
137}