mod test;
use flatbuffers::FlatBufferBuilder;
use track_rails::message_generated::protocol::{Message, MessageArgs, OkStatus, OkStatusArgs, Payload, RegisterRequest, RegisterRequestArgs, Status, Text, TextArgs, Time, TimeArgs, Train, TrainArgs, Value, ValueWrapper, ValueWrapperArgs};
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{error, info};
pub struct Connection {
host: String,
port: u16,
stream: TcpStream,
}
impl Connection {
fn new(host: &str, port: u16, stream: TcpStream) -> Result<Self, String> {
let host = String::from(host);
let mut connection = Connection{
host,
port,
stream,
};
connection.connect()?;
Ok(connection)
}
pub(crate) fn send(&mut self, msg: &str) -> Result<(), String> {
let msg = self.msg(msg);
self.write_all(&msg)
}
pub(crate) fn receive(&mut self) -> Result<String, String> {
self.read()
}
fn write_all<'a>(&'a mut self, msg: &'a [u8]) -> Result<(), String> {
let length: [u8; 4] = (msg.len() as u32).to_be_bytes();
self.stream.write_all(&length).map_err(|e| e.to_string())?;
println!("sending {} bytes", msg.len());
self.stream.write_all(msg).map_err(|e| e.to_string())?;
Ok(())
}
fn connect(&mut self) -> Result<(), String> {
let mut builder = FlatBufferBuilder::new();
let register = RegisterRequest::create(&mut builder, &RegisterRequestArgs { id: None, catalog: None }).as_union_value();
let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
let msg = Message::create(&mut builder, &MessageArgs{ data_type: Payload::RegisterRequest, data: Some(register), status_type: Status::OkStatus, status: Some(status) });
builder.finish(msg, None);
let msg = builder.finished_data().to_vec();
let code = self.write_all(&msg).map_err(|e| e.to_string());
match code {
Ok(_) => info!("Connected successfully"),
_ => error!("Error writing to stream"),
}
let msg = self.read()?;
println!("{:?}", msg);
Ok(())
}
fn read(&mut self) -> Result<String, String> {
let mut buf = [0u8; 4];
self.stream.read_exact(&mut buf).map_err(|e| e.to_string())?;
let length = u32::from_be_bytes(buf) as usize;
let mut buffer = vec![0u8; length];
self.stream.read_exact(&mut buffer).map_err(|err| err.to_string())?;
let msg = flatbuffers::root::<Message>(&buffer).map_err(|e| e.to_string())?;
Ok(format!("{:?}", msg))
}
fn msg(&mut self, msg: &str) -> Vec<u8> {
let mut builder = FlatBufferBuilder::new();
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
let time = Time::create(&mut builder, &TimeArgs{data: millis as u64 as i64 });
let topic = builder.create_string("");
let value = builder.create_string(msg );
let value = Text::create(&mut builder, &TextArgs{ data: Some(value) }).as_union_value();
let value = ValueWrapper::create(&mut builder, &ValueWrapperArgs{ data_type: Value::Text, data: Some(value)});
let values = builder.create_vector(&[value]);
let train = Train::create(&mut builder, &TrainArgs {
values: Some(values),
topic: Some(topic),
event_time: Some(time),
}).as_union_value();
let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
let msg = Message::create(&mut builder, &MessageArgs{ data_type: Payload::Train, data: Some(train), status_type: Status::OkStatus, status: Some(status) }).as_union_value();
builder.finish(msg, None);
builder.finished_data().to_vec()
}
}
pub struct Client{
host: String,
port: u16,
}
impl Client {
pub fn new(host: &str, port: u16) -> Self {
Client{ host: host.to_string(), port }
}
pub fn connect(&self) -> Result<Connection, String> {
let stream = TcpStream::connect((self.host.clone(), self.port)).unwrap();
Connection::new(&self.host, self.port, stream)
}
}