use bolt_client_macros::*;
use bolt_proto::message::*;
use bolt_proto::Message;
use futures_util::io::{AsyncRead, AsyncWrite};
use crate::error::*;
use crate::{Client, Metadata, Params};
impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
#[bolt_version(1, 2)]
pub async fn init(
&mut self,
client_name: impl Into<String>,
auth_token: Metadata,
) -> Result<Message> {
let init_msg = Init::new(client_name.into(), auth_token.value);
self.send_message(Message::Init(init_msg)).await?;
self.read_message().await
}
#[bolt_version(1, 2)]
pub async fn run(
&mut self,
statement: impl Into<String>,
parameters: Option<Params>,
) -> Result<Message> {
let run_msg = Run::new(statement.into(), parameters.unwrap_or_default().value);
self.send_message(Message::Run(run_msg)).await?;
self.read_message().await
}
#[bolt_version(1, 2, 3)]
pub async fn discard_all(&mut self) -> Result<Message> {
self.send_message(Message::DiscardAll).await?;
self.read_message().await
}
#[bolt_version(1, 2, 3)]
pub async fn pull_all(&mut self) -> Result<(Message, Vec<Record>)> {
self.send_message(Message::PullAll).await?;
let mut records = vec![];
loop {
match self.read_message().await? {
Message::Record(record) => records.push(record),
other => return Ok((other, records)),
}
}
}
#[bolt_version(1, 2)]
pub async fn ack_failure(&mut self) -> Result<Message> {
self.send_message(Message::AckFailure).await?;
self.read_message().await
}
#[bolt_version(1, 2, 3, 4, 4.1)]
pub async fn reset(&mut self) -> Result<Message> {
self.send_message(Message::Reset).await?;
self.read_message().await
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::{convert::TryFrom, env, iter::FromIterator};
use bolt_proto::{message::*, value::*, version::*};
use tokio::io::BufStream;
use tokio_util::compat::*;
use crate::{skip_if_handshake_failed, stream, Metadata};
use super::*;
type Stream = Compat<BufStream<stream::Stream>>;
pub(crate) async fn new_client(version: u32) -> Result<Client<Stream>> {
Client::new(
BufStream::new(
stream::Stream::connect(
env::var("BOLT_TEST_ADDR").unwrap(),
env::var("BOLT_TEST_DOMAIN").ok(),
)
.await?,
)
.compat(),
&[version, 0, 0, 0],
)
.await
}
pub(crate) async fn initialize_client(
client: &mut Client<Stream>,
succeed: bool,
) -> Result<Message> {
let username = env::var("BOLT_TEST_USERNAME").unwrap();
let password = if succeed {
env::var("BOLT_TEST_PASSWORD").unwrap()
} else {
String::from("invalid")
};
let version = client.version();
if [V1_0, V2_0].contains(&version) {
client
.init(
"bolt-client/X.Y.Z",
Metadata::from_iter(vec![
("scheme", "basic"),
("principal", &username),
("credentials", &password),
]),
)
.await
} else {
client
.hello(Some(Metadata::from_iter(vec![
("user_agent", "bolt-client/X.Y.Z"),
("scheme", "basic"),
("principal", &username),
("credentials", &password),
])))
.await
}
}
pub(crate) async fn get_initialized_client(version: u32) -> Result<Client<Stream>> {
let mut client = new_client(version).await?;
initialize_client(&mut client, true).await?;
Ok(client)
}
pub(crate) async fn run_invalid_query(client: &mut Client<Stream>) -> Result<Message> {
if client.version() > V2_0 {
client
.run_with_metadata(
"RETURN invalid query oof as n;",
Some(Params::from_iter(vec![("some_val", 25.5432)])),
Some(Metadata::from_iter(vec![("some_key", true)])),
)
.await
} else {
client.run("", None).await
}
}
pub(crate) async fn run_valid_query(client: &mut Client<Stream>) -> Result<Message> {
if client.version() > V2_0 {
client
.run_with_metadata(
"RETURN $some_val as n;",
Some(Params::from_iter(vec![("some_val", 25.5432)])),
Some(Metadata::from_iter(vec![("some_key", true)])),
)
.await
} else {
client.run("RETURN 1 as n;", None).await
}
}
#[tokio::test]
async fn init() {
let client = new_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = initialize_client(&mut client, true).await.unwrap();
assert!(Success::try_from(response).is_ok());
}
#[tokio::test]
async fn init_fail() {
let client = new_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = initialize_client(&mut client, false).await.unwrap();
assert!(Failure::try_from(response).is_ok());
let response = initialize_client(&mut client, true).await;
assert!(match response {
Err(Error::ProtocolError(bolt_proto::error::Error::IOError(_))) => true,
_ => false,
})
}
#[tokio::test]
async fn ack_failure() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
let response = client.ack_failure().await.unwrap();
assert!(Success::try_from(response).is_ok());
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
}
#[tokio::test]
async fn ack_failure_after_ignored() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
let response = run_valid_query(&mut client).await.unwrap();
assert!(match response {
Message::Ignored => true,
_ => false,
});
let response = client.ack_failure().await.unwrap();
assert!(Success::try_from(response).is_ok());
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
}
#[tokio::test]
async fn run() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
}
#[tokio::test]
async fn run_pipelined() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let messages = vec![
Message::Run(Run::new("MATCH (n {test: 'v1-pipelined'}) DETACH DELETE n;".to_string(), Default::default())),
Message::PullAll,
Message::Run(Run::new("CREATE (:Database {name: 'neo4j', born: 2007, test: 'v1-pipelined'});".to_string(), Default::default())),
Message::PullAll,
Message::Run(Run::new(
"MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}) CREATE (:Library {name: 'bolt-client', born: 2019, test: 'v1-pipelined'})-[:CLIENT_FOR]->(neo4j);".to_string(),
Default::default())),
Message::PullAll,
Message::Run(Run::new(
"MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}), (bolt_client:Library {name: 'bolt-client', test: 'v1-pipelined'}) RETURN bolt_client.born - neo4j.born;".to_string(),
Default::default())),
Message::PullAll,
];
for response in client.pipeline(messages).await.unwrap() {
assert!(match response {
Message::Success(_) => true,
Message::Record(record) => {
assert_eq!(record.fields()[0], Value::from(12_i8));
true
}
_ => false,
});
}
}
#[tokio::test]
async fn run_and_pull() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = client.run("RETURN 3458376 as n;", None).await.unwrap();
assert!(Success::try_from(response).is_ok());
let (response, records) = client.pull_all().await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(records.len(), 1);
assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
}
#[tokio::test]
async fn node_and_rel_creation() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
client
.run("MATCH (n {test: 'v1-node-rel'}) DETACH DELETE n;", None)
.await
.unwrap();
client.pull_all().await.unwrap();
client.run("CREATE (:Client {name: 'bolt-client', test: 'v1-node-rel'})-[:WRITTEN_IN]->(:Language {name: 'Rust', test: 'v1-node-rel'});", None).await.unwrap();
client.pull_all().await.unwrap();
client
.run(
"MATCH (c {test: 'v1-node-rel'})-[r:WRITTEN_IN]->(l) RETURN c, r, l;",
None,
)
.await
.unwrap();
let (_response, records) = client.pull_all().await.unwrap();
let c = Node::try_from(records[0].fields()[0].clone()).unwrap();
let r = Relationship::try_from(records[0].fields()[1].clone()).unwrap();
let l = Node::try_from(records[0].fields()[2].clone()).unwrap();
assert_eq!(c.labels(), &[String::from("Client")]);
assert_eq!(
c.properties().get("name"),
Some(&Value::from("bolt-client"))
);
assert_eq!(l.labels(), &[String::from("Language")]);
assert_eq!(l.properties().get("name"), Some(&Value::from("Rust")));
assert_eq!(r.rel_type(), "WRITTEN_IN");
assert!(r.properties().is_empty());
assert_eq!(
(r.start_node_identity(), r.end_node_identity()),
(c.node_identity(), l.node_identity())
);
}
#[tokio::test]
async fn discard_all_fail() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = client.discard_all().await.unwrap();
assert!(Failure::try_from(response).is_ok());
}
#[tokio::test]
async fn discard_all() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
let response = client.discard_all().await.unwrap();
assert!(Success::try_from(response).is_ok());
}
#[tokio::test]
async fn discard_all_and_pull() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
let response = client.discard_all().await.unwrap();
assert!(Success::try_from(response).is_ok());
let (response, records) = client.pull_all().await.unwrap();
assert!(Failure::try_from(response).is_ok());
assert!(records.is_empty());
}
#[tokio::test]
async fn reset() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
let response = run_valid_query(&mut client).await.unwrap();
assert!(match response {
Message::Ignored => true,
_ => false,
});
let response = client.reset().await.unwrap();
assert!(Success::try_from(response).is_ok());
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
}
#[tokio::test]
async fn ignored() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
let response = run_valid_query(&mut client).await.unwrap();
assert!(match response {
Message::Ignored => true,
_ => false,
});
}
#[tokio::test]
async fn v3_method_with_v1_client_fails() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert!(match client.commit().await {
Err(Error::UnsupportedOperation(V1_0)) => true,
_ => false,
});
}
#[tokio::test]
async fn v3_message_with_v1_client_fails() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
client.send_message(Message::Commit).await.unwrap();
assert!(match client.read_message().await {
Err(Error::ProtocolError(_)) | Ok(Message::Failure(_)) => true,
_ => false,
});
}
}