bolt_client/client.rs
1// Much of the documentation comments in this module are copied from the descriptions on
2// https://neo4j.com/docs/bolt/current, with minor modifications.
3//
4// Original copyright and license information for these descriptions:
5// Copyright © Neo4j Sweden AB [http://neo4j.com]
6// CC BY-NC-SA 4.0 (https://creativecommons.org/licenses/by-nc-sa/4.0/)
7//
8// The aforementioned documentation comments are thus licensed under CC BY-NC-SA 4.0.
9
10use std::{collections::VecDeque, io};
11
12use bytes::*;
13use futures_util::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
14
15use bolt_client_macros::*;
16use bolt_proto::{
17 error::Error as ProtocolError, message::*, version::*, Message, ServerState, ServerState::*,
18 Value,
19};
20
21use crate::{
22 error::{CommunicationError, CommunicationResult, ConnectionError, ConnectionResult},
23 Metadata, Params, RoutingContext,
24};
25
26mod v1;
27mod v2;
28mod v3;
29mod v4;
30mod v4_1;
31mod v4_2;
32mod v4_3;
33mod v4_4;
34
35const PREAMBLE: [u8; 4] = [0x60, 0x60, 0xB0, 0x17];
36
37/// Return whether a version is compatible with version specifier.
38fn is_compatible(version: u32, specifier: u32) -> bool {
39 let (major, minor) = (version & 0xff, version >> 8 & 0xff);
40 let (specified_major, specified_minor, range) = (
41 specifier & 0xff,
42 specifier >> 8 & 0xff,
43 specifier >> 16 & 0xff,
44 );
45
46 major == specified_major
47 && (specified_minor.saturating_sub(range)..=specified_minor).contains(&minor)
48}
49
50/// An asynchronous client for Bolt servers.
51#[derive(Debug)]
52pub struct Client<S: AsyncRead + AsyncWrite + Unpin> {
53 stream: S,
54 version: u32,
55 server_state: ServerState,
56 sent_queue: VecDeque<Message>,
57 open_tx_streams: usize,
58}
59
60impl<S: AsyncRead + AsyncWrite + Unpin> Client<S> {
61 /// Attempt to create a new client from an asynchronous stream. A handshake will be performed
62 /// with the provided protocol version specifiers, and, if this succeeds, a Client will be
63 /// returned.
64 pub async fn new(mut stream: S, version_specifiers: &[u32; 4]) -> ConnectionResult<Self> {
65 let mut version_specifiers_bytes = BytesMut::with_capacity(16);
66 version_specifiers
67 .iter()
68 .for_each(|&v| version_specifiers_bytes.put_u32(v));
69 stream.write_all(&PREAMBLE).await?;
70 stream.write_all(&version_specifiers_bytes).await?;
71 stream.flush().await?;
72
73 let mut u32_bytes = [0, 0, 0, 0];
74 stream.read_exact(&mut u32_bytes).await?;
75 let version = u32::from_be_bytes(u32_bytes);
76
77 if version > 0 {
78 for &specifier in version_specifiers {
79 if is_compatible(version, specifier) {
80 return Ok(Self {
81 stream,
82 version,
83 server_state: Connected,
84 sent_queue: VecDeque::default(),
85 open_tx_streams: 0,
86 });
87 }
88 }
89 }
90 Err(ConnectionError::HandshakeFailed(*version_specifiers))
91 }
92
93 /// Get the current version of this client.
94 pub fn version(&self) -> u32 {
95 self.version
96 }
97
98 /// Get the current server state for this client.
99 pub fn server_state(&self) -> ServerState {
100 self.server_state
101 }
102
103 pub(crate) async fn read_message(&mut self) -> CommunicationResult<Message> {
104 let message = Message::from_stream(&mut self.stream)
105 .await
106 .map_err(ProtocolError::from)?;
107
108 #[cfg(test)]
109 println!("<<< {:?}\n", message);
110
111 match (self.server_state, self.sent_queue.pop_front(), message) {
112 // CONNECTED
113 (Connected, Some(Message::Init(_)), Message::Success(success)) => {
114 self.server_state = Ready;
115 Ok(Message::Success(success))
116 }
117 (Connected, Some(Message::Init(_)), Message::Failure(failure)) => {
118 self.server_state = Defunct;
119 Ok(Message::Failure(failure))
120 }
121 (Connected, Some(Message::Hello(_)), Message::Success(success)) => {
122 self.server_state = Ready;
123 Ok(Message::Success(success))
124 }
125 (Connected, Some(Message::Hello(_)), Message::Failure(failure)) => {
126 self.server_state = Defunct;
127 Ok(Message::Failure(failure))
128 }
129
130 // READY
131 (Ready, Some(Message::Run(_)), Message::Success(success)) => {
132 self.server_state = Streaming;
133 Ok(Message::Success(success))
134 }
135 (Ready, Some(Message::Run(_)), Message::Failure(failure)) => {
136 self.server_state = Failed;
137 Ok(Message::Failure(failure))
138 }
139 (Ready, Some(Message::RunWithMetadata(_)), Message::Success(success)) => {
140 self.server_state = Streaming;
141 Ok(Message::Success(success))
142 }
143 (Ready, Some(Message::RunWithMetadata(_)), Message::Failure(failure)) => {
144 self.server_state = Failed;
145 Ok(Message::Failure(failure))
146 }
147 (Ready, Some(Message::Begin(_)), Message::Success(success)) => {
148 self.server_state = TxReady;
149 Ok(Message::Success(success))
150 }
151 (Ready, Some(Message::Begin(_)), Message::Failure(failure)) => {
152 self.server_state = Failed;
153 Ok(Message::Failure(failure))
154 }
155 (Ready, Some(Message::Route(_)), Message::Success(success)) => {
156 self.server_state = Ready;
157 Ok(Message::Success(success))
158 }
159 (Ready, Some(Message::Route(_)), Message::Failure(failure)) => {
160 self.server_state = Failed;
161 Ok(Message::Failure(failure))
162 }
163 (Ready, Some(Message::RouteWithMetadata(_)), Message::Success(success)) => {
164 self.server_state = Ready;
165 Ok(Message::Success(success))
166 }
167 (Ready, Some(Message::RouteWithMetadata(_)), Message::Failure(failure)) => {
168 self.server_state = Failed;
169 Ok(Message::Failure(failure))
170 }
171
172 // STREAMING
173 (Streaming, Some(Message::PullAll), Message::Success(success)) => {
174 self.server_state = Ready;
175 Ok(Message::Success(success))
176 }
177 (Streaming, Some(Message::PullAll), Message::Record(record)) => {
178 self.server_state = Streaming;
179 // Put the PULL_ALL message back so we can keep consuming records
180 self.sent_queue.push_front(Message::PullAll);
181 Ok(Message::Record(record))
182 }
183 (Streaming, Some(Message::PullAll), Message::Failure(failure)) => {
184 self.server_state = Failed;
185 Ok(Message::Failure(failure))
186 }
187 (Streaming, Some(Message::Pull(_)), Message::Success(success)) => {
188 self.server_state = match success.metadata().get("has_more") {
189 Some(&Value::Boolean(true)) => Streaming,
190 _ => Ready,
191 };
192 Ok(Message::Success(success))
193 }
194 (Streaming, Some(Message::Pull(pull)), Message::Record(record)) => {
195 self.server_state = Streaming;
196 // Put the PULL message back so we can keep consuming records
197 self.sent_queue.push_front(Message::Pull(pull));
198 Ok(Message::Record(record))
199 }
200 (Streaming, Some(Message::Pull(_)), Message::Failure(failure)) => {
201 self.server_state = Failed;
202 Ok(Message::Failure(failure))
203 }
204 (Streaming, Some(Message::DiscardAll), Message::Success(success)) => {
205 self.server_state = Ready;
206 Ok(Message::Success(success))
207 }
208 (Streaming, Some(Message::DiscardAll), Message::Failure(failure)) => {
209 self.server_state = Failed;
210 Ok(Message::Failure(failure))
211 }
212 (Streaming, Some(Message::Discard(_)), Message::Success(success)) => {
213 self.server_state = match success.metadata().get("has_more") {
214 Some(&Value::Boolean(true)) => Streaming,
215 _ => Ready,
216 };
217 Ok(Message::Success(success))
218 }
219 (Streaming, Some(Message::Discard(_)), Message::Failure(failure)) => {
220 self.server_state = Failed;
221 Ok(Message::Failure(failure))
222 }
223
224 // TX_READY
225 (TxReady, Some(Message::RunWithMetadata(_)), Message::Success(success)) => {
226 self.open_tx_streams += 1;
227 self.server_state = TxStreaming;
228 Ok(Message::Success(success))
229 }
230 (TxReady, Some(Message::RunWithMetadata(_)), Message::Failure(failure)) => {
231 self.server_state = Failed;
232 Ok(Message::Failure(failure))
233 }
234 (TxReady, Some(Message::Commit), Message::Success(success)) => {
235 self.server_state = Ready;
236 Ok(Message::Success(success))
237 }
238 (TxReady, Some(Message::Commit), Message::Failure(failure)) => {
239 self.server_state = Failed;
240 Ok(Message::Failure(failure))
241 }
242 (TxReady, Some(Message::Rollback), Message::Success(success)) => {
243 self.server_state = Ready;
244 Ok(Message::Success(success))
245 }
246 (TxReady, Some(Message::Rollback), Message::Failure(failure)) => {
247 self.server_state = Failed;
248 Ok(Message::Failure(failure))
249 }
250
251 // TX_STREAMING
252 (TxStreaming, Some(Message::RunWithMetadata(_)), Message::Success(success)) => {
253 self.open_tx_streams += 1;
254 self.server_state = TxStreaming;
255 Ok(Message::Success(success))
256 }
257 (TxStreaming, Some(Message::RunWithMetadata(_)), Message::Failure(failure)) => {
258 self.server_state = Failed;
259 Ok(Message::Failure(failure))
260 }
261 (TxStreaming, Some(Message::PullAll), Message::Success(success)) => {
262 self.open_tx_streams -= 1;
263 self.server_state = TxReady;
264 Ok(Message::Success(success))
265 }
266 (TxStreaming, Some(Message::PullAll), Message::Record(record)) => {
267 self.server_state = TxStreaming;
268 // Put the PULL_ALL message back so we can keep consuming records
269 self.sent_queue.push_front(Message::PullAll);
270 Ok(Message::Record(record))
271 }
272 (TxStreaming, Some(Message::PullAll), Message::Failure(failure)) => {
273 self.server_state = Failed;
274 Ok(Message::Failure(failure))
275 }
276 (TxStreaming, Some(Message::Pull(_)), Message::Success(success)) => {
277 self.server_state = match success.metadata().get("has_more") {
278 Some(&Value::Boolean(true)) => TxStreaming,
279 _ => {
280 self.open_tx_streams -= 1;
281 if self.open_tx_streams > 0 {
282 TxStreaming
283 } else {
284 TxReady
285 }
286 }
287 };
288 Ok(Message::Success(success))
289 }
290 (TxStreaming, Some(Message::Pull(pull)), Message::Record(record)) => {
291 self.server_state = TxStreaming;
292 // Put the PULL message back so we can keep consuming records
293 self.sent_queue.push_front(Message::Pull(pull));
294 Ok(Message::Record(record))
295 }
296 (TxStreaming, Some(Message::Pull(_)), Message::Failure(failure)) => {
297 self.server_state = Failed;
298 Ok(Message::Failure(failure))
299 }
300 (TxStreaming, Some(Message::DiscardAll), Message::Success(success)) => {
301 self.open_tx_streams -= 1;
302 self.server_state = TxReady;
303 Ok(Message::Success(success))
304 }
305 (TxStreaming, Some(Message::DiscardAll), Message::Failure(failure)) => {
306 self.server_state = Failed;
307 Ok(Message::Failure(failure))
308 }
309 (TxStreaming, Some(Message::Discard(_)), Message::Success(success)) => {
310 self.server_state = match success.metadata().get("has_more") {
311 Some(&Value::Boolean(true)) => TxStreaming,
312 _ => {
313 self.open_tx_streams -= 1;
314 if self.open_tx_streams > 0 {
315 TxStreaming
316 } else {
317 TxReady
318 }
319 }
320 };
321 Ok(Message::Success(success))
322 }
323 (TxStreaming, Some(Message::Discard(_)), Message::Failure(failure)) => {
324 self.server_state = Failed;
325 Ok(Message::Failure(failure))
326 }
327
328 // FAILED
329 (Failed, Some(Message::Run(_)), Message::Ignored) => {
330 self.server_state = Failed;
331 Ok(Message::Ignored)
332 }
333 (Failed, Some(Message::RunWithMetadata(_)), Message::Ignored) => {
334 self.server_state = Failed;
335 Ok(Message::Ignored)
336 }
337 (Failed, Some(Message::PullAll), Message::Ignored) => {
338 self.server_state = Failed;
339 Ok(Message::Ignored)
340 }
341 (Failed, Some(Message::Pull(_)), Message::Ignored) => {
342 self.server_state = Failed;
343 Ok(Message::Ignored)
344 }
345 (Failed, Some(Message::DiscardAll), Message::Ignored) => {
346 self.server_state = Failed;
347 Ok(Message::Ignored)
348 }
349 (Failed, Some(Message::Discard(_)), Message::Ignored) => {
350 self.server_state = Failed;
351 Ok(Message::Ignored)
352 }
353 (Failed, Some(Message::Route(_)), Message::Ignored) => {
354 self.server_state = Failed;
355 Ok(Message::Ignored)
356 }
357 (Failed, Some(Message::RouteWithMetadata(_)), Message::Ignored) => {
358 self.server_state = Failed;
359 Ok(Message::Ignored)
360 }
361 (Failed, Some(Message::AckFailure), Message::Success(success)) => {
362 self.server_state = Ready;
363 Ok(Message::Success(success))
364 }
365 (Failed, Some(Message::AckFailure), Message::Failure(failure)) => {
366 self.server_state = Defunct;
367 Ok(Message::Failure(failure))
368 }
369
370 // INTERRUPTED
371 (Interrupted, Some(Message::Run(_)), _) => {
372 self.server_state = Interrupted;
373 Ok(Message::Ignored)
374 }
375 (Interrupted, Some(Message::RunWithMetadata(_)), _) => {
376 self.server_state = Interrupted;
377 Ok(Message::Ignored)
378 }
379 (Interrupted, Some(Message::PullAll), Message::Record(_)) => {
380 self.server_state = Interrupted;
381 // Put the PULL_ALL message back so we can keep consuming records
382 self.sent_queue.push_front(Message::PullAll);
383 Ok(Message::Ignored)
384 }
385 (Interrupted, Some(Message::PullAll), _) => {
386 self.server_state = Interrupted;
387 Ok(Message::Ignored)
388 }
389 (Interrupted, Some(Message::Pull(pull)), Message::Record(_)) => {
390 self.server_state = Interrupted;
391 // Put the PULL message back so we can keep consuming records
392 self.sent_queue.push_front(Message::Pull(pull));
393 Ok(Message::Ignored)
394 }
395 (Interrupted, Some(Message::Pull(_)), _) => {
396 self.server_state = Interrupted;
397 Ok(Message::Ignored)
398 }
399 (Interrupted, Some(Message::DiscardAll), _) => {
400 self.server_state = Interrupted;
401 Ok(Message::Ignored)
402 }
403 (Interrupted, Some(Message::Discard(_)), _) => {
404 self.server_state = Interrupted;
405 Ok(Message::Ignored)
406 }
407 (Interrupted, Some(Message::Begin(_)), _) => {
408 self.server_state = Interrupted;
409 Ok(Message::Ignored)
410 }
411 (Interrupted, Some(Message::Commit), _) => {
412 self.server_state = Interrupted;
413 Ok(Message::Ignored)
414 }
415 (Interrupted, Some(Message::Rollback), _) => {
416 self.server_state = Interrupted;
417 Ok(Message::Ignored)
418 }
419 (Interrupted, Some(Message::AckFailure), _) => {
420 self.server_state = Interrupted;
421 Ok(Message::Ignored)
422 }
423 (Interrupted, Some(Message::Route(_)), _) => {
424 self.server_state = Interrupted;
425 Ok(Message::Ignored)
426 }
427 (Interrupted, Some(Message::RouteWithMetadata(_)), _) => {
428 self.server_state = Interrupted;
429 Ok(Message::Ignored)
430 }
431 (Interrupted, Some(Message::Reset), Message::Success(success)) => {
432 self.open_tx_streams = 0;
433 self.server_state = Ready;
434 Ok(Message::Success(success))
435 }
436 (Interrupted, Some(Message::Reset), Message::Failure(failure)) => {
437 self.server_state = Defunct;
438 Ok(Message::Failure(failure))
439 }
440 (state, request, response) => {
441 self.server_state = Defunct;
442 Err(CommunicationError::InvalidResponse {
443 state,
444 request,
445 response,
446 })
447 }
448 }
449 }
450
451 pub(crate) async fn send_message(&mut self, message: Message) -> CommunicationResult<()> {
452 match (self.server_state, &message) {
453 (Connected, Message::Init(_)) => {}
454 (Connected, Message::Hello(_)) => {}
455 (Ready, Message::Run(_)) => {}
456 (Ready, Message::RunWithMetadata(_)) => {}
457 (Ready, Message::Begin(_)) => {}
458 (Ready, Message::Route(_)) => {}
459 (Ready, Message::RouteWithMetadata(_)) => {}
460 (Ready, Message::Reset) => {}
461 (Ready, Message::Goodbye) => {}
462 (Streaming, Message::PullAll) => {}
463 (Streaming, Message::Pull(_)) => {}
464 (Streaming, Message::DiscardAll) => {}
465 (Streaming, Message::Discard(_)) => {}
466 (Streaming, Message::Reset) => {}
467 (Streaming, Message::Goodbye) => {}
468 (TxReady, Message::RunWithMetadata(_)) => {}
469 (TxReady, Message::Commit) => {}
470 (TxReady, Message::Rollback) => {}
471 (TxReady, Message::Reset) => {}
472 (TxReady, Message::Goodbye) => {}
473 (TxStreaming, Message::RunWithMetadata(_)) => {}
474 (TxStreaming, Message::PullAll) => {}
475 (TxStreaming, Message::Pull(_)) => {}
476 (TxStreaming, Message::DiscardAll) => {}
477 (TxStreaming, Message::Discard(_)) => {}
478 (TxStreaming, Message::Reset) => {}
479 (TxStreaming, Message::Goodbye) => {}
480 (Failed, Message::Run(_)) => {}
481 (Failed, Message::RunWithMetadata(_)) => {}
482 (Failed, Message::PullAll) => {}
483 (Failed, Message::Pull(_)) => {}
484 (Failed, Message::DiscardAll) => {}
485 (Failed, Message::Discard(_)) => {}
486 (Failed, Message::AckFailure) => {}
487 (Failed, Message::Reset) => {}
488 (Failed, Message::Goodbye) => {}
489 (Interrupted, Message::Run(_)) => {}
490 (Interrupted, Message::RunWithMetadata(_)) => {}
491 (Interrupted, Message::PullAll) => {}
492 (Interrupted, Message::Pull(_)) => {}
493 (Interrupted, Message::DiscardAll) => {}
494 (Interrupted, Message::Discard(_)) => {}
495 (Interrupted, Message::AckFailure) => {}
496 (Interrupted, Message::Begin(_)) => {}
497 (Interrupted, Message::Commit) => {}
498 (Interrupted, Message::Rollback) => {}
499 (Interrupted, Message::Reset) => {}
500 (Interrupted, Message::Goodbye) => {}
501 (state, message) => {
502 self.server_state = Defunct;
503 return Err(CommunicationError::InvalidState {
504 state,
505 message: message.clone(),
506 });
507 }
508 }
509
510 #[cfg(test)]
511 println!(">>> {:?}", message);
512
513 let chunks = message.clone().into_chunks().map_err(ProtocolError::from)?;
514
515 for chunk in chunks {
516 self.stream.write_all(&chunk).await?;
517 }
518 self.stream.flush().await?;
519
520 // Immediate state changes
521 match message {
522 Message::Reset => self.server_state = Interrupted,
523 Message::Goodbye => self.server_state = Disconnected,
524 _ => {}
525 }
526
527 self.sent_queue.push_back(message);
528 Ok(())
529 }
530
531 /// Send a [`HELLO`](Message::Hello) (or [`INIT`](Message::Init)) message to the server.
532 /// _(Sends `INIT` for Bolt v1 - v2, and `HELLO` for Bolt v3+.)_
533 ///
534 /// # Description
535 /// The `HELLO` message requests the connection to be authorized for use with the remote
536 /// database. Clients should send a `HELLO` message to the server immediately after connection
537 /// and process the response before using that connection in any other way.
538 ///
539 /// The server must be in the [`Connected`](ServerState::Connected) state to be able to process
540 /// a `HELLO` message. For any other states, receipt of a `HELLO` message is considered a
541 /// protocol violation and leads to connection closure.
542 ///
543 /// If authentication fails, the server will respond with a [`FAILURE`](Message::Failure)
544 /// message and immediately close the connection. Clients wishing to retry initialization
545 /// should establish a new connection.
546 ///
547 /// # Fields
548 /// `metadata` should contain at least two entries:
549 /// - `user_agent`, which should conform to the format `"Name/Version"`, for example
550 /// `"Example/1.0.0"` (see
551 /// [here](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent)).
552 /// - `scheme` is the authentication scheme. Predefined schemes are `"none"`, `"basic"`, or
553 /// `"kerberos"`.
554 ///
555 /// If using Bolt v4.1 or later, the following additional `metadata` entries can be specified:
556 /// - `routing`, a map which should contain routing context information as well as an `address`
557 /// field indicating to which address the client should initially connect. Leaving this
558 /// unspecified indicates that the server should not carry out any routing.
559 /// _(Bolt v4.1+ only.)_
560 ///
561 /// Further entries in `metadata` are passed to the implementation of the chosen authentication
562 /// scheme. Their names, types, and defaults depend on that choice. For example, the scheme
563 /// `"basic"` requires `metadata` to contain the username and password in the form
564 /// `{"principal": "<username>", "credentials": "<password>"}`.
565 ///
566 /// # Response
567 /// - [`Message::Success`] - initialization has completed successfully and the server has
568 /// entered the [`Ready`](ServerState::Ready) state. The server may include metadata that
569 /// describes details of the server environment and/or the connection. The following fields
570 /// are defined for inclusion in the `SUCCESS` metadata:
571 /// - `server`, the server agent string (e.g. `"Neo4j/4.3.0"`)
572 /// - `connection_id`, a unique identifier for the connection (e.g. `"bolt-61"`)
573 /// _(Bolt v3+ only.)_
574 /// - `hints`, a map of configuration hints (e.g. `{"connection.recv_timeout_seconds": 120}`)
575 /// These hints may be interpreted or ignored by drivers at their own discretion in order
576 /// to augment operations where applicable. Hints remain valid throughout the lifetime of a
577 /// given connection and cannot be changed. As such, newly established connections may
578 /// observe different hints as the server configuration is adjusted.
579 /// _(Bolt v4.3+ only.)_
580 /// - [`Message::Failure`] - initialization has failed and the server has entered the
581 /// [`Defunct`](ServerState::Defunct) state. The server may choose to include metadata
582 /// describing the nature of the failure but will immediately close the connection after the
583 /// failure has been sent.
584 #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
585 pub async fn hello(&mut self, mut metadata: Metadata) -> CommunicationResult<Message> {
586 let message = match self.version() {
587 V1_0 | V2_0 => {
588 let user_agent: String = metadata
589 .value
590 .remove("user_agent")
591 .ok_or_else(|| {
592 io::Error::new(io::ErrorKind::InvalidInput, "missing user_agent")
593 })?
594 .try_into()
595 .map_err(|_| {
596 io::Error::new(io::ErrorKind::InvalidInput, "user_agent must be a string")
597 })?;
598 let auth_token = metadata.value;
599
600 Message::Init(Init::new(user_agent, auth_token))
601 }
602 _ => Message::Hello(Hello::new(metadata.value)),
603 };
604
605 self.send_message(message).await?;
606 self.read_message().await
607 }
608
609 /// Send a [`ROUTE`](Message::RouteWithMetadata) message to the server.
610 /// _(Bolt v4.3+ only. For Bolt v4.3, an [alternate version](Message::Route) of the message is
611 /// sent.)_
612 ///
613 /// # Description
614 /// The `ROUTE` message instructs the server to return the current routing table.
615 ///
616 /// The server must be in the [`Ready`](ServerState::Ready) state to be able to successfully
617 /// process a `ROUTE` request. If the server is in the [`Failed`](ServerState::Failed) or
618 /// [`Interrupted`](ServerState::Interrupted) state, the response will be
619 /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `ROUTE` request will be
620 /// considered a protocol violation and will lead to connection closure.
621 ///
622 /// # Fields
623 /// - `context`, which should contain routing context information as well as an `address` field
624 /// indicating to which address the client should initially connect.
625 /// - `bookmarks`, a list of strings containing some kind of bookmark identification, e.g
626 /// `["bkmk-transaction:1", "bkmk-transaction:2"]`. Default is `[]`.
627 /// - `metadata`, a map which can contain the following optional entries:
628 /// - `db`, a string containing the name of the database for which this command should be
629 /// run. [`null`](Value::Null) denotes the server-side configured default database.
630 /// - `imp_user`, a string specifying the impersonated user for the purposes of resolving
631 /// their home database. [`null`](Value::Null) denotes no impersonation (i.e., execution
632 /// takes place as the current user). _(Bolt v4.4+ only.)_
633 ///
634 /// # Response
635 /// - [`Message::Success`] - the routing table has been successfully retrieved and the server
636 /// has entered the [`Ready`](ServerState::Ready) state. The server sends the following
637 /// metadata fields in the response:
638 /// - `rt`, a map with the following fields:
639 /// - `ttl`, an integer denoting the number of seconds this routing table should be
640 /// considered valid
641 /// - `servers`, a list of maps representing roles for one or more addresses. Each element
642 /// will have the following fields:
643 /// - `role`, a server role. Possible values are `"READ"`, `"WRITE"`, and `"ROUTE"`.
644 /// - `addresses`, a list of strings representing the servers with the specified role
645 /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
646 /// [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
647 /// being processed. No server state change has occurred.
648 /// - [`Message::Failure`] - the request could not be processed successfully and the server has
649 /// entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
650 /// message to provide more detail on the nature of the failure.
651 #[bolt_version(4.3, 4.4)]
652 pub async fn route(
653 &mut self,
654 context: RoutingContext,
655 bookmarks: impl Into<Vec<String>>,
656 metadata: Option<Metadata>,
657 ) -> CommunicationResult<Message> {
658 let mut metadata = metadata.unwrap_or_default().value;
659 let message = match self.version() {
660 V4_3 => {
661 let database = match metadata.remove("db") {
662 Some(value) => match value {
663 Value::String(string) => Some(string),
664 Value::Null => None,
665 _ => {
666 return Err(io::Error::new(
667 io::ErrorKind::InvalidInput,
668 "db must be either a string or null",
669 )
670 .into())
671 }
672 },
673 None => None,
674 };
675
676 Message::Route(Route::new(context.value, bookmarks.into(), database))
677 }
678 _ => Message::RouteWithMetadata(RouteWithMetadata::new(
679 context.value,
680 bookmarks.into(),
681 metadata,
682 )),
683 };
684
685 self.send_message(message).await?;
686 self.read_message().await
687 }
688
689 /// Send a [`RUN`](Message::Run) message to the server.
690 /// _(Bolt v1+. For Bolt v1 - v2, the `metadata` parameter is ignored.)_
691 ///
692 /// # Description
693 /// A `RUN` message submits a new query for execution, the result of which will be consumed by
694 /// a subsequent message, such as [`PULL`](Message::Pull).
695 ///
696 /// The server must be in either the [`Ready`](ServerState::Ready) state, the
697 /// [`TxReady`](ServerState::TxReady) state (Bolt v3+), or the
698 /// [`TxStreaming`](ServerState::TxStreaming) state (Bolt v4+) to be able to successfully
699 /// process a `RUN` request. If the server is in the [`Failed`](ServerState::Failed) or
700 /// [`Interrupted`](ServerState::Interrupted) state, the response will be
701 /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `RUN` request will be
702 /// considered a protocol violation and will lead to connection closure.
703 ///
704 /// # Fields
705 /// - `query` contains a database query or remote procedure call.
706 /// - `parameters` contains variable fields for `query`.
707 ///
708 /// If using Bolt v3 or later, the following `metadata` entries can be specified:
709 /// - `bookmarks`, a list of strings containing some kind of bookmark identification, e.g
710 /// `["bkmk-transaction:1", "bkmk-transaction:2"]`. Default is `[]`.
711 /// - `tx_timeout`, an integer specifying a transaction timeout in milliseconds. Default is the
712 /// server-side configured timeout.
713 /// - `tx_metadata`, a map containing some metadata information, mainly used for logging.
714 /// - `mode`, a string which specifies what kind of server should be used for this transaction.
715 /// For write access, use `"w"` and for read access use `"r"`. Default is `"w"`.
716 /// - `db`, a string containing the name of the database where the transaction should take
717 /// place. [`null`](Value::Null) and `""` denote the server-side configured default database.
718 /// _(Bolt v4+ only.)_
719 /// - `imp_user`, a string specifying the impersonated user which executes this transaction.
720 /// [`null`](Value::Null) denotes no impersonation (i.e., execution takes place as the
721 /// current user). _(Bolt v4.4+ only.)_
722 ///
723 /// # Response
724 /// - [`Message::Success`] - the request has been successfully received and the server has
725 /// entered the [`Streaming`](ServerState::Streaming) state. Clients should not consider a
726 /// `SUCCESS` response to indicate completion of the execution of the query, merely
727 /// acceptance of it. The server may attach metadata to the message to provide header detail
728 /// for the results that follow. The following fields are defined for inclusion in the
729 /// metadata:
730 /// - `fields`, the fields included in the result (e.g. `["name", "age"]`)
731 /// - `result_available_after`, the time in milliseconds after which the first record in the
732 /// result stream is available. _(Bolt v1 - v2 only.)_
733 /// - `t_first`, supercedes `result_available_after`. _(Bolt v3+ only.)_
734 /// - `qid`, an integer that specifies the server-assigned query ID. This is sent for queries
735 /// submitted within an explicit transaction. _(Bolt v4+ only.)_
736 /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
737 /// [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
738 /// being processed. No server state change has occurred.
739 /// - [`Message::Failure`] - the request could not be processed successfully or is invalid, and
740 /// the server has entered the [`Failed`](ServerState::Failed) state. The server may attach
741 /// metadata to the message to provide more detail on the nature of the failure.
742 #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
743 pub async fn run(
744 &mut self,
745 query: impl Into<String>,
746 parameters: Option<Params>,
747 metadata: Option<Metadata>,
748 ) -> CommunicationResult<Message> {
749 let message = match self.version() {
750 V1_0 | V2_0 => {
751 Message::Run(Run::new(query.into(), parameters.unwrap_or_default().value))
752 }
753 _ => Message::RunWithMetadata(RunWithMetadata::new(
754 query.into(),
755 parameters.unwrap_or_default().value,
756 metadata.unwrap_or_default().value,
757 )),
758 };
759
760 self.send_message(message).await?;
761 self.read_message().await
762 }
763
764 /// Send a [`PULL`](Message::Pull) (or [`PULL_ALL`](Message::PullAll)) message to the server.
765 /// _(Sends `PULL_ALL` for Bolt v1 - v3, and `PULL` for Bolt v4+. For Bolt v1 - v3, the
766 /// `metadata` parameter is ignored.)_
767 ///
768 /// # Description
769 /// The `PULL` message issues a request to stream outstanding results back to the client,
770 /// before returning to the [`Ready`](ServerState::Ready) state.
771 ///
772 /// Result details consist of zero or more [`RECORD`](Message::Record) messages and a summary
773 /// message. Each record carries with it a list of values which form the data content of the
774 /// record. The order of the values within that list should be meaningful to the client,
775 /// perhaps based on a requested ordering for that result, but no guarantees are made around
776 /// the order of records within the result. A record should only be considered valid if
777 /// accompanied by a [`SUCCESS`](Message::Success) summary message.
778 ///
779 /// The server must be in the [`Streaming`](ServerState::Streaming) or
780 /// [`TxStreaming`](ServerState::TxStreaming) state to be able to successfully process a `PULL`
781 /// request. If the server is in the [`Failed`](ServerState::Failed) state or
782 /// [`Interrupted`](ServerState::Interrupted) state, the response will be
783 /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `PULL` request will be
784 /// considered a protocol violation and will lead to connection closure.
785 ///
786 /// # Fields
787 /// For Bolt v4+, additional metadata is passed along with this message:
788 /// - `n` is an integer specifying how many records to fetch. `-1` will fetch all records. `n`
789 /// has no default and must be present.
790 /// - `qid` is an integer that specifies for which statement the `PULL` operation should be
791 /// carried out within an explicit transaction. `-1` is the default, which denotes the last
792 /// executed statement.
793 ///
794 /// # Response
795 /// - `(_, `[`Message::Success`]`)` - results have been successfully pulled and the server has
796 /// entered the [`Ready`](ServerState::Ready) state. The server may attach metadata to the
797 /// `SUCCESS` message to provide footer detail for the results. The following fields are
798 /// defined for inclusion in the metadata:
799 /// - `type`, the type of query: read-only (`"r"`), write-only (`"w"`), read-write (`"rw"`),
800 /// or schema (`"s"`)
801 /// - `result_consumed_after`, the time in milliseconds after which the last record in the
802 /// result stream is consumed. _(Bolt v1 - v2 only.)_
803 /// - `t_last`, supercedes `result_consumed_after`. _(Bolt v3+ only.)_
804 /// - `bookmark` (e.g. `"bookmark:1234"`). _(Bolt v3+ only.)_
805 /// - `stats`, a map containing counter information, such as DB hits, etc. _(Bolt v3+ only.)_
806 /// - `plan`, a map containing the query plan result. _(Bolt v3+ only.)_
807 /// - `profile`, a map containing the query profile result. _(Bolt v3+ only.)_
808 /// - `notifications`, a map containing any notifications generated during execution of the
809 /// query. _(Bolt v3+ only.)_
810 /// - `db`, a string containing the name of the database where the query was executed.
811 /// _(Bolt v4+ only.)_
812 /// - `has_more`, a boolean indicating whether there are still records left in the result
813 /// stream. Default is `false`. _(Bolt v4+ only.)_
814 /// - `(_, `[`Message::Ignored`]`)` - the server is in the [`Failed`](ServerState::Failed) or
815 /// [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
816 /// being processed. No server state change has occurred.
817 /// - `(_, `[`Message::Failure`]`)` - the request could not be processed successfully and the
818 /// server has entered the [`Failed`](ServerState::Failed) state. The server may attach
819 /// metadata to the message to provide more detail on the nature of the failure. Failure may
820 /// occur at any time during result streaming, so any records returned in the response should
821 /// be considered invalid.
822 #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
823 pub async fn pull(
824 &mut self,
825 metadata: Option<Metadata>,
826 ) -> CommunicationResult<(Vec<Record>, Message)> {
827 match self.version() {
828 V1_0 | V2_0 | V3_0 => self.send_message(Message::PullAll).await?,
829 _ => {
830 self.send_message(Message::Pull(Pull::new(metadata.unwrap_or_default().value)))
831 .await?
832 }
833 }
834 let mut records = vec![];
835 loop {
836 match self.read_message().await? {
837 Message::Record(record) => records.push(record),
838 Message::Success(success) => return Ok((records, Message::Success(success))),
839 Message::Failure(failure) => return Ok((records, Message::Failure(failure))),
840 Message::Ignored => return Ok((vec![], Message::Ignored)),
841 _ => unreachable!(),
842 }
843 }
844 }
845
846 /// Send a [`DISCARD`](Message::Discard) (or [`DISCARD_ALL`](Message::DiscardAll)) message to
847 /// the server.
848 /// _(Sends a `DISCARD_ALL` for Bolt v1 - v3, and `DISCARD` for Bold v4+. For Bolt v1 - v3, the
849 /// `metadata` parameter is ignored.)_
850 ///
851 /// # Description
852 /// The `DISCARD` message issues a request to discard the outstanding result and return to the
853 /// [`Ready`](ServerState::Ready) state. A receiving server will not abort the request but
854 /// continue to process it without streaming any detail messages to the client.
855 ///
856 /// The server must be in the [`Streaming`](ServerState::Streaming) or
857 /// [`TxStreaming`](ServerState::TxStreaming) state to be able to successfully process a
858 /// `DISCARD` request. If the server is in the [`Failed`](ServerState::Failed) state or
859 /// [`Interrupted`](ServerState::Interrupted) state, the response will be
860 /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `DISCARD` request will be
861 /// considered a protocol violation and will lead to connection closure.
862 ///
863 /// # Fields
864 /// For Bolt v4+, additional metadata is passed along with this message:
865 /// - `n` is an integer specifying how many records to discard. `-1` will discard all records.
866 /// `n` has no default and must be present.
867 /// - `qid` is an integer that specifies for which statement the `DISCARD` operation should be
868 /// carried out within an explicit transaction. `-1` is the default, which denotes the last
869 /// executed statement.
870 ///
871 /// # Response
872 /// - [`Message::Success`] - results have been successfully discarded and the server has
873 /// entered the [`Ready`](ServerState::Ready) state. The server may attach metadata to the
874 /// message to provide footer detail for the discarded results. The following fields are
875 /// defined for inclusion in the metadata:
876 /// - `type`, the type of query: read-only (`"r"`), write-only (`"w"`), read-write (`"rw"`),
877 /// or schema (`"s"`)
878 /// - `result_consumed_after`, the time in milliseconds after which the last record in the
879 /// result stream is consumed. _(Bolt v1 - v2 only.)_
880 /// - `t_last`, supercedes `result_consumed_after`. _(Bolt v3+ only.)_
881 /// - `bookmark` (e.g. `"bookmark:1234"`). _(Bolt v3+ only.)_
882 /// - `db`, a string containing the name of the database where the query was executed.
883 /// _(Bolt v4+ only.)_
884 /// - `has_more`, a boolean indicating whether there are still records left in the result
885 /// stream. Default is `false`. _(Bolt v4+ only.)_
886 /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
887 /// [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
888 /// being processed. No server state change has occurred.
889 /// - [`Message::Failure`] - the request could not be processed successfully and the server has
890 /// entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
891 /// message to provide more detail on the nature of the failure.
892 #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
893 pub async fn discard(&mut self, metadata: Option<Metadata>) -> CommunicationResult<Message> {
894 let message = match self.version() {
895 V1_0 | V2_0 | V3_0 => Message::DiscardAll,
896 _ => Message::Discard(Discard::new(metadata.unwrap_or_default().value)),
897 };
898 self.send_message(message).await?;
899 self.read_message().await
900 }
901
902 /// Send a [`BEGIN`](Message::Begin) message to the server.
903 /// _(Bolt v3+ only.)_
904 ///
905 /// # Description
906 /// The `BEGIN` message starts a new explicit transaction and transitions the server to the
907 /// [`TxReady`](ServerState::TxReady) state. The explicit transaction is closed with either the
908 /// [`COMMIT`](Message::Commit) message or [`ROLLBACK`](Message::Rollback) message.
909 ///
910 /// The server must be in the [`Ready`](ServerState::Ready) state to be able to successfully
911 /// process a `BEGIN` request. If the server is in the [`Failed`](ServerState::Failed) or
912 /// [`Interrupted`](ServerState::Interrupted) state, the response will be
913 /// [`IGNORED`](Message::Ignored). For any other states, receipt of a `BEGIN` request will be
914 /// considered a protocol violation and will lead to connection closure.
915 ///
916 /// # Fields
917 /// `metadata` may contain the following optional fields:
918 /// - `bookmarks`, a list of strings containing some kind of bookmark identification, e.g
919 /// `["bkmk-transaction:1", "bkmk-transaction:2"]`. Default is `[]`.
920 /// - `tx_timeout`, an integer specifying a transaction timeout in milliseconds. Default is the
921 /// server-side configured timeout.
922 /// - `tx_metadata`, a map containing some metadata information, mainly used for logging.
923 /// - `mode`, a string which specifies what kind of server should be used for this transaction.
924 /// For write access, use `"w"` and for read access use `"r"`. Default is `"w"`.
925 /// - `db`, a string containing the name of the database where the transaction should take
926 /// place. [`null`](Value::Null) and `""` denote the server-side configured default database.
927 /// _(Bolt v4+ only.)_
928 /// - `imp_user`, a string specifying the impersonated user which executes this transaction.
929 /// [`null`](Value::Null) denotes no impersonation (i.e., execution takes place as the
930 /// current user). _(Bolt v4.4+ only.)_
931 ///
932 /// # Response
933 /// - [`Message::Success`] - the transaction has been successfully started and the server has
934 /// entered the [`TxReady`](ServerState::Ready) state.
935 /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
936 /// [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
937 /// being processed. No server state change has occurred.
938 /// - [`Message::Failure`] - the request could not be processed successfully and the server has
939 /// entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
940 /// message to provide more detail on the nature of the failure.
941 #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
942 pub async fn begin(&mut self, metadata: Option<Metadata>) -> CommunicationResult<Message> {
943 let begin_msg = Begin::new(metadata.unwrap_or_default().value);
944 self.send_message(Message::Begin(begin_msg)).await?;
945 self.read_message().await
946 }
947
948 /// Send a [`COMMIT`](Message::Commit) message to the server.
949 /// _(Bolt v3+ only.)_
950 ///
951 /// # Description
952 /// The `COMMIT` message requests to commit the results of an explicit transaction and
953 /// transition the server back to the [`Ready`](ServerState::Ready) state.
954 ///
955 /// The server must be in the [`TxReady`](ServerState::TxReady) state to be able to
956 /// successfully process a `COMMIT` request, which means that any outstanding results in the
957 /// result stream must be consumed via [`Client::pull`]. If the server is in the
958 /// [`Failed`](ServerState::Failed) or [`Interrupted`](ServerState::Interrupted) state, the
959 /// response will be [`IGNORED`](Message::Ignored). For any other states, receipt of a `COMMIT`
960 /// request will be considered a protocol violation and will lead to connection closure.
961 ///
962 /// To instead cancel pending changes, send a [`ROLLBACK`](Message::Rollback) message.
963 ///
964 /// # Response
965 /// - [`Message::Success`] - the transaction has been successfully committed and the server has
966 /// entered the [`Ready`](ServerState::Ready) state. The server sends the following metadata
967 /// fields in the response:
968 /// - `bookmark` (e.g. `"bookmark:1234"`)
969 /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
970 /// [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
971 /// being processed. No server state change has occurred.
972 /// - [`Message::Failure`] - the request could not be processed successfully and the server has
973 /// entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
974 /// message to provide more detail on the nature of the failure.
975 #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
976 pub async fn commit(&mut self) -> CommunicationResult<Message> {
977 self.send_message(Message::Commit).await?;
978 self.read_message().await
979 }
980
981 /// Send a [`ROLLBACK`](Message::Rollback) message to the server.
982 /// _(Bolt v3+ only.)_
983 ///
984 /// # Description
985 /// The `ROLLBACK` message requests to cancel a transaction and transition the server back to
986 /// the [`Ready`](ServerState::Ready) state. Any changes made since the transaction was started
987 /// will be undone.
988 ///
989 /// The server must be in the [`TxReady`](ServerState::TxReady) state to be able to
990 /// successfully process a `ROLLBACK` request, which means that any outstanding results in the
991 /// result stream must be consumed via [`Client::pull`]. If the server is in the
992 /// [`Failed`](ServerState::Failed) or [`Interrupted`](ServerState::Interrupted) state, the
993 /// response will be [`IGNORED`](Message::Ignored). For any other states, receipt of a
994 /// `ROLLBACK` request will be considered a protocol violation and will lead to connection
995 /// closure.
996 ///
997 /// To instead persist pending changes, send a [`COMMIT`](Message::Commit) message.
998 ///
999 /// # Response
1000 /// - [`Message::Success`] - the transaction has been successfully reverted and the server has
1001 /// entered the [`Ready`](ServerState::Ready) state.
1002 /// - [`Message::Ignored`] - the server is in the [`Failed`](ServerState::Failed) or
1003 /// [`Interrupted`](ServerState::Interrupted) state, and the request was discarded without
1004 /// being processed. No server state change has occurred.
1005 /// - [`Message::Failure`] - the request could not be processed successfully and the server has
1006 /// entered the [`Failed`](ServerState::Failed) state. The server may attach metadata to the
1007 /// message to provide more detail on the nature of the failure.
1008 #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
1009 pub async fn rollback(&mut self) -> CommunicationResult<Message> {
1010 self.send_message(Message::Rollback).await?;
1011 self.read_message().await
1012 }
1013
1014 /// Send an [`ACK_FAILURE`](Message::AckFailure) message to the server.
1015 /// _(Bolt v1 - v2 only. For Bolt v3+, see [`Client::reset`].)_
1016 ///
1017 /// # Description
1018 /// `ACK_FAILURE` signals to the server that the client has acknowledged a previous failure and
1019 /// should return to the [`Ready`](ServerState::Ready) state.
1020 ///
1021 /// The server must be in the [`Failed`](ServerState::Failed) state to be able to successfully
1022 /// process an `ACK_FAILURE` request. For any other states, receipt of an `ACK_FAILURE` request
1023 /// will be considered a protocol violation and will lead to connection closure.
1024 ///
1025 /// # Response
1026 /// - [`Message::Success`] - failure has been successfully acknowledged and the server has
1027 /// entered the [`Ready`](ServerState::Ready) state. The server may attach metadata to the
1028 /// `SUCCESS` message.
1029 /// - [`Message::Failure`] - the request could not be processed successfully and the server has
1030 /// entered the [`Defunct`](ServerState::Defunct) state. The server may choose to include
1031 /// metadata describing the nature of the failure but will immediately close the connection
1032 /// after the failure has been sent.
1033 #[bolt_version(1, 2)]
1034 pub async fn ack_failure(&mut self) -> CommunicationResult<Message> {
1035 self.send_message(Message::AckFailure).await?;
1036 self.read_message().await
1037 }
1038
1039 /// Send a [`RESET`](Message::Reset) message to the server.
1040 /// _(Bolt v1+. For Bolt v1 - v2, see [`Client::ack_failure`] for just clearing the
1041 /// [`Failed`](ServerState::Failed) state.)_
1042 ///
1043 /// # Description
1044 /// The `RESET` message requests that the connection be set back to its initial state, as if
1045 /// initialization had just been successfully completed. The `RESET` message is unique in that
1046 /// it on arrival at the server, it jumps ahead in the message queue, stopping any unit of work
1047 /// that happens to be executing. All the queued messages originally in front of the `RESET`
1048 /// message will then be [`IGNORED`](Message::Ignored) until the `RESET` position is reached,
1049 /// at which point the server will be ready for a new session.
1050 ///
1051 /// Specifically, `RESET` will:
1052 /// - force any currently processing message to abort with [`IGNORED`](Message::Ignored)
1053 /// - force any pending messages that have not yet started processing to be
1054 /// [`IGNORED`](Message::Ignored)
1055 /// - clear any outstanding [`Failed`](ServerState::Failed) state
1056 /// - dispose of any outstanding result records
1057 /// - cancel the current transaction, if any
1058 ///
1059 /// # Response
1060 /// - [`Message::Success`] - the session has been successfully reset and the server has entered
1061 /// the [`Ready`](ServerState::Ready) state.
1062 /// - [`Message::Failure`] - the request could not be processed successfully and the server has
1063 /// entered the [`Defunct`](ServerState::Defunct) state. The server may choose to include
1064 /// metadata describing the nature of the failure but will immediately close the connection
1065 /// after the failure has been sent.
1066 #[bolt_version(1, 2, 3, 4, 4.1, 4.2, 4.3, 4.4)]
1067 pub async fn reset(&mut self) -> CommunicationResult<Message> {
1068 self.send_message(Message::Reset).await?;
1069 loop {
1070 match self.read_message().await? {
1071 Message::Success(success) => return Ok(Message::Success(success)),
1072 Message::Failure(failure) => return Ok(Message::Failure(failure)),
1073 Message::Ignored => {}
1074 _ => unreachable!(),
1075 }
1076 }
1077 }
1078
1079 /// Send a [`GOODBYE`](Message::Goodbye) message to the server.
1080 /// _(Bolt v3+ only.)_
1081 ///
1082 /// # Description
1083 /// The `GOODBYE` message notifies the server that the connection is terminating gracefully. On
1084 /// receipt of this message, the server will immediately shut down the socket on its side
1085 /// without sending a response. A client may shut down the socket at any time after sending the
1086 /// `GOODBYE` message. This message interrupts the server's current work, if any.
1087 #[bolt_version(3, 4, 4.1, 4.2, 4.3, 4.4)]
1088 pub async fn goodbye(&mut self) -> CommunicationResult<()> {
1089 self.send_message(Message::Goodbye).await?;
1090 self.server_state = Defunct;
1091 Ok(self.stream.close().await?)
1092 }
1093
1094 /// Send multiple messages to the server without waiting for a response. Returns a [`Vec`]
1095 /// containing the server's response messages for each of the sent messages, in the order they
1096 /// were provided.
1097 ///
1098 /// # Description
1099 /// The client is not required to wait for a response before sending more messages. Sending
1100 /// multiple messages together like this is called _pipelining_. For performance reasons, it is
1101 /// recommended that clients use pipelining wherever possible. Using pipelining, multiple
1102 /// messages can be transmitted together in the same network package, significantly reducing
1103 /// latency and increasing throughput.
1104 ///
1105 /// A common technique is to buffer outgoing messages on the client until the last possible
1106 /// moment, such as when a commit is issued or a result is read by the application, and then
1107 /// sending all messages in the buffer together.
1108 ///
1109 /// # Failure Handling
1110 /// Because the protocol leverages pipelining, the client and the server need to agree on what
1111 /// happens when a failure occurs, otherwise messages that were sent assuming no failure would
1112 /// occur might have unintended effects.
1113 ///
1114 /// When requests fail on the server, the server will send the client a
1115 /// [`FAILURE`](Message::Failure) message. The client must clear the failure state by sending a
1116 /// [`RESET`](Message::Reset) (Bolt v3+) or [`ACK_FAILURE`](Message::AckFailure) (Bolt v1 - v2)
1117 /// message to the server. Until the server receives the `RESET`/`ACK_FAILURE` message, it will
1118 /// send an [`IGNORED`](Message::Ignored) message in response to any other message from the
1119 /// client, including messages that were sent in a pipeline.
1120 pub async fn pipeline(&mut self, messages: Vec<Message>) -> CommunicationResult<Vec<Message>> {
1121 // This Vec is too small if we're expecting some RECORD messages, so there's no "good" size
1122 let mut responses = Vec::with_capacity(messages.len());
1123
1124 for message in &messages {
1125 #[cfg(test)]
1126 println!(">>> {:?}", message);
1127
1128 let chunks = message.clone().into_chunks().map_err(ProtocolError::from)?;
1129
1130 for chunk in chunks {
1131 self.stream.write_all(&chunk).await?;
1132 }
1133
1134 // Immediate state changes
1135 match message {
1136 Message::Reset => self.server_state = Interrupted,
1137 Message::Goodbye => self.server_state = Disconnected,
1138 _ => {}
1139 }
1140 }
1141 self.stream.flush().await?;
1142 self.sent_queue.extend(messages);
1143
1144 while !self.sent_queue.is_empty() {
1145 responses.push(self.read_message().await?);
1146 }
1147 Ok(responses)
1148 }
1149}