aeron_rpc/
lib.rs

1pub mod aeron;
2pub mod client;
3mod data;
4pub mod err;
5pub mod multiplexer;
6mod protocol;
7pub mod server;
8
9type RequestId = u64;
10type BusinessId = u64;
11
12pub trait ToBusinessId {
13    fn to_business_id(&self) -> BusinessId;
14}
15
16pub enum Interface {
17    Ping = 1,
18    Echo = 2,
19    Stream = 3,
20}
21
22impl ToBusinessId for Interface {
23    fn to_business_id(&self) -> u64 {
24        match self {
25            Interface::Ping => 1,
26            Interface::Echo => 2,
27            Interface::Stream => 3,
28        }
29    }
30}
31
32pub trait FromBytes {
33    fn from_bytes(data: Vec<u8>) -> Result<Self, FromBytesError>
34    where
35        Self: Sized;
36}
37
38impl FromBytes for String {
39    fn from_bytes(data: Vec<u8>) -> Result<Self, FromBytesError> {
40        String::from_utf8(data).map_err(|e| FromBytesError::ParseError(e.to_string()))
41    }
42}
43
44impl FromBytes for Vec<u8> {
45    fn from_bytes(data: Vec<u8>) -> Result<Self, FromBytesError> {
46        Ok(data)
47    }
48}
49
50pub trait ToBytes {
51    fn to_bytes(self) -> Vec<u8>;
52}
53
54impl<T, E> ToBytes for Result<T, E>
55where
56    T: ToBytes,
57    E: ToBytes,
58{
59    fn to_bytes(self) -> Vec<u8> {
60        match self {
61            Ok(data) => data.to_bytes(),
62            Err(err) => err.to_bytes(),
63        }
64    }
65}
66
67impl ToBytes for () {
68    fn to_bytes(self) -> Vec<u8> {
69        Vec::new()
70    }
71}
72
73impl ToBytes for Vec<u8> {
74    fn to_bytes(self) -> Vec<u8> {
75        self
76    }
77}
78
79impl ToBytes for String {
80    fn to_bytes(self) -> Vec<u8> {
81        self.into_bytes()
82    }
83}
84
85impl ToBytes for &str {
86    fn to_bytes(self) -> Vec<u8> {
87        self.as_bytes().to_vec()
88    }
89}
90
91macro_rules! impl_to_bytes_for_numbers {
92    ($($t:ty),*) => {
93        $(
94            impl ToBytes for $t {
95                fn to_bytes(self) -> Vec<u8> {
96                    self.to_le_bytes().to_vec()
97                }
98            }
99        )*
100    };
101}
102
103impl_to_bytes_for_numbers!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64);
104
105use std::collections::HashMap;
106use std::sync::{Arc, Mutex};
107
108use aeron_rs::publication::Publication;
109use aeron_rs::subscription::Subscription;
110use client::RpcClient;
111use err::FromBytesError;
112use multiplexer::Multiplexer;
113use protocol::{Client2MultiplexerSender, Multiplexer2ServerReceiver, Server2MultiplexerSender};
114use server::{Handler, HandlerWrapper, IntoHandlerWrapper, RpcServer};
115use tokio::sync::mpsc;
116
117pub struct RpcContext {
118    multiplexer: Multiplexer,
119    rx: Option<Multiplexer2ServerReceiver>,
120    tx2: Option<Server2MultiplexerSender>,
121    tx3: Option<Client2MultiplexerSender>,
122    handlers: Option<HashMap<u64, Arc<dyn Handler + Send + Sync>>>,
123    is_taken_client: bool,
124    is_taken_server: bool,
125}
126
127impl RpcContext {
128    pub fn new(
129        publication: Arc<Mutex<Publication>>,
130        subscription: Arc<Mutex<Subscription>>,
131    ) -> Self {
132        let (tx, rx) = mpsc::channel(1 << 10);
133        let (tx2, rx2) = mpsc::channel(1 << 10);
134        let (tx3, rx3) = mpsc::channel(1 << 10);
135
136        Self {
137            multiplexer: Multiplexer::new(publication, subscription, tx, rx2, rx3),
138            rx: Some(rx),
139            tx2: Some(tx2),
140            tx3: Some(tx3),
141            handlers: Some(HashMap::new()),
142            is_taken_client: false,
143            is_taken_server: false,
144        }
145    }
146
147    pub fn add_handler<F, Args, Res>(
148        &mut self,
149        business_id: &impl ToBusinessId,
150        func: F,
151    ) -> &mut Self
152    where
153        F: IntoHandlerWrapper<Args, Res> + 'static,
154        HandlerWrapper<F, Args, Res>: Handler + 'static,
155        Res: ToBytes + 'static,
156    {
157        let business_id = business_id.to_business_id();
158        let wrapper = func.into_handler_wrapper();
159        self.handlers
160            .as_mut()
161            .unwrap()
162            .insert(business_id, Arc::new(wrapper));
163        self
164    }
165
166    pub fn get_rpc_client(&mut self) -> RpcClient {
167        if self.is_taken_client {
168            panic!("RpcClient has been taken");
169        }
170        self.is_taken_client = true;
171        RpcClient::new(self.tx3.take().unwrap())
172    }
173
174    pub fn get_rpc_server(&mut self) -> RpcServer {
175        if self.is_taken_server {
176            panic!("RpcServer has been taken");
177        }
178        self.is_taken_server = true;
179        RpcServer::new(
180            self.handlers.take().unwrap().into(),
181            self.rx.take().unwrap(),
182            self.tx2.take().unwrap(),
183        )
184    }
185
186    pub fn run(&mut self) {
187        self.multiplexer.run();
188    }
189}