1#[cfg(test)]
2mod tests {
3 use std::collections::HashMap;
4
5 use bolt_proto::{message::*, value::*, version::*, ServerState::*};
6
7 use crate::{
8 client::v1::tests::*, error::CommunicationError, skip_if_handshake_failed, Metadata,
9 };
10
11 #[tokio::test]
12 async fn hello() {
13 let client = new_client(V4_0).await;
14 skip_if_handshake_failed!(client);
15 let mut client = client.unwrap();
16 assert_eq!(client.server_state(), Connected);
17 let response = initialize_client(&mut client, true).await.unwrap();
18 assert!(Success::try_from(response).is_ok());
19 assert_eq!(client.server_state(), Ready);
20 }
21
22 #[tokio::test]
23 async fn hello_fail() {
24 let client = new_client(V4_0).await;
25 skip_if_handshake_failed!(client);
26 let mut client = client.unwrap();
27 assert_eq!(client.server_state(), Connected);
28 let response = initialize_client(&mut client, false).await.unwrap();
29 assert!(Failure::try_from(response).is_ok());
30 assert_eq!(client.server_state(), Defunct);
31 }
32
33 #[tokio::test]
34 async fn goodbye() {
35 let client = get_initialized_client(V4_0).await;
36 skip_if_handshake_failed!(client);
37 let mut client = client.unwrap();
38 assert_eq!(client.server_state(), Ready);
39 assert!(client.goodbye().await.is_ok());
40 assert_eq!(client.server_state(), Defunct);
41 }
42
43 #[tokio::test]
44 async fn run() {
45 let client = get_initialized_client(V4_0).await;
46 skip_if_handshake_failed!(client);
47 let mut client = client.unwrap();
48 assert_eq!(client.server_state(), Ready);
49 let response = run_valid_query(&mut client).await.unwrap();
50 assert!(Success::try_from(response).is_ok());
51 assert_eq!(client.server_state(), Streaming);
52 }
53
54 #[tokio::test]
55 async fn run_pipelined() {
56 let client = get_initialized_client(V4_0).await;
57 skip_if_handshake_failed!(client);
58 let mut client = client.unwrap();
59 let messages = vec![
60 Message::RunWithMetadata(RunWithMetadata::new(
61 "MATCH (n {test: 'v4-pipelined'}) DETACH DELETE n;".to_string(),
62 Default::default(), Default::default())),
63 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
64 Message::RunWithMetadata(RunWithMetadata::new(
65 "CREATE (:Database {name: 'neo4j', v1_release: date('2010-02-16'), test: 'v4-pipelined'});".to_string(),
66 Default::default(), Default::default())),
67 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
68 Message::RunWithMetadata(RunWithMetadata::new(
69 "MATCH (neo4j:Database {name: 'neo4j', test: 'v4-pipelined'}) CREATE (:Library {name: 'bolt-client', v1_release: date('2019-12-23'), test: 'v4-pipelined'})-[:CLIENT_FOR]->(neo4j);".to_string(),
70 Default::default(), Default::default())),
71 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
72 Message::RunWithMetadata(RunWithMetadata::new(
73 "MATCH (neo4j:Database {name: 'neo4j', test: 'v4-pipelined'}), (bolt_client:Library {name: 'bolt-client', test: 'v4-pipelined'}) RETURN duration.between(neo4j.v1_release, bolt_client.v1_release);".to_string(),
74 Default::default(), Default::default())),
75 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
76 ];
77 for response in client.pipeline(messages).await.unwrap() {
78 assert!(match response {
79 Message::Success(_) => true,
80 Message::Record(record) => {
81 assert_eq!(record.fields()[0], Value::from(Duration::new(118, 7, 0, 0)));
82 true
83 }
84 _ => false,
85 });
86 }
87 }
88
89 #[tokio::test]
99 async fn discard() {
100 let client = get_initialized_client(V4_0).await;
101 skip_if_handshake_failed!(client);
102 let mut client = client.unwrap();
103 assert_eq!(client.server_state(), Ready);
104
105 let response = run_valid_query(&mut client).await.unwrap();
106 assert!(Success::try_from(response).is_ok());
107 assert_eq!(client.server_state(), Streaming);
108 let response = client.discard(None).await.unwrap();
109 assert!(Failure::try_from(response).is_ok());
110 assert_eq!(client.server_state(), Failed);
111
112 let response = client.reset().await.unwrap();
113 assert!(Success::try_from(response).is_ok());
114 assert_eq!(client.server_state(), Ready);
115
116 let response = run_valid_query(&mut client).await.unwrap();
117 assert!(Success::try_from(response).is_ok());
118 assert_eq!(client.server_state(), Streaming);
119 let response = client
120 .discard(Some(Metadata::from_iter(vec![("n", 1)])))
121 .await
122 .unwrap();
123 assert!(Failure::try_from(response).is_ok());
124 assert_eq!(client.server_state(), Failed);
125
126 let response = client.reset().await.unwrap();
127 assert!(Success::try_from(response).is_ok());
128 assert_eq!(client.server_state(), Ready);
129
130 let response = run_valid_query(&mut client).await.unwrap();
131 assert!(Success::try_from(response).is_ok());
132 assert_eq!(client.server_state(), Streaming);
133 let response = client
134 .discard(Some(Metadata::from_iter(vec![("n", -1)])))
135 .await
136 .unwrap();
137 assert!(Success::try_from(response).is_ok());
138 assert_eq!(client.server_state(), Ready);
139 }
140
141 #[tokio::test]
145 async fn run_and_pull() {
146 let client = get_initialized_client(V4_0).await;
147 skip_if_handshake_failed!(client);
148 let mut client = client.unwrap();
149 assert_eq!(client.server_state(), Ready);
150
151 let response = client
153 .run("RETURN 3458376 as n;", None, None)
154 .await
155 .unwrap();
156 assert!(Success::try_from(response).is_ok());
157 assert_eq!(client.server_state(), Streaming);
158
159 let (records, response) = client
160 .pull(Some(Metadata::from_iter(vec![("n", 1)])))
161 .await
162 .unwrap();
163 assert!(Success::try_from(response).is_ok());
164 assert_eq!(client.server_state(), Ready);
165 assert_eq!(records.len(), 1);
166 assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
167
168 let response = client
170 .run("RETURN 3458376 as n;", None, None)
171 .await
172 .unwrap();
173 assert!(Success::try_from(response).is_ok());
174 assert_eq!(client.server_state(), Streaming);
175
176 let (records, response) = client
177 .pull(Some(Metadata::from_iter(vec![("n", -1)])))
178 .await
179 .unwrap();
180 assert!(Success::try_from(response).is_ok());
181 assert_eq!(client.server_state(), Ready);
182 assert_eq!(records.len(), 1);
183 assert_eq!(records[0].fields(), &[Value::from(3_458_376)]);
184 }
185
186 #[tokio::test]
187 async fn begin() {
188 let client = get_initialized_client(V4_0).await;
189 skip_if_handshake_failed!(client);
190 let mut client = client.unwrap();
191 assert_eq!(client.server_state(), Ready);
192 let response = client.begin(None).await.unwrap();
193 assert!(Success::try_from(response).is_ok());
194 assert_eq!(client.server_state(), TxReady);
195 }
196
197 #[tokio::test]
198 async fn commit_empty_transaction() {
199 let client = get_initialized_client(V4_0).await;
200 skip_if_handshake_failed!(client);
201 let mut client = client.unwrap();
202 assert_eq!(client.server_state(), Ready);
203 client.begin(None).await.unwrap();
204 assert_eq!(client.server_state(), TxReady);
205 let response = client.commit().await.unwrap();
206 assert!(Success::try_from(response).is_ok());
207 assert_eq!(client.server_state(), Ready);
208 }
209
210 #[tokio::test]
211 async fn commit() {
212 let client = get_initialized_client(V4_0).await;
213 skip_if_handshake_failed!(client);
214 let mut client = client.unwrap();
215 assert_eq!(client.server_state(), Ready);
216 client.begin(None).await.unwrap();
217 assert_eq!(client.server_state(), TxReady);
218
219 let messages = vec![
220 Message::RunWithMetadata(RunWithMetadata::new(
221 "MATCH (n {test: 'v4-commit'}) DETACH DELETE n;".to_string(),
222 Default::default(), Default::default())),
223 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
224 Message::RunWithMetadata(RunWithMetadata::new(
225 "CREATE (:Database {name: 'neo4j', v1_release: date('2010-02-16'), test: 'v4-commit'});".to_string(),
226 Default::default(), Default::default())),
227 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
228 ];
229 client.pipeline(messages).await.unwrap();
230 assert_eq!(client.server_state(), TxReady);
231 let response = client.commit().await.unwrap();
232 assert!(Success::try_from(response).is_ok());
233 assert_eq!(client.server_state(), Ready);
234
235 let messages = vec![
236 Message::RunWithMetadata(RunWithMetadata::new(
237 "MATCH (n {test: 'v4-commit'}) RETURN n;".to_string(),
238 Default::default(),
239 Default::default(),
240 )),
241 Message::Pull(Pull::new(HashMap::from_iter(vec![(
242 "n".to_string(),
243 Value::from(1),
244 )]))),
245 ];
246 let mut node_exists = false;
247 for response in client.pipeline(messages).await.unwrap() {
248 if let Message::Record(record) = response {
249 let node = Node::try_from(record.fields()[0].clone()).unwrap();
250 assert_eq!(node.labels(), &["Database"]);
251 node_exists = true;
252 break;
253 }
254 }
255 assert!(node_exists);
256 }
257
258 #[tokio::test]
259 async fn commit_with_no_begin_fails() {
260 let client = get_initialized_client(V4_0).await;
261 skip_if_handshake_failed!(client);
262 let mut client = client.unwrap();
263 assert!(matches!(
264 client.commit().await,
265 Err(CommunicationError::InvalidState { state: Ready, .. })
266 ));
267 }
268
269 #[tokio::test]
270 async fn rollback_empty_transaction() {
271 let client = get_initialized_client(V4_0).await;
272 skip_if_handshake_failed!(client);
273 let mut client = client.unwrap();
274 assert_eq!(client.server_state(), Ready);
275 client.begin(None).await.unwrap();
276 assert_eq!(client.server_state(), TxReady);
277 let response = client.rollback().await.unwrap();
278 assert!(Success::try_from(response).is_ok());
279 assert_eq!(client.server_state(), Ready);
280 }
281
282 #[tokio::test]
283 async fn rollback() {
284 let client = get_initialized_client(V4_0).await;
285 skip_if_handshake_failed!(client);
286 let mut client = client.unwrap();
287 assert_eq!(client.server_state(), Ready);
288 client.begin(None).await.unwrap();
289 assert_eq!(client.server_state(), TxReady);
290 let messages = vec![
291 Message::RunWithMetadata(RunWithMetadata::new(
292 "MATCH (n {test: 'v4-rollback'}) DETACH DELETE n;".to_string(),
293 Default::default(), Default::default())),
294 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
295 Message::RunWithMetadata(RunWithMetadata::new(
296 "CREATE (:Database {name: 'neo4j', v1_release: date('2010-02-16'), test: 'v4-rollback'});".to_string(),
297 Default::default(), Default::default())),
298 Message::Pull(Pull::new(HashMap::from_iter(vec![("n".to_string(), Value::from(1))]))),
299 ];
300 client.pipeline(messages).await.unwrap();
301 assert_eq!(client.server_state(), TxReady);
302 let response = client.rollback().await.unwrap();
303 assert!(Success::try_from(response).is_ok());
304 assert_eq!(client.server_state(), Ready);
305
306 let messages = vec![
307 Message::RunWithMetadata(RunWithMetadata::new(
308 "MATCH (n {test: 'v4-rollback'}) RETURN n;".to_string(),
309 Default::default(),
310 Default::default(),
311 )),
312 Message::Pull(Pull::new(HashMap::from_iter(vec![(
313 "n".to_string(),
314 Value::from(1),
315 )]))),
316 ];
317 for response in client.pipeline(messages).await.unwrap() {
318 assert!(matches!(response, Message::Success(_)));
320 }
321 }
322
323 #[tokio::test]
324 async fn rollback_with_no_begin_fails() {
325 let client = get_initialized_client(V4_0).await;
326 skip_if_handshake_failed!(client);
327 let mut client = client.unwrap();
328 assert!(matches!(
329 client.rollback().await,
330 Err(CommunicationError::InvalidState { state: Ready, .. })
331 ));
332 }
333
334 #[tokio::test]
335 async fn multiple_open_streams() {
336 let client = get_initialized_client(V4_0).await;
337 skip_if_handshake_failed!(client);
338 let mut client = client.unwrap();
339 assert_eq!(client.server_state(), Ready);
340 client.begin(None).await.unwrap();
341 assert_eq!(client.server_state(), TxReady);
342
343 client
344 .run(
345 "MATCH (n {test: 'v4-multi-stream'}) DETACH DELETE n;",
346 None,
347 None,
348 )
349 .await
350 .unwrap();
351 client
352 .pull(Some(Metadata::from_iter(vec![("n", -1)])))
353 .await
354 .unwrap();
355
356 const NUM_STREAMS: usize = 5;
357 let mut qids: HashMap<i32, i64> = HashMap::with_capacity(NUM_STREAMS);
358 for n in 1..=NUM_STREAMS {
359 let response = client
360 .run(
361 format!(
362 "CREATE (s:Stream {{number: {}, test: 'v4-multi-stream'}}) RETURN s",
363 n
364 ),
365 None,
366 None,
367 )
368 .await
369 .unwrap();
370 let success = Success::try_from(response).unwrap();
371 match success.metadata().get("qid").unwrap() {
372 Value::Integer(qid) => {
373 qids.insert(n as i32, *qid);
374 }
375 _ => panic!("qid not returned"),
376 }
377 }
378
379 assert_eq!(client.open_tx_streams, NUM_STREAMS);
380
381 for (n, qid) in qids {
382 assert_eq!(client.server_state(), TxStreaming);
383
384 let (records, response) = client
385 .pull(Some(Metadata::from_iter(vec![("n", -1), ("qid", qid)])))
386 .await
387 .unwrap();
388
389 assert!(Success::try_from(response).is_ok());
390 let node = Node::try_from(records[0].fields()[0].clone()).unwrap();
391 assert_eq!(node.properties().get("number").unwrap(), &Value::from(n));
392 }
393
394 assert_eq!(client.server_state(), TxReady);
395 assert_eq!(client.open_tx_streams, 0);
396 }
397
398 #[tokio::test]
399 async fn reset_internals_pipelined() {
400 let client = get_initialized_client(V4_0).await;
401 skip_if_handshake_failed!(client);
402 let mut client = client.unwrap();
403
404 let mut messages = client
405 .pipeline(vec![
406 Message::RunWithMetadata(RunWithMetadata::new(
407 String::from("RETURN 1;"),
408 Default::default(),
409 Default::default(),
410 )),
411 Message::Pull(Pull::new(HashMap::from_iter(vec![(
412 String::from("n"),
413 Value::from(1),
414 )]))),
415 Message::RunWithMetadata(RunWithMetadata::new(
416 String::from("RETURN 1;"),
417 Default::default(),
418 Default::default(),
419 )),
420 Message::Pull(Pull::new(HashMap::from_iter(vec![(
421 String::from("n"),
422 Value::from(1),
423 )]))),
424 Message::Reset,
425 ])
426 .await
427 .unwrap();
428
429 assert_eq!(
431 messages.pop(),
432 Some(Message::Success(Success::new(Default::default())))
433 );
434
435 assert!(messages.len() >= 4);
437 for message in messages {
438 assert_eq!(message, Message::Ignored);
439 }
440 }
441
442 #[tokio::test]
443 async fn reset_internals() {
444 let client = get_initialized_client(V4_0).await;
445 skip_if_handshake_failed!(client);
446 let mut client = client.unwrap();
447
448 client.run("RETURN 1;", None, None).await.unwrap();
449 client
450 .send_message(Message::Pull(Pull::new(HashMap::from_iter(vec![(
451 String::from("n"),
452 Value::from(1),
453 )]))))
454 .await
455 .unwrap();
456 client.send_message(Message::Reset).await.unwrap();
457 assert_eq!(client.server_state(), Interrupted);
458
459 assert_eq!(client.read_message().await.unwrap(), Message::Ignored);
465
466 match client.read_message().await.unwrap() {
467 Message::Ignored => {
469 Success::try_from(client.read_message().await.unwrap()).unwrap();
471 }
472 Message::Success(_) => {}
474 other => panic!("unexpected response {:?}", other),
475 }
476 }
477}