1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use std::convert::TryInto;
use std::net::IpAddr;

use bytes::*;
use failure::Error;
use tokio::io::BufStream;
use tokio::net::TcpStream;
use tokio::prelude::*;

use bolt_proto::Message;

const PREAMBLE: [u8; 4] = [0x60, 0x60, 0xB0, 0x17];
const SUPPORTED_VERSIONS: [u32; 4] = [1, 0, 0, 0];

pub struct Client {
    pub(crate) stream: BufStream<TcpStream>,
    pub(crate) version: u8,
}

impl Client {
    pub async fn new(host: IpAddr, port: usize) -> Result<Self, Error> {
        let mut client = Client {
            stream: BufStream::new(TcpStream::connect(format!("{}:{}", host, port)).await?),
            version: 0,
        };
        client.version = client.handshake().await? as u8;
        Ok(client)
    }

    pub async fn handshake(&mut self) -> Result<u32, Error> {
        let mut allowed_versions = BytesMut::with_capacity(16);
        SUPPORTED_VERSIONS
            .iter()
            .for_each(|&v| allowed_versions.put_u32(v));
        self.stream.write(&PREAMBLE).await?;
        self.stream.write_buf(&mut allowed_versions).await?;
        self.stream.flush().await?;
        Ok(self.stream.read_u32().await?)
    }

    pub async fn read_message(&mut self) -> Result<Message, Error> {
        Message::from_stream(&mut self.stream).await
    }

    pub async fn send_message(&mut self, message: Message) -> Result<(), Error> {
        let chunks: Vec<Bytes> = message.try_into()?;
        for mut chunk in chunks {
            self.stream.write_buf(&mut chunk).await?;
        }
        self.stream.flush().await?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;
    use std::convert::TryFrom;
    use std::iter::FromIterator;

    use bolt_proto::message::{Init, Success};
    use bolt_proto::Value;

    use super::*;

    async fn new_client() -> Result<Client, Error> {
        Client::new("127.0.0.1".parse().unwrap(), 7687).await
    }

    #[tokio::test]
    async fn handshake() {
        let client = new_client().await.unwrap();
        assert_eq!(client.version, 1);
    }

    #[tokio::test]
    async fn init() {
        let mut client = new_client().await.unwrap();
        assert!(client
            .send_message(Message::from(Init::new(
                "bolt-client/0.2.0".to_string(),
                HashMap::from_iter(vec![
                    (String::from("scheme"), Value::from("basic")),
                    (String::from("principal"), Value::from("neo4j")),
                    (String::from("credentials"), Value::from("test")),
                ]),
            )))
            .await
            .is_ok());
        let response = client.read_message().await.unwrap();
        // println!("{:?}", response);
        assert!(Success::try_from(response).is_ok())
    }
}