use std::collections::HashMap;
use parser::Contract;
use serde::{Deserialize, Serialize};
use tonic::transport::Channel;
#[path = "engine.rs"]
pub(crate) mod engine;
use engine::{
core_client::CoreClient, handshake_client::HandshakeClient, ContractRequest, Message,
MessageBuf, MessageStatus, Ping, QueueBuf,
};
pub mod parser;
pub struct XMessage<T> {
#[allow(dead_code)]
id: i64,
inner: T,
}
impl<T> XMessage<T> {
pub fn new(id: i64, msg: T) -> Self {
Self { id, inner: msg }
}
pub fn into_inner(self) -> T {
self.inner
}
}
#[derive(Debug)]
pub enum Error {
HandshakeError,
PingError,
InternalError(String),
ProduceSerializeError,
ProduceError,
ContractSendError,
}
#[derive(PartialEq)]
pub enum ConnectionState {
Idle,
Ready,
Lost,
}
pub struct Connection {
#[allow(dead_code)]
address: String,
token: String,
hs_client: HandshakeClient<Channel>,
core_client: CoreClient<Channel>,
state: ConnectionState,
pub on_connect: fn() -> (),
pub on_reconnect: fn() -> (),
pub on_error: fn(String) -> (),
pub on_disconnect: fn(String) -> (),
}
fn sys_out(msg: String) {
println!("{}", msg)
}
impl Connection {
pub async fn create(addr: String) -> Result<Self, Error> {
let client = HandshakeClient::connect(addr.clone()).await.map_err(|e| {
println!("Error while handshake: {}", e);
return Error::HandshakeError;
})?;
let core_client = CoreClient::connect(addr.clone()).await.map_err(|e| {
println!("Error while core handshake: {}", e);
return Error::HandshakeError;
})?;
let conn = Self {
address: addr,
token: "".into(),
hs_client: client,
core_client,
state: ConnectionState::Idle,
on_connect: || {},
on_reconnect: || {},
on_error: sys_out,
on_disconnect: sys_out,
};
Ok(conn)
}
pub async fn connect(&mut self) -> Result<(), Error> {
self.heartbeat().await?;
(self.on_connect)();
Ok(())
}
async fn heartbeat(&mut self) -> Result<(), Error> {
let reply = self
.hs_client
.ping(Ping {
version: "1".into(),
})
.await
.map_err(|e| {
self.state = ConnectionState::Lost;
println!("Error while pinging for channel: {}", e);
return Error::PingError;
})?;
let resp = reply.into_inner();
if !resp.ok {
self.state = ConnectionState::Idle;
return Err(Error::InternalError(resp.message));
}
self.state = ConnectionState::Ready;
self.token = resp.token;
Ok(())
}
pub fn get_state(&self) -> &ConnectionState {
&self.state
}
pub async fn load_contract(&mut self, contract: Contract) -> Result<(), Error> {
let mut mbufs: HashMap<String, MessageBuf> = HashMap::new();
let mut qbufs: HashMap<String, QueueBuf> = HashMap::new();
for k in contract.queue.keys() {
let v = contract.queue.get(k).unwrap();
let msg = contract.message.get(&v.message).unwrap();
mbufs.insert(v.message.clone(), MessageBuf { inner: msg.clone() });
qbufs.insert(
k.clone(),
QueueBuf {
ns: v.ns.clone(),
fork: v.fork.clone(),
push_back: v.push_back,
message: v.message.clone(),
},
);
}
let contract_req = ContractRequest {
message: mbufs,
queues: qbufs,
};
let reply = self
.core_client
.load_contract(contract_req)
.await
.map_err(|e| {
eprintln!("Error while loading contract: {}", e);
Error::ContractSendError
})?;
let reply = reply.into_inner();
if !reply.ok {
return Err(Error::InternalError(reply.message.unwrap()));
}
Ok(())
}
}
pub struct Queue<'a, M>
where
M: Serialize + Deserialize<'a>,
{
name: String,
conn: &'a mut Connection,
consume_fn: Option<fn(XMessage<M>) -> MessageStatus>,
}
impl<'a, M> Queue<'a, M>
where
M: Serialize + Deserialize<'a>,
{
pub fn new(conn: &'a mut Connection, name: String) -> Self {
Self {
name,
conn,
consume_fn: None,
}
}
pub fn consume(&mut self, handler: fn(XMessage<M>) -> MessageStatus) {
self.consume_fn = Some(handler);
}
pub async fn produce(&mut self, message: &M) -> Result<bool, Error> {
let j: String = serde_json::to_string(message).map_err(|e| {
eprintln!("Error while converting into json string: {}", e);
return Error::ProduceSerializeError;
})?;
let payload = j.into_bytes();
let msg = Message {
id: 1,
payload: payload.clone(), length: payload.len() as i64,
topic: self.name.clone(),
};
let reply = self.conn.core_client.send(msg).await.map_err(|e| {
eprintln!("Error: e : {}", e);
return Error::ProduceError;
})?;
let reply = reply.into_inner();
if !reply.ok {
return Err(Error::InternalError(reply.message.unwrap()));
}
Ok(true)
}
}