1mod test;
2
3use flatbuffers::FlatBufferBuilder;
4use track_rails::message_generated::protocol::{Message, MessageArgs, OkStatus, OkStatusArgs, Payload, RegisterRequest, RegisterRequestArgs, Status, Text, TextArgs, Time, TimeArgs, Train, TrainArgs, Value, ValueWrapper, ValueWrapperArgs};
5use std::io::{Read, Write};
6use std::net::TcpStream;
7use std::time::{SystemTime, UNIX_EPOCH};
8use tracing::{error, info};
9
10pub struct Connection {
11 host: String,
12 port: u16,
13 stream: TcpStream,
14}
15
16
17impl Connection {
18
19 fn new(host: &str, port: u16, stream: TcpStream) -> Result<Self, String> {
20 let host = String::from(host);
21
22 let mut connection = Connection{
23 host,
24 port,
25 stream,
26 };
27 connection.connect()?;
28 Ok(connection)
29 }
30
31 pub(crate) fn send(&mut self, msg: &str) -> Result<(), String> {
32 let msg = self.msg(msg);
33 self.write_all(&msg)
34 }
35
36 pub(crate) fn receive(&mut self) -> Result<String, String> {
37 self.read()
38 }
39
40 fn write_all<'a>(&'a mut self, msg: &'a [u8]) -> Result<(), String> {
41 let length: [u8; 4] = (msg.len() as u32).to_be_bytes();
42 self.stream.write_all(&length).map_err(|e| e.to_string())?;
44 println!("sending {} bytes", msg.len());
45 self.stream.write_all(msg).map_err(|e| e.to_string())?;
47 Ok(())
48 }
49
50 fn connect(&mut self) -> Result<(), String> {
51 let mut builder = FlatBufferBuilder::new();
52 let register = RegisterRequest::create(&mut builder, &RegisterRequestArgs { id: None, catalog: None }).as_union_value();
53
54 let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
55
56 let msg = Message::create(&mut builder, &MessageArgs{ data_type: Payload::RegisterRequest, data: Some(register), status_type: Status::OkStatus, status: Some(status) });
57
58 builder.finish(msg, None);
59 let msg = builder.finished_data().to_vec();
60
61
62 let code = self.write_all(&msg).map_err(|e| e.to_string());
63 match code {
64 Ok(_) => info!("Connected successfully"),
65 _ => error!("Error writing to stream"),
66 }
67
68
69 let msg = self.read()?;
70 println!("{:?}", msg);
71 Ok(())
72 }
73
74 fn read(&mut self) -> Result<String, String> {
75 let mut buf = [0u8; 4];
76 self.stream.read_exact(&mut buf).map_err(|e| e.to_string())?;
77
78
79 let length = u32::from_be_bytes(buf) as usize;
80 let mut buffer = vec![0u8; length];
81 self.stream.read_exact(&mut buffer).map_err(|err| err.to_string())?;
82 let msg = flatbuffers::root::<Message>(&buffer).map_err(|e| e.to_string())?;
83 Ok(format!("{:?}", msg))
84 }
85
86 fn msg(&mut self, msg: &str) -> Vec<u8> {
87 let mut builder = FlatBufferBuilder::new();
88
89 let millis = SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .expect("Time went backwards")
92 .as_millis();
93 let time = Time::create(&mut builder, &TimeArgs{data: millis as u64 as i64 });
94
95 let topic = builder.create_string("");
96
97 let value = builder.create_string(msg );
98 let value = Text::create(&mut builder, &TextArgs{ data: Some(value) }).as_union_value();
99
100
101 let value = ValueWrapper::create(&mut builder, &ValueWrapperArgs{ data_type: Value::Text, data: Some(value)});
102
103 let values = builder.create_vector(&[value]);
104
105 let train = Train::create(&mut builder, &TrainArgs {
106 values: Some(values),
107 topic: Some(topic),
108 event_time: Some(time),
109 }).as_union_value();
110
111 let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
112
113 let msg = Message::create(&mut builder, &MessageArgs{ data_type: Payload::Train, data: Some(train), status_type: Status::OkStatus, status: Some(status) }).as_union_value();
114
115 builder.finish(msg, None);
116 builder.finished_data().to_vec()
117 }
118
119
120}
121
122
123pub struct Client{
124 host: String,
125 port: u16,
126}
127
128
129impl Client {
130 pub fn new(host: &str, port: u16) -> Self {
131 Client{ host: host.to_string(), port }
132 }
133
134 pub fn connect(&self) -> Result<Connection, String> {
135 let stream = TcpStream::connect((self.host.clone(), self.port)).unwrap();
136 Connection::new(&self.host, self.port, stream)
137 }
138}
139