scylla/client/session.rs
1//! `Session` is the main object used in the driver.\
2//! It manages all connections to the cluster and allows to execute CQL requests.
3
4use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
5use super::pager::{PreparedPagerConfig, QueryPager};
6use super::{Compression, PoolSize, SelfIdentity, WriteCoalescingDelay};
7use crate::authentication::AuthenticatorProvider;
8use crate::client::client_routes::ClientRoutesConfig;
9use crate::cluster::node::{KnownNode, NodeRef};
10use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
11use crate::errors::{
12 BadQuery, BrokenConnectionError, ExecutionError, MetadataError, NewSessionError,
13 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, SchemaAgreementError,
14 TracingError, UseKeyspaceError,
15};
16use crate::frame::response::result;
17use crate::network::tls::TlsProvider;
18use crate::network::{Connection, ConnectionConfig, PoolConfig, VerifiedKeyspaceName};
19use crate::observability::driver_tracing::RequestSpan;
20use crate::observability::history::{self, HistoryListener};
21#[cfg(feature = "metrics")]
22use crate::observability::metrics::Metrics;
23use crate::observability::tracing::TracingInfo;
24use crate::policies::address_translator::AddressTranslator;
25use crate::policies::host_filter::HostFilter;
26use crate::policies::load_balancing::{self, RoutingInfo};
27use crate::policies::reconnect::ExponentialReconnectPolicy;
28#[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
29use crate::policies::reconnect::ReconnectPolicy;
30use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
31use crate::policies::speculative_execution;
32use crate::policies::timestamp_generator::TimestampGenerator;
33use crate::response::query_result::{MaybeFirstRowError, QueryResult, RowsError};
34use crate::response::{
35 Coordinator, NonErrorQueryResponse, PagingState, PagingStateResponse, QueryResponse,
36};
37use crate::routing::partitioner::PartitionerName;
38use crate::routing::{Shard, ShardAwarePortRange};
39use crate::statement::batch::batch_values;
40use crate::statement::batch::{Batch, BatchStatement};
41use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
42use crate::statement::unprepared::Statement;
43use crate::statement::{Consistency, PageSize, StatementConfig};
44use arc_swap::ArcSwapOption;
45use futures::future::join_all;
46use futures::future::try_join_all;
47use itertools::Itertools;
48use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadataV2 as NonErrorResponseWithDeserializedMetadata;
49use scylla_cql::frame::response::error::DbError;
50use scylla_cql::serialize::batch::BatchValues;
51use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
52use std::borrow::Borrow;
53use std::future::Future;
54use std::net::{IpAddr, SocketAddr};
55use std::num::NonZeroU32;
56use std::ops::ControlFlow;
57use std::sync::{Arc, OnceLock};
58use std::time::Duration;
59use thiserror::Error;
60use tokio::time::timeout;
61use tracing::{Instrument, debug, error, trace, trace_span};
62use uuid::Uuid;
63
64pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
65
66// Query used for schema agreement checks
67const SCHEMA_VERSION_QUERY_STR: &str = "SELECT schema_version FROM system.local WHERE key='local'";
68
69/// Statements for internal driver operations.
70///
71/// We would like those to be prepared, but there are issues related to
72/// changing connection keyspace: https://github.com/scylladb/scylla-rust-driver/issues/1561
73#[derive(Default)]
74struct InternalStatements {
75 /// Statement for querying tracing session info from system_traces.sessions
76 tracing_session: OnceLock<Statement>,
77 /// Statement for querying tracing events from system_traces.events
78 tracing_events: OnceLock<Statement>,
79 /// Statement for fetching schema version during schema agreement checks
80 schema_version: OnceLock<Statement>,
81}
82
83/// `Session` manages connections to the cluster and allows to execute CQL requests.
84pub struct Session {
85 cluster: Cluster,
86 default_execution_profile_handle: ExecutionProfileHandle,
87 schema_agreement_interval: Duration,
88 #[cfg(feature = "metrics")]
89 metrics: Arc<Metrics>,
90 schema_agreement_timeout: Duration,
91 schema_agreement_automatic_waiting: bool,
92 refresh_metadata_on_auto_schema_agreement: bool,
93 keyspace_name: Arc<ArcSwapOption<String>>,
94 tracing_info_fetch_attempts: NonZeroU32,
95 tracing_info_fetch_interval: Duration,
96 tracing_info_fetch_consistency: Consistency,
97 internal_statements: InternalStatements,
98}
99
100/// This implementation deliberately omits some details from Cluster in order
101/// to avoid cluttering the print with much information of little usability.
102impl std::fmt::Debug for Session {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 let mut d = f.debug_struct("Session");
105 d.field("cluster", &ClusterNeatDebug(&self.cluster))
106 .field(
107 "default_execution_profile_handle",
108 &self.default_execution_profile_handle,
109 )
110 .field("schema_agreement_interval", &self.schema_agreement_interval);
111
112 #[cfg(feature = "metrics")]
113 d.field("metrics", &self.metrics);
114
115 d.field(
116 "auto_await_schema_agreement_timeout",
117 &self.schema_agreement_timeout,
118 )
119 .field(
120 "schema_agreement_automatic_waiting",
121 &self.schema_agreement_automatic_waiting,
122 )
123 .field(
124 "refresh_metadata_on_auto_schema_agreement",
125 &self.refresh_metadata_on_auto_schema_agreement,
126 )
127 .field("keyspace_name", &self.keyspace_name)
128 .field(
129 "tracing_info_fetch_attempts",
130 &self.tracing_info_fetch_attempts,
131 )
132 .field(
133 "tracing_info_fetch_interval",
134 &self.tracing_info_fetch_interval,
135 )
136 .field(
137 "tracing_info_fetch_consistency",
138 &self.tracing_info_fetch_consistency,
139 )
140 .finish()
141 }
142}
143
144/// Represents a TLS context used to configure TLS connections to DB nodes.
145/// Abstracts over various TLS implementations, such as OpenSSL and Rustls.
146#[derive(Clone)] // Cheaply clonable - reference counted.
147#[non_exhaustive]
148pub enum TlsContext {
149 /// TLS context backed by OpenSSL 0.10.
150 #[cfg(feature = "openssl-010")]
151 OpenSsl010(openssl::ssl::SslContext),
152 /// TLS context backed by Rustls 0.23.
153 #[cfg(feature = "rustls-023")]
154 Rustls023(Arc<rustls::ClientConfig>),
155}
156
157#[cfg(feature = "openssl-010")]
158impl From<openssl::ssl::SslContext> for TlsContext {
159 fn from(value: openssl::ssl::SslContext) -> Self {
160 TlsContext::OpenSsl010(value)
161 }
162}
163
164#[cfg(feature = "rustls-023")]
165impl From<Arc<rustls::ClientConfig>> for TlsContext {
166 fn from(value: Arc<rustls::ClientConfig>) -> Self {
167 TlsContext::Rustls023(value)
168 }
169}
170
171/// Configuration options for [`Session`].
172/// Can be created manually, but usually it's easier to use
173/// [SessionBuilder](super::session_builder::SessionBuilder)
174#[derive(Clone)]
175#[non_exhaustive]
176pub struct SessionConfig {
177 /// List of database servers known on Session startup.
178 /// Session will connect to these nodes to retrieve information about other nodes in the cluster.
179 /// Each node can be represented as a hostname or an IP address.
180 pub known_nodes: Vec<KnownNode>,
181
182 /// A local ip address to bind all driver's TCP sockets to.
183 ///
184 /// By default set to None, which is equivalent to:
185 /// - `INADDR_ANY` for IPv4 ([`Ipv4Addr::UNSPECIFIED`][std::net::Ipv4Addr::UNSPECIFIED])
186 /// - `in6addr_any` for IPv6 ([`Ipv6Addr::UNSPECIFIED`][std::net::Ipv6Addr::UNSPECIFIED])
187 pub local_ip_address: Option<IpAddr>,
188
189 /// Specifies the local port range used for shard-aware connections.
190 ///
191 /// By default set to [`ShardAwarePortRange::EPHEMERAL_PORT_RANGE`].
192 pub shard_aware_local_port_range: ShardAwarePortRange,
193
194 /// Preferred compression algorithm to use on connections.
195 /// If it's not supported by database server Session will fall back to no compression.
196 pub compression: Option<Compression>,
197
198 /// Whether to set the nodelay TCP flag.
199 pub tcp_nodelay: bool,
200
201 /// TCP keepalive interval, which means how often keepalive messages
202 /// are sent **on TCP layer** when a connection is idle.
203 /// If `None`, no TCP keepalive messages are sent.
204 pub tcp_keepalive_interval: Option<Duration>,
205
206 /// Size of the TCP receive buffer in bytes.
207 /// If `None`, the OS default is used.
208 pub tcp_recv_buffer_size: Option<usize>,
209
210 /// Size of the TCP send buffer in bytes.
211 /// If `None`, the OS default is used.
212 pub tcp_send_buffer_size: Option<usize>,
213
214 /// Whether to set the `SO_REUSEADDR` socket option.
215 /// If `None`, the OS default is used (typically `false`).
216 pub tcp_reuse_address: Option<bool>,
217
218 /// Linger duration for the socket.
219 /// If `None`, the OS default is used (lingering disabled).
220 /// Setting this to `Some(Duration::ZERO)` causes the connection to be reset (RST) on close.
221 pub tcp_linger: Option<Duration>,
222
223 /// Handle to the default execution profile, which is used
224 /// for all statements that do not specify an execution profile.
225 pub default_execution_profile_handle: ExecutionProfileHandle,
226
227 /// Keyspace to be used on all connections.
228 /// Each connection will send `"USE <keyspace_name>"` before sending any requests.
229 /// This can be later changed with [`Session::use_keyspace`].
230 pub used_keyspace: Option<String>,
231
232 /// Whether the keyspace name is case-sensitive.
233 /// This is used to determine how the keyspace name is sent to the server:
234 /// - if case-insensitive, it is sent as-is,
235 /// - if case-sensitive, it is enclosed in double quotes.
236 pub keyspace_case_sensitive: bool,
237
238 /// TLS context used configure TLS connections to DB nodes.
239 pub tls_context: Option<TlsContext>,
240
241 /// Custom authenticator provider to create an authenticator instance
242 /// upon session creation.
243 pub authenticator: Option<Arc<dyn AuthenticatorProvider>>,
244
245 /// Timeout for establishing connections to a node.
246 ///
247 /// If it's higher than underlying os's default connection timeout, it won't have
248 /// any effect.
249 pub connect_timeout: Duration,
250
251 /// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
252 /// The default is `PerShard(1)`, which is the recommended setting for ScyllaDB clusters.
253 pub connection_pool_size: PoolSize,
254
255 /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
256 /// Generally, this options is best left as default (false).
257 pub disallow_shard_aware_port: bool,
258
259 /// Policy that determines how long the connection pool waits between attempts
260 /// to fill connections to given host.
261 #[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
262 pub reconnect_policy: Arc<dyn ReconnectPolicy>,
263
264 /// Timestamp generator used for generating timestamps on the client-side
265 /// If None, server-side timestamps are used.
266 pub timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
267
268 /// If empty, fetch all keyspaces
269 pub keyspaces_to_fetch: Vec<String>,
270
271 /// If true, full schema is fetched with every metadata refresh.
272 pub fetch_schema_metadata: bool,
273
274 /// Custom timeout for requests that query metadata.
275 pub metadata_request_serverside_timeout: Option<Duration>,
276
277 /// Interval of sending keepalive requests.
278 /// If `None`, keepalives are never sent, so `Self::keepalive_timeout` has no effect.
279 pub keepalive_interval: Option<Duration>,
280
281 /// Controls after what time of not receiving response to keepalives a connection is closed.
282 /// If `None`, connections are never closed due to lack of response to a keepalive message.
283 pub keepalive_timeout: Option<Duration>,
284
285 /// How often the driver should ask if schema is in agreement.
286 pub schema_agreement_interval: Duration,
287
288 /// Controls the timeout for waiting for schema agreement.
289 /// This works both for manual awaiting schema agreement and for
290 /// automatic waiting after a schema-altering statement is sent.
291 pub schema_agreement_timeout: Duration,
292
293 /// Controls whether schema agreement is automatically awaited
294 /// after sending a schema-altering statement.
295 pub schema_agreement_automatic_waiting: bool,
296
297 /// If true, full schema metadata is fetched after successfully reaching a schema agreement.
298 /// It is true by default but can be disabled if successive schema-altering statements should be performed.
299 pub refresh_metadata_on_auto_schema_agreement: bool,
300
301 /// DNS hostname resolution timeout.
302 /// If `None`, the driver will wait for hostname resolution indefinitely.
303 pub hostname_resolution_timeout: Option<Duration>,
304
305 /// The address translator is used to translate addresses received from ScyllaDB nodes
306 /// (either with cluster metadata or with an event) to addresses that can be used to
307 /// actually connect to those nodes. This may be needed e.g. when there is NAT
308 /// between the nodes and the driver.
309 pub address_translator: Option<Arc<dyn AddressTranslator>>,
310
311 /// Routing configuration for Scylla Cloud. If set, Session will connect
312 /// to Scylla Cloud clusters using custom routing based on `system.client_routes`.
313 #[cfg(feature = "unstable-client-routes")]
314 pub(crate) client_routes_config: Option<super::client_routes::ClientRoutesConfig>,
315
316 /// The host filter decides whether any connections should be opened
317 /// to the node or not. The driver will also avoid filtered out nodes when
318 /// re-establishing the control connection.
319 pub host_filter: Option<Arc<dyn HostFilter>>,
320
321 #[cfg(all(scylla_unstable, feature = "unstable-host-listener"))]
322 /// Optional listener for host events (ADD, REMOVE, UP, DOWN).
323 pub host_listener: Option<Arc<dyn crate::policies::host_listener::HostListener>>,
324
325 /// If true, the driver will inject a delay controlled by [`SessionConfig::write_coalescing_delay`]
326 /// before flushing data to the socket.
327 /// This gives the driver an opportunity to collect more write requests
328 /// and write them in a single syscall, increasing the efficiency.
329 ///
330 /// However, this optimization may worsen latency if the rate of requests
331 /// issued by the application is low, but otherwise the application is
332 /// heavily loaded with other tasks on the same tokio executor.
333 /// Please do performance measurements before committing to disabling
334 /// this option.
335 pub enable_write_coalescing: bool,
336
337 /// Controls the write coalescing delay (if enabled).
338 ///
339 /// This option has no effect if [`SessionConfig::enable_write_coalescing`] is false.
340 ///
341 /// This option is [`WriteCoalescingDelay::SmallNondeterministic`] by default.
342 pub write_coalescing_delay: WriteCoalescingDelay,
343
344 /// Number of attempts to fetch [`TracingInfo`]
345 /// in [`Session::get_tracing_info`]. Tracing info
346 /// might not be available immediately on queried node - that's why
347 /// the driver performs a few attempts with sleeps in between.
348 pub tracing_info_fetch_attempts: NonZeroU32,
349
350 /// Delay between attempts to fetch [`TracingInfo`]
351 /// in [`Session::get_tracing_info`]. Tracing info
352 /// might not be available immediately on queried node - that's why
353 /// the driver performs a few attempts with sleeps in between.
354 pub tracing_info_fetch_interval: Duration,
355
356 /// Consistency level of fetching [`TracingInfo`]
357 /// in [`Session::get_tracing_info`].
358 pub tracing_info_fetch_consistency: Consistency,
359
360 /// Interval between refreshing cluster metadata. This
361 /// can be configured according to the traffic pattern
362 /// for e.g: if they do not want unexpected traffic
363 /// or they expect the topology to change frequently.
364 pub cluster_metadata_refresh_interval: Duration,
365
366 /// Driver and application self-identifying information,
367 /// to be sent to server in STARTUP message.
368 pub identity: SelfIdentity<'static>,
369}
370
371impl SessionConfig {
372 /// Creates a [`SessionConfig`] with default configuration
373 /// # Default configuration
374 /// * Compression: None
375 /// * Load balancing policy: Token-aware Round-robin
376 ///
377 /// # Example
378 /// ```
379 /// # use scylla::client::session::SessionConfig;
380 /// let config = SessionConfig::new();
381 /// ```
382 pub fn new() -> Self {
383 SessionConfig {
384 known_nodes: Vec::new(),
385 local_ip_address: None,
386 shard_aware_local_port_range: ShardAwarePortRange::EPHEMERAL_PORT_RANGE,
387 compression: None,
388 tcp_nodelay: true,
389 tcp_keepalive_interval: None,
390 tcp_recv_buffer_size: None,
391 tcp_send_buffer_size: None,
392 tcp_reuse_address: None,
393 tcp_linger: None,
394 schema_agreement_interval: Duration::from_millis(200),
395 default_execution_profile_handle: ExecutionProfile::new_from_inner(Default::default())
396 .into_handle(),
397 used_keyspace: None,
398 keyspace_case_sensitive: false,
399 tls_context: None,
400 authenticator: None,
401 connect_timeout: Duration::from_secs(5),
402 hostname_resolution_timeout: Some(Duration::from_secs(5)),
403 connection_pool_size: Default::default(),
404 disallow_shard_aware_port: false,
405 #[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
406 reconnect_policy: Arc::new(ExponentialReconnectPolicy::new()),
407 timestamp_generator: None,
408 keyspaces_to_fetch: Vec::new(),
409 fetch_schema_metadata: true,
410 metadata_request_serverside_timeout: Some(Duration::from_secs(2)),
411 keepalive_interval: Some(Duration::from_secs(30)),
412 keepalive_timeout: Some(Duration::from_secs(30)),
413 schema_agreement_timeout: Duration::from_secs(60),
414 schema_agreement_automatic_waiting: true,
415 address_translator: None,
416 host_filter: None,
417 #[cfg(all(scylla_unstable, feature = "unstable-host-listener"))]
418 host_listener: None,
419 refresh_metadata_on_auto_schema_agreement: true,
420 enable_write_coalescing: true,
421 write_coalescing_delay: WriteCoalescingDelay::SmallNondeterministic,
422 tracing_info_fetch_attempts: NonZeroU32::new(10).unwrap(),
423 tracing_info_fetch_interval: Duration::from_millis(3),
424 tracing_info_fetch_consistency: Consistency::One,
425 cluster_metadata_refresh_interval: Duration::from_secs(60),
426 identity: SelfIdentity::default(),
427 #[cfg(feature = "unstable-client-routes")]
428 client_routes_config: None,
429 }
430 }
431
432 /// Adds a known database server with a hostname.
433 /// If the port is not explicitly specified, 9042 is used as default
434 /// # Example
435 /// ```
436 /// # use scylla::client::session::SessionConfig;
437 /// let mut config = SessionConfig::new();
438 /// config.add_known_node("127.0.0.1");
439 /// config.add_known_node("db1.example.com:9042");
440 /// ```
441 pub fn add_known_node(&mut self, hostname: impl AsRef<str>) {
442 self.known_nodes
443 .push(KnownNode::Hostname(hostname.as_ref().to_string()));
444 }
445
446 /// Adds a known database server with an IP address
447 /// # Example
448 /// ```
449 /// # use scylla::client::session::SessionConfig;
450 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
451 /// let mut config = SessionConfig::new();
452 /// config.add_known_node_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9042));
453 /// ```
454 pub fn add_known_node_addr(&mut self, node_addr: SocketAddr) {
455 self.known_nodes.push(KnownNode::Address(node_addr));
456 }
457
458 /// Adds a list of known database server with hostnames.
459 /// If the port is not explicitly specified, 9042 is used as default
460 /// # Example
461 /// ```
462 /// # use scylla::client::session::SessionConfig;
463 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
464 /// let mut config = SessionConfig::new();
465 /// config.add_known_nodes(&["127.0.0.1:9042", "db1.example.com"]);
466 /// ```
467 pub fn add_known_nodes(&mut self, hostnames: impl IntoIterator<Item = impl AsRef<str>>) {
468 for hostname in hostnames {
469 self.add_known_node(hostname);
470 }
471 }
472
473 /// Adds a list of known database servers with IP addresses
474 /// # Example
475 /// ```
476 /// # use scylla::client::session::SessionConfig;
477 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
478 /// let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 9042);
479 /// let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9042);
480 ///
481 /// let mut config = SessionConfig::new();
482 /// config.add_known_nodes_addr(&[addr1, addr2]);
483 /// ```
484 pub fn add_known_nodes_addr(
485 &mut self,
486 node_addrs: impl IntoIterator<Item = impl Borrow<SocketAddr>>,
487 ) {
488 for address in node_addrs {
489 self.add_known_node_addr(*address.borrow());
490 }
491 }
492}
493
494/// Creates default [`SessionConfig`], same as [`SessionConfig::new`]
495impl Default for SessionConfig {
496 fn default() -> Self {
497 Self::new()
498 }
499}
500
501impl SessionConfig {
502 /// [SessionConfig] may unfortunately represent invalid configurations. We need to rule them out
503 /// at runtime by running validation.
504 #[expect(clippy::result_large_err)] // TODO(2.0): Make NewSessionError smaller.
505 fn validate(&self) -> Result<(), NewSessionError> {
506 // Ensure there is at least one known node
507 if self.known_nodes.is_empty() {
508 return Err(NewSessionError::EmptyKnownNodesList);
509 }
510
511 // Ensure no illegal configuration with Client Routes
512 #[cfg(feature = "unstable-client-routes")]
513 if self.client_routes_config.is_some() {
514 if self.address_translator.is_some() {
515 return Err(NewSessionError::IllegalConfig(
516 "User-provided address translator is not supported if ClientRoutesConfig is provided, \
517 because the driver uses its own custom translator for client routes".into(),
518 ));
519 }
520
521 if self.tls_context.is_some() {
522 return Err(NewSessionError::IllegalConfig(
523 "TLS is not (yet) supported if ClientRoutesConfig is provided, \
524 because of architectural limitations that are out of the driver's scope"
525 .into(),
526 ));
527 }
528 }
529
530 Ok(())
531 }
532}
533
534pub(crate) enum RunRequestResult<ResT> {
535 IgnoredWriteError,
536 Completed(ResT),
537}
538
539/// Represents a CQL session, which can be used to communicate
540/// with the database
541impl Session {
542 /// Sends a request to the database and receives a response.\
543 /// Executes an unprepared CQL statement without paging, i.e. all results are received in a single response.
544 ///
545 /// This is the easiest way to execute a CQL statement, but performance is worse than that of prepared statements.
546 ///
547 /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
548 /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
549 /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_unpaged()`] instead.
550 ///
551 /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
552 /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
553 /// it is best to use paged requests:
554 /// - to receive multiple pages and transparently iterate through them, use [query_iter](Session::query_iter).
555 /// - to manually receive multiple pages and iterate through them, use [query_single_page](Session::query_single_page).
556 ///
557 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/unprepared.html) for more information
558 /// # Arguments
559 /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
560 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
561 ///
562 /// # Examples
563 /// ```rust
564 /// # use scylla::client::session::Session;
565 /// # use std::error::Error;
566 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
567 /// // Insert an int and text into a table.
568 /// session
569 /// .query_unpaged(
570 /// "INSERT INTO ks.tab (a, b) VALUES(?, ?)",
571 /// (2_i32, "some text")
572 /// )
573 /// .await?;
574 /// # Ok(())
575 /// # }
576 /// ```
577 /// ```rust
578 /// # use scylla::client::session::Session;
579 /// # use std::error::Error;
580 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
581 ///
582 /// // Read rows containing an int and text.
583 /// // Keep in mind that all results come in one response (no paging is done!),
584 /// // so the memory footprint and latency may be huge!
585 /// // To prevent that, use `Session::query_iter` or `Session::query_single_page`.
586 /// let query_rows = session
587 /// .query_unpaged("SELECT a, b FROM ks.tab", &[])
588 /// .await?
589 /// .into_rows_result()?;
590 ///
591 /// for row in query_rows.rows()? {
592 /// // Parse row as int and text.
593 /// let (int_val, text_val): (i32, &str) = row?;
594 /// }
595 /// # Ok(())
596 /// # }
597 /// ```
598 pub async fn query_unpaged(
599 &self,
600 statement: impl Into<Statement>,
601 values: impl SerializeRow,
602 ) -> Result<QueryResult, ExecutionError> {
603 self.do_query_unpaged(&statement.into(), values).await
604 }
605
606 /// Queries a single page from the database, optionally continuing from a saved point.
607 ///
608 /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
609 /// trait method returns false). In such case, CQL statement first needs to be prepared (on a single connection), so
610 /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_single_page()`] instead.
611 ///
612 /// # Arguments
613 ///
614 /// * `statement` - statement to be executed
615 /// * `values` - values bound to the statement
616 /// * `paging_state` - previously received paging state or [PagingState::start()]
617 ///
618 /// # Example
619 ///
620 /// ```rust
621 /// # use scylla::client::session::Session;
622 /// # use std::error::Error;
623 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
624 /// use std::ops::ControlFlow;
625 /// use scylla::response::PagingState;
626 ///
627 /// // Manual paging in a loop, unprepared statement.
628 /// let mut paging_state = PagingState::start();
629 /// loop {
630 /// let (res, paging_state_response) = session
631 /// .query_single_page("SELECT a, b, c FROM ks.tbl", &[], paging_state)
632 /// .await?;
633 ///
634 /// // Do something with a single page of results.
635 /// for row in res
636 /// .into_rows_result()?
637 /// .rows::<(i32, &str)>()?
638 /// {
639 /// let (a, b) = row?;
640 /// }
641 ///
642 /// match paging_state_response.into_paging_control_flow() {
643 /// ControlFlow::Break(()) => {
644 /// // No more pages to be fetched.
645 /// break;
646 /// }
647 /// ControlFlow::Continue(new_paging_state) => {
648 /// // Update paging state from the response, so that query
649 /// // will be resumed from where it ended the last time.
650 /// paging_state = new_paging_state;
651 /// }
652 /// }
653 /// }
654 /// # Ok(())
655 /// # }
656 /// ```
657 pub async fn query_single_page(
658 &self,
659 statement: impl Into<Statement>,
660 values: impl SerializeRow,
661 paging_state: PagingState,
662 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
663 self.do_query_single_page(&statement.into(), values, paging_state)
664 .await
665 }
666
667 /// Execute an unprepared CQL statement with paging\
668 /// This method will query all pages of the result\
669 ///
670 /// Returns an async iterator (stream) over all received rows\
671 /// Page size can be specified in the [`Statement`] passed to the function
672 ///
673 /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
674 /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
675 /// driver will initially perform 2 round trips instead of 1. Please use [`Session::execute_iter()`] instead.
676 ///
677 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
678 ///
679 /// # Arguments
680 /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
681 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
682 ///
683 /// # Example
684 ///
685 /// ```rust
686 /// # use scylla::client::session::Session;
687 /// # use std::error::Error;
688 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
689 /// use futures::stream::StreamExt;
690 ///
691 /// let mut rows_stream = session
692 /// .query_iter("SELECT a, b FROM ks.t", &[])
693 /// .await?
694 /// .rows_stream::<(i32, i32)>()?;
695 ///
696 /// while let Some(next_row_res) = rows_stream.next().await {
697 /// let (a, b): (i32, i32) = next_row_res?;
698 /// println!("a, b: {}, {}", a, b);
699 /// }
700 /// # Ok(())
701 /// # }
702 /// ```
703 pub async fn query_iter(
704 &self,
705 statement: impl Into<Statement>,
706 values: impl SerializeRow,
707 ) -> Result<QueryPager, PagerExecutionError> {
708 self.do_query_iter(statement.into(), values).await
709 }
710
711 /// Execute a prepared statement. Requires a [PreparedStatement]
712 /// generated using [`Session::prepare`](Session::prepare).\
713 /// Performs an unpaged request, i.e. all results are received in a single response.
714 ///
715 /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
716 /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
717 /// it is best to use paged requests:
718 /// - to receive multiple pages and transparently iterate through them, use [execute_iter](Session::execute_iter).
719 /// - to manually receive multiple pages and iterate through them, use [execute_single_page](Session::execute_single_page).
720 ///
721 /// Prepared statements are much faster than unprepared statements:
722 /// * Database doesn't need to parse the statement string upon each execution (only once)
723 /// * They are properly load balanced using token aware routing
724 ///
725 /// > ***Warning***\
726 /// > For token/shard aware load balancing to work properly, all partition key values
727 /// > must be sent as bound values
728 /// > (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance)).
729 ///
730 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
731 ///
732 /// # Arguments
733 /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
734 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
735 ///
736 /// # Example
737 /// ```rust
738 /// # use scylla::client::session::Session;
739 /// # use std::error::Error;
740 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
741 /// use scylla::statement::prepared::PreparedStatement;
742 ///
743 /// // Prepare the statement for later execution
744 /// let prepared: PreparedStatement = session
745 /// .prepare("INSERT INTO ks.tab (a) VALUES(?)")
746 /// .await?;
747 ///
748 /// // Execute the prepared statement with some values, just like an unprepared statement.
749 /// let to_insert: i32 = 12345;
750 /// session.execute_unpaged(&prepared, (to_insert,)).await?;
751 /// # Ok(())
752 /// # }
753 /// ```
754 pub async fn execute_unpaged(
755 &self,
756 prepared: &PreparedStatement,
757 values: impl SerializeRow,
758 ) -> Result<QueryResult, ExecutionError> {
759 let serialized_values = prepared.serialize_values(&values)?;
760 let (result, paging_state) = self
761 .execute(prepared, &serialized_values, None, PagingState::start())
762 .await?;
763 if !paging_state.finished() {
764 error!(
765 "Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug."
766 );
767 return Err(ExecutionError::LastAttemptError(
768 RequestAttemptError::NonfinishedPagingState,
769 ));
770 }
771 Ok(result)
772 }
773
774 /// Executes a prepared statement, restricting results to single page.
775 /// Optionally continues fetching results from a saved point.
776 ///
777 /// # Arguments
778 ///
779 /// * `prepared` - a statement prepared with [prepare](crate::client::session::Session::prepare)
780 /// * `values` - values bound to the statement
781 /// * `paging_state` - continuation based on a paging state received from a previous paged query or None
782 ///
783 /// # Example
784 ///
785 /// ```rust
786 /// # use scylla::client::session::Session;
787 /// # use std::error::Error;
788 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
789 /// use std::ops::ControlFlow;
790 /// use scylla::statement::unprepared::Statement;
791 /// use scylla::response::{PagingState, PagingStateResponse};
792 ///
793 /// let paged_prepared = session
794 /// .prepare(
795 /// Statement::new("SELECT a, b FROM ks.tbl")
796 /// .with_page_size(100.try_into().unwrap()),
797 /// )
798 /// .await?;
799 ///
800 /// // Manual paging in a loop, prepared statement.
801 /// let mut paging_state = PagingState::start();
802 /// loop {
803 /// let (res, paging_state_response) = session
804 /// .execute_single_page(&paged_prepared, &[], paging_state)
805 /// .await?;
806 ///
807 /// // Do something with a single page of results.
808 /// for row in res
809 /// .into_rows_result()?
810 /// .rows::<(i32, &str)>()?
811 /// {
812 /// let (a, b) = row?;
813 /// }
814 ///
815 /// match paging_state_response.into_paging_control_flow() {
816 /// ControlFlow::Break(()) => {
817 /// // No more pages to be fetched.
818 /// break;
819 /// }
820 /// ControlFlow::Continue(new_paging_state) => {
821 /// // Update paging continuation from the paging state, so that query
822 /// // will be resumed from where it ended the last time.
823 /// paging_state = new_paging_state;
824 /// }
825 /// }
826 /// }
827 /// # Ok(())
828 /// # }
829 /// ```
830 pub async fn execute_single_page(
831 &self,
832 prepared: &PreparedStatement,
833 values: impl SerializeRow,
834 paging_state: PagingState,
835 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
836 let serialized_values = prepared.serialize_values(&values)?;
837 let page_size = prepared.get_validated_page_size();
838 self.execute(prepared, &serialized_values, Some(page_size), paging_state)
839 .await
840 }
841
842 /// Execute a prepared statement with paging.\
843 /// This method will query all pages of the result.\
844 ///
845 /// Returns an async iterator (stream) over all received rows.\
846 /// Page size can be specified in the [PreparedStatement] passed to the function.
847 ///
848 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
849 ///
850 /// # Arguments
851 /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
852 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
853 ///
854 /// # Example
855 ///
856 /// ```rust
857 /// # use scylla::client::session::Session;
858 /// # use futures::StreamExt as _;
859 /// # use std::error::Error;
860 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
861 /// use scylla::statement::prepared::PreparedStatement;
862 ///
863 /// // Prepare the statement for later execution
864 /// let prepared: PreparedStatement = session
865 /// .prepare("SELECT a, b FROM ks.t")
866 /// .await?;
867 ///
868 /// // Execute the statement and receive all pages
869 /// let mut rows_stream = session
870 /// .execute_iter(prepared, &[])
871 /// .await?
872 /// .rows_stream::<(i32, i32)>()?;
873 ///
874 /// while let Some(next_row_res) = rows_stream.next().await {
875 /// let (a, b): (i32, i32) = next_row_res?;
876 /// println!("a, b: {}, {}", a, b);
877 /// }
878 /// # Ok(())
879 /// # }
880 /// ```
881 pub async fn execute_iter(
882 &self,
883 prepared: impl Into<PreparedStatement>,
884 values: impl SerializeRow,
885 ) -> Result<QueryPager, PagerExecutionError> {
886 let prepared = prepared.into();
887 let serialized_values = prepared.serialize_values(&values)?;
888
889 self.execute_iter_nongeneric(prepared, serialized_values)
890 .await
891 }
892
893 /// Execute a batch statement\
894 /// Batch contains many `unprepared` or `prepared` statements which are executed at once\
895 /// Batch doesn't return any rows.
896 ///
897 /// Batch values must contain values for each of the statements.
898 ///
899 /// Avoid using non-empty values ([`SerializeRow::is_empty()`] return false) for unprepared statements
900 /// inside the batch. Such statements will first need to be prepared, so the driver will need to
901 /// send (numer_of_unprepared_statements_with_values + 1) requests instead of 1 request, severly
902 /// affecting performance.
903 ///
904 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/batch.html) for more information
905 ///
906 /// # Arguments
907 /// * `batch` - [Batch] to be performed
908 /// * `values` - List of values for each statement, it's the easiest to use a tuple of tuples
909 ///
910 /// # Example
911 /// ```rust
912 /// # use scylla::client::session::Session;
913 /// # use std::error::Error;
914 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
915 /// use scylla::statement::batch::Batch;
916 ///
917 /// let mut batch: Batch = Default::default();
918 ///
919 /// // A statement with two bound values
920 /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)");
921 ///
922 /// // A statement with one bound value
923 /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(3, ?)");
924 ///
925 /// // A statement with no bound values
926 /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(5, 6)");
927 ///
928 /// // Batch values is a tuple of 3 tuples containing values for each statement
929 /// let batch_values = ((1_i32, 2_i32), // Tuple with two values for the first statement
930 /// (4_i32,), // Tuple with one value for the second statement
931 /// ()); // Empty tuple/unit for the third statement
932 ///
933 /// // Run the batch
934 /// session.batch(&batch, batch_values).await?;
935 /// # Ok(())
936 /// # }
937 /// ```
938 pub async fn batch(
939 &self,
940 batch: &Batch,
941 values: impl BatchValues,
942 ) -> Result<QueryResult, ExecutionError> {
943 // Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
944 // If users batch statements by shard, they will be rewarded with full shard awareness
945
946 // check to ensure that we don't send a batch statement with more than u16::MAX queries
947 let batch_statements_length = batch.statements.len();
948 if batch_statements_length > u16::MAX as usize {
949 return Err(ExecutionError::BadQuery(
950 BadQuery::TooManyQueriesInBatchStatement(batch_statements_length),
951 ));
952 }
953
954 let execution_profile = batch
955 .get_execution_profile_handle()
956 .unwrap_or_else(|| self.get_default_execution_profile_handle())
957 .access();
958
959 let consistency = batch
960 .config
961 .consistency
962 .unwrap_or(execution_profile.consistency);
963
964 let serial_consistency = batch
965 .config
966 .serial_consistency
967 .unwrap_or(execution_profile.serial_consistency);
968
969 let (first_value_token, values) =
970 batch_values::peek_first_token(values, batch.statements.first())?;
971 let values_ref = &values;
972
973 let table_spec =
974 if let Some(BatchStatement::PreparedStatement(ps)) = batch.statements.first() {
975 ps.get_table_spec()
976 } else {
977 None
978 };
979
980 let statement_info = RoutingInfo {
981 consistency,
982 serial_consistency,
983 token: first_value_token,
984 table: table_spec,
985 is_confirmed_lwt: false,
986 };
987
988 let span = RequestSpan::new_batch();
989
990 let (run_request_result, coordinator): (
991 RunRequestResult<NonErrorQueryResponse>,
992 Coordinator,
993 ) = self
994 .run_request(
995 statement_info,
996 &batch.config,
997 execution_profile,
998 |connection: Arc<Connection>,
999 consistency: Consistency,
1000 execution_profile: &ExecutionProfileInner| {
1001 let serial_consistency = batch
1002 .config
1003 .serial_consistency
1004 .unwrap_or(execution_profile.serial_consistency);
1005 async move {
1006 connection
1007 .batch_with_consistency(
1008 batch,
1009 values_ref,
1010 consistency,
1011 serial_consistency,
1012 )
1013 .await
1014 .and_then(QueryResponse::into_non_error_query_response)
1015 }
1016 },
1017 &span,
1018 )
1019 .instrument(span.span().clone())
1020 .await?;
1021
1022 let result = match run_request_result {
1023 RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(coordinator),
1024 RunRequestResult::Completed(non_error_query_response) => {
1025 let result = non_error_query_response.into_query_result(coordinator)?;
1026 span.record_result_fields(&result);
1027 result
1028 }
1029 };
1030
1031 Ok(result)
1032 }
1033
1034 /// Estabilishes a CQL session with the database
1035 ///
1036 /// Usually it's easier to use [SessionBuilder](crate::client::session_builder::SessionBuilder)
1037 /// instead of calling `Session::connect` directly, because it's more convenient.
1038 /// # Arguments
1039 /// * `config` - Connection configuration - known nodes, Compression, etc.
1040 /// Must contain at least one known node.
1041 ///
1042 /// # Example
1043 /// ```rust
1044 /// # use std::error::Error;
1045 /// # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
1046 /// use scylla::client::session::{Session, SessionConfig};
1047 /// use scylla::cluster::KnownNode;
1048 ///
1049 /// let mut config = SessionConfig::new();
1050 /// config.known_nodes.push(KnownNode::Hostname("127.0.0.1:9042".to_string()));
1051 ///
1052 /// let session: Session = Session::connect(config).await?;
1053 /// # Ok(())
1054 /// # }
1055 /// ```
1056 pub async fn connect(config: SessionConfig) -> Result<Self, NewSessionError> {
1057 config.validate()?;
1058
1059 let known_nodes = config.known_nodes;
1060
1061 let (tablet_sender, tablet_receiver) = tokio::sync::mpsc::channel(TABLET_CHANNEL_SIZE);
1062
1063 let tls_provider = if let Some(tls_context) = config.tls_context {
1064 // To silence warnings when TlsContext is an empty enum (tls features are disabled).
1065 // In such case, TlsProvider is uninhabited.
1066 #[cfg_attr(
1067 not(any(feature = "openssl-010", feature = "rustls-023")),
1068 // TODO: make this expect() once MSRV is 1.92+.
1069 allow(unreachable_code, unused_variables)
1070 )]
1071 let provider = TlsProvider::new_with_global_context(tls_context);
1072 #[cfg_attr(
1073 not(any(feature = "openssl-010", feature = "rustls-023")),
1074 // TODO: remove this once MSRV is 1.92+.
1075 allow(unreachable_code)
1076 )]
1077 Some(provider)
1078 } else {
1079 None
1080 };
1081
1082 let connection_config = ConnectionConfig {
1083 local_ip_address: config.local_ip_address,
1084 shard_aware_local_port_range: config.shard_aware_local_port_range,
1085 compression: config.compression,
1086 tcp_nodelay: config.tcp_nodelay,
1087 tcp_keepalive_interval: config.tcp_keepalive_interval,
1088 tcp_recv_buffer_size: config.tcp_recv_buffer_size,
1089 tcp_send_buffer_size: config.tcp_send_buffer_size,
1090 tcp_reuse_address: config.tcp_reuse_address,
1091 tcp_linger: config.tcp_linger,
1092 timestamp_generator: config.timestamp_generator,
1093 tls_provider,
1094 authenticator: config.authenticator,
1095 connect_timeout: config.connect_timeout,
1096 event_sender: None,
1097 default_consistency: Default::default(),
1098 address_translator: config.address_translator,
1099 write_coalescing_delay: config
1100 .enable_write_coalescing
1101 .then_some(config.write_coalescing_delay),
1102 keepalive_interval: config.keepalive_interval,
1103 keepalive_timeout: config.keepalive_timeout,
1104 tablet_sender: Some(tablet_sender),
1105 identity: config.identity,
1106 };
1107
1108 let pool_config = PoolConfig {
1109 connection_config,
1110 pool_size: config.connection_pool_size,
1111 can_use_shard_aware_port: !config.disallow_shard_aware_port,
1112 #[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
1113 reconnect_policy: config.reconnect_policy,
1114 #[cfg(not(all(scylla_unstable, feature = "unstable-reconnect-policy")))]
1115 reconnect_policy: Arc::new(ExponentialReconnectPolicy::new()),
1116 };
1117
1118 #[cfg(feature = "metrics")]
1119 let metrics = Arc::new(Metrics::new());
1120
1121 let host_listener = {
1122 #[cfg(all(scylla_unstable, feature = "unstable-host-listener"))]
1123 {
1124 config.host_listener
1125 }
1126 #[cfg(not(all(scylla_unstable, feature = "unstable-host-listener")))]
1127 {
1128 None
1129 }
1130 };
1131
1132 let client_routes_config: Option<ClientRoutesConfig> = {
1133 #[cfg(feature = "unstable-client-routes")]
1134 {
1135 config.client_routes_config
1136 }
1137 #[cfg(not(feature = "unstable-client-routes"))]
1138 {
1139 None
1140 }
1141 };
1142
1143 let cluster = Cluster::new(
1144 known_nodes,
1145 pool_config,
1146 config.keyspaces_to_fetch,
1147 config.fetch_schema_metadata,
1148 config.metadata_request_serverside_timeout,
1149 config.hostname_resolution_timeout,
1150 config.host_filter,
1151 host_listener,
1152 config.cluster_metadata_refresh_interval,
1153 tablet_receiver,
1154 #[cfg(feature = "metrics")]
1155 Arc::clone(&metrics),
1156 client_routes_config,
1157 )
1158 .await?;
1159
1160 let default_execution_profile_handle = config.default_execution_profile_handle;
1161
1162 let session = Self {
1163 cluster,
1164 default_execution_profile_handle,
1165 schema_agreement_interval: config.schema_agreement_interval,
1166 #[cfg(feature = "metrics")]
1167 metrics,
1168 schema_agreement_timeout: config.schema_agreement_timeout,
1169 schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
1170 refresh_metadata_on_auto_schema_agreement: config
1171 .refresh_metadata_on_auto_schema_agreement,
1172 keyspace_name: Arc::new(ArcSwapOption::default()), // will be set by use_keyspace
1173 tracing_info_fetch_attempts: config.tracing_info_fetch_attempts,
1174 tracing_info_fetch_interval: config.tracing_info_fetch_interval,
1175 tracing_info_fetch_consistency: config.tracing_info_fetch_consistency,
1176 internal_statements: InternalStatements::default(),
1177 };
1178
1179 if let Some(keyspace_name) = config.used_keyspace {
1180 session
1181 .use_keyspace(keyspace_name, config.keyspace_case_sensitive)
1182 .await?;
1183 }
1184
1185 Ok(session)
1186 }
1187
1188 /// Creates and returns ref to a shared statement for querying tracing session info.
1189 fn get_tracing_session_statement(&self) -> &Statement {
1190 self.internal_statements.tracing_session.get_or_init(|| {
1191 let mut stmt = Statement::new(crate::observability::tracing::TRACES_SESSION_QUERY_STR);
1192 stmt.set_page_size(crate::observability::tracing::TRACING_QUERY_PAGE_SIZE);
1193 stmt.set_consistency(self.tracing_info_fetch_consistency);
1194 stmt.set_is_idempotent(true);
1195 stmt
1196 })
1197 }
1198
1199 /// Creates and returns ref to a shared statement for querying tracing events.
1200 fn get_tracing_events_statement(&self) -> &Statement {
1201 self.internal_statements.tracing_events.get_or_init(|| {
1202 let mut stmt = Statement::new(crate::observability::tracing::TRACES_EVENTS_QUERY_STR);
1203 stmt.set_page_size(crate::observability::tracing::TRACING_QUERY_PAGE_SIZE);
1204 stmt.set_consistency(self.tracing_info_fetch_consistency);
1205 stmt.set_is_idempotent(true);
1206 stmt
1207 })
1208 }
1209
1210 /// Creates and returns ref to a shared statement for querying schema version.
1211 fn get_schema_version_statement(&self) -> &Statement {
1212 self.internal_statements.schema_version.get_or_init(|| {
1213 let mut statement = Statement::new(SCHEMA_VERSION_QUERY_STR);
1214 // Use ONE consistency for schema version queries - this is a local query
1215 // that reads from system.local, so ONE is appropriate.
1216 statement.set_consistency(Consistency::One);
1217 statement.set_is_idempotent(true);
1218 statement
1219 })
1220 }
1221
1222 async fn do_query_unpaged(
1223 &self,
1224 statement: &Statement,
1225 values: impl SerializeRow,
1226 ) -> Result<QueryResult, ExecutionError> {
1227 let (result, paging_state_response) = self
1228 .query(statement, values, None, PagingState::start())
1229 .await?;
1230 if !paging_state_response.finished() {
1231 error!(
1232 "Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug."
1233 );
1234 return Err(ExecutionError::LastAttemptError(
1235 RequestAttemptError::NonfinishedPagingState,
1236 ));
1237 }
1238 Ok(result)
1239 }
1240
1241 async fn do_query_single_page(
1242 &self,
1243 statement: &Statement,
1244 values: impl SerializeRow,
1245 paging_state: PagingState,
1246 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1247 self.query(
1248 statement,
1249 values,
1250 Some(statement.get_validated_page_size()),
1251 paging_state,
1252 )
1253 .await
1254 }
1255
1256 /// Sends a request to the database.
1257 /// Optionally continues fetching results from a saved point.
1258 ///
1259 /// This is now an internal method only.
1260 ///
1261 /// Tl;dr: use [Session::query_unpaged], [Session::query_single_page] or [Session::query_iter] instead.
1262 ///
1263 /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1264 /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1265 /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1266 /// should be made.
1267 async fn query(
1268 &self,
1269 statement: &Statement,
1270 values: impl SerializeRow,
1271 page_size: Option<PageSize>,
1272 paging_state: PagingState,
1273 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1274 let execution_profile = statement
1275 .get_execution_profile_handle()
1276 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1277 .access();
1278
1279 let statement_info = RoutingInfo {
1280 consistency: statement
1281 .config
1282 .consistency
1283 .unwrap_or(execution_profile.consistency),
1284 serial_consistency: statement
1285 .config
1286 .serial_consistency
1287 .unwrap_or(execution_profile.serial_consistency),
1288 ..Default::default()
1289 };
1290
1291 let span = RequestSpan::new_query(&statement.contents);
1292 let span_ref = &span;
1293 let (run_request_result, coordinator): (
1294 RunRequestResult<NonErrorQueryResponse>,
1295 Coordinator,
1296 ) = self
1297 .run_request(
1298 statement_info,
1299 &statement.config,
1300 execution_profile,
1301 |connection: Arc<Connection>,
1302 consistency: Consistency,
1303 execution_profile: &ExecutionProfileInner| {
1304 let serial_consistency = statement
1305 .config
1306 .serial_consistency
1307 .unwrap_or(execution_profile.serial_consistency);
1308 // Needed to avoid moving query and values into async move block
1309 let values_ref = &values;
1310 let paging_state_ref = &paging_state;
1311 async move {
1312 if values_ref.is_empty() {
1313 span_ref.record_request_size(0);
1314 connection
1315 .query_raw_with_consistency(
1316 statement,
1317 consistency,
1318 serial_consistency,
1319 page_size,
1320 paging_state_ref.clone(),
1321 )
1322 .await
1323 .and_then(QueryResponse::into_non_error_query_response)
1324 } else {
1325 let prepared = connection.prepare(statement).await?;
1326 let serialized = prepared.serialize_values(values_ref)?;
1327 span_ref.record_request_size(serialized.buffer_size());
1328 connection
1329 .execute_raw_with_consistency(
1330 &prepared,
1331 &serialized,
1332 consistency,
1333 serial_consistency,
1334 page_size,
1335 paging_state_ref.clone(),
1336 )
1337 .await
1338 .and_then(QueryResponse::into_non_error_query_response)
1339 }
1340 }
1341 },
1342 &span,
1343 )
1344 .instrument(span.span().clone())
1345 .await?;
1346
1347 let response = match run_request_result {
1348 RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1349 response: NonErrorResponseWithDeserializedMetadata::Result(
1350 result::ResultWithDeserializedMetadata::Void,
1351 ),
1352 tracing_id: None,
1353 warnings: Vec::new(),
1354 },
1355 RunRequestResult::Completed(response) => response,
1356 };
1357
1358 let (result, paging_state_response) =
1359 response.into_query_result_and_paging_state(coordinator)?;
1360 span.record_result_fields(&result);
1361
1362 Ok((result, paging_state_response))
1363 }
1364
1365 pub(crate) async fn handle_set_keyspace_response(
1366 &self,
1367 response: &NonErrorQueryResponse,
1368 ) -> Result<(), UseKeyspaceError> {
1369 if let Some(set_keyspace) = response.as_set_keyspace() {
1370 debug!(
1371 "Detected USE KEYSPACE query, setting session's keyspace to {}",
1372 set_keyspace.keyspace_name
1373 );
1374 self.use_keyspace(set_keyspace.keyspace_name.clone(), true)
1375 .await?;
1376 }
1377
1378 Ok(())
1379 }
1380}
1381
1382#[derive(Debug, Error)]
1383/// Errors that can occur during automatic awaiting of schema agreement after a schema change.
1384pub(crate) enum AutoSchemaAwaitingError {
1385 /// Schema agreement could not be reached.
1386 #[error("Schema agreement could not be reached: {0}")]
1387 SchemaAgreement(#[from] SchemaAgreementError),
1388 /// Metadata refresh after reaching schema agreement failed.
1389 #[error("Metadata refresh after reaching schema agreement failed: {0}")]
1390 MetadataRefresh(#[from] MetadataError),
1391}
1392
1393impl From<AutoSchemaAwaitingError> for ExecutionError {
1394 fn from(err: AutoSchemaAwaitingError) -> Self {
1395 match err {
1396 AutoSchemaAwaitingError::SchemaAgreement(e) => e.into(),
1397 AutoSchemaAwaitingError::MetadataRefresh(e) => e.into(),
1398 }
1399 }
1400}
1401
1402impl Session {
1403 pub(crate) async fn handle_auto_await_schema_agreement(
1404 &self,
1405 response: &NonErrorQueryResponse,
1406 coordinator_id: Uuid,
1407 ) -> Result<(), AutoSchemaAwaitingError> {
1408 if self.schema_agreement_automatic_waiting && response.as_schema_change().is_some() {
1409 debug!("Detected schema change, so awaiting schema agreement automatically...");
1410 self.await_schema_agreement_with_required_node(Some(coordinator_id))
1411 .await?;
1412 debug!("Auto schema agreement awaiting: schema agreement reached.",);
1413
1414 if self.refresh_metadata_on_auto_schema_agreement {
1415 self.refresh_metadata().await?;
1416 }
1417 }
1418
1419 Ok(())
1420 }
1421
1422 async fn do_query_iter(
1423 &self,
1424 statement: Statement,
1425 values: impl SerializeRow,
1426 ) -> Result<QueryPager, PagerExecutionError> {
1427 if values.is_empty() {
1428 self.do_query_iter_without_values(statement).await
1429 } else {
1430 // Making QueryPager::new_for_query work with values is too hard (if even possible)
1431 // so instead of sending one prepare to a specific connection on each iterator query,
1432 // we fully prepare a statement beforehand.
1433 let prepared = self.prepare_nongeneric(&statement).await?;
1434 let values = prepared.serialize_values(&values)?;
1435 self.execute_iter_nongeneric(prepared, values).await
1436 }
1437 }
1438
1439 /// `Session::query_iter` specialization for empty values.
1440 async fn do_query_iter_without_values(
1441 &self,
1442 statement: Statement,
1443 ) -> Result<QueryPager, PagerExecutionError> {
1444 let execution_profile = statement
1445 .get_execution_profile_handle()
1446 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1447 .access();
1448
1449 QueryPager::new_for_query(
1450 self,
1451 statement,
1452 execution_profile,
1453 self.cluster.get_state(),
1454 #[cfg(feature = "metrics")]
1455 Arc::clone(&self.metrics),
1456 )
1457 .await
1458 }
1459
1460 /// Prepares a statement on the server side and returns a prepared statement,
1461 /// which can later be used to perform more efficient requests.
1462 ///
1463 /// The statement is prepared on all nodes. This function finishes once all nodes respond
1464 /// with either success or an error.
1465 ///
1466 /// Prepared statements are much faster than unprepared statements:
1467 /// * Database doesn't need to parse the statement string upon each execution (only once)
1468 /// * They are properly load balanced using token aware routing
1469 ///
1470 /// <div class="warning">
1471 ///
1472 /// **Warning!**
1473 ///
1474 /// **Prepare a statement once** (e.g., store it in a variable, static, or struct field),
1475 /// then **execute it multiple times** with different values.
1476 /// Do **NOT** call `prepare()` repeatedly for the same statement before each execution -
1477 /// this defeats the purpose of prepared statements and significantly degrades performance.
1478 ///
1479 /// </div>
1480 ///
1481 /// <div class="warning">
1482 ///
1483 /// **Warning!**
1484 ///
1485 /// For token/shard aware load balancing to work properly, all partition key values
1486 /// must be sent as bound values
1487 /// (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance))
1488 ///
1489 /// </div>
1490 ///
1491 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
1492 /// See the documentation of [`PreparedStatement`].
1493 ///
1494 /// # Arguments
1495 /// * `statement` - statement to prepare, can be just a `&str` or the [`Statement`] struct.
1496 ///
1497 /// # Example
1498 /// ```rust
1499 /// # use scylla::client::session::Session;
1500 /// # use std::error::Error;
1501 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1502 /// use scylla::statement::prepared::PreparedStatement;
1503 ///
1504 /// // Prepare the statement ONCE for later execution
1505 /// let prepared: PreparedStatement = session
1506 /// .prepare("INSERT INTO ks.tab (a) VALUES(?)")
1507 /// .await?;
1508 ///
1509 /// // Execute the prepared statement multiple times
1510 /// let to_insert: i32 = 12345;
1511 /// session.execute_unpaged(&prepared, (to_insert,)).await?;
1512 ///
1513 /// let to_insert: i32 = 67890;
1514 /// session.execute_unpaged(&prepared, (to_insert,)).await?;
1515 /// # Ok(())
1516 /// # }
1517 /// ```
1518 pub async fn prepare(
1519 &self,
1520 statement: impl Into<Statement>,
1521 ) -> Result<PreparedStatement, PrepareError> {
1522 let statement = statement.into();
1523 self.prepare_nongeneric(&statement).await
1524 }
1525
1526 // Introduced to avoid monomorphisation of this large function.
1527 async fn prepare_nongeneric(
1528 &self,
1529 statement: &Statement,
1530 ) -> Result<PreparedStatement, PrepareError> {
1531 let cluster_state = self.get_cluster_state();
1532
1533 // Start by attempting preparation on a single (random) connection to every node.
1534 {
1535 let mut connections_to_nodes = cluster_state.iter_working_connections_to_nodes()?;
1536 let on_all_nodes_result =
1537 Self::prepare_on_all(statement, &cluster_state, &mut connections_to_nodes).await;
1538 if let Ok(prepared) = on_all_nodes_result {
1539 // We succeeded in preparing the statement on at least one node. We're done.
1540 // Other nodes could have failed to prepare the statement, but this will be handled
1541 // as `DbError::Unprepared` upon execution, followed by a repreparation attempt.
1542 return Ok(prepared);
1543 }
1544 }
1545
1546 // We could have been just unlucky: we could have possibly chosen random connections all of which were defunct
1547 // (one possibility is that we targeted overloaded shards).
1548 // Let's try again, this time on connections to every shard. This is a "last call" fallback.
1549 {
1550 let mut connections_to_shards = cluster_state.iter_working_connections_to_shards()?;
1551
1552 Self::prepare_on_all(statement, &cluster_state, &mut connections_to_shards).await
1553 }
1554 }
1555
1556 /// Prepares the statement on all given connections.
1557 /// These are intended to be connections to either all nodes or all shards.
1558 ///
1559 /// ASSUMPTION: the `working_connections` Iterator is nonempty.
1560 ///
1561 /// Returns:
1562 /// - `Ok(PreparedStatement)`, if preparation succeeded on at least one connection;
1563 /// - `Err(PrepareError)`, if no connection is working or preparation failed on all attempted connections.
1564 // TODO: There are no timeouts here. So, just one stuck node freezes the driver here, potentially indefinitely long.
1565 // Also, what the driver requires to get from the cluster is the prepared statement metadata.
1566 // It suffices that it gets only one copy of it, just from one success response. Therefore, it's a possible
1567 // optimisation that the function only waits for one preparation to finish successfully, and then it returns.
1568 // For it to be done, other preparations must continue in the background, on a separate tokio task.
1569 // Describing issue: #1332.
1570 async fn prepare_on_all(
1571 statement: &Statement,
1572 cluster_state: &ClusterState,
1573 working_connections: &mut (dyn Iterator<Item = Arc<Connection>> + Send),
1574 ) -> Result<PreparedStatement, PrepareError> {
1575 // Find the first result that is Ok, or Err if all failed.
1576 let preparations =
1577 working_connections.map(|c| async move { c.prepare_raw(statement).await });
1578 let raw_prepared_statements_results = join_all(preparations).await;
1579
1580 let mut raw_prepared_statements_results_iter = raw_prepared_statements_results.into_iter();
1581
1582 // Safety: We pass a nonempty iterator, so there will be at least one result.
1583 let first_ok_or_error = raw_prepared_statements_results_iter
1584 .by_ref()
1585 .find_or_first(|res| res.is_ok())
1586 .unwrap(); // Safety: there is at least one connection.
1587
1588 let mut prepared: PreparedStatement = first_ok_or_error
1589 .map_err(|first_attempt| PrepareError::AllAttemptsFailed { first_attempt })?
1590 .into_prepared_statement();
1591
1592 // Validate prepared ids equality.
1593 for another_raw_prepared in raw_prepared_statements_results_iter.flatten() {
1594 if prepared.get_id() != another_raw_prepared.get_id() {
1595 tracing::error!(
1596 "Got differing ids upon statement preparation: statement \"{}\", id1: {:?}, id2: {:?}",
1597 prepared.get_statement(),
1598 prepared.get_id(),
1599 another_raw_prepared.get_id()
1600 );
1601 return Err(PrepareError::PreparedStatementIdsMismatch);
1602 }
1603
1604 // Collect all tracing ids from prepare() queries in the final result
1605 prepared
1606 .prepare_tracing_ids
1607 .extend(another_raw_prepared.tracing_id());
1608 }
1609
1610 // This is the first preparation that succeeded.
1611 // Let's return the PreparedStatement.
1612 prepared.set_partitioner_name(
1613 Self::extract_partitioner_name(&prepared, cluster_state)
1614 .and_then(PartitionerName::from_str)
1615 .unwrap_or_default(),
1616 );
1617
1618 Ok(prepared)
1619 }
1620
1621 fn extract_partitioner_name<'a>(
1622 prepared: &PreparedStatement,
1623 cluster_state: &'a ClusterState,
1624 ) -> Option<&'a str> {
1625 let table_spec = prepared.get_table_spec()?;
1626 cluster_state
1627 .keyspaces
1628 .get(table_spec.ks_name())?
1629 .tables
1630 .get(table_spec.table_name())?
1631 .partitioner
1632 .as_deref()
1633 }
1634
1635 /// Sends a prepared request to the database, optionally continuing from a saved point.
1636 ///
1637 /// This is now an internal method only.
1638 ///
1639 /// Tl;dr: use [Session::execute_unpaged], [Session::execute_single_page] or [Session::execute_iter] instead.
1640 ///
1641 /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1642 /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1643 /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1644 /// should be made.
1645 async fn execute(
1646 &self,
1647 prepared: &PreparedStatement,
1648 serialized_values: &SerializedValues,
1649 page_size: Option<PageSize>,
1650 paging_state: PagingState,
1651 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1652 let paging_state_ref = &paging_state;
1653
1654 let (partition_key, token) = prepared
1655 .extract_partition_key_and_calculate_token(
1656 prepared.get_partitioner_name(),
1657 serialized_values,
1658 )
1659 .map_err(PartitionKeyError::into_execution_error)?
1660 .unzip();
1661
1662 let execution_profile = prepared
1663 .get_execution_profile_handle()
1664 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1665 .access();
1666
1667 let table_spec = prepared.get_table_spec();
1668
1669 let statement_info = RoutingInfo {
1670 consistency: prepared
1671 .config
1672 .consistency
1673 .unwrap_or(execution_profile.consistency),
1674 serial_consistency: prepared
1675 .config
1676 .serial_consistency
1677 .unwrap_or(execution_profile.serial_consistency),
1678 token,
1679 table: table_spec,
1680 is_confirmed_lwt: prepared.is_confirmed_lwt(),
1681 };
1682
1683 let span = RequestSpan::new_prepared(
1684 partition_key.as_ref().map(|pk| pk.iter()),
1685 token,
1686 serialized_values.buffer_size(),
1687 );
1688
1689 if !span.span().is_disabled()
1690 && let (Some(table_spec), Some(token)) = (statement_info.table, token)
1691 {
1692 let cluster_state = self.get_cluster_state();
1693 let replicas = cluster_state.get_token_endpoints_iter(table_spec, token);
1694 span.record_replicas(replicas)
1695 }
1696
1697 let (run_request_result, coordinator): (
1698 RunRequestResult<NonErrorQueryResponse>,
1699 Coordinator,
1700 ) = self
1701 .run_request(
1702 statement_info,
1703 &prepared.config,
1704 execution_profile,
1705 |connection: Arc<Connection>,
1706 consistency: Consistency,
1707 execution_profile: &ExecutionProfileInner| {
1708 let serial_consistency = prepared
1709 .config
1710 .serial_consistency
1711 .unwrap_or(execution_profile.serial_consistency);
1712 async move {
1713 connection
1714 .execute_raw_with_consistency(
1715 prepared,
1716 serialized_values,
1717 consistency,
1718 serial_consistency,
1719 page_size,
1720 paging_state_ref.clone(),
1721 )
1722 .await
1723 .and_then(QueryResponse::into_non_error_query_response)
1724 }
1725 },
1726 &span,
1727 )
1728 .instrument(span.span().clone())
1729 .await?;
1730
1731 let response = match run_request_result {
1732 RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1733 response: NonErrorResponseWithDeserializedMetadata::Result(
1734 result::ResultWithDeserializedMetadata::Void,
1735 ),
1736 tracing_id: None,
1737 warnings: Vec::new(),
1738 },
1739 RunRequestResult::Completed(response) => response,
1740 };
1741
1742 let (result, paging_state_response) =
1743 response.into_query_result_and_paging_state(coordinator)?;
1744 span.record_result_fields(&result);
1745
1746 Ok((result, paging_state_response))
1747 }
1748
1749 /// Does the same as [`Session::execute_iter`], but without generics.
1750 /// This is to reduce code bloat.
1751 ///
1752 /// Additionally, this function accepts only `SerializedValues`,
1753 /// so the caller is responsible for serializing the values beforehand.
1754 /// This:
1755 /// - on one hand introduces a risk of misusing the API (passing values
1756 /// that don't match the prepared statement) - therefore this API
1757 /// must not be leaked to end users;
1758 /// - on the other hand allows manual preserialization of values, which
1759 /// we need in wrapper drivers (e.g. C#-RS Driver) - for them we expose
1760 /// a dedicated API endpoint using this function, conditionally compiled
1761 /// if special compile flag is passed.
1762 async fn execute_iter_nongeneric(
1763 &self,
1764 prepared: PreparedStatement,
1765 values: SerializedValues,
1766 ) -> Result<QueryPager, PagerExecutionError> {
1767 let execution_profile = prepared
1768 .get_execution_profile_handle()
1769 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1770 .access();
1771
1772 QueryPager::new_for_prepared_statement(
1773 self,
1774 PreparedPagerConfig {
1775 prepared,
1776 values,
1777 execution_profile,
1778 cluster_state: self.cluster.get_state(),
1779 #[cfg(feature = "metrics")]
1780 metrics: Arc::clone(&self.metrics),
1781 },
1782 )
1783 .await
1784 }
1785
1786 /// Prepares all statements within the batch and returns a new batch where every
1787 /// statement is prepared.
1788 ///
1789 /// # Example
1790 /// ```rust
1791 /// # extern crate scylla;
1792 /// # use scylla::client::session::Session;
1793 /// # use std::error::Error;
1794 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1795 /// use scylla::statement::batch::Batch;
1796 ///
1797 /// // Create a batch statement with unprepared statements
1798 /// let mut batch: Batch = Default::default();
1799 /// batch.append_statement("INSERT INTO ks.simple_unprepared1 VALUES(?, ?)");
1800 /// batch.append_statement("INSERT INTO ks.simple_unprepared2 VALUES(?, ?)");
1801 ///
1802 /// // Prepare all statements in the batch at once
1803 /// let prepared_batch: Batch = session.prepare_batch(&batch).await?;
1804 ///
1805 /// // Specify bound values to use with each statement
1806 /// let batch_values = ((1_i32, 2_i32),
1807 /// (3_i32, 4_i32));
1808 ///
1809 /// // Run the prepared batch
1810 /// session.batch(&prepared_batch, batch_values).await?;
1811 /// # Ok(())
1812 /// # }
1813 /// ```
1814 pub async fn prepare_batch(&self, batch: &Batch) -> Result<Batch, PrepareError> {
1815 let mut prepared_batch = batch.clone();
1816
1817 try_join_all(
1818 prepared_batch
1819 .statements
1820 .iter_mut()
1821 .map(|statement| async move {
1822 if let BatchStatement::Query(query) = statement {
1823 let prepared = self.prepare_nongeneric(query).await?;
1824 *statement = BatchStatement::PreparedStatement(prepared);
1825 }
1826 Ok::<(), PrepareError>(())
1827 }),
1828 )
1829 .await?;
1830
1831 Ok(prepared_batch)
1832 }
1833
1834 /// Sends `USE <keyspace_name>` request on all connections\
1835 /// This allows to write `SELECT * FROM table` instead of `SELECT * FROM keyspace.table`\
1836 ///
1837 /// Note that even failed `use_keyspace` can change currently used keyspace - the request is sent on all connections and
1838 /// can overwrite previously used keyspace.
1839 ///
1840 /// Call only one `use_keyspace` at a time.\
1841 /// Trying to do two `use_keyspace` requests simultaneously with different names
1842 /// can end with some connections using one keyspace and the rest using the other.
1843 ///
1844 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/usekeyspace.html) for more information
1845 ///
1846 /// # Arguments
1847 ///
1848 /// * `keyspace_name` - keyspace name to use,
1849 /// keyspace names can have up to 48 alphanumeric characters and contain underscores
1850 /// * `case_sensitive` - if set to true the generated statement will put keyspace name in quotes
1851 /// # Example
1852 /// ```rust
1853 /// # use scylla::client::session::Session;
1854 /// # use scylla::client::session_builder::SessionBuilder;
1855 /// # use scylla::client::Compression;
1856 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1857 /// # let session = SessionBuilder::new().known_node("127.0.0.1:9042").build().await?;
1858 /// session
1859 /// .query_unpaged("INSERT INTO my_keyspace.tab (a) VALUES ('test1')", &[])
1860 /// .await?;
1861 ///
1862 /// session.use_keyspace("my_keyspace", false).await?;
1863 ///
1864 /// // Now we can omit keyspace name in the statement
1865 /// session
1866 /// .query_unpaged("INSERT INTO tab (a) VALUES ('test2')", &[])
1867 /// .await?;
1868 /// # Ok(())
1869 /// # }
1870 /// ```
1871 pub async fn use_keyspace(
1872 &self,
1873 keyspace_name: impl Into<String>,
1874 case_sensitive: bool,
1875 ) -> Result<(), UseKeyspaceError> {
1876 let keyspace_name = keyspace_name.into();
1877 self.keyspace_name
1878 .store(Some(Arc::new(keyspace_name.clone())));
1879
1880 // Trying to pass keyspace as bound value in "USE ?" doesn't work
1881 // So we have to create a string for query: "USE " + new_keyspace
1882 // To avoid any possible CQL injections it's good to verify that the name is valid
1883 let verified_ks_name = VerifiedKeyspaceName::new(keyspace_name, case_sensitive)?;
1884
1885 self.cluster.use_keyspace(verified_ks_name).await
1886 }
1887
1888 /// Manually trigger a metadata refresh\
1889 /// The driver will fetch current nodes in the cluster and update its metadata
1890 ///
1891 /// Normally this is not needed,
1892 /// the driver should automatically detect all metadata changes in the cluster
1893 pub async fn refresh_metadata(&self) -> Result<(), MetadataError> {
1894 debug!("Session: requested metadata refresh");
1895 let res = self.cluster.refresh_metadata().await;
1896 debug!("Session: finished metadata refresh");
1897 res
1898 }
1899
1900 /// Access metrics collected by the driver\
1901 /// Driver collects various metrics like number of queries or query latencies.
1902 /// They can be read using this method
1903 #[cfg(feature = "metrics")]
1904 pub fn get_metrics(&self) -> Arc<Metrics> {
1905 Arc::clone(&self.metrics)
1906 }
1907
1908 /// Access cluster state visible by the driver.
1909 ///
1910 /// Driver collects various information about network topology or schema.
1911 /// It can be read using this method.
1912 pub fn get_cluster_state(&self) -> Arc<ClusterState> {
1913 self.cluster.get_state()
1914 }
1915
1916 /// Get [`TracingInfo`] of a traced query performed earlier
1917 ///
1918 /// See [the book](https://rust-driver.docs.scylladb.com/stable/tracing/tracing.html)
1919 /// for more information about query tracing
1920 pub async fn get_tracing_info(&self, tracing_id: &Uuid) -> Result<TracingInfo, TracingError> {
1921 // tracing_info_fetch_attempts is NonZeroU32 so at least one attempt will be made
1922 for _ in 0..self.tracing_info_fetch_attempts.get() {
1923 let current_try: Option<TracingInfo> =
1924 self.try_getting_tracing_info(tracing_id).await?;
1925
1926 match current_try {
1927 Some(tracing_info) => return Ok(tracing_info),
1928 None => tokio::time::sleep(self.tracing_info_fetch_interval).await,
1929 };
1930 }
1931
1932 Err(TracingError::EmptyResults)
1933 }
1934
1935 /// Gets the name of the keyspace that is currently set, or `None` if no
1936 /// keyspace was set.
1937 ///
1938 /// It will initially return the name of the keyspace that was set
1939 /// in the session configuration, but calling `use_keyspace` will update
1940 /// it.
1941 ///
1942 /// Note: the return value might be wrong if `use_keyspace` was called
1943 /// concurrently or it previously failed. It is also unspecified
1944 /// if `get_keyspace` is called concurrently with `use_keyspace`.
1945 #[inline]
1946 pub fn get_keyspace(&self) -> Option<Arc<String>> {
1947 self.keyspace_name.load_full()
1948 }
1949
1950 // Tries getting the tracing info
1951 // If the queries return 0 rows then returns None - the information didn't reach this node yet
1952 // If there is some other error returns this error
1953 async fn try_getting_tracing_info(
1954 &self,
1955 tracing_id: &Uuid,
1956 ) -> Result<Option<TracingInfo>, TracingError> {
1957 // Get statements for tracing queries.
1958 // Consistency is set during construction based on session's tracing_info_fetch_consistency.
1959 let traces_session_stmt = self.get_tracing_session_statement();
1960 let traces_events_stmt = self.get_tracing_events_statement();
1961
1962 // Using non-public do_query_unpaged allows us to avoid cloning.
1963 let (traces_session_res, traces_events_res) = tokio::try_join!(
1964 self.do_query_unpaged(traces_session_stmt, (tracing_id,)),
1965 self.do_query_unpaged(traces_events_stmt, (tracing_id,))
1966 )?;
1967
1968 // Get tracing info
1969 let maybe_tracing_info: Option<TracingInfo> = traces_session_res
1970 .into_rows_result()
1971 .map_err(TracingError::TracesSessionIntoRowsResultError)?
1972 .maybe_first_row()
1973 .map_err(|err| match err {
1974 MaybeFirstRowError::TypeCheckFailed(e) => {
1975 TracingError::TracesSessionInvalidColumnType(e)
1976 }
1977 MaybeFirstRowError::DeserializationFailed(e) => {
1978 TracingError::TracesSessionDeserializationFailed(e)
1979 }
1980 })?;
1981
1982 let mut tracing_info = match maybe_tracing_info {
1983 None => return Ok(None),
1984 Some(tracing_info) => tracing_info,
1985 };
1986
1987 // Get tracing events
1988 let tracing_event_rows_result = traces_events_res
1989 .into_rows_result()
1990 .map_err(TracingError::TracesEventsIntoRowsResultError)?;
1991 let tracing_event_rows = tracing_event_rows_result.rows().map_err(|err| match err {
1992 RowsError::TypeCheckFailed(err) => TracingError::TracesEventsInvalidColumnType(err),
1993 })?;
1994
1995 tracing_info.events = tracing_event_rows
1996 .collect::<Result<_, _>>()
1997 .map_err(TracingError::TracesEventsDeserializationFailed)?;
1998
1999 if tracing_info.events.is_empty() {
2000 return Ok(None);
2001 }
2002
2003 Ok(Some(tracing_info))
2004 }
2005
2006 /// This method allows to easily run a request using load balancing, retry policy etc.
2007 /// Requires some information about the request and a closure.
2008 /// The closure is used to execute the request once on a chosen connection.
2009 /// - query will use connection.query()
2010 /// - execute will use connection.execute()
2011 ///
2012 /// If this closure fails with some errors, retry policy is used to perform retries.
2013 /// On success, this request's result is returned.
2014 // I tried to make this closures take a reference instead of an Arc but failed
2015 // maybe once async closures get stabilized this can be fixed
2016 async fn run_request<'a, QueryFut>(
2017 &'a self,
2018 statement_info: RoutingInfo<'a>,
2019 statement_config: &'a StatementConfig,
2020 execution_profile: Arc<ExecutionProfileInner>,
2021 run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
2022 request_span: &'a RequestSpan,
2023 ) -> Result<(RunRequestResult<NonErrorQueryResponse>, Coordinator), ExecutionError>
2024 where
2025 QueryFut: Future<Output = Result<NonErrorQueryResponse, RequestAttemptError>>,
2026 {
2027 let history_listener_and_id: Option<(&'a dyn HistoryListener, history::RequestId)> =
2028 statement_config
2029 .history_listener
2030 .as_ref()
2031 .map(|hl| (&**hl, hl.log_request_start()));
2032
2033 let load_balancer = statement_config
2034 .load_balancing_policy
2035 .as_deref()
2036 .unwrap_or(execution_profile.load_balancing_policy.as_ref());
2037
2038 let runner = async {
2039 let cluster_state = self.cluster.get_state();
2040 let request_plan =
2041 load_balancing::Plan::new(load_balancer, &statement_info, &cluster_state);
2042
2043 // If a speculative execution policy is used to run request, request_plan has to be shared
2044 // between different async functions. This struct helps to wrap request_plan in mutex so it
2045 // can be shared safely.
2046 struct SharedPlan<'a, I>
2047 where
2048 I: Iterator<Item = (NodeRef<'a>, Shard)>,
2049 {
2050 iter: std::sync::Mutex<I>,
2051 }
2052
2053 impl<'a, I> Iterator for &SharedPlan<'a, I>
2054 where
2055 I: Iterator<Item = (NodeRef<'a>, Shard)>,
2056 {
2057 type Item = (NodeRef<'a>, Shard);
2058
2059 fn next(&mut self) -> Option<Self::Item> {
2060 self.iter.lock().unwrap().next()
2061 }
2062 }
2063
2064 let retry_policy = statement_config
2065 .retry_policy
2066 .as_deref()
2067 .unwrap_or(&*execution_profile.retry_policy);
2068
2069 let speculative_policy = execution_profile.speculative_execution_policy.as_ref();
2070
2071 match speculative_policy {
2072 Some(speculative) if statement_config.is_idempotent => {
2073 let shared_request_plan = SharedPlan {
2074 iter: std::sync::Mutex::new(request_plan),
2075 };
2076
2077 let request_runner_generator = |is_speculative: bool| {
2078 let history_data: Option<HistoryData> = history_listener_and_id
2079 .as_ref()
2080 .map(|(history_listener, request_id)| {
2081 let speculative_id: Option<history::SpeculativeId> =
2082 if is_speculative {
2083 Some(
2084 history_listener.log_new_speculative_fiber(*request_id),
2085 )
2086 } else {
2087 None
2088 };
2089 HistoryData {
2090 listener: *history_listener,
2091 request_id: *request_id,
2092 speculative_id,
2093 }
2094 });
2095
2096 if is_speculative {
2097 request_span.inc_speculative_executions();
2098 }
2099
2100 self.run_request_speculative_fiber(
2101 &shared_request_plan,
2102 &run_request_once,
2103 &execution_profile,
2104 ExecuteRequestContext {
2105 is_idempotent: statement_config.is_idempotent,
2106 consistency_set_on_statement: statement_config.consistency,
2107 retry_session: retry_policy.new_session(),
2108 history_data,
2109 load_balancing_policy: load_balancer,
2110 query_info: &statement_info,
2111 request_span,
2112 },
2113 )
2114 };
2115
2116 let context = speculative_execution::Context {
2117 #[cfg(feature = "metrics")]
2118 metrics: Arc::clone(&self.metrics),
2119 };
2120
2121 speculative_execution::execute(
2122 speculative.as_ref(),
2123 &context,
2124 request_runner_generator,
2125 )
2126 .await
2127 }
2128 _ => {
2129 let history_data: Option<HistoryData> =
2130 history_listener_and_id
2131 .as_ref()
2132 .map(|(history_listener, request_id)| HistoryData {
2133 listener: *history_listener,
2134 request_id: *request_id,
2135 speculative_id: None,
2136 });
2137 self.run_request_speculative_fiber(
2138 request_plan,
2139 &run_request_once,
2140 &execution_profile,
2141 ExecuteRequestContext {
2142 is_idempotent: statement_config.is_idempotent,
2143 consistency_set_on_statement: statement_config.consistency,
2144 retry_session: retry_policy.new_session(),
2145 history_data,
2146 load_balancing_policy: load_balancer,
2147 query_info: &statement_info,
2148 request_span,
2149 },
2150 )
2151 .await
2152 .unwrap_or(Err(RequestError::EmptyPlan))
2153 }
2154 }
2155 };
2156
2157 let effective_timeout = statement_config
2158 .request_timeout
2159 .or(execution_profile.request_timeout);
2160 let result = match effective_timeout {
2161 Some(timeout) => tokio::time::timeout(timeout, runner).await.unwrap_or_else(
2162 |_: tokio::time::error::Elapsed| {
2163 #[cfg(feature = "metrics")]
2164 self.metrics.inc_request_timeouts();
2165
2166 let timeout_error = RequestError::RequestTimeout(timeout);
2167 trace!(
2168 parent: request_span.span(),
2169 error = %timeout_error,
2170 "Request timed out"
2171 );
2172 Err(timeout_error)
2173 },
2174 ),
2175 None => runner.await,
2176 };
2177
2178 if let Some((history_listener, request_id)) = history_listener_and_id {
2179 match &result {
2180 Ok(_) => history_listener.log_request_success(request_id),
2181 Err(e) => history_listener.log_request_error(request_id, e),
2182 }
2183 }
2184
2185 // Automatically handle meaningful responses.
2186 if let Ok((RunRequestResult::Completed(ref response), ref coordinator)) = result {
2187 self.handle_set_keyspace_response(response).await?;
2188 self.handle_auto_await_schema_agreement(response, coordinator.node().host_id)
2189 .await?;
2190 }
2191
2192 result.map_err(RequestError::into_execution_error)
2193 }
2194
2195 /// Executes the closure `run_request_once`, provided the load balancing plan and some information
2196 /// about the request, including retry session.
2197 /// If request fails, retry session is used to perform retries.
2198 ///
2199 /// Returns None, if provided plan is empty.
2200 async fn run_request_speculative_fiber<'a, QueryFut>(
2201 &'a self,
2202 request_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
2203 run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
2204 execution_profile: &ExecutionProfileInner,
2205 mut context: ExecuteRequestContext<'a>,
2206 ) -> Option<Result<(RunRequestResult<NonErrorQueryResponse>, Coordinator), RequestError>>
2207 where
2208 QueryFut: Future<Output = Result<NonErrorQueryResponse, RequestAttemptError>>,
2209 {
2210 let mut last_error: Option<RequestError> = None;
2211 let mut current_consistency: Consistency = context
2212 .consistency_set_on_statement
2213 .unwrap_or(execution_profile.consistency);
2214
2215 'nodes_in_plan: for (node, shard) in request_plan {
2216 let span = trace_span!("Executing request", node = %node.address, shard = %shard);
2217 'same_node_retries: loop {
2218 trace!(parent: &span, "Execution started");
2219 let connection = match node.connection_for_shard(shard).await {
2220 Ok(connection) => connection,
2221 Err(e) => {
2222 trace!(
2223 parent: &span,
2224 error = %e,
2225 "Choosing connection failed"
2226 );
2227 last_error = Some(e.into());
2228 // Broken connection doesn't count as a failed request, don't log in metrics
2229 continue 'nodes_in_plan;
2230 }
2231 };
2232 context.request_span.record_shard_id(&connection);
2233
2234 #[cfg(feature = "metrics")]
2235 self.metrics.inc_total_nonpaged_queries();
2236 let request_start = std::time::Instant::now();
2237
2238 let connect_address = connection.get_connect_address();
2239 trace!(
2240 parent: &span,
2241 connection = %connect_address,
2242 "Sending"
2243 );
2244 let coordinator =
2245 Coordinator::new(node, node.sharder().is_some().then_some(shard), &connection);
2246
2247 let attempt_id: Option<history::AttemptId> =
2248 context.log_attempt_start(connect_address);
2249 let request_result: Result<NonErrorQueryResponse, RequestAttemptError> =
2250 run_request_once(connection, current_consistency, execution_profile)
2251 .instrument(span.clone())
2252 .await;
2253
2254 let elapsed = request_start.elapsed();
2255 let request_error: RequestAttemptError = match request_result {
2256 Ok(response) => {
2257 trace!(parent: &span, "Request succeeded");
2258 #[cfg(feature = "metrics")]
2259 let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
2260 context.log_attempt_success(&attempt_id);
2261 context.load_balancing_policy.on_request_success(
2262 context.query_info,
2263 elapsed,
2264 node,
2265 );
2266 return Some(Ok((RunRequestResult::Completed(response), coordinator)));
2267 }
2268 Err(e) => {
2269 trace!(
2270 parent: &span,
2271 last_error = %e,
2272 "Request failed"
2273 );
2274 #[cfg(feature = "metrics")]
2275 self.metrics.inc_failed_nonpaged_queries();
2276 context.load_balancing_policy.on_request_failure(
2277 context.query_info,
2278 elapsed,
2279 node,
2280 &e,
2281 );
2282 e
2283 }
2284 };
2285
2286 // Use retry policy to decide what to do next
2287 let request_info = RequestInfo {
2288 error: &request_error,
2289 is_idempotent: context.is_idempotent,
2290 consistency: context
2291 .consistency_set_on_statement
2292 .unwrap_or(execution_profile.consistency),
2293 };
2294
2295 let retry_decision = context.retry_session.decide_should_retry(request_info);
2296 trace!(
2297 parent: &span,
2298 retry_decision = ?retry_decision
2299 );
2300
2301 context.log_attempt_error(&attempt_id, &request_error, &retry_decision);
2302
2303 last_error = Some(request_error.into());
2304
2305 match retry_decision {
2306 RetryDecision::RetrySameTarget(new_cl) => {
2307 #[cfg(feature = "metrics")]
2308 self.metrics.inc_retries_num();
2309 current_consistency = new_cl.unwrap_or(current_consistency);
2310 continue 'same_node_retries;
2311 }
2312 RetryDecision::RetryNextTarget(new_cl) => {
2313 #[cfg(feature = "metrics")]
2314 self.metrics.inc_retries_num();
2315 current_consistency = new_cl.unwrap_or(current_consistency);
2316 continue 'nodes_in_plan;
2317 }
2318 RetryDecision::DontRetry => break 'nodes_in_plan,
2319
2320 RetryDecision::IgnoreWriteError => {
2321 return Some(Ok((RunRequestResult::IgnoredWriteError, coordinator)));
2322 }
2323 };
2324 }
2325 }
2326
2327 last_error.map(Result::Err)
2328 }
2329
2330 /// Awaits schema agreement among all reachable nodes.
2331 ///
2332 /// Issues an agreement check each `Session::schema_agreement_interval`.
2333 /// If agreement is not reached in `Session::schema_agreement_timeout`,
2334 /// `SchemaAgreementError::Timeout` is returned.
2335 pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
2336 self.await_schema_agreement_with_required_node(None).await
2337 }
2338
2339 /// Decides if an error should result in await_schema_agreement stopping immediately,
2340 /// or if it's fine to try again (after schema agreement interval).
2341 /// The errors that should stop immediately are non-transient ones, for which
2342 /// there is little or no hope that a retry will succeed.
2343 fn classify_schema_check_error(error: &SchemaAgreementError) -> ControlFlow<()> {
2344 let classify_attempt_error = |request_attempt_error: &RequestAttemptError| {
2345 #[deny(clippy::wildcard_enum_match_arm)]
2346 match request_attempt_error {
2347 // It may be possible to recover from those errors.
2348 RequestAttemptError::UnableToAllocStreamId
2349 | RequestAttemptError::BrokenConnectionError(_)
2350 | RequestAttemptError::BodyExtensionsParseError(_)
2351 | RequestAttemptError::CqlResultParseError(_)
2352 | RequestAttemptError::CqlErrorParseError(_) => ControlFlow::Continue(()),
2353
2354 // Those errors should not happen, but if they did, something is
2355 // really wrong. Let's return early.
2356 RequestAttemptError::SerializationError(_)
2357 | RequestAttemptError::CqlRequestSerialization(_)
2358 | RequestAttemptError::UnexpectedResponse(_)
2359 | RequestAttemptError::RepreparedIdChanged { .. }
2360 | RequestAttemptError::RepreparedIdMissingInBatch
2361 | RequestAttemptError::NonfinishedPagingState => ControlFlow::Break(()),
2362
2363 #[deny(clippy::wildcard_enum_match_arm)]
2364 RequestAttemptError::DbError(db_error, _) => match db_error {
2365 // Those errors should not happen, but if they did, something is
2366 // really wrong. Let's return early.
2367 DbError::SyntaxError
2368 | DbError::Invalid
2369 | DbError::AlreadyExists { .. }
2370 | DbError::FunctionFailure { .. }
2371 | DbError::AuthenticationError
2372 | DbError::Unauthorized
2373 | DbError::ConfigError
2374 | DbError::TruncateError
2375 | DbError::ProtocolError => ControlFlow::Break(()),
2376
2377 // Those errors likely won't go away on retry
2378 DbError::Unavailable { .. }
2379 | DbError::ReadFailure { .. }
2380 | DbError::WriteFailure { .. }
2381 | DbError::ServerError
2382 | DbError::Other(_) => ControlFlow::Break(()),
2383
2384 DbError::Overloaded
2385 | DbError::ReadTimeout { .. }
2386 | DbError::WriteTimeout { .. }
2387 | DbError::Unprepared { .. }
2388 | DbError::RateLimitReached { .. }
2389 | DbError::IsBootstrapping
2390 | _ => ControlFlow::Continue(()),
2391 },
2392 }
2393 };
2394 #[deny(clippy::wildcard_enum_match_arm)]
2395 match error {
2396 // Unexpected format (type, row count, frame type etc) or deserialization error of response.
2397 // It should not happen, but if it did it indicates a serious issue.
2398 SchemaAgreementError::SingleRowError(_)
2399 | SchemaAgreementError::TracesEventsIntoRowsResultError(_) => ControlFlow::Break(()),
2400
2401 // Should not be possible - we create this error only after returning here.
2402 // Let's not panic, but log a warning so that it gets noticed.
2403 SchemaAgreementError::Timeout(_) => {
2404 error!("Unexpected schema agreement error type: {}", error);
2405 ControlFlow::Break(())
2406 }
2407
2408 // Definitely a transient error.
2409 SchemaAgreementError::ConnectionPoolError(_)
2410 | SchemaAgreementError::RequiredHostAbsent(_) => ControlFlow::Continue(()),
2411
2412 SchemaAgreementError::RequestError(request_attempt_error) => {
2413 classify_attempt_error(request_attempt_error)
2414 }
2415 SchemaAgreementError::PrepareError(err) => match err {
2416 PrepareError::ConnectionPoolError(_) => ControlFlow::Continue(()),
2417 PrepareError::AllAttemptsFailed { first_attempt } => {
2418 classify_attempt_error(first_attempt)
2419 }
2420 PrepareError::PreparedStatementIdsMismatch => ControlFlow::Break(()),
2421 },
2422 }
2423 }
2424
2425 /// Awaits schema agreement among all reachable nodes.
2426 ///
2427 /// Issues an agreement check each `Session::schema_agreement_interval`.
2428 /// If agreement is not reached in `Session::schema_agreement_timeout`,
2429 /// `SchemaAgreementError::Timeout` is returned.
2430 ///
2431 /// If `required_node` is Some, only returns Ok if this node successfully
2432 /// returned its schema version during the agreement process.
2433 pub(crate) async fn await_schema_agreement_with_required_node(
2434 &self,
2435 required_node: Option<Uuid>,
2436 ) -> Result<Uuid, SchemaAgreementError> {
2437 // None: no finished attempt recorded
2438 // Some(Ok(())): Last attempt successful, without agreement
2439 // Some(Err(_)): Last attempt failed
2440 let mut last_agreement_failure: Option<Result<(), SchemaAgreementError>> = None;
2441 // The future passed to timeout returns either Ok(Uuid) if agreement was
2442 // reached, or Err(SchemaAgreementError) if there was an error that should
2443 // stop the waiting before timeout.
2444 let agreement_result = timeout(self.schema_agreement_timeout, async {
2445 loop {
2446 let result = self
2447 .check_schema_agreement_with_required_node(required_node)
2448 .await;
2449 debug!("Schema agreement check result: {:?}", result);
2450 match result {
2451 Ok(Some(agreed_version)) => return Ok(agreed_version),
2452 Ok(None) => last_agreement_failure = Some(Ok(())),
2453 Err(err) => {
2454 let decision = Self::classify_schema_check_error(&err);
2455 match decision {
2456 ControlFlow::Continue(_) => {
2457 last_agreement_failure = Some(Err(err));
2458 }
2459 ControlFlow::Break(_) => return Err(err),
2460 }
2461 }
2462 }
2463 tokio::time::sleep(self.schema_agreement_interval).await;
2464 }
2465 })
2466 .await;
2467 match agreement_result {
2468 Err(_timeout) => {
2469 // Timeout occurred. Either all attempts returned possibly-transient errors,
2470 // or just did not reach agreement in time.
2471 let effective_error = match last_agreement_failure {
2472 // There were no finished attempts - the only error we can return is Timeout.
2473 None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2474 // If the last finished attempt resulted in an error, this error will be more informative than Timeout.
2475 Some(Err(err)) => err,
2476 // This is the canonical case for timeout - last attempt finished successfully, but without agreement.
2477 Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2478 };
2479 Err(effective_error)
2480 }
2481 // Agreement encountered a non-transient error, we must return it.
2482 Ok(Err(inner_error)) => Err(inner_error),
2483 // Agreement successful
2484 Ok(Ok(uuid)) => Ok(uuid),
2485 }
2486 }
2487
2488 /// Checks if all reachable nodes have the same schema version.
2489 ///
2490 /// If so, returns that agreed upon version.
2491 pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
2492 self.check_schema_agreement_with_required_node(None).await
2493 }
2494
2495 /// Checks if all reachable nodes have the same schema version.
2496 /// If so, returns that agreed upon version.
2497 ///
2498 /// If `required_node` is Some, only returns Ok if this node successfully
2499 /// returned its schema version.
2500 async fn check_schema_agreement_with_required_node(
2501 &self,
2502 required_node: Option<Uuid>,
2503 ) -> Result<Option<Uuid>, SchemaAgreementError> {
2504 let cluster_state = self.get_cluster_state();
2505 // The iterator is guaranteed to be nonempty.
2506 let per_node_connections = cluster_state.iter_working_connections_per_node()?;
2507
2508 // Therefore, this iterator is guaranteed to be nonempty, too.
2509 let handles = per_node_connections.map(|(host_id, pool)| async move {
2510 (host_id, self.read_node_schema_version(pool).await)
2511 });
2512 // Hence, this is nonempty, too.
2513 let versions_results = join_all(handles).await;
2514
2515 // Verify that required host is present, and returned success.
2516 if let Some(required_node) = required_node {
2517 match versions_results
2518 .iter()
2519 .find(|(host_id, _)| *host_id == required_node)
2520 {
2521 Some((_, Ok(SchemaNodeResult::Success(_version)))) => (),
2522 // For other connections we can ignore Broken error, but for required
2523 // host we need an actual schema version.
2524 Some((_, Ok(SchemaNodeResult::BrokenConnection(e)))) => {
2525 return Err(SchemaAgreementError::RequestError(
2526 RequestAttemptError::BrokenConnectionError(e.clone()),
2527 ));
2528 }
2529 Some((_, Err(e))) => return Err(e.clone()),
2530 None => return Err(SchemaAgreementError::RequiredHostAbsent(required_node)),
2531 }
2532 }
2533
2534 // Now we no longer need all the errors. We can return if there is
2535 // irrecoverable one, and collect the Ok values otherwise.
2536 // TODO(2.0): This expect can be avoided in next major release
2537 #[expect(clippy::result_large_err)]
2538 let versions_results: Vec<_> = versions_results
2539 .into_iter()
2540 .map(|(_, result)| result)
2541 .try_collect()?;
2542
2543 // unwrap is safe because iterator is still not empty.
2544 let local_version = match versions_results
2545 .iter()
2546 .find_or_first(|r| matches!(r, SchemaNodeResult::Success(_)))
2547 .unwrap()
2548 {
2549 SchemaNodeResult::Success(v) => *v,
2550 SchemaNodeResult::BrokenConnection(err) => {
2551 // There are only broken connection errors. Nothing better to do
2552 // than to return an error.
2553 return Err(SchemaAgreementError::RequestError(
2554 RequestAttemptError::BrokenConnectionError(err.clone()),
2555 ));
2556 }
2557 };
2558
2559 let in_agreement = versions_results
2560 .into_iter()
2561 .filter_map(|v_r| match v_r {
2562 SchemaNodeResult::Success(v) => Some(v),
2563 SchemaNodeResult::BrokenConnection(_) => None,
2564 })
2565 .all(|v| v == local_version);
2566 Ok(in_agreement.then_some(local_version))
2567 }
2568
2569 /// Iterate over connections to the node.
2570 /// If fetching succeeds on some connection, return first fetched schema version,
2571 /// as Ok(SchemaNodeResult::Success(...)).
2572 /// Otherwise it means there are only errors:
2573 /// - If, and only if, all connections returned ConnectionBrokenError, first such error will be returned,
2574 /// as Ok(SchemaNodeResult::BrokenConnection(...)).
2575 /// - Otherwise there is some other type of error on some connection. First such error will be returned as Err(...).
2576 ///
2577 /// `connections_to_node` iterator must be non-empty!
2578 async fn read_node_schema_version(
2579 &self,
2580 connections_to_node: impl Iterator<Item = Arc<Connection>>,
2581 ) -> Result<SchemaNodeResult, SchemaAgreementError> {
2582 let mut first_broken_connection_err: Option<BrokenConnectionError> = None;
2583 let mut first_unignorable_err: Option<SchemaAgreementError> = None;
2584 for connection in connections_to_node {
2585 match self.fetch_connection_schema_version(&connection).await {
2586 Ok(schema_version) => return Ok(SchemaNodeResult::Success(schema_version)),
2587 Err(SchemaAgreementError::RequestError(
2588 RequestAttemptError::BrokenConnectionError(conn_err),
2589 )) => {
2590 if first_broken_connection_err.is_none() {
2591 first_broken_connection_err = Some(conn_err);
2592 }
2593 }
2594 Err(err) => {
2595 if first_unignorable_err.is_none() {
2596 first_unignorable_err = Some(err);
2597 }
2598 }
2599 }
2600 }
2601 // The iterator was guaranteed to be nonempty, so there must have been at least one error.
2602 // It means at least one of `first_broken_connection_err` and `first_unrecoverable_err` is Some.
2603 if let Some(err) = first_unignorable_err {
2604 return Err(err);
2605 }
2606
2607 Ok(SchemaNodeResult::BrokenConnection(
2608 first_broken_connection_err.unwrap(),
2609 ))
2610 }
2611
2612 /// Fetches the schema version from a single connection.
2613 async fn fetch_connection_schema_version(
2614 &self,
2615 connection: &Connection,
2616 ) -> Result<Uuid, SchemaAgreementError> {
2617 let result = connection
2618 .query_unpaged(self.get_schema_version_statement())
2619 .await?;
2620
2621 let (version_id,) = result
2622 .into_rows_result()
2623 .map_err(SchemaAgreementError::TracesEventsIntoRowsResultError)?
2624 .single_row::<(Uuid,)>()
2625 .map_err(SchemaAgreementError::SingleRowError)?;
2626
2627 Ok(version_id)
2628 }
2629
2630 /// Retrieves the handle to execution profile that is used by this session
2631 /// by default, i.e. when an executed statement does not define its own handle.
2632 pub fn get_default_execution_profile_handle(&self) -> &ExecutionProfileHandle {
2633 &self.default_execution_profile_handle
2634 }
2635}
2636
2637struct ExecuteRequestContext<'a> {
2638 is_idempotent: bool,
2639 consistency_set_on_statement: Option<Consistency>,
2640 retry_session: Box<dyn RetrySession>,
2641 history_data: Option<HistoryData<'a>>,
2642 load_balancing_policy: &'a dyn load_balancing::LoadBalancingPolicy,
2643 query_info: &'a load_balancing::RoutingInfo<'a>,
2644 request_span: &'a RequestSpan,
2645}
2646
2647struct HistoryData<'a> {
2648 listener: &'a dyn HistoryListener,
2649 request_id: history::RequestId,
2650 speculative_id: Option<history::SpeculativeId>,
2651}
2652
2653impl ExecuteRequestContext<'_> {
2654 fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
2655 self.history_data.as_ref().map(|hd| {
2656 hd.listener
2657 .log_attempt_start(hd.request_id, hd.speculative_id, node_addr)
2658 })
2659 }
2660
2661 fn log_attempt_success(&self, attempt_id_opt: &Option<history::AttemptId>) {
2662 let attempt_id: &history::AttemptId = match attempt_id_opt {
2663 Some(id) => id,
2664 None => return,
2665 };
2666
2667 let history_data: &HistoryData = match &self.history_data {
2668 Some(data) => data,
2669 None => return,
2670 };
2671
2672 history_data.listener.log_attempt_success(*attempt_id);
2673 }
2674
2675 fn log_attempt_error(
2676 &self,
2677 attempt_id_opt: &Option<history::AttemptId>,
2678 error: &RequestAttemptError,
2679 retry_decision: &RetryDecision,
2680 ) {
2681 let attempt_id: &history::AttemptId = match attempt_id_opt {
2682 Some(id) => id,
2683 None => return,
2684 };
2685
2686 let history_data: &HistoryData = match &self.history_data {
2687 Some(data) => data,
2688 None => return,
2689 };
2690
2691 history_data
2692 .listener
2693 .log_attempt_error(*attempt_id, error, retry_decision);
2694 }
2695}
2696
2697#[derive(Debug)]
2698enum SchemaNodeResult {
2699 Success(Uuid),
2700 BrokenConnection(BrokenConnectionError),
2701}