1use std::collections::HashMap;
2use std::fmt::{Debug, Formatter};
3use std::io::{Read, Write};
4use std::net::{Ipv4Addr, Shutdown, TcpStream};
5use std::str::{from_utf8, FromStr};
6use std::sync::{Arc, mpsc, Mutex, RwLock};
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::mpsc::{Receiver, Sender};
9use std::thread;
10
11use bytebuffer::ByteBuffer;
12use byteorder::{BigEndian, ReadBytesExt};
13use sha2::{Digest, Sha256};
14
15use crate::encode::{Value, VoltError};
16use crate::procedure_invocation::new_procedure_invocation;
17use crate::response::VoltResponseInfo;
18use crate::table::{new_volt_table, VoltTable};
19use crate::volt_param;
20
21const PING_HANDLE: i64 = 1 << 63 - 1;
22
23
24#[derive(Clone, Eq, PartialEq, Debug)]
25pub struct Opts(pub(crate) Box<InnerOpts>);
26
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub struct IpPort {
29 ip_host: String,
30 port: u16,
31}
32
33impl IpPort {
34 pub fn new(ip_host: String,
35 port: u16) -> Self {
36 return IpPort {
37 ip_host,
38 port,
39 };
40 }
41}
42
43impl Opts {
44 pub fn new(hosts: Vec<IpPort>) -> Opts {
45 let opt = Opts {
46 0: Box::new(InnerOpts {
47 ip_ports: hosts,
48 user: None,
49 pass: None,
50 })
51 };
52 opt
53 }
54}
55
56#[derive(Debug, Clone, Eq, PartialEq)]
57pub(crate) struct InnerOpts {
58 pub(crate) ip_ports: Vec<IpPort>,
59 pub(crate) user: Option<String>,
60 pub(crate) pass: Option<String>,
61}
62
63
64pub struct NodeOpt {
65 pub ip_port: IpPort,
66 pub user: Option<String>,
67 pub pass: Option<String>,
68
69}
70
71
72#[derive(Debug)]
73pub(crate) struct NetworkRequest {
74 handle: i64,
75 query: bool,
76 sync: bool,
77 num_bytes: i32,
78 channel: Mutex<Sender<VoltTable>>,
79}
80
81pub trait Connection: Sync + Send + 'static {}
82
83#[allow(dead_code)]
84pub struct Node {
85 tcp_stream: Box<Option<TcpStream>>,
86 info: ConnInfo,
87 requests: Arc<RwLock<HashMap<i64, NetworkRequest>>>,
88 stop: Arc<Mutex<bool>>,
89 counter: Mutex<AtomicI64>,
90}
91
92impl Debug for Node {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 return write!(f, "Pending request: {}", 1);
95 }
96}
97
98impl Drop for Node {
99 fn drop(&mut self) {
100 let res = self.shutdown();
101 match res {
102 Ok(_) => {}
103 Err(e) => {
104 eprintln!("{:?}", e);
105 }
106 }
107 }
108}
109
110
111impl Connection for Node {}
112
113
114impl Node {
115 pub fn new(opt: NodeOpt) -> Result<Node, VoltError> {
116 let ip_host = opt.ip_port;
117 let addr = format!("{}:{}", ip_host.ip_host, ip_host.port);
118 let mut buffer = ByteBuffer::new();
119 let result = [1; 1];
120 buffer.write_u32(0);
121 buffer.write_bytes(&result);
122 buffer.write_bytes(&result);
123 buffer.write_string("database");
124 match opt.user {
125 None => {
126 buffer.write_string("");
127 }
128 Some(user) => {
129 buffer.write_string(user.as_str());
130 }
131 }
132 match opt.pass {
133 None => {
134 let password = [];
135 let mut hasher: Sha256 = Sha256::new();
136 Digest::update(&mut hasher, password);
137 buffer.write(&hasher.finalize())?;
138 }
139 Some(password) => {
140 let password = password.as_bytes();
141 let mut hasher: Sha256 = Sha256::new();
142 Digest::update(&mut hasher, password);
143 buffer.write(&hasher.finalize())?;
144 }
145 }
146
147 buffer.set_wpos(0);
148 buffer.write_u32((buffer.len() - 4) as u32);
149 let bs = buffer.to_bytes();
150 let mut stream: TcpStream = TcpStream::connect(addr)?;
151 stream.write(&bs)?;
152 stream.flush()?;
153 let read = stream.read_u32::<BigEndian>()?;
154 let mut all = vec![0; read as usize];
155 stream.read_exact(&mut all)?;
156 let mut res = ByteBuffer::from_bytes(&*all);
157 let _version = res.read_u8()?;
158 let auth = res.read_u8()?;
159 if auth != 0 {
160 return Err(VoltError::AuthFailed);
161 }
162 let host_id = res.read_i32()?;
163 let connection = res.read_i64()?;
164 let _ = res.read_i64()?;
165 let leader = res.read_i32()?;
166 let bs = (leader as u32).to_be_bytes();
167 let leader_addr = Ipv4Addr::from(bs);
168 let length = res.read_i32()?;
170 let mut build = vec![0; length as usize];
171 res.read_exact(&mut build)?;
172 let b = from_utf8(&build)?;
173 let info = ConnInfo {
174 host_id,
175 connection,
176 leader_addr,
177 build: String::from(b),
178 };
179 let data = Arc::new(RwLock::new(HashMap::new()));
180 let mut res = Node {
181 stop: Arc::new(Mutex::new(false)),
182 tcp_stream: Box::new(Option::Some(stream)),
183 info,
184 requests: data,
185 counter: Mutex::new(AtomicI64::new(1)),
186 };
187 res.listen()?;
188 return Ok(res);
189 }
190 pub fn get_sequence(&self) -> i64 {
191 let lock = self.counter.lock();
192 let seq = lock.unwrap();
193 let i = seq.fetch_add(1, Ordering::Relaxed);
194 return i;
195 }
196
197 pub fn list_procedures(&mut self) -> Result<Receiver<VoltTable>, VoltError> {
198 self.call_sp("@SystemCatalog", volt_param!("PROCEDURES"))
199 }
200
201 pub fn call_sp(&mut self, query: &str, param: Vec<&dyn Value>) -> Result<Receiver<VoltTable>, VoltError> {
202 let req = self.get_sequence();
203 let mut proc = new_procedure_invocation(
204 req,
205 false,
206 ¶m,
207 query);
208 let (tx, rx): (Sender<VoltTable>, Receiver<VoltTable>) = mpsc::channel();
209 let shared_sender = Mutex::new(tx);
210 let seq = NetworkRequest {
211 query: true,
212 handle: req,
213 num_bytes: proc.slen,
214 sync: true,
215 channel: shared_sender,
216 };
217 self.requests.write()?.insert(req, seq);
218 let bs = proc.bytes();
219 let tcp_stream = self.tcp_stream.as_mut();
220 match tcp_stream {
221 None => {
222 return Err(VoltError::ConnectionNotAvailable);
223 }
224 Some(stream) => {
225 stream.write_all(&*bs)?;
226 }
227 }
228 return Ok(rx);
229 }
230
231 pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<Receiver<VoltTable>, VoltError> {
232 self.call_sp("@UpdateClasses", volt_param!(bs,""))
233 }
234 pub fn query(&mut self, sql: &str) -> Result<Receiver<VoltTable>, VoltError> {
236 let mut zero_vec: Vec<&dyn Value> = Vec::new();
237 zero_vec.push(&sql);
238 return Ok(self.call_sp("@AdHoc", zero_vec)?);
239 }
240
241 pub fn ping(&mut self) -> Result<(), VoltError> {
242 let zero_vec: Vec<&dyn Value> = Vec::new();
243 let mut proc = new_procedure_invocation(PING_HANDLE, false, &zero_vec, "@Ping");
244 let bs = proc.bytes();
245 let res = self.tcp_stream.as_mut();
246 match res {
247 None => {
248 return Err(VoltError::ConnectionNotAvailable);
249 }
250 Some(stream) => {
251 stream.write_all(&*bs)?;
252 }
253 }
254 Ok({})
255 }
256
257
258 fn job(mut tcp: &TcpStream, requests: &Arc<RwLock<HashMap<i64, NetworkRequest>>>) -> Result<(), VoltError> {
259 let read_res = tcp.read_u32::<BigEndian>();
260 match read_res {
261 Ok(read) => {
262 if read > 0 {
263 let mut all = vec![0; read as usize];
264 tcp.read_exact(&mut all)?;
265 let mut res = ByteBuffer::from_bytes(&*all);
266 let _ = res.read_u8()?;
267 let handle = res.read_i64()?;
268 if handle == PING_HANDLE {
269 return Ok({});
270 }
271 if let Some(t) = requests.write()?.remove(&handle) {
272 let info = VoltResponseInfo::new(&mut res, handle)?;
273 let table = new_volt_table(&mut res, info)?;
274 let sender = t.channel.lock()?;
275 sender.send(table).unwrap();
276 }
277 }
278 }
279 Err(e) => {
280 return Err(VoltError::Io(e));
281 }
282 }
283 Ok({})
284 }
285 pub fn shutdown(&mut self) -> Result<(), VoltError> {
286 let mut stop = self.stop.lock().unwrap();
287 *stop = true;
288 let res = self.tcp_stream.as_mut();
289 match res {
290 None => {}
291 Some(stream) => {
292 stream.shutdown(Shutdown::Both)?;
293 }
294 }
295 self.tcp_stream = Box::new(Option::None);
296 return Ok({});
297 }
298 fn listen(&mut self) -> Result<(), VoltError>
300 {
301 let requests = Arc::clone(&self.requests);
302
303 let res = self.tcp_stream.as_mut();
304 return match res {
305 None => {
306 Ok(())
307 }
308 Some(res) => {
309 let tcp = res.try_clone()?;
310 let stopping = Arc::clone(&self.stop);
311 thread::spawn(move || {
312 loop {
313 if *stopping.lock().unwrap() {
314 break;
315 } else {
316 let res = crate::node::Node::job(&tcp, &requests);
317 match res {
318 Ok(_) => {}
319 Err(err) => {
320 if !*stopping.lock().unwrap() {
321 eprintln!("{} ", err)
322 }
323 }
324 }
325 }
326 }
327 });
328 Ok(())
329 }
330 };
331 }
332}
333
334#[derive(Debug, Clone)]
335pub struct ConnInfo {
336 host_id: i32,
337 connection: i64,
338 leader_addr: Ipv4Addr,
339 build: String,
340}
341
342pub fn block_for_result(res: &Receiver<VoltTable>) -> Result<VoltTable, VoltError> {
344 let mut table = res.recv()?;
345 let err = table.has_error();
346 return match err {
347 None => { Ok(table) }
348 Some(err) => { Err(err) }
349 };
350}
351
352pub fn reset() {}
353
354
355pub fn get_node(addr: &str) -> Result<Node, VoltError> {
357 let url = addr.split(":").collect::<Vec<&str>>();
358 let host = url.get(0).unwrap().to_string();
359 let port = u16::from_str(url.get(1).unwrap()).unwrap();
360 let ip_port = IpPort::new(host, port);
361 let opt = NodeOpt {
362 ip_port,
363 user: None,
364 pass: None,
365 };
366 return Node::new(opt);
367}