voltdb_client_rust/
node.rs

1use std::collections::HashMap;
2use std::fmt::{Debug, Formatter};
3use std::io::{Read, Write};
4use std::net::{Ipv4Addr, Shutdown, TcpStream};
5use std::str::{from_utf8, FromStr};
6use std::sync::{Arc, mpsc, Mutex, RwLock};
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::mpsc::{Receiver, Sender};
9use std::thread;
10
11use bytebuffer::ByteBuffer;
12use byteorder::{BigEndian, ReadBytesExt};
13use sha2::{Digest, Sha256};
14
15use crate::encode::{Value, VoltError};
16use crate::procedure_invocation::new_procedure_invocation;
17use crate::response::VoltResponseInfo;
18use crate::table::{new_volt_table, VoltTable};
19use crate::volt_param;
20
21const PING_HANDLE: i64 = 1 << 63 - 1;
22
23
24#[derive(Clone, Eq, PartialEq, Debug)]
25pub struct Opts(pub(crate) Box<InnerOpts>);
26
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub struct IpPort {
29    ip_host: String,
30    port: u16,
31}
32
33impl IpPort {
34    pub fn new(ip_host: String,
35               port: u16) -> Self {
36        return IpPort {
37            ip_host,
38            port,
39        };
40    }
41}
42
43impl Opts {
44    pub fn new(hosts: Vec<IpPort>) -> Opts {
45        let opt = Opts {
46            0: Box::new(InnerOpts {
47                ip_ports: hosts,
48                user: None,
49                pass: None,
50            })
51        };
52        opt
53    }
54}
55
56#[derive(Debug, Clone, Eq, PartialEq)]
57pub(crate) struct InnerOpts {
58    pub(crate) ip_ports: Vec<IpPort>,
59    pub(crate) user: Option<String>,
60    pub(crate) pass: Option<String>,
61}
62
63
64pub struct NodeOpt {
65    pub ip_port: IpPort,
66    pub user: Option<String>,
67    pub pass: Option<String>,
68
69}
70
71
72#[derive(Debug)]
73pub(crate) struct NetworkRequest {
74    handle: i64,
75    query: bool,
76    sync: bool,
77    num_bytes: i32,
78    channel: Mutex<Sender<VoltTable>>,
79}
80
81pub trait Connection: Sync + Send + 'static {}
82
83#[allow(dead_code)]
84pub struct Node {
85    tcp_stream: Box<Option<TcpStream>>,
86    info: ConnInfo,
87    requests: Arc<RwLock<HashMap<i64, NetworkRequest>>>,
88    stop: Arc<Mutex<bool>>,
89    counter: Mutex<AtomicI64>,
90}
91
92impl Debug for Node {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        return write!(f, "Pending request: {}", 1);
95    }
96}
97
98impl Drop for Node {
99    fn drop(&mut self) {
100        let res = self.shutdown();
101        match res {
102            Ok(_) => {}
103            Err(e) => {
104                eprintln!("{:?}", e);
105            }
106        }
107    }
108}
109
110
111impl Connection for Node {}
112
113
114impl Node {
115    pub fn new(opt: NodeOpt) -> Result<Node, VoltError> {
116        let ip_host = opt.ip_port;
117        let addr = format!("{}:{}", ip_host.ip_host, ip_host.port);
118        let mut buffer = ByteBuffer::new();
119        let result = [1; 1];
120        buffer.write_u32(0);
121        buffer.write_bytes(&result);
122        buffer.write_bytes(&result);
123        buffer.write_string("database");
124        match opt.user {
125            None => {
126                buffer.write_string("");
127            }
128            Some(user) => {
129                buffer.write_string(user.as_str());
130            }
131        }
132        match opt.pass {
133            None => {
134                let password = [];
135                let mut hasher: Sha256 = Sha256::new();
136                Digest::update(&mut hasher, password);
137                buffer.write(&hasher.finalize())?;
138            }
139            Some(password) => {
140                let password = password.as_bytes();
141                let mut hasher: Sha256 = Sha256::new();
142                Digest::update(&mut hasher, password);
143                buffer.write(&hasher.finalize())?;
144            }
145        }
146
147        buffer.set_wpos(0);
148        buffer.write_u32((buffer.len() - 4) as u32);
149        let bs = buffer.to_bytes();
150        let mut stream: TcpStream = TcpStream::connect(addr)?;
151        stream.write(&bs)?;
152        stream.flush()?;
153        let read = stream.read_u32::<BigEndian>()?;
154        let mut all = vec![0; read as usize];
155        stream.read_exact(&mut all)?;
156        let mut res = ByteBuffer::from_bytes(&*all);
157        let _version = res.read_u8()?;
158        let auth = res.read_u8()?;
159        if auth != 0 {
160            return Err(VoltError::AuthFailed);
161        }
162        let host_id = res.read_i32()?;
163        let connection = res.read_i64()?;
164        let _ = res.read_i64()?;
165        let leader = res.read_i32()?;
166        let bs = (leader as u32).to_be_bytes();
167        let leader_addr = Ipv4Addr::from(bs);
168        // TODO check IP
169        let length = res.read_i32()?;
170        let mut build = vec![0; length as usize];
171        res.read_exact(&mut build)?;
172        let b = from_utf8(&build)?;
173        let info = ConnInfo {
174            host_id,
175            connection,
176            leader_addr,
177            build: String::from(b),
178        };
179        let data = Arc::new(RwLock::new(HashMap::new()));
180        let mut res = Node {
181            stop: Arc::new(Mutex::new(false)),
182            tcp_stream: Box::new(Option::Some(stream)),
183            info,
184            requests: data,
185            counter: Mutex::new(AtomicI64::new(1)),
186        };
187        res.listen()?;
188        return Ok(res);
189    }
190    pub fn get_sequence(&self) -> i64 {
191        let lock = self.counter.lock();
192        let seq = lock.unwrap();
193        let i = seq.fetch_add(1, Ordering::Relaxed);
194        return i;
195    }
196
197    pub fn list_procedures(&mut self) -> Result<Receiver<VoltTable>, VoltError> {
198        self.call_sp("@SystemCatalog", volt_param!("PROCEDURES"))
199    }
200
201    pub fn call_sp(&mut self, query: &str, param: Vec<&dyn Value>) -> Result<Receiver<VoltTable>, VoltError> {
202        let req = self.get_sequence();
203        let mut proc = new_procedure_invocation(
204            req,
205            false,
206            &param,
207            query);
208        let (tx, rx): (Sender<VoltTable>, Receiver<VoltTable>) = mpsc::channel();
209        let shared_sender = Mutex::new(tx);
210        let seq = NetworkRequest {
211            query: true,
212            handle: req,
213            num_bytes: proc.slen,
214            sync: true,
215            channel: shared_sender,
216        };
217        self.requests.write()?.insert(req, seq);
218        let bs = proc.bytes();
219        let tcp_stream = self.tcp_stream.as_mut();
220        match tcp_stream {
221            None => {
222                return Err(VoltError::ConnectionNotAvailable);
223            }
224            Some(stream) => {
225                stream.write_all(&*bs)?;
226            }
227        }
228        return Ok(rx);
229    }
230
231    pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<Receiver<VoltTable>, VoltError> {
232        self.call_sp("@UpdateClasses", volt_param!(bs,""))
233    }
234    /// Use `@AdHoc` proc to query .
235    pub fn query(&mut self, sql: &str) -> Result<Receiver<VoltTable>, VoltError> {
236        let mut zero_vec: Vec<&dyn Value> = Vec::new();
237        zero_vec.push(&sql);
238        return Ok(self.call_sp("@AdHoc", zero_vec)?);
239    }
240
241    pub fn ping(&mut self) -> Result<(), VoltError> {
242        let zero_vec: Vec<&dyn Value> = Vec::new();
243        let mut proc = new_procedure_invocation(PING_HANDLE, false, &zero_vec, "@Ping");
244        let bs = proc.bytes();
245        let res = self.tcp_stream.as_mut();
246        match res {
247            None => {
248                return Err(VoltError::ConnectionNotAvailable);
249            }
250            Some(stream) => {
251                stream.write_all(&*bs)?;
252            }
253        }
254        Ok({})
255    }
256
257
258    fn job(mut tcp: &TcpStream, requests: &Arc<RwLock<HashMap<i64, NetworkRequest>>>) -> Result<(), VoltError> {
259        let read_res = tcp.read_u32::<BigEndian>();
260        match read_res {
261            Ok(read) => {
262                if read > 0 {
263                    let mut all = vec![0; read as usize];
264                    tcp.read_exact(&mut all)?;
265                    let mut res = ByteBuffer::from_bytes(&*all);
266                    let _ = res.read_u8()?;
267                    let handle = res.read_i64()?;
268                    if handle == PING_HANDLE {
269                        return Ok({});
270                    }
271                    if let Some(t) = requests.write()?.remove(&handle) {
272                        let info = VoltResponseInfo::new(&mut res, handle)?;
273                        let table = new_volt_table(&mut res, info)?;
274                        let sender = t.channel.lock()?;
275                        sender.send(table).unwrap();
276                    }
277                }
278            }
279            Err(e) => {
280                return Err(VoltError::Io(e));
281            }
282        }
283        Ok({})
284    }
285    pub fn shutdown(&mut self) -> Result<(), VoltError> {
286        let mut stop = self.stop.lock().unwrap();
287        *stop = true;
288        let res = self.tcp_stream.as_mut();
289        match res {
290            None => {}
291            Some(stream) => {
292                stream.shutdown(Shutdown::Both)?;
293            }
294        }
295        self.tcp_stream = Box::new(Option::None);
296        return Ok({});
297    }
298    /// Listen on new message come in .
299    fn listen(&mut self) -> Result<(), VoltError>
300    {
301        let requests = Arc::clone(&self.requests);
302
303        let res = self.tcp_stream.as_mut();
304        return match res {
305            None => {
306                Ok(())
307            }
308            Some(res) => {
309                let tcp = res.try_clone()?;
310                let stopping = Arc::clone(&self.stop);
311                thread::spawn(move || {
312                    loop {
313                        if *stopping.lock().unwrap() {
314                            break;
315                        } else {
316                            let res = crate::node::Node::job(&tcp, &requests);
317                            match res {
318                                Ok(_) => {}
319                                Err(err) => {
320                                    if !*stopping.lock().unwrap() {
321                                        eprintln!("{} ", err)
322                                    }
323                                }
324                            }
325                        }
326                    }
327                });
328                Ok(())
329            }
330        };
331    }
332}
333
334#[derive(Debug, Clone)]
335pub struct ConnInfo {
336    host_id: i32,
337    connection: i64,
338    leader_addr: Ipv4Addr,
339    build: String,
340}
341
342/// Wait for response, convert response error from volt error to `VoltError`.
343pub fn block_for_result(res: &Receiver<VoltTable>) -> Result<VoltTable, VoltError> {
344    let mut table = res.recv()?;
345    let err = table.has_error();
346    return match err {
347        None => { Ok(table) }
348        Some(err) => { Err(err) }
349    };
350}
351
352pub fn reset() {}
353
354
355/// Create new connection to server .
356pub fn get_node(addr: &str) -> Result<Node, VoltError> {
357    let url = addr.split(":").collect::<Vec<&str>>();
358    let host = url.get(0).unwrap().to_string();
359    let port = u16::from_str(url.get(1).unwrap()).unwrap();
360    let ip_port = IpPort::new(host, port);
361    let opt = NodeOpt {
362        ip_port,
363        user: None,
364        pass: None,
365    };
366    return Node::new(opt);
367}