1pub mod aeron;
2pub mod client;
3mod data;
4pub mod err;
5pub mod multiplexer;
6mod protocol;
7pub mod server;
8
9type RequestId = u64;
10type BusinessId = u64;
11
12pub trait ToBusinessId {
13 fn to_business_id(&self) -> BusinessId;
14}
15
16pub enum Interface {
17 Ping = 1,
18 Echo = 2,
19 Stream = 3,
20}
21
22impl ToBusinessId for Interface {
23 fn to_business_id(&self) -> u64 {
24 match self {
25 Interface::Ping => 1,
26 Interface::Echo => 2,
27 Interface::Stream => 3,
28 }
29 }
30}
31
32pub trait FromBytes {
33 fn from_bytes(data: Vec<u8>) -> Result<Self, FromBytesError>
34 where
35 Self: Sized;
36}
37
38impl FromBytes for String {
39 fn from_bytes(data: Vec<u8>) -> Result<Self, FromBytesError> {
40 String::from_utf8(data).map_err(|e| FromBytesError::ParseError(e.to_string()))
41 }
42}
43
44impl FromBytes for Vec<u8> {
45 fn from_bytes(data: Vec<u8>) -> Result<Self, FromBytesError> {
46 Ok(data)
47 }
48}
49
50pub trait ToBytes {
51 fn to_bytes(self) -> Vec<u8>;
52}
53
54impl<T, E> ToBytes for Result<T, E>
55where
56 T: ToBytes,
57 E: ToBytes,
58{
59 fn to_bytes(self) -> Vec<u8> {
60 match self {
61 Ok(data) => data.to_bytes(),
62 Err(err) => err.to_bytes(),
63 }
64 }
65}
66
67impl ToBytes for () {
68 fn to_bytes(self) -> Vec<u8> {
69 Vec::new()
70 }
71}
72
73impl ToBytes for Vec<u8> {
74 fn to_bytes(self) -> Vec<u8> {
75 self
76 }
77}
78
79impl ToBytes for String {
80 fn to_bytes(self) -> Vec<u8> {
81 self.into_bytes()
82 }
83}
84
85impl ToBytes for &str {
86 fn to_bytes(self) -> Vec<u8> {
87 self.as_bytes().to_vec()
88 }
89}
90
91macro_rules! impl_to_bytes_for_numbers {
92 ($($t:ty),*) => {
93 $(
94 impl ToBytes for $t {
95 fn to_bytes(self) -> Vec<u8> {
96 self.to_le_bytes().to_vec()
97 }
98 }
99 )*
100 };
101}
102
103impl_to_bytes_for_numbers!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64);
104
105use std::collections::HashMap;
106use std::sync::{Arc, Mutex};
107
108use aeron_rs::publication::Publication;
109use aeron_rs::subscription::Subscription;
110use client::RpcClient;
111use err::FromBytesError;
112use multiplexer::Multiplexer;
113use protocol::{Client2MultiplexerSender, Multiplexer2ServerReceiver, Server2MultiplexerSender};
114use server::{Handler, HandlerWrapper, IntoHandlerWrapper, RpcServer};
115use tokio::sync::mpsc;
116
117pub struct RpcContext {
118 multiplexer: Multiplexer,
119 rx: Option<Multiplexer2ServerReceiver>,
120 tx2: Option<Server2MultiplexerSender>,
121 tx3: Option<Client2MultiplexerSender>,
122 handlers: Option<HashMap<u64, Arc<dyn Handler + Send + Sync>>>,
123 is_taken_client: bool,
124 is_taken_server: bool,
125}
126
127impl RpcContext {
128 pub fn new(
129 publication: Arc<Mutex<Publication>>,
130 subscription: Arc<Mutex<Subscription>>,
131 ) -> Self {
132 let (tx, rx) = mpsc::channel(1 << 10);
133 let (tx2, rx2) = mpsc::channel(1 << 10);
134 let (tx3, rx3) = mpsc::channel(1 << 10);
135
136 Self {
137 multiplexer: Multiplexer::new(publication, subscription, tx, rx2, rx3),
138 rx: Some(rx),
139 tx2: Some(tx2),
140 tx3: Some(tx3),
141 handlers: Some(HashMap::new()),
142 is_taken_client: false,
143 is_taken_server: false,
144 }
145 }
146
147 pub fn add_handler<F, Args, Res>(
148 &mut self,
149 business_id: &impl ToBusinessId,
150 func: F,
151 ) -> &mut Self
152 where
153 F: IntoHandlerWrapper<Args, Res> + 'static,
154 HandlerWrapper<F, Args, Res>: Handler + 'static,
155 Res: ToBytes + 'static,
156 {
157 let business_id = business_id.to_business_id();
158 let wrapper = func.into_handler_wrapper();
159 self.handlers
160 .as_mut()
161 .unwrap()
162 .insert(business_id, Arc::new(wrapper));
163 self
164 }
165
166 pub fn get_rpc_client(&mut self) -> RpcClient {
167 if self.is_taken_client {
168 panic!("RpcClient has been taken");
169 }
170 self.is_taken_client = true;
171 RpcClient::new(self.tx3.take().unwrap())
172 }
173
174 pub fn get_rpc_server(&mut self) -> RpcServer {
175 if self.is_taken_server {
176 panic!("RpcServer has been taken");
177 }
178 self.is_taken_server = true;
179 RpcServer::new(
180 self.handlers.take().unwrap().into(),
181 self.rx.take().unwrap(),
182 self.tx2.take().unwrap(),
183 )
184 }
185
186 pub fn run(&mut self) {
187 self.multiplexer.run();
188 }
189}