#[cfg(test)]
pub(crate) mod tests {
use std::env;
use bolt_proto::{message::*, value::*, version::*, ServerState::*};
use tokio::io::BufStream;
use tokio_util::compat::*;
use crate::{
error::{CommunicationError, CommunicationResult, ConnectionResult, Result},
skip_if_handshake_failed, stream, Client, Metadata, Params,
};
type Stream = Compat<BufStream<stream::Stream>>;
pub(crate) async fn new_client(version: u32) -> ConnectionResult<Client<Stream>> {
Client::new(
BufStream::new(
stream::Stream::connect(
env::var("BOLT_TEST_ADDR").unwrap(),
env::var("BOLT_TEST_DOMAIN").ok(),
)
.await?,
)
.compat(),
&[version, 0, 0, 0],
)
.await
}
pub(crate) async fn initialize_client(
client: &mut Client<Stream>,
succeed: bool,
) -> CommunicationResult<Message> {
let username = env::var("BOLT_TEST_USERNAME").unwrap();
let password = if succeed {
env::var("BOLT_TEST_PASSWORD").unwrap()
} else {
String::from("invalid")
};
client
.hello(Metadata::from_iter(vec![
("user_agent", "bolt-client/X.Y.Z"),
("scheme", "basic"),
("principal", &username),
("credentials", &password),
]))
.await
}
pub(crate) async fn get_initialized_client(version: u32) -> Result<Client<Stream>> {
let mut client = new_client(version).await?;
initialize_client(&mut client, true).await?;
Ok(client)
}
pub(crate) async fn run_invalid_query(
client: &mut Client<Stream>,
) -> CommunicationResult<Message> {
client
.run(
"RETURN invalid query oof as n;",
Some(Params::from_iter(vec![("some_val", 25.5432)])),
Some(Metadata::from_iter(vec![("some_key", true)])),
)
.await
}
pub(crate) async fn run_valid_query(
client: &mut Client<Stream>,
) -> CommunicationResult<Message> {
client
.run(
"RETURN $some_val as n;",
Some(Params::from_iter(vec![("some_val", 25.5432)])),
Some(Metadata::from_iter(vec![("some_key", true)])),
)
.await
}
#[tokio::test]
async fn init() {
let client = new_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Connected);
let response = initialize_client(&mut client, true).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Ready);
}
#[tokio::test]
async fn init_fail() {
let client = new_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Connected);
let response = initialize_client(&mut client, false).await.unwrap();
assert!(Failure::try_from(response).is_ok());
assert_eq!(client.server_state(), Defunct);
let response = initialize_client(&mut client, true).await;
assert!(matches!(
response,
Err(CommunicationError::InvalidState { state: Defunct, .. })
));
}
#[tokio::test]
async fn ack_failure() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
assert_eq!(client.server_state(), Failed);
let response = client.ack_failure().await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Ready);
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Streaming);
}
#[tokio::test]
async fn ack_failure_after_ignored() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
assert_eq!(client.server_state(), Failed);
let response = run_valid_query(&mut client).await.unwrap();
assert!(matches!(response, Message::Ignored));
assert_eq!(client.server_state(), Failed);
let response = client.ack_failure().await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Ready);
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Streaming);
}
#[tokio::test]
async fn run() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Streaming);
}
#[tokio::test]
async fn run_pipelined() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let messages = vec![
Message::Run(Run::new("MATCH (n {test: 'v1-pipelined'}) DETACH DELETE n;".to_string(), Default::default())),
Message::PullAll,
Message::Run(Run::new("CREATE (:Database {name: 'neo4j', born: 2007, test: 'v1-pipelined'});".to_string(), Default::default())),
Message::PullAll,
Message::Run(Run::new(
"MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}) CREATE (:Library {name: 'bolt-client', born: 2019, test: 'v1-pipelined'})-[:CLIENT_FOR]->(neo4j);".to_string(),
Default::default())),
Message::PullAll,
Message::Run(Run::new(
"MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}), (bolt_client:Library {name: 'bolt-client', test: 'v1-pipelined'}) RETURN bolt_client.born - neo4j.born;".to_string(),
Default::default())),
Message::PullAll,
];
for response in client.pipeline(messages).await.unwrap() {
assert!(match response {
Message::Success(_) => true,
Message::Record(record) => {
assert_eq!(record.fields()[0], Value::from(12_i8));
true
}
_ => false,
});
}
}
#[tokio::test]
async fn run_and_pull() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = client
.run("RETURN 3458376 as n;", None, None)
.await
.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Streaming);
let (records, response) = client.pull(None).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Ready);
assert_eq!(records.len(), 1);
assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
}
#[tokio::test]
async fn node_and_rel_creation() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
client
.run(
"MATCH (n {test: 'v1-node-rel'}) DETACH DELETE n;",
None,
None,
)
.await
.unwrap();
client.pull(None).await.unwrap();
client.run("CREATE (:Client {name: 'bolt-client', test: 'v1-node-rel'})-[:WRITTEN_IN]->(:Language {name: 'Rust', test: 'v1-node-rel'});", None, None).await.unwrap();
client.pull(None).await.unwrap();
client
.run(
"MATCH (c {test: 'v1-node-rel'})-[r:WRITTEN_IN]->(l) RETURN c, r, l;",
None,
None,
)
.await
.unwrap();
let (records, _response) = client.pull(None).await.unwrap();
let c = Node::try_from(records[0].fields()[0].clone()).unwrap();
let r = Relationship::try_from(records[0].fields()[1].clone()).unwrap();
let l = Node::try_from(records[0].fields()[2].clone()).unwrap();
assert_eq!(c.labels(), &[String::from("Client")]);
assert_eq!(
c.properties().get("name"),
Some(&Value::from("bolt-client"))
);
assert_eq!(l.labels(), &[String::from("Language")]);
assert_eq!(l.properties().get("name"), Some(&Value::from("Rust")));
assert_eq!(r.rel_type(), "WRITTEN_IN");
assert!(r.properties().is_empty());
assert_eq!(
(r.start_node_identity(), r.end_node_identity()),
(c.node_identity(), l.node_identity())
);
}
#[tokio::test]
async fn discard_fail() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
assert!(matches!(
client.discard(None).await,
Err(CommunicationError::InvalidState { state: Ready, .. })
));
}
#[tokio::test]
async fn discard() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Streaming);
let response = client.discard(None).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Ready);
}
#[tokio::test]
async fn discard_and_pull() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Streaming);
let response = client.discard(None).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Ready);
assert!(matches!(
client.pull(None).await,
Err(CommunicationError::InvalidState { state: Ready, .. })
));
}
#[tokio::test]
async fn reset() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
assert_eq!(client.server_state(), Failed);
let response = run_valid_query(&mut client).await.unwrap();
assert!(matches!(response, Message::Ignored));
assert_eq!(client.server_state(), Failed);
let response = client.reset().await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Ready);
let response = run_valid_query(&mut client).await.unwrap();
assert!(Success::try_from(response).is_ok());
assert_eq!(client.server_state(), Streaming);
}
#[tokio::test]
async fn reset_internals_pipelined() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let mut messages = client
.pipeline(vec![
Message::Run(Run::new(String::from("RETURN 1;"), Default::default())),
Message::PullAll,
Message::Run(Run::new(String::from("RETURN 1;"), Default::default())),
Message::PullAll,
Message::Reset,
])
.await
.unwrap();
assert_eq!(
messages.pop(),
Some(Message::Success(Success::new(Default::default())))
);
assert!(messages.len() >= 4);
for message in messages {
assert_eq!(message, Message::Ignored);
}
}
#[tokio::test]
async fn reset_internals() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
client.run("RETURN 1;", None, None).await.unwrap();
client.send_message(Message::PullAll).await.unwrap();
client.send_message(Message::Reset).await.unwrap();
assert_eq!(client.server_state(), Interrupted);
assert_eq!(client.read_message().await.unwrap(), Message::Ignored);
match client.read_message().await.unwrap() {
Message::Ignored => {
Success::try_from(client.read_message().await.unwrap()).unwrap();
}
Message::Success(_) => {}
other => panic!("unexpected response {:?}", other),
}
}
#[tokio::test]
async fn ignored() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert_eq!(client.server_state(), Ready);
let response = run_invalid_query(&mut client).await.unwrap();
assert!(Failure::try_from(response).is_ok());
assert_eq!(client.server_state(), Failed);
let response = run_valid_query(&mut client).await.unwrap();
assert!(matches!(response, Message::Ignored));
assert_eq!(client.server_state(), Failed);
}
#[tokio::test]
async fn v3_method_with_v1_client_fails() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
assert!(matches!(
client.commit().await,
Err(CommunicationError::UnsupportedOperation(V1_0))
));
}
#[tokio::test]
async fn v3_message_with_v1_client_fails() {
let client = get_initialized_client(V1_0).await;
skip_if_handshake_failed!(client);
let mut client = client.unwrap();
let begin = Begin::new(Default::default());
client.send_message(Message::Begin(begin)).await.unwrap();
assert!(matches!(
client.read_message().await,
Err(CommunicationError::ProtocolError(
bolt_proto::error::Error::DeserializationError(
bolt_proto::error::DeserializationError::IoError(_)
)
))
));
}
}