1use std::collections::HashMap;
2
3use parser::Contract;
4use serde::{Deserialize, Serialize};
5
6use tonic::transport::Channel;
7
8#[path = "engine.rs"]
9pub(crate) mod engine;
10
11use engine::{
12 core_client::CoreClient, handshake_client::HandshakeClient, ContractRequest, Message,
13 MessageBuf, MessageStatus, Ping, QueueBuf,
14};
15
16pub mod parser;
17
18pub struct XMessage<T> {
19 #[allow(dead_code)]
20 id: i64,
21 inner: T,
22}
23
24impl<T> XMessage<T> {
25 pub fn new(id: i64, msg: T) -> Self {
26 Self { id, inner: msg }
27 }
28
29 pub fn into_inner(self) -> T {
30 self.inner
31 }
32}
33
34#[derive(Debug)]
35pub enum Error {
36 HandshakeError,
37 PingError,
38 InternalError(String),
39 ProduceSerializeError,
40 ProduceError,
41 ContractSendError,
42}
43
44#[derive(PartialEq)]
45pub enum ConnectionState {
46 Idle,
47 Ready,
48 Lost,
49}
50
51pub struct Connection {
52 #[allow(dead_code)]
53 address: String,
54 token: String,
55 hs_client: HandshakeClient<Channel>,
56 core_client: CoreClient<Channel>,
57 state: ConnectionState,
58
59 pub on_connect: fn() -> (),
60 pub on_reconnect: fn() -> (),
61 pub on_error: fn(String) -> (),
62 pub on_disconnect: fn(String) -> (),
63}
64
65fn sys_out(msg: String) {
66 println!("{}", msg)
67}
68
69impl Connection {
70 pub async fn create(addr: String) -> Result<Self, Error> {
71 let client = HandshakeClient::connect(addr.clone()).await.map_err(|e| {
72 println!("Error while handshake: {}", e);
73 return Error::HandshakeError;
74 })?;
75
76 let core_client = CoreClient::connect(addr.clone()).await.map_err(|e| {
77 println!("Error while core handshake: {}", e);
78 return Error::HandshakeError;
79 })?;
80
81 let conn = Self {
82 address: addr,
83 token: "".into(),
84 hs_client: client,
85 core_client,
86 state: ConnectionState::Idle,
87 on_connect: || {},
88 on_reconnect: || {},
89 on_error: sys_out,
90 on_disconnect: sys_out,
91 };
92
93 Ok(conn)
94 }
95
96 pub async fn connect(&mut self) -> Result<(), Error> {
97 self.heartbeat().await?;
98
99 (self.on_connect)();
100 Ok(())
103 }
104
105 async fn heartbeat(&mut self) -> Result<(), Error> {
106 let reply = self
107 .hs_client
108 .ping(Ping {
109 version: "1".into(),
110 })
111 .await
112 .map_err(|e| {
113 self.state = ConnectionState::Lost;
114 println!("Error while pinging for channel: {}", e);
115 return Error::PingError;
116 })?;
117
118 let resp = reply.into_inner();
119 if !resp.ok {
120 self.state = ConnectionState::Idle;
121 return Err(Error::InternalError(resp.message));
122 }
123
124 self.state = ConnectionState::Ready;
125 self.token = resp.token;
126
127 Ok(())
128 }
129
130 pub fn get_state(&self) -> &ConnectionState {
131 &self.state
132 }
133
134 pub async fn load_contract(&mut self, contract: Contract) -> Result<(), Error> {
138 let mut mbufs: HashMap<String, MessageBuf> = HashMap::new();
139 let mut qbufs: HashMap<String, QueueBuf> = HashMap::new();
140 for k in contract.queue.keys() {
141 let v = contract.queue.get(k).unwrap();
142 let msg = contract.message.get(&v.message).unwrap();
143 mbufs.insert(v.message.clone(), MessageBuf { inner: msg.clone() });
144 qbufs.insert(
145 k.clone(),
146 QueueBuf {
147 ns: v.ns.clone(),
148 fork: v.fork.clone(),
149 push_back: v.push_back,
150 message: v.message.clone(),
151 },
152 );
153 }
154 let contract_req = ContractRequest {
155 message: mbufs,
156 queues: qbufs,
157 };
158
159 let reply = self
161 .core_client
162 .load_contract(contract_req)
163 .await
164 .map_err(|e| {
165 eprintln!("Error while loading contract: {}", e);
166 Error::ContractSendError
167 })?;
168
169 let reply = reply.into_inner();
170 if !reply.ok {
171 return Err(Error::InternalError(reply.message.unwrap()));
172 }
173 Ok(())
174 }
175}
176
177pub struct Queue<'a, M>
178where
179 M: Serialize + Deserialize<'a>,
180{
181 name: String,
182 conn: &'a mut Connection,
183 consume_fn: Option<fn(XMessage<M>) -> MessageStatus>,
184}
185
186impl<'a, M> Queue<'a, M>
187where
188 M: Serialize + Deserialize<'a>,
189{
190 pub fn new(conn: &'a mut Connection, name: String) -> Self {
191 Self {
192 name,
193 conn,
194 consume_fn: None,
195 }
196 }
197
198 pub fn consume(&mut self, handler: fn(XMessage<M>) -> MessageStatus) {
199 self.consume_fn = Some(handler);
200 }
201
202 pub async fn produce(&mut self, message: &M) -> Result<bool, Error> {
204 let j: String = serde_json::to_string(message).map_err(|e| {
205 eprintln!("Error while converting into json string: {}", e);
206 return Error::ProduceSerializeError;
207 })?;
208
209 let payload = j.into_bytes();
213 let msg = Message {
214 id: 1,
215 payload: payload.clone(), length: payload.len() as i64,
217 topic: self.name.clone(),
218 };
219
220 let reply = self.conn.core_client.send(msg).await.map_err(|e| {
221 eprintln!("Error: e : {}", e);
222 return Error::ProduceError;
223 })?;
224
225 let reply = reply.into_inner();
226 if !reply.ok {
227 return Err(Error::InternalError(reply.message.unwrap()));
228 }
229
230 Ok(true)
231 }
232}