rusty_tracks/connection/
connection.rs

1use crate::connection::admin::Admin;
2use std::io::{Read, Write};
3use std::net::TcpStream;
4use std::time::{SystemTime, UNIX_EPOCH};
5use flatbuffers::FlatBufferBuilder;
6use tracing::{debug, error, info};
7use track_rails::message_generated::protocol;
8use track_rails::message_generated::protocol::{Disconnect, DisconnectArgs, MessageArgs, OkStatus, OkStatusArgs, Payload, RegisterRequest, RegisterRequestArgs, Status, Time, TimeArgs, Train, TrainArgs};
9use crate::connection::Permission::AdminPermission;
10use crate::connection::permission::Permission;
11use crate::messages;
12use crate::messages::Message;
13use crate::value::Value;
14
15pub struct Connection {
16    id: Option<usize>,
17    host: String,
18    port: u16,
19    stream: TcpStream,
20    permissions: Vec<Permission>
21}
22
23impl Connection {
24
25    pub(crate) fn new(host: &str, port: u16, stream: TcpStream) -> Result<Self, String> {
26        let host = String::from(host);
27
28        let mut connection = Connection{
29            id: None,
30            host,
31            port,
32            stream,
33            permissions: vec![],
34        };
35        connection.connect()?;
36        Ok(connection)
37    }
38
39
40    pub fn send<V:Into<Value>>(&mut self, msg: V) -> Result<(), String> {
41        let msg = self.wrap(msg.into());
42        self.write_all(&msg)
43    }
44
45    pub fn receive_msg(&mut self) -> Result<Message, String> {
46        self.receive()
47    }
48
49    pub fn admin(self) -> Result<Admin, String> {
50        if !self.permissions.contains(&AdminPermission) {
51            return Err(String::from("No admin permission"));
52        }
53        Ok(Admin::new(self))
54    }
55
56    pub(crate) fn write_all<'a>(&'a mut self, msg: &'a [u8]) -> Result<(), String> {
57        let length: [u8; 4] = (msg.len() as u32).to_be_bytes();
58        // we write length first
59        self.stream.write_all(&length).map_err(|e| e.to_string())?;
60        debug!("sending {} bytes", msg.len());
61        // then msg
62        self.stream.write_all(msg).map_err(|e| e.to_string())?;
63        Ok(())
64    }
65
66    fn connect(&mut self) -> Result<(), String> {
67        let mut builder = FlatBufferBuilder::new();
68        let register = RegisterRequest::create(&mut builder, &RegisterRequestArgs { id: None, catalog: None }).as_union_value();
69
70        let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
71
72        let msg = protocol::Message::create(&mut builder, &MessageArgs{ data_type: Payload::RegisterRequest, data: Some(register), status_type: Status::OkStatus, status: Some(status) });
73
74        builder.finish(msg, None);
75        let msg = builder.finished_data().to_vec();
76
77
78        let code = self.write_all(&msg).map_err(|e| e.to_string());
79        match code {
80            Ok(_) => info!("Connected successfully"),
81            _ => error!("Error writing to stream"),
82        }
83
84
85        let msg: messages::RegisterResponse = self.receive()?;
86        debug!("{:?}", msg);
87        self.permissions = msg.permissions;
88        Ok(())
89    }
90
91    pub fn receive<Msg>(&mut self) -> Result<Msg, String> where
92        Msg: for<'a> TryFrom<protocol::Message<'a>, Error = String> {
93        let mut buf = [0u8; 4];
94        self.stream.read_exact(&mut buf).map_err(|e| e.to_string())?;
95
96
97        let length = u32::from_be_bytes(buf) as usize;
98        let mut buffer = vec![0u8; length];
99        self.stream.read_exact(&mut buffer).map_err(|err| err.to_string())?;
100        let msg = flatbuffers::root::<protocol::Message>(&buffer).map_err(|e| e.to_string())?;
101        Msg::try_from(msg)
102    }
103
104    pub(crate) fn wrap(&mut self, msg: Value) -> Vec<u8> {
105        let mut builder = FlatBufferBuilder::new();
106
107        let millis = SystemTime::now()
108            .duration_since(UNIX_EPOCH)
109            .expect("Time went backwards")
110            .as_millis();
111        let time = Time::create(&mut builder, &TimeArgs{data: millis as u64 as i64 });
112
113        let topic = builder.create_string("");
114
115        let value = msg.flatternize(&mut builder);
116
117        let values = builder.create_vector(&[value]);
118
119        let train = Train::create(&mut builder, &TrainArgs {
120            values: Some(values),
121            topic: Some(topic),
122            event_time: Some(time),
123        }).as_union_value();
124
125        let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
126
127        let msg = protocol::Message::create(&mut builder, &MessageArgs{ data_type: Payload::Train, data: Some(train), status_type: Status::OkStatus, status: Some(status) }).as_union_value();
128
129        builder.finish(msg, None);
130        builder.finished_data().to_vec()
131    }
132
133}
134
135impl Drop for Connection {
136    fn drop(&mut self) {
137        let mut builder = FlatBufferBuilder::new();
138
139        let diconnect = Disconnect::create(&mut builder, &DisconnectArgs { id: self.id.unwrap_or_default() as u64 }).as_union_value();
140
141        let status = OkStatus::create(&mut builder, &OkStatusArgs{}).as_union_value();
142
143        let message = protocol::Message::create(&mut builder, &MessageArgs{data_type: Payload::Disconnect, data: Some(diconnect), status_type: Status::OkStatus, status: Some(status) }).as_union_value();
144
145        builder.finish(message, None);
146        let msg = builder.finished_data().to_vec();
147        debug!("disconnecting");
148        self.write_all(&msg).unwrap()
149    }
150}