rusty_tracks/connection/
connection.rs1use crate::connection::admin::Admin;
2use std::io::{Read, Write};
3use std::net::TcpStream;
4use std::time::{SystemTime, UNIX_EPOCH};
5use flatbuffers::FlatBufferBuilder;
6use tracing::{debug, error, info};
7use track_rails::message_generated::protocol;
8use track_rails::message_generated::protocol::{Disconnect, DisconnectArgs, MessageArgs, OkStatus, OkStatusArgs, Payload, RegisterRequest, RegisterRequestArgs, Status, Text, TextArgs, Time, TimeArgs, Train, TrainArgs, Value as ProtoValue, ValueWrapper, ValueWrapperArgs};
9use crate::connection::Permission::AdminPermission;
10use crate::connection::permission::Permission;
11use crate::messages;
12use crate::messages::Message;
13use crate::value::Value;
14
15pub struct Connection {
16 id: Option<usize>,
17 host: String,
18 port: u16,
19 stream: TcpStream,
20 permissions: Vec<Permission>
21}
22
23impl Connection {
24
25 pub(crate) fn new(host: &str, port: u16, stream: TcpStream) -> Result<Self, String> {
26 let host = String::from(host);
27
28 let mut connection = Connection{
29 id: None,
30 host,
31 port,
32 stream,
33 permissions: vec![],
34 };
35 connection.connect()?;
36 Ok(connection)
37 }
38
39
40 pub fn send<V:Into<Value>>(&mut self, msg: V) -> Result<(), String> {
41 let msg = self.wrap_send(msg.into());
42 self.write_all(&msg)
43 }
44
45 pub fn receive(&mut self) -> Result<Message, String> {
46 self.read_msg()
47 }
48
49 pub fn admin(self) -> Result<Admin, String> {
50 if !self.permissions.contains(&AdminPermission) {
51 return Err(String::from("No admin permission"));
52 }
53 Ok(Admin::new(self))
54 }
55
56 pub(crate) fn write_all<'a>(&'a mut self, msg: &'a [u8]) -> Result<(), String> {
57 let length: [u8; 4] = (msg.len() as u32).to_be_bytes();
58 self.stream.write_all(&length).map_err(|e| e.to_string())?;
60 debug!("sending {} bytes", msg.len());
61 self.stream.write_all(msg).map_err(|e| e.to_string())?;
63 Ok(())
64 }
65
66 fn connect(&mut self) -> Result<(), String> {
67 let mut builder = FlatBufferBuilder::new();
68 let register = RegisterRequest::create(&mut builder, &RegisterRequestArgs { id: None, catalog: None }).as_union_value();
69
70 let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
71
72 let msg = protocol::Message::create(&mut builder, &MessageArgs{ data_type: Payload::RegisterRequest, data: Some(register), status_type: Status::OkStatus, status: Some(status) });
73
74 builder.finish(msg, None);
75 let msg = builder.finished_data().to_vec();
76
77
78 let code = self.write_all(&msg).map_err(|e| e.to_string());
79 match code {
80 Ok(_) => info!("Connected successfully"),
81 _ => error!("Error writing to stream"),
82 }
83
84
85 let msg: messages::RegisterResponse = self.read_msg()?;
86 debug!("{:?}", msg);
87 self.permissions = msg.permissions;
88 Ok(())
89 }
90
91 pub fn read_msg<Msg>(&mut self) -> Result<Msg, String> where
92 Msg: for<'a> TryFrom<protocol::Message<'a>, Error = String> {
93 let mut buf = [0u8; 4];
94 self.stream.read_exact(&mut buf).map_err(|e| e.to_string())?;
95
96
97 let length = u32::from_be_bytes(buf) as usize;
98 let mut buffer = vec![0u8; length];
99 self.stream.read_exact(&mut buffer).map_err(|err| err.to_string())?;
100 let msg = flatbuffers::root::<protocol::Message>(&buffer).map_err(|e| e.to_string())?;
101 Msg::try_from(msg)
102 }
103
104 pub(crate) fn wrap_send(&mut self, msg: Value) -> Vec<u8> {
105 let mut builder = FlatBufferBuilder::new();
106
107 let millis = SystemTime::now()
108 .duration_since(UNIX_EPOCH)
109 .expect("Time went backwards")
110 .as_millis();
111 let time = Time::create(&mut builder, &TimeArgs{data: millis as u64 as i64 });
112
113 let topic = builder.create_string("");
114
115 let value = msg.flatternize(&mut builder);
116
117 let values = builder.create_vector(&[value]);
118
119 let train = Train::create(&mut builder, &TrainArgs {
120 values: Some(values),
121 topic: Some(topic),
122 event_time: Some(time),
123 }).as_union_value();
124
125 let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
126
127 let msg = protocol::Message::create(&mut builder, &MessageArgs{ data_type: Payload::Train, data: Some(train), status_type: Status::OkStatus, status: Some(status) }).as_union_value();
128
129 builder.finish(msg, None);
130 builder.finished_data().to_vec()
131 }
132
133}
134
135impl Drop for Connection {
136 fn drop(&mut self) {
137 let mut builder = FlatBufferBuilder::new();
138
139 let diconnect = Disconnect::create(&mut builder, &DisconnectArgs { id: self.id.unwrap_or_default() as u64 }).as_union_value();
140
141 let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
142
143 let message = protocol::Message::create(&mut builder, &MessageArgs{data_type: Payload::Disconnect, data: Some(diconnect), status_type: Status::OkStatus, status: Some(status) }).as_union_value();
144
145 builder.finish(message, None);
146 let msg = builder.finished_data().to_vec();
147 debug!("disconnecting");
148 self.write_all(&msg).unwrap()
149 }
150}