quee/
lib.rs

1use std::collections::HashMap;
2
3use parser::Contract;
4use serde::{Deserialize, Serialize};
5
6use tonic::transport::Channel;
7
8#[path = "engine.rs"]
9pub(crate) mod engine;
10
11use engine::{
12    core_client::CoreClient, handshake_client::HandshakeClient, ContractRequest, Message,
13    MessageBuf, MessageStatus, Ping, QueueBuf,
14};
15
16pub mod parser;
17
18pub struct XMessage<T> {
19    #[allow(dead_code)]
20    id: i64,
21    inner: T,
22}
23
24impl<T> XMessage<T> {
25    pub fn new(id: i64, msg: T) -> Self {
26        Self { id, inner: msg }
27    }
28
29    pub fn into_inner(self) -> T {
30        self.inner
31    }
32}
33
34#[derive(Debug)]
35pub enum Error {
36    HandshakeError,
37    PingError,
38    InternalError(String),
39    ProduceSerializeError,
40    ProduceError,
41    ContractSendError,
42}
43
44#[derive(PartialEq)]
45pub enum ConnectionState {
46    Idle,
47    Ready,
48    Lost,
49}
50
51pub struct Connection {
52    #[allow(dead_code)]
53    address: String,
54    token: String,
55    hs_client: HandshakeClient<Channel>,
56    core_client: CoreClient<Channel>,
57    state: ConnectionState,
58
59    pub on_connect: fn() -> (),
60    pub on_reconnect: fn() -> (),
61    pub on_error: fn(String) -> (),
62    pub on_disconnect: fn(String) -> (),
63}
64
65fn sys_out(msg: String) {
66    println!("{}", msg)
67}
68
69impl Connection {
70    pub async fn create(addr: String) -> Result<Self, Error> {
71        let client = HandshakeClient::connect(addr.clone()).await.map_err(|e| {
72            println!("Error while handshake: {}", e);
73            return Error::HandshakeError;
74        })?;
75
76        let core_client = CoreClient::connect(addr.clone()).await.map_err(|e| {
77            println!("Error while core handshake: {}", e);
78            return Error::HandshakeError;
79        })?;
80
81        let conn = Self {
82            address: addr,
83            token: "".into(),
84            hs_client: client,
85            core_client,
86            state: ConnectionState::Idle,
87            on_connect: || {},
88            on_reconnect: || {},
89            on_error: sys_out,
90            on_disconnect: sys_out,
91        };
92
93        Ok(conn)
94    }
95
96    pub async fn connect(&mut self) -> Result<(), Error> {
97        self.heartbeat().await?;
98
99        (self.on_connect)();
100        // TODO: start calling the server to check for its
101        // liveliness
102        Ok(())
103    }
104
105    async fn heartbeat(&mut self) -> Result<(), Error> {
106        let reply = self
107            .hs_client
108            .ping(Ping {
109                version: "1".into(),
110            })
111            .await
112            .map_err(|e| {
113                self.state = ConnectionState::Lost;
114                println!("Error while pinging for channel: {}", e);
115                return Error::PingError;
116            })?;
117
118        let resp = reply.into_inner();
119        if !resp.ok {
120            self.state = ConnectionState::Idle;
121            return Err(Error::InternalError(resp.message));
122        }
123
124        self.state = ConnectionState::Ready;
125        self.token = resp.token;
126
127        Ok(())
128    }
129
130    pub fn get_state(&self) -> &ConnectionState {
131        &self.state
132    }
133
134    // TODO: for now let it be here ..but need to think of some place
135    // better suited for this job
136    // Here we clone a SHIT TON of things, find a better way
137    pub async fn load_contract(&mut self, contract: Contract) -> Result<(), Error> {
138        let mut mbufs: HashMap<String, MessageBuf> = HashMap::new();
139        let mut qbufs: HashMap<String, QueueBuf> = HashMap::new();
140        for k in contract.queue.keys() {
141            let v = contract.queue.get(k).unwrap();
142            let msg = contract.message.get(&v.message).unwrap();
143            mbufs.insert(v.message.clone(), MessageBuf { inner: msg.clone() });
144            qbufs.insert(
145                k.clone(),
146                QueueBuf {
147                    ns: v.ns.clone(),
148                    fork: v.fork.clone(),
149                    push_back: v.push_back,
150                    message: v.message.clone(),
151                },
152            );
153        }
154        let contract_req = ContractRequest {
155            message: mbufs,
156            queues: qbufs,
157        };
158
159        // TODO: take in token here!!
160        let reply = self
161            .core_client
162            .load_contract(contract_req)
163            .await
164            .map_err(|e| {
165                eprintln!("Error while loading contract: {}", e);
166                Error::ContractSendError
167            })?;
168
169        let reply = reply.into_inner();
170        if !reply.ok {
171            return Err(Error::InternalError(reply.message.unwrap()));
172        }
173        Ok(())
174    }
175}
176
177pub struct Queue<'a, M>
178where
179    M: Serialize + Deserialize<'a>,
180{
181    name: String,
182    conn: &'a mut Connection,
183    consume_fn: Option<fn(XMessage<M>) -> MessageStatus>,
184}
185
186impl<'a, M> Queue<'a, M>
187where
188    M: Serialize + Deserialize<'a>,
189{
190    pub fn new(conn: &'a mut Connection, name: String) -> Self {
191        Self {
192            name,
193            conn,
194            consume_fn: None,
195        }
196    }
197
198    pub fn consume(&mut self, handler: fn(XMessage<M>) -> MessageStatus) {
199        self.consume_fn = Some(handler);
200    }
201
202    // TODO: maybe create our own errors instead of dyn Error
203    pub async fn produce(&mut self, message: &M) -> Result<bool, Error> {
204        let j: String = serde_json::to_string(message).map_err(|e| {
205            eprintln!("Error while converting into json string: {}", e);
206            return Error::ProduceSerializeError;
207        })?;
208
209        // TODO: create id here
210
211        // send this message
212        let payload = j.into_bytes();
213        let msg = Message {
214            id: 1,
215            payload: payload.clone(), // See something here about not cloning and using
216            length: payload.len() as i64,
217            topic: self.name.clone(),
218        };
219
220        let reply = self.conn.core_client.send(msg).await.map_err(|e| {
221            eprintln!("Error: e : {}", e);
222            return Error::ProduceError;
223        })?;
224
225        let reply = reply.into_inner();
226        if !reply.ok {
227            return Err(Error::InternalError(reply.message.unwrap()));
228        }
229
230        Ok(true)
231    }
232}