1#[cfg(test)]
2pub(crate) mod tests {
3 use std::env;
4
5 use bolt_proto::{message::*, value::*, version::*, ServerState::*};
6 use tokio::io::BufStream;
7 use tokio_util::compat::*;
8
9 use crate::{
10 error::{CommunicationError, CommunicationResult, ConnectionResult, Result},
11 skip_if_handshake_failed, stream, Client, Metadata, Params,
12 };
13
14 type Stream = Compat<BufStream<stream::Stream>>;
15
16 pub(crate) async fn new_client(version: u32) -> ConnectionResult<Client<Stream>> {
17 Client::new(
18 BufStream::new(
19 stream::Stream::connect(
20 env::var("BOLT_TEST_ADDR").unwrap(),
21 env::var("BOLT_TEST_DOMAIN").ok(),
22 )
23 .await?,
24 )
25 .compat(),
26 &[version, 0, 0, 0],
27 )
28 .await
29 }
30
31 pub(crate) async fn initialize_client(
32 client: &mut Client<Stream>,
33 succeed: bool,
34 ) -> CommunicationResult<Message> {
35 let username = env::var("BOLT_TEST_USERNAME").unwrap();
36 let password = if succeed {
37 env::var("BOLT_TEST_PASSWORD").unwrap()
38 } else {
39 String::from("invalid")
40 };
41
42 client
43 .hello(Metadata::from_iter(vec![
44 ("user_agent", "bolt-client/X.Y.Z"),
45 ("scheme", "basic"),
46 ("principal", &username),
47 ("credentials", &password),
48 ]))
49 .await
50 }
51
52 pub(crate) async fn get_initialized_client(version: u32) -> Result<Client<Stream>> {
53 let mut client = new_client(version).await?;
54 initialize_client(&mut client, true).await?;
55 Ok(client)
56 }
57
58 pub(crate) async fn run_invalid_query(
59 client: &mut Client<Stream>,
60 ) -> CommunicationResult<Message> {
61 client
62 .run(
63 "RETURN invalid query oof as n;",
64 Some(Params::from_iter(vec![("some_val", 25.5432)])),
65 Some(Metadata::from_iter(vec![("some_key", true)])),
66 )
67 .await
68 }
69
70 pub(crate) async fn run_valid_query(
71 client: &mut Client<Stream>,
72 ) -> CommunicationResult<Message> {
73 client
74 .run(
75 "RETURN $some_val as n;",
76 Some(Params::from_iter(vec![("some_val", 25.5432)])),
77 Some(Metadata::from_iter(vec![("some_key", true)])),
78 )
79 .await
80 }
81
82 #[tokio::test]
83 async fn init() {
84 let client = new_client(V1_0).await;
85 skip_if_handshake_failed!(client);
86 let mut client = client.unwrap();
87 assert_eq!(client.server_state(), Connected);
88 let response = initialize_client(&mut client, true).await.unwrap();
89 assert!(Success::try_from(response).is_ok());
90 assert_eq!(client.server_state(), Ready);
91 }
92
93 #[tokio::test]
94 async fn init_fail() {
95 let client = new_client(V1_0).await;
96 skip_if_handshake_failed!(client);
97 let mut client = client.unwrap();
98 assert_eq!(client.server_state(), Connected);
99 let response = initialize_client(&mut client, false).await.unwrap();
100 assert!(Failure::try_from(response).is_ok());
101 assert_eq!(client.server_state(), Defunct);
102
103 let response = initialize_client(&mut client, true).await;
105 assert!(matches!(
106 response,
107 Err(CommunicationError::InvalidState { state: Defunct, .. })
108 ));
109 }
110
111 #[tokio::test]
112 async fn ack_failure() {
113 let client = get_initialized_client(V1_0).await;
114 skip_if_handshake_failed!(client);
115 let mut client = client.unwrap();
116 assert_eq!(client.server_state(), Ready);
117 let response = run_invalid_query(&mut client).await.unwrap();
118 assert!(Failure::try_from(response).is_ok());
119 assert_eq!(client.server_state(), Failed);
120 let response = client.ack_failure().await.unwrap();
121 assert!(Success::try_from(response).is_ok());
122 assert_eq!(client.server_state(), Ready);
123 let response = run_valid_query(&mut client).await.unwrap();
124 assert!(Success::try_from(response).is_ok());
125 assert_eq!(client.server_state(), Streaming);
126 }
127
128 #[tokio::test]
129 async fn ack_failure_after_ignored() {
130 let client = get_initialized_client(V1_0).await;
131 skip_if_handshake_failed!(client);
132 let mut client = client.unwrap();
133 assert_eq!(client.server_state(), Ready);
134 let response = run_invalid_query(&mut client).await.unwrap();
135 assert!(Failure::try_from(response).is_ok());
136 assert_eq!(client.server_state(), Failed);
137 let response = run_valid_query(&mut client).await.unwrap();
138 assert!(matches!(response, Message::Ignored));
139 assert_eq!(client.server_state(), Failed);
140 let response = client.ack_failure().await.unwrap();
141 assert!(Success::try_from(response).is_ok());
142 assert_eq!(client.server_state(), Ready);
143 let response = run_valid_query(&mut client).await.unwrap();
144 assert!(Success::try_from(response).is_ok());
145 assert_eq!(client.server_state(), Streaming);
146 }
147
148 #[tokio::test]
149 async fn run() {
150 let client = get_initialized_client(V1_0).await;
151 skip_if_handshake_failed!(client);
152 let mut client = client.unwrap();
153 assert_eq!(client.server_state(), Ready);
154 let response = run_valid_query(&mut client).await.unwrap();
155 assert!(Success::try_from(response).is_ok());
156 assert_eq!(client.server_state(), Streaming);
157 }
158
159 #[tokio::test]
160 async fn run_pipelined() {
161 let client = get_initialized_client(V1_0).await;
162 skip_if_handshake_failed!(client);
163 let mut client = client.unwrap();
164 let messages = vec![
165 Message::Run(Run::new("MATCH (n {test: 'v1-pipelined'}) DETACH DELETE n;".to_string(), Default::default())),
166 Message::PullAll,
167 Message::Run(Run::new("CREATE (:Database {name: 'neo4j', born: 2007, test: 'v1-pipelined'});".to_string(), Default::default())),
168 Message::PullAll,
169 Message::Run(Run::new(
170 "MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}) CREATE (:Library {name: 'bolt-client', born: 2019, test: 'v1-pipelined'})-[:CLIENT_FOR]->(neo4j);".to_string(),
171 Default::default())),
172 Message::PullAll,
173 Message::Run(Run::new(
174 "MATCH (neo4j:Database {name: 'neo4j', test: 'v1-pipelined'}), (bolt_client:Library {name: 'bolt-client', test: 'v1-pipelined'}) RETURN bolt_client.born - neo4j.born;".to_string(),
175 Default::default())),
176 Message::PullAll,
177 ];
178 for response in client.pipeline(messages).await.unwrap() {
179 assert!(match response {
180 Message::Success(_) => true,
181 Message::Record(record) => {
182 assert_eq!(record.fields()[0], Value::from(12_i8));
183 true
184 }
185 _ => false,
186 });
187 }
188 }
189
190 #[tokio::test]
191 async fn run_and_pull() {
192 let client = get_initialized_client(V1_0).await;
193 skip_if_handshake_failed!(client);
194 let mut client = client.unwrap();
195 assert_eq!(client.server_state(), Ready);
196 let response = client
197 .run("RETURN 3458376 as n;", None, None)
198 .await
199 .unwrap();
200 assert!(Success::try_from(response).is_ok());
201 assert_eq!(client.server_state(), Streaming);
202
203 let (records, response) = client.pull(None).await.unwrap();
204 assert!(Success::try_from(response).is_ok());
205 assert_eq!(client.server_state(), Ready);
206 assert_eq!(records.len(), 1);
207 assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
208 }
209
210 #[tokio::test]
211 async fn node_and_rel_creation() {
212 let client = get_initialized_client(V1_0).await;
213 skip_if_handshake_failed!(client);
214 let mut client = client.unwrap();
215 client
216 .run(
217 "MATCH (n {test: 'v1-node-rel'}) DETACH DELETE n;",
218 None,
219 None,
220 )
221 .await
222 .unwrap();
223 client.pull(None).await.unwrap();
224
225 client.run("CREATE (:Client {name: 'bolt-client', test: 'v1-node-rel'})-[:WRITTEN_IN]->(:Language {name: 'Rust', test: 'v1-node-rel'});", None, None).await.unwrap();
226 client.pull(None).await.unwrap();
227 client
228 .run(
229 "MATCH (c {test: 'v1-node-rel'})-[r:WRITTEN_IN]->(l) RETURN c, r, l;",
230 None,
231 None,
232 )
233 .await
234 .unwrap();
235 let (records, _response) = client.pull(None).await.unwrap();
236
237 let c = Node::try_from(records[0].fields()[0].clone()).unwrap();
238 let r = Relationship::try_from(records[0].fields()[1].clone()).unwrap();
239 let l = Node::try_from(records[0].fields()[2].clone()).unwrap();
240
241 assert_eq!(c.labels(), &[String::from("Client")]);
242 assert_eq!(
243 c.properties().get("name"),
244 Some(&Value::from("bolt-client"))
245 );
246 assert_eq!(l.labels(), &[String::from("Language")]);
247 assert_eq!(l.properties().get("name"), Some(&Value::from("Rust")));
248 assert_eq!(r.rel_type(), "WRITTEN_IN");
249 assert!(r.properties().is_empty());
250 assert_eq!(
251 (r.start_node_identity(), r.end_node_identity()),
252 (c.node_identity(), l.node_identity())
253 );
254 }
255
256 #[tokio::test]
257 async fn discard_fail() {
258 let client = get_initialized_client(V1_0).await;
259 skip_if_handshake_failed!(client);
260 let mut client = client.unwrap();
261 assert_eq!(client.server_state(), Ready);
262 assert!(matches!(
263 client.discard(None).await,
264 Err(CommunicationError::InvalidState { state: Ready, .. })
265 ));
266 }
267
268 #[tokio::test]
269 async fn discard() {
270 let client = get_initialized_client(V1_0).await;
271 skip_if_handshake_failed!(client);
272 let mut client = client.unwrap();
273 assert_eq!(client.server_state(), Ready);
274 let response = run_valid_query(&mut client).await.unwrap();
275 assert!(Success::try_from(response).is_ok());
276 assert_eq!(client.server_state(), Streaming);
277 let response = client.discard(None).await.unwrap();
278 assert!(Success::try_from(response).is_ok());
279 assert_eq!(client.server_state(), Ready);
280 }
281
282 #[tokio::test]
283 async fn discard_and_pull() {
284 let client = get_initialized_client(V1_0).await;
285 skip_if_handshake_failed!(client);
286 let mut client = client.unwrap();
287 assert_eq!(client.server_state(), Ready);
288 let response = run_valid_query(&mut client).await.unwrap();
289 assert!(Success::try_from(response).is_ok());
290 assert_eq!(client.server_state(), Streaming);
291 let response = client.discard(None).await.unwrap();
292 assert!(Success::try_from(response).is_ok());
293 assert_eq!(client.server_state(), Ready);
294 assert!(matches!(
295 client.pull(None).await,
296 Err(CommunicationError::InvalidState { state: Ready, .. })
297 ));
298 }
299
300 #[tokio::test]
301 async fn reset() {
302 let client = get_initialized_client(V1_0).await;
303 skip_if_handshake_failed!(client);
304 let mut client = client.unwrap();
305 assert_eq!(client.server_state(), Ready);
306 let response = run_invalid_query(&mut client).await.unwrap();
307 assert!(Failure::try_from(response).is_ok());
308 assert_eq!(client.server_state(), Failed);
309 let response = run_valid_query(&mut client).await.unwrap();
310 assert!(matches!(response, Message::Ignored));
311 assert_eq!(client.server_state(), Failed);
312 let response = client.reset().await.unwrap();
313 assert!(Success::try_from(response).is_ok());
314 assert_eq!(client.server_state(), Ready);
315 let response = run_valid_query(&mut client).await.unwrap();
316 assert!(Success::try_from(response).is_ok());
317 assert_eq!(client.server_state(), Streaming);
318 }
319
320 #[tokio::test]
321 async fn reset_internals_pipelined() {
322 let client = get_initialized_client(V1_0).await;
323 skip_if_handshake_failed!(client);
324 let mut client = client.unwrap();
325
326 let mut messages = client
327 .pipeline(vec![
328 Message::Run(Run::new(String::from("RETURN 1;"), Default::default())),
329 Message::PullAll,
330 Message::Run(Run::new(String::from("RETURN 1;"), Default::default())),
331 Message::PullAll,
332 Message::Reset,
333 ])
334 .await
335 .unwrap();
336
337 assert_eq!(
339 messages.pop(),
340 Some(Message::Success(Success::new(Default::default())))
341 );
342
343 assert!(messages.len() >= 4);
345 for message in messages {
346 assert_eq!(message, Message::Ignored);
347 }
348 }
349
350 #[tokio::test]
351 async fn reset_internals() {
352 let client = get_initialized_client(V1_0).await;
353 skip_if_handshake_failed!(client);
354 let mut client = client.unwrap();
355
356 client.run("RETURN 1;", None, None).await.unwrap();
357 client.send_message(Message::PullAll).await.unwrap();
358 client.send_message(Message::Reset).await.unwrap();
359 assert_eq!(client.server_state(), Interrupted);
360
361 assert_eq!(client.read_message().await.unwrap(), Message::Ignored);
367
368 match client.read_message().await.unwrap() {
369 Message::Ignored => {
371 Success::try_from(client.read_message().await.unwrap()).unwrap();
373 }
374 Message::Success(_) => {}
376 other => panic!("unexpected response {:?}", other),
377 }
378 }
379
380 #[tokio::test]
381 async fn ignored() {
382 let client = get_initialized_client(V1_0).await;
383 skip_if_handshake_failed!(client);
384 let mut client = client.unwrap();
385 assert_eq!(client.server_state(), Ready);
386 let response = run_invalid_query(&mut client).await.unwrap();
387 assert!(Failure::try_from(response).is_ok());
388 assert_eq!(client.server_state(), Failed);
389 let response = run_valid_query(&mut client).await.unwrap();
390 assert!(matches!(response, Message::Ignored));
391 assert_eq!(client.server_state(), Failed);
392 }
393
394 #[tokio::test]
395 async fn v3_method_with_v1_client_fails() {
396 let client = get_initialized_client(V1_0).await;
397 skip_if_handshake_failed!(client);
398 let mut client = client.unwrap();
399 assert!(matches!(
400 client.commit().await,
401 Err(CommunicationError::UnsupportedOperation(V1_0))
402 ));
403 }
404
405 #[tokio::test]
406 async fn v3_message_with_v1_client_fails() {
407 let client = get_initialized_client(V1_0).await;
408 skip_if_handshake_failed!(client);
409 let mut client = client.unwrap();
410 let begin = Begin::new(Default::default());
411 client.send_message(Message::Begin(begin)).await.unwrap();
412 assert!(matches!(
413 client.read_message().await,
414 Err(CommunicationError::ProtocolError(
415 bolt_proto::error::Error::DeserializationError(
416 bolt_proto::error::DeserializationError::IoError(_)
417 )
418 ))
419 ));
420 }
421}