rusty_tracks/
lib.rs

1mod test;
2
3use flatbuffers::FlatBufferBuilder;
4use track_rails::message_generated::protocol::{Message, MessageArgs, OkStatus, OkStatusArgs, Payload, RegisterRequest, RegisterRequestArgs, Status, Text, TextArgs, Time, TimeArgs, Train, TrainArgs, Value, ValueWrapper, ValueWrapperArgs};
5use std::io::{Read, Write};
6use std::net::TcpStream;
7use std::time::{SystemTime, UNIX_EPOCH};
8use tracing::{error, info};
9
10pub struct Connection {
11    host: String,
12    port: u16,
13    stream: TcpStream,
14}
15
16
17impl Connection {
18
19    fn new(host: &str, port: u16, stream: TcpStream) -> Result<Self, String> {
20        let host = String::from(host);
21
22        let mut connection = Connection{
23            host,
24            port,
25            stream,
26        };
27        connection.connect()?;
28        Ok(connection)
29    }
30
31    pub(crate) fn send(&mut self, msg: &str) -> Result<(), String> {
32        let msg = self.msg(msg);
33        self.write_all(&msg)
34    }
35
36    pub(crate) fn receive(&mut self) -> Result<String, String> {
37        self.read()
38    }
39
40    fn write_all<'a>(&'a mut self, msg: &'a [u8]) -> Result<(), String> {
41        let length: [u8; 4] = (msg.len() as u32).to_be_bytes();
42        // we write length first
43        self.stream.write_all(&length).map_err(|e| e.to_string())?;
44        println!("sending {} bytes", msg.len());
45        // then msg
46        self.stream.write_all(msg).map_err(|e| e.to_string())?;
47        Ok(())
48    }
49
50    fn connect(&mut self) -> Result<(), String> {
51        let mut builder = FlatBufferBuilder::new();
52        let register = RegisterRequest::create(&mut builder, &RegisterRequestArgs { id: None, catalog: None }).as_union_value();
53
54        let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
55        
56        let msg = Message::create(&mut builder, &MessageArgs{ data_type: Payload::RegisterRequest, data: Some(register), status_type: Status::OkStatus, status: Some(status) });
57
58        builder.finish(msg, None);
59        let msg = builder.finished_data().to_vec();
60
61
62        let code = self.write_all(&msg).map_err(|e| e.to_string());
63        match code {
64            Ok(_) => info!("Connected successfully"),
65            _ => error!("Error writing to stream"),
66        }
67
68
69        let msg = self.read()?;
70        println!("{:?}", msg);
71        Ok(())
72    }
73
74    fn read(&mut self) -> Result<String, String> {
75        let mut buf = [0u8; 4];
76        self.stream.read_exact(&mut buf).map_err(|e| e.to_string())?;
77
78
79        let length = u32::from_be_bytes(buf) as usize;
80        let mut buffer = vec![0u8; length];
81        self.stream.read_exact(&mut buffer).map_err(|err| err.to_string())?;
82        let msg = flatbuffers::root::<Message>(&buffer).map_err(|e| e.to_string())?;
83        Ok(format!("{:?}", msg))
84    }
85
86    fn msg(&mut self, msg: &str) -> Vec<u8> {
87        let mut builder = FlatBufferBuilder::new();
88
89        let millis = SystemTime::now()
90            .duration_since(UNIX_EPOCH)
91            .expect("Time went backwards")
92            .as_millis();
93        let time = Time::create(&mut builder, &TimeArgs{data: millis as u64 as i64 });
94
95        let topic = builder.create_string("");
96
97        let value = builder.create_string(msg );
98        let value = Text::create(&mut builder, &TextArgs{ data: Some(value) }).as_union_value();
99
100
101        let value = ValueWrapper::create(&mut builder, &ValueWrapperArgs{ data_type: Value::Text, data: Some(value)});
102
103        let values = builder.create_vector(&[value]);
104
105        let train = Train::create(&mut builder, &TrainArgs {
106            values: Some(values),
107            topic: Some(topic),
108            event_time: Some(time),
109        }).as_union_value();
110        
111        let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
112        
113        let msg = Message::create(&mut builder, &MessageArgs{ data_type: Payload::Train, data: Some(train), status_type: Status::OkStatus, status: Some(status) }).as_union_value();
114
115        builder.finish(msg, None);
116        builder.finished_data().to_vec()
117    }
118
119
120}
121
122
123pub struct Client{
124    host: String,
125    port: u16,
126}
127
128
129impl Client {
130    pub fn new(host: &str, port: u16) -> Self {
131        Client{ host: host.to_string(), port }
132    }
133
134    pub fn connect(&self) -> Result<Connection, String> {
135        let stream = TcpStream::connect((self.host.clone(), self.port)).unwrap();
136        Connection::new(&self.host, self.port, stream)
137    }
138}
139