Skip to main content

scylla/client/
session.rs

1//! `Session` is the main object used in the driver.\
2//! It manages all connections to the cluster and allows to execute CQL requests.
3
4use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
5use super::pager::{PreparedPagerConfig, QueryPager};
6use super::{Compression, PoolSize, SelfIdentity, WriteCoalescingDelay};
7use crate::authentication::AuthenticatorProvider;
8use crate::client::client_routes::ClientRoutesConfig;
9use crate::cluster::node::{KnownNode, NodeRef};
10use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
11use crate::errors::{
12    BadQuery, BrokenConnectionError, ExecutionError, MetadataError, NewSessionError,
13    PagerExecutionError, PrepareError, RequestAttemptError, RequestError, SchemaAgreementError,
14    TracingError, UseKeyspaceError,
15};
16use crate::frame::response::result;
17use crate::network::tls::TlsProvider;
18use crate::network::{Connection, ConnectionConfig, PoolConfig, VerifiedKeyspaceName};
19use crate::observability::driver_tracing::RequestSpan;
20use crate::observability::history::{self, HistoryListener};
21#[cfg(feature = "metrics")]
22use crate::observability::metrics::Metrics;
23use crate::observability::tracing::TracingInfo;
24use crate::policies::address_translator::AddressTranslator;
25use crate::policies::host_filter::HostFilter;
26use crate::policies::load_balancing::{self, RoutingInfo};
27use crate::policies::reconnect::ExponentialReconnectPolicy;
28#[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
29use crate::policies::reconnect::ReconnectPolicy;
30use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
31use crate::policies::speculative_execution;
32use crate::policies::timestamp_generator::TimestampGenerator;
33use crate::response::query_result::{MaybeFirstRowError, QueryResult, RowsError};
34use crate::response::{
35    Coordinator, NonErrorQueryResponse, PagingState, PagingStateResponse, QueryResponse,
36};
37use crate::routing::partitioner::PartitionerName;
38use crate::routing::{Shard, ShardAwarePortRange};
39use crate::statement::batch::batch_values;
40use crate::statement::batch::{Batch, BatchStatement};
41use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
42use crate::statement::unprepared::Statement;
43use crate::statement::{Consistency, PageSize, StatementConfig};
44use arc_swap::ArcSwapOption;
45use futures::future::join_all;
46use futures::future::try_join_all;
47use itertools::Itertools;
48use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadataV2 as NonErrorResponseWithDeserializedMetadata;
49use scylla_cql::frame::response::error::DbError;
50use scylla_cql::serialize::batch::BatchValues;
51use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
52use std::borrow::Borrow;
53use std::future::Future;
54use std::net::{IpAddr, SocketAddr};
55use std::num::NonZeroU32;
56use std::ops::ControlFlow;
57use std::sync::{Arc, OnceLock};
58use std::time::Duration;
59use thiserror::Error;
60use tokio::time::timeout;
61use tracing::{Instrument, debug, error, trace, trace_span};
62use uuid::Uuid;
63
64pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
65
66// Query used for schema agreement checks
67const SCHEMA_VERSION_QUERY_STR: &str = "SELECT schema_version FROM system.local WHERE key='local'";
68
69/// Statements for internal driver operations.
70///
71/// We would like those to be prepared, but there are issues related to
72/// changing connection keyspace: https://github.com/scylladb/scylla-rust-driver/issues/1561
73#[derive(Default)]
74struct InternalStatements {
75    /// Statement for querying tracing session info from system_traces.sessions
76    tracing_session: OnceLock<Statement>,
77    /// Statement for querying tracing events from system_traces.events
78    tracing_events: OnceLock<Statement>,
79    /// Statement for fetching schema version during schema agreement checks
80    schema_version: OnceLock<Statement>,
81}
82
83/// `Session` manages connections to the cluster and allows to execute CQL requests.
84pub struct Session {
85    cluster: Cluster,
86    default_execution_profile_handle: ExecutionProfileHandle,
87    schema_agreement_interval: Duration,
88    #[cfg(feature = "metrics")]
89    metrics: Arc<Metrics>,
90    schema_agreement_timeout: Duration,
91    schema_agreement_automatic_waiting: bool,
92    refresh_metadata_on_auto_schema_agreement: bool,
93    keyspace_name: Arc<ArcSwapOption<String>>,
94    tracing_info_fetch_attempts: NonZeroU32,
95    tracing_info_fetch_interval: Duration,
96    tracing_info_fetch_consistency: Consistency,
97    internal_statements: InternalStatements,
98}
99
100/// This implementation deliberately omits some details from Cluster in order
101/// to avoid cluttering the print with much information of little usability.
102impl std::fmt::Debug for Session {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        let mut d = f.debug_struct("Session");
105        d.field("cluster", &ClusterNeatDebug(&self.cluster))
106            .field(
107                "default_execution_profile_handle",
108                &self.default_execution_profile_handle,
109            )
110            .field("schema_agreement_interval", &self.schema_agreement_interval);
111
112        #[cfg(feature = "metrics")]
113        d.field("metrics", &self.metrics);
114
115        d.field(
116            "auto_await_schema_agreement_timeout",
117            &self.schema_agreement_timeout,
118        )
119        .field(
120            "schema_agreement_automatic_waiting",
121            &self.schema_agreement_automatic_waiting,
122        )
123        .field(
124            "refresh_metadata_on_auto_schema_agreement",
125            &self.refresh_metadata_on_auto_schema_agreement,
126        )
127        .field("keyspace_name", &self.keyspace_name)
128        .field(
129            "tracing_info_fetch_attempts",
130            &self.tracing_info_fetch_attempts,
131        )
132        .field(
133            "tracing_info_fetch_interval",
134            &self.tracing_info_fetch_interval,
135        )
136        .field(
137            "tracing_info_fetch_consistency",
138            &self.tracing_info_fetch_consistency,
139        )
140        .finish()
141    }
142}
143
144/// Represents a TLS context used to configure TLS connections to DB nodes.
145/// Abstracts over various TLS implementations, such as OpenSSL and Rustls.
146#[derive(Clone)] // Cheaply clonable - reference counted.
147#[non_exhaustive]
148pub enum TlsContext {
149    /// TLS context backed by OpenSSL 0.10.
150    #[cfg(feature = "openssl-010")]
151    OpenSsl010(openssl::ssl::SslContext),
152    /// TLS context backed by Rustls 0.23.
153    #[cfg(feature = "rustls-023")]
154    Rustls023(Arc<rustls::ClientConfig>),
155}
156
157#[cfg(feature = "openssl-010")]
158impl From<openssl::ssl::SslContext> for TlsContext {
159    fn from(value: openssl::ssl::SslContext) -> Self {
160        TlsContext::OpenSsl010(value)
161    }
162}
163
164#[cfg(feature = "rustls-023")]
165impl From<Arc<rustls::ClientConfig>> for TlsContext {
166    fn from(value: Arc<rustls::ClientConfig>) -> Self {
167        TlsContext::Rustls023(value)
168    }
169}
170
171/// Configuration options for [`Session`].
172/// Can be created manually, but usually it's easier to use
173/// [SessionBuilder](super::session_builder::SessionBuilder)
174#[derive(Clone)]
175#[non_exhaustive]
176pub struct SessionConfig {
177    /// List of database servers known on Session startup.
178    /// Session will connect to these nodes to retrieve information about other nodes in the cluster.
179    /// Each node can be represented as a hostname or an IP address.
180    pub known_nodes: Vec<KnownNode>,
181
182    /// A local ip address to bind all driver's TCP sockets to.
183    ///
184    /// By default set to None, which is equivalent to:
185    /// - `INADDR_ANY` for IPv4 ([`Ipv4Addr::UNSPECIFIED`][std::net::Ipv4Addr::UNSPECIFIED])
186    /// - `in6addr_any` for IPv6 ([`Ipv6Addr::UNSPECIFIED`][std::net::Ipv6Addr::UNSPECIFIED])
187    pub local_ip_address: Option<IpAddr>,
188
189    /// Specifies the local port range used for shard-aware connections.
190    ///
191    /// By default set to [`ShardAwarePortRange::EPHEMERAL_PORT_RANGE`].
192    pub shard_aware_local_port_range: ShardAwarePortRange,
193
194    /// Preferred compression algorithm to use on connections.
195    /// If it's not supported by database server Session will fall back to no compression.
196    pub compression: Option<Compression>,
197
198    /// Whether to set the nodelay TCP flag.
199    pub tcp_nodelay: bool,
200
201    /// TCP keepalive interval, which means how often keepalive messages
202    /// are sent **on TCP layer** when a connection is idle.
203    /// If `None`, no TCP keepalive messages are sent.
204    pub tcp_keepalive_interval: Option<Duration>,
205
206    /// Size of the TCP receive buffer in bytes.
207    /// If `None`, the OS default is used.
208    pub tcp_recv_buffer_size: Option<usize>,
209
210    /// Size of the TCP send buffer in bytes.
211    /// If `None`, the OS default is used.
212    pub tcp_send_buffer_size: Option<usize>,
213
214    /// Whether to set the `SO_REUSEADDR` socket option.
215    /// If `None`, the OS default is used (typically `false`).
216    pub tcp_reuse_address: Option<bool>,
217
218    /// Linger duration for the socket.
219    /// If `None`, the OS default is used (lingering disabled).
220    /// Setting this to `Some(Duration::ZERO)` causes the connection to be reset (RST) on close.
221    pub tcp_linger: Option<Duration>,
222
223    /// Handle to the default execution profile, which is used
224    /// for all statements that do not specify an execution profile.
225    pub default_execution_profile_handle: ExecutionProfileHandle,
226
227    /// Keyspace to be used on all connections.
228    /// Each connection will send `"USE <keyspace_name>"` before sending any requests.
229    /// This can be later changed with [`Session::use_keyspace`].
230    pub used_keyspace: Option<String>,
231
232    /// Whether the keyspace name is case-sensitive.
233    /// This is used to determine how the keyspace name is sent to the server:
234    /// - if case-insensitive, it is sent as-is,
235    /// - if case-sensitive, it is enclosed in double quotes.
236    pub keyspace_case_sensitive: bool,
237
238    /// TLS context used configure TLS connections to DB nodes.
239    pub tls_context: Option<TlsContext>,
240
241    /// Custom authenticator provider to create an authenticator instance
242    /// upon session creation.
243    pub authenticator: Option<Arc<dyn AuthenticatorProvider>>,
244
245    /// Timeout for establishing connections to a node.
246    ///
247    /// If it's higher than underlying os's default connection timeout, it won't have
248    /// any effect.
249    pub connect_timeout: Duration,
250
251    /// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
252    /// The default is `PerShard(1)`, which is the recommended setting for ScyllaDB clusters.
253    pub connection_pool_size: PoolSize,
254
255    /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
256    /// Generally, this options is best left as default (false).
257    pub disallow_shard_aware_port: bool,
258
259    /// Policy that determines how long the connection pool waits between attempts
260    /// to fill connections to given host.
261    #[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
262    pub reconnect_policy: Arc<dyn ReconnectPolicy>,
263
264    ///  Timestamp generator used for generating timestamps on the client-side
265    ///  If None, server-side timestamps are used.
266    pub timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
267
268    /// If empty, fetch all keyspaces
269    pub keyspaces_to_fetch: Vec<String>,
270
271    /// If true, full schema is fetched with every metadata refresh.
272    pub fetch_schema_metadata: bool,
273
274    /// Custom timeout for requests that query metadata.
275    pub metadata_request_serverside_timeout: Option<Duration>,
276
277    /// Interval of sending keepalive requests.
278    /// If `None`, keepalives are never sent, so `Self::keepalive_timeout` has no effect.
279    pub keepalive_interval: Option<Duration>,
280
281    /// Controls after what time of not receiving response to keepalives a connection is closed.
282    /// If `None`, connections are never closed due to lack of response to a keepalive message.
283    pub keepalive_timeout: Option<Duration>,
284
285    /// How often the driver should ask if schema is in agreement.
286    pub schema_agreement_interval: Duration,
287
288    /// Controls the timeout for waiting for schema agreement.
289    /// This works both for manual awaiting schema agreement and for
290    /// automatic waiting after a schema-altering statement is sent.
291    pub schema_agreement_timeout: Duration,
292
293    /// Controls whether schema agreement is automatically awaited
294    /// after sending a schema-altering statement.
295    pub schema_agreement_automatic_waiting: bool,
296
297    /// If true, full schema metadata is fetched after successfully reaching a schema agreement.
298    /// It is true by default but can be disabled if successive schema-altering statements should be performed.
299    pub refresh_metadata_on_auto_schema_agreement: bool,
300
301    /// DNS hostname resolution timeout.
302    /// If `None`, the driver will wait for hostname resolution indefinitely.
303    pub hostname_resolution_timeout: Option<Duration>,
304
305    /// The address translator is used to translate addresses received from ScyllaDB nodes
306    /// (either with cluster metadata or with an event) to addresses that can be used to
307    /// actually connect to those nodes. This may be needed e.g. when there is NAT
308    /// between the nodes and the driver.
309    pub address_translator: Option<Arc<dyn AddressTranslator>>,
310
311    /// Routing configuration for Scylla Cloud. If set, Session will connect
312    /// to Scylla Cloud clusters using custom routing based on `system.client_routes`.
313    #[cfg(feature = "unstable-client-routes")]
314    pub(crate) client_routes_config: Option<super::client_routes::ClientRoutesConfig>,
315
316    /// The host filter decides whether any connections should be opened
317    /// to the node or not. The driver will also avoid filtered out nodes when
318    /// re-establishing the control connection.
319    pub host_filter: Option<Arc<dyn HostFilter>>,
320
321    #[cfg(all(scylla_unstable, feature = "unstable-host-listener"))]
322    /// Optional listener for host events (ADD, REMOVE, UP, DOWN).
323    pub host_listener: Option<Arc<dyn crate::policies::host_listener::HostListener>>,
324
325    /// If true, the driver will inject a delay controlled by [`SessionConfig::write_coalescing_delay`]
326    /// before flushing data to the socket.
327    /// This gives the driver an opportunity to collect more write requests
328    /// and write them in a single syscall, increasing the efficiency.
329    ///
330    /// However, this optimization may worsen latency if the rate of requests
331    /// issued by the application is low, but otherwise the application is
332    /// heavily loaded with other tasks on the same tokio executor.
333    /// Please do performance measurements before committing to disabling
334    /// this option.
335    pub enable_write_coalescing: bool,
336
337    /// Controls the write coalescing delay (if enabled).
338    ///
339    /// This option has no effect if [`SessionConfig::enable_write_coalescing`] is false.
340    ///
341    /// This option is [`WriteCoalescingDelay::SmallNondeterministic`] by default.
342    pub write_coalescing_delay: WriteCoalescingDelay,
343
344    /// Number of attempts to fetch [`TracingInfo`]
345    /// in [`Session::get_tracing_info`]. Tracing info
346    /// might not be available immediately on queried node - that's why
347    /// the driver performs a few attempts with sleeps in between.
348    pub tracing_info_fetch_attempts: NonZeroU32,
349
350    /// Delay between attempts to fetch [`TracingInfo`]
351    /// in [`Session::get_tracing_info`]. Tracing info
352    /// might not be available immediately on queried node - that's why
353    /// the driver performs a few attempts with sleeps in between.
354    pub tracing_info_fetch_interval: Duration,
355
356    /// Consistency level of fetching [`TracingInfo`]
357    /// in [`Session::get_tracing_info`].
358    pub tracing_info_fetch_consistency: Consistency,
359
360    /// Interval between refreshing cluster metadata. This
361    /// can be configured according to the traffic pattern
362    /// for e.g: if they do not want unexpected traffic
363    /// or they expect the topology to change frequently.
364    pub cluster_metadata_refresh_interval: Duration,
365
366    /// Driver and application self-identifying information,
367    /// to be sent to server in STARTUP message.
368    pub identity: SelfIdentity<'static>,
369}
370
371impl SessionConfig {
372    /// Creates a [`SessionConfig`] with default configuration
373    /// # Default configuration
374    /// * Compression: None
375    /// * Load balancing policy: Token-aware Round-robin
376    ///
377    /// # Example
378    /// ```
379    /// # use scylla::client::session::SessionConfig;
380    /// let config = SessionConfig::new();
381    /// ```
382    pub fn new() -> Self {
383        SessionConfig {
384            known_nodes: Vec::new(),
385            local_ip_address: None,
386            shard_aware_local_port_range: ShardAwarePortRange::EPHEMERAL_PORT_RANGE,
387            compression: None,
388            tcp_nodelay: true,
389            tcp_keepalive_interval: None,
390            tcp_recv_buffer_size: None,
391            tcp_send_buffer_size: None,
392            tcp_reuse_address: None,
393            tcp_linger: None,
394            schema_agreement_interval: Duration::from_millis(200),
395            default_execution_profile_handle: ExecutionProfile::new_from_inner(Default::default())
396                .into_handle(),
397            used_keyspace: None,
398            keyspace_case_sensitive: false,
399            tls_context: None,
400            authenticator: None,
401            connect_timeout: Duration::from_secs(5),
402            hostname_resolution_timeout: Some(Duration::from_secs(5)),
403            connection_pool_size: Default::default(),
404            disallow_shard_aware_port: false,
405            #[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
406            reconnect_policy: Arc::new(ExponentialReconnectPolicy::new()),
407            timestamp_generator: None,
408            keyspaces_to_fetch: Vec::new(),
409            fetch_schema_metadata: true,
410            metadata_request_serverside_timeout: Some(Duration::from_secs(2)),
411            keepalive_interval: Some(Duration::from_secs(30)),
412            keepalive_timeout: Some(Duration::from_secs(30)),
413            schema_agreement_timeout: Duration::from_secs(60),
414            schema_agreement_automatic_waiting: true,
415            address_translator: None,
416            host_filter: None,
417            #[cfg(all(scylla_unstable, feature = "unstable-host-listener"))]
418            host_listener: None,
419            refresh_metadata_on_auto_schema_agreement: true,
420            enable_write_coalescing: true,
421            write_coalescing_delay: WriteCoalescingDelay::SmallNondeterministic,
422            tracing_info_fetch_attempts: NonZeroU32::new(10).unwrap(),
423            tracing_info_fetch_interval: Duration::from_millis(3),
424            tracing_info_fetch_consistency: Consistency::One,
425            cluster_metadata_refresh_interval: Duration::from_secs(60),
426            identity: SelfIdentity::default(),
427            #[cfg(feature = "unstable-client-routes")]
428            client_routes_config: None,
429        }
430    }
431
432    /// Adds a known database server with a hostname.
433    /// If the port is not explicitly specified, 9042 is used as default
434    /// # Example
435    /// ```
436    /// # use scylla::client::session::SessionConfig;
437    /// let mut config = SessionConfig::new();
438    /// config.add_known_node("127.0.0.1");
439    /// config.add_known_node("db1.example.com:9042");
440    /// ```
441    pub fn add_known_node(&mut self, hostname: impl AsRef<str>) {
442        self.known_nodes
443            .push(KnownNode::Hostname(hostname.as_ref().to_string()));
444    }
445
446    /// Adds a known database server with an IP address
447    /// # Example
448    /// ```
449    /// # use scylla::client::session::SessionConfig;
450    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
451    /// let mut config = SessionConfig::new();
452    /// config.add_known_node_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9042));
453    /// ```
454    pub fn add_known_node_addr(&mut self, node_addr: SocketAddr) {
455        self.known_nodes.push(KnownNode::Address(node_addr));
456    }
457
458    /// Adds a list of known database server with hostnames.
459    /// If the port is not explicitly specified, 9042 is used as default
460    /// # Example
461    /// ```
462    /// # use scylla::client::session::SessionConfig;
463    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
464    /// let mut config = SessionConfig::new();
465    /// config.add_known_nodes(&["127.0.0.1:9042", "db1.example.com"]);
466    /// ```
467    pub fn add_known_nodes(&mut self, hostnames: impl IntoIterator<Item = impl AsRef<str>>) {
468        for hostname in hostnames {
469            self.add_known_node(hostname);
470        }
471    }
472
473    /// Adds a list of known database servers with IP addresses
474    /// # Example
475    /// ```
476    /// # use scylla::client::session::SessionConfig;
477    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
478    /// let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 9042);
479    /// let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9042);
480    ///
481    /// let mut config = SessionConfig::new();
482    /// config.add_known_nodes_addr(&[addr1, addr2]);
483    /// ```
484    pub fn add_known_nodes_addr(
485        &mut self,
486        node_addrs: impl IntoIterator<Item = impl Borrow<SocketAddr>>,
487    ) {
488        for address in node_addrs {
489            self.add_known_node_addr(*address.borrow());
490        }
491    }
492}
493
494/// Creates default [`SessionConfig`], same as [`SessionConfig::new`]
495impl Default for SessionConfig {
496    fn default() -> Self {
497        Self::new()
498    }
499}
500
501impl SessionConfig {
502    /// [SessionConfig] may unfortunately represent invalid configurations. We need to rule them out
503    /// at runtime by running validation.
504    #[expect(clippy::result_large_err)] // TODO(2.0): Make NewSessionError smaller.
505    fn validate(&self) -> Result<(), NewSessionError> {
506        // Ensure there is at least one known node
507        if self.known_nodes.is_empty() {
508            return Err(NewSessionError::EmptyKnownNodesList);
509        }
510
511        // Ensure no illegal configuration with Client Routes
512        #[cfg(feature = "unstable-client-routes")]
513        if self.client_routes_config.is_some() {
514            if self.address_translator.is_some() {
515                return Err(NewSessionError::IllegalConfig(
516                    "User-provided address translator is not supported if ClientRoutesConfig is provided, \
517                        because the driver uses its own custom translator for client routes".into(),
518                ));
519            }
520
521            if self.tls_context.is_some() {
522                return Err(NewSessionError::IllegalConfig(
523                    "TLS is not (yet) supported if ClientRoutesConfig is provided, \
524                        because of architectural limitations that are out of the driver's scope"
525                        .into(),
526                ));
527            }
528        }
529
530        Ok(())
531    }
532}
533
534pub(crate) enum RunRequestResult<ResT> {
535    IgnoredWriteError,
536    Completed(ResT),
537}
538
539/// Represents a CQL session, which can be used to communicate
540/// with the database
541impl Session {
542    /// Sends a request to the database and receives a response.\
543    /// Executes an unprepared CQL statement without paging, i.e. all results are received in a single response.
544    ///
545    /// This is the easiest way to execute a CQL statement, but performance is worse than that of prepared statements.
546    ///
547    /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
548    /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
549    /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_unpaged()`] instead.
550    ///
551    /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
552    /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
553    /// it is best to use paged requests:
554    /// - to receive multiple pages and transparently iterate through them, use [query_iter](Session::query_iter).
555    /// - to manually receive multiple pages and iterate through them, use [query_single_page](Session::query_single_page).
556    ///
557    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/unprepared.html) for more information
558    /// # Arguments
559    /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
560    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
561    ///
562    /// # Examples
563    /// ```rust
564    /// # use scylla::client::session::Session;
565    /// # use std::error::Error;
566    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
567    /// // Insert an int and text into a table.
568    /// session
569    ///     .query_unpaged(
570    ///         "INSERT INTO ks.tab (a, b) VALUES(?, ?)",
571    ///         (2_i32, "some text")
572    ///     )
573    ///     .await?;
574    /// # Ok(())
575    /// # }
576    /// ```
577    /// ```rust
578    /// # use scylla::client::session::Session;
579    /// # use std::error::Error;
580    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
581    ///
582    /// // Read rows containing an int and text.
583    /// // Keep in mind that all results come in one response (no paging is done!),
584    /// // so the memory footprint and latency may be huge!
585    /// // To prevent that, use `Session::query_iter` or `Session::query_single_page`.
586    /// let query_rows = session
587    ///     .query_unpaged("SELECT a, b FROM ks.tab", &[])
588    ///     .await?
589    ///     .into_rows_result()?;
590    ///
591    /// for row in query_rows.rows()? {
592    ///     // Parse row as int and text.
593    ///     let (int_val, text_val): (i32, &str) = row?;
594    /// }
595    /// # Ok(())
596    /// # }
597    /// ```
598    pub async fn query_unpaged(
599        &self,
600        statement: impl Into<Statement>,
601        values: impl SerializeRow,
602    ) -> Result<QueryResult, ExecutionError> {
603        self.do_query_unpaged(&statement.into(), values).await
604    }
605
606    /// Queries a single page from the database, optionally continuing from a saved point.
607    ///
608    /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
609    /// trait method returns false). In such case, CQL statement first needs to be prepared (on a single connection), so
610    /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_single_page()`] instead.
611    ///
612    /// # Arguments
613    ///
614    /// * `statement` - statement to be executed
615    /// * `values` - values bound to the statement
616    /// * `paging_state` - previously received paging state or [PagingState::start()]
617    ///
618    /// # Example
619    ///
620    /// ```rust
621    /// # use scylla::client::session::Session;
622    /// # use std::error::Error;
623    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
624    /// use std::ops::ControlFlow;
625    /// use scylla::response::PagingState;
626    ///
627    /// // Manual paging in a loop, unprepared statement.
628    /// let mut paging_state = PagingState::start();
629    /// loop {
630    ///    let (res, paging_state_response) = session
631    ///        .query_single_page("SELECT a, b, c FROM ks.tbl", &[], paging_state)
632    ///        .await?;
633    ///
634    ///    // Do something with a single page of results.
635    ///    for row in res
636    ///        .into_rows_result()?
637    ///        .rows::<(i32, &str)>()?
638    ///    {
639    ///        let (a, b) = row?;
640    ///    }
641    ///
642    ///    match paging_state_response.into_paging_control_flow() {
643    ///        ControlFlow::Break(()) => {
644    ///            // No more pages to be fetched.
645    ///            break;
646    ///        }
647    ///        ControlFlow::Continue(new_paging_state) => {
648    ///            // Update paging state from the response, so that query
649    ///            // will be resumed from where it ended the last time.
650    ///            paging_state = new_paging_state;
651    ///        }
652    ///    }
653    /// }
654    /// # Ok(())
655    /// # }
656    /// ```
657    pub async fn query_single_page(
658        &self,
659        statement: impl Into<Statement>,
660        values: impl SerializeRow,
661        paging_state: PagingState,
662    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
663        self.do_query_single_page(&statement.into(), values, paging_state)
664            .await
665    }
666
667    /// Execute an unprepared CQL statement with paging\
668    /// This method will query all pages of the result\
669    ///
670    /// Returns an async iterator (stream) over all received rows\
671    /// Page size can be specified in the [`Statement`] passed to the function
672    ///
673    /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
674    /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
675    /// driver will initially perform 2 round trips instead of 1. Please use [`Session::execute_iter()`] instead.
676    ///
677    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
678    ///
679    /// # Arguments
680    /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
681    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
682    ///
683    /// # Example
684    ///
685    /// ```rust
686    /// # use scylla::client::session::Session;
687    /// # use std::error::Error;
688    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
689    /// use futures::stream::StreamExt;
690    ///
691    /// let mut rows_stream = session
692    ///    .query_iter("SELECT a, b FROM ks.t", &[])
693    ///    .await?
694    ///    .rows_stream::<(i32, i32)>()?;
695    ///
696    /// while let Some(next_row_res) = rows_stream.next().await {
697    ///     let (a, b): (i32, i32) = next_row_res?;
698    ///     println!("a, b: {}, {}", a, b);
699    /// }
700    /// # Ok(())
701    /// # }
702    /// ```
703    pub async fn query_iter(
704        &self,
705        statement: impl Into<Statement>,
706        values: impl SerializeRow,
707    ) -> Result<QueryPager, PagerExecutionError> {
708        self.do_query_iter(statement.into(), values).await
709    }
710
711    /// Execute a prepared statement. Requires a [PreparedStatement]
712    /// generated using [`Session::prepare`](Session::prepare).\
713    /// Performs an unpaged request, i.e. all results are received in a single response.
714    ///
715    /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
716    /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
717    /// it is best to use paged requests:
718    /// - to receive multiple pages and transparently iterate through them, use [execute_iter](Session::execute_iter).
719    /// - to manually receive multiple pages and iterate through them, use [execute_single_page](Session::execute_single_page).
720    ///
721    /// Prepared statements are much faster than unprepared statements:
722    /// * Database doesn't need to parse the statement string upon each execution (only once)
723    /// * They are properly load balanced using token aware routing
724    ///
725    /// > ***Warning***\
726    /// > For token/shard aware load balancing to work properly, all partition key values
727    /// > must be sent as bound values
728    /// > (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance)).
729    ///
730    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
731    ///
732    /// # Arguments
733    /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
734    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
735    ///
736    /// # Example
737    /// ```rust
738    /// # use scylla::client::session::Session;
739    /// # use std::error::Error;
740    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
741    /// use scylla::statement::prepared::PreparedStatement;
742    ///
743    /// // Prepare the statement for later execution
744    /// let prepared: PreparedStatement = session
745    ///     .prepare("INSERT INTO ks.tab (a) VALUES(?)")
746    ///     .await?;
747    ///
748    /// // Execute the prepared statement with some values, just like an unprepared statement.
749    /// let to_insert: i32 = 12345;
750    /// session.execute_unpaged(&prepared, (to_insert,)).await?;
751    /// # Ok(())
752    /// # }
753    /// ```
754    pub async fn execute_unpaged(
755        &self,
756        prepared: &PreparedStatement,
757        values: impl SerializeRow,
758    ) -> Result<QueryResult, ExecutionError> {
759        let serialized_values = prepared.serialize_values(&values)?;
760        let (result, paging_state) = self
761            .execute(prepared, &serialized_values, None, PagingState::start())
762            .await?;
763        if !paging_state.finished() {
764            error!(
765                "Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug."
766            );
767            return Err(ExecutionError::LastAttemptError(
768                RequestAttemptError::NonfinishedPagingState,
769            ));
770        }
771        Ok(result)
772    }
773
774    /// Executes a prepared statement, restricting results to single page.
775    /// Optionally continues fetching results from a saved point.
776    ///
777    /// # Arguments
778    ///
779    /// * `prepared` - a statement prepared with [prepare](crate::client::session::Session::prepare)
780    /// * `values` - values bound to the statement
781    /// * `paging_state` - continuation based on a paging state received from a previous paged query or None
782    ///
783    /// # Example
784    ///
785    /// ```rust
786    /// # use scylla::client::session::Session;
787    /// # use std::error::Error;
788    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
789    /// use std::ops::ControlFlow;
790    /// use scylla::statement::unprepared::Statement;
791    /// use scylla::response::{PagingState, PagingStateResponse};
792    ///
793    /// let paged_prepared = session
794    ///     .prepare(
795    ///         Statement::new("SELECT a, b FROM ks.tbl")
796    ///             .with_page_size(100.try_into().unwrap()),
797    ///     )
798    ///     .await?;
799    ///
800    /// // Manual paging in a loop, prepared statement.
801    /// let mut paging_state = PagingState::start();
802    /// loop {
803    ///     let (res, paging_state_response) = session
804    ///         .execute_single_page(&paged_prepared, &[], paging_state)
805    ///         .await?;
806    ///
807    ///    // Do something with a single page of results.
808    ///    for row in res
809    ///        .into_rows_result()?
810    ///        .rows::<(i32, &str)>()?
811    ///    {
812    ///        let (a, b) = row?;
813    ///    }
814    ///
815    ///     match paging_state_response.into_paging_control_flow() {
816    ///         ControlFlow::Break(()) => {
817    ///             // No more pages to be fetched.
818    ///             break;
819    ///         }
820    ///         ControlFlow::Continue(new_paging_state) => {
821    ///             // Update paging continuation from the paging state, so that query
822    ///             // will be resumed from where it ended the last time.
823    ///             paging_state = new_paging_state;
824    ///         }
825    ///     }
826    /// }
827    /// # Ok(())
828    /// # }
829    /// ```
830    pub async fn execute_single_page(
831        &self,
832        prepared: &PreparedStatement,
833        values: impl SerializeRow,
834        paging_state: PagingState,
835    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
836        let serialized_values = prepared.serialize_values(&values)?;
837        let page_size = prepared.get_validated_page_size();
838        self.execute(prepared, &serialized_values, Some(page_size), paging_state)
839            .await
840    }
841
842    /// Execute a prepared statement with paging.\
843    /// This method will query all pages of the result.\
844    ///
845    /// Returns an async iterator (stream) over all received rows.\
846    /// Page size can be specified in the [PreparedStatement] passed to the function.
847    ///
848    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
849    ///
850    /// # Arguments
851    /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
852    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
853    ///
854    /// # Example
855    ///
856    /// ```rust
857    /// # use scylla::client::session::Session;
858    /// # use futures::StreamExt as _;
859    /// # use std::error::Error;
860    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
861    /// use scylla::statement::prepared::PreparedStatement;
862    ///
863    /// // Prepare the statement for later execution
864    /// let prepared: PreparedStatement = session
865    ///     .prepare("SELECT a, b FROM ks.t")
866    ///     .await?;
867    ///
868    /// // Execute the statement and receive all pages
869    /// let mut rows_stream = session
870    ///    .execute_iter(prepared, &[])
871    ///    .await?
872    ///    .rows_stream::<(i32, i32)>()?;
873    ///
874    /// while let Some(next_row_res) = rows_stream.next().await {
875    ///     let (a, b): (i32, i32) = next_row_res?;
876    ///     println!("a, b: {}, {}", a, b);
877    /// }
878    /// # Ok(())
879    /// # }
880    /// ```
881    pub async fn execute_iter(
882        &self,
883        prepared: impl Into<PreparedStatement>,
884        values: impl SerializeRow,
885    ) -> Result<QueryPager, PagerExecutionError> {
886        let prepared = prepared.into();
887        let serialized_values = prepared.serialize_values(&values)?;
888
889        self.execute_iter_nongeneric(prepared, serialized_values)
890            .await
891    }
892
893    /// Execute a batch statement\
894    /// Batch contains many `unprepared` or `prepared` statements which are executed at once\
895    /// Batch doesn't return any rows.
896    ///
897    /// Batch values must contain values for each of the statements.
898    ///
899    /// Avoid using non-empty values ([`SerializeRow::is_empty()`] return false) for unprepared statements
900    /// inside the batch. Such statements will first need to be prepared, so the driver will need to
901    /// send (numer_of_unprepared_statements_with_values + 1) requests instead of 1 request, severly
902    /// affecting performance.
903    ///
904    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/batch.html) for more information
905    ///
906    /// # Arguments
907    /// * `batch` - [Batch] to be performed
908    /// * `values` - List of values for each statement, it's the easiest to use a tuple of tuples
909    ///
910    /// # Example
911    /// ```rust
912    /// # use scylla::client::session::Session;
913    /// # use std::error::Error;
914    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
915    /// use scylla::statement::batch::Batch;
916    ///
917    /// let mut batch: Batch = Default::default();
918    ///
919    /// // A statement with two bound values
920    /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)");
921    ///
922    /// // A statement with one bound value
923    /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(3, ?)");
924    ///
925    /// // A statement with no bound values
926    /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(5, 6)");
927    ///
928    /// // Batch values is a tuple of 3 tuples containing values for each statement
929    /// let batch_values = ((1_i32, 2_i32), // Tuple with two values for the first statement
930    ///                     (4_i32,),       // Tuple with one value for the second statement
931    ///                     ());            // Empty tuple/unit for the third statement
932    ///
933    /// // Run the batch
934    /// session.batch(&batch, batch_values).await?;
935    /// # Ok(())
936    /// # }
937    /// ```
938    pub async fn batch(
939        &self,
940        batch: &Batch,
941        values: impl BatchValues,
942    ) -> Result<QueryResult, ExecutionError> {
943        // Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
944        // If users batch statements by shard, they will be rewarded with full shard awareness
945
946        // check to ensure that we don't send a batch statement with more than u16::MAX queries
947        let batch_statements_length = batch.statements.len();
948        if batch_statements_length > u16::MAX as usize {
949            return Err(ExecutionError::BadQuery(
950                BadQuery::TooManyQueriesInBatchStatement(batch_statements_length),
951            ));
952        }
953
954        let execution_profile = batch
955            .get_execution_profile_handle()
956            .unwrap_or_else(|| self.get_default_execution_profile_handle())
957            .access();
958
959        let consistency = batch
960            .config
961            .consistency
962            .unwrap_or(execution_profile.consistency);
963
964        let serial_consistency = batch
965            .config
966            .serial_consistency
967            .unwrap_or(execution_profile.serial_consistency);
968
969        let (first_value_token, values) =
970            batch_values::peek_first_token(values, batch.statements.first())?;
971        let values_ref = &values;
972
973        let table_spec =
974            if let Some(BatchStatement::PreparedStatement(ps)) = batch.statements.first() {
975                ps.get_table_spec()
976            } else {
977                None
978            };
979
980        let statement_info = RoutingInfo {
981            consistency,
982            serial_consistency,
983            token: first_value_token,
984            table: table_spec,
985            is_confirmed_lwt: false,
986        };
987
988        let span = RequestSpan::new_batch();
989
990        let (run_request_result, coordinator): (
991            RunRequestResult<NonErrorQueryResponse>,
992            Coordinator,
993        ) = self
994            .run_request(
995                statement_info,
996                &batch.config,
997                execution_profile,
998                |connection: Arc<Connection>,
999                 consistency: Consistency,
1000                 execution_profile: &ExecutionProfileInner| {
1001                    let serial_consistency = batch
1002                        .config
1003                        .serial_consistency
1004                        .unwrap_or(execution_profile.serial_consistency);
1005                    async move {
1006                        connection
1007                            .batch_with_consistency(
1008                                batch,
1009                                values_ref,
1010                                consistency,
1011                                serial_consistency,
1012                            )
1013                            .await
1014                            .and_then(QueryResponse::into_non_error_query_response)
1015                    }
1016                },
1017                &span,
1018            )
1019            .instrument(span.span().clone())
1020            .await?;
1021
1022        let result = match run_request_result {
1023            RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(coordinator),
1024            RunRequestResult::Completed(non_error_query_response) => {
1025                let result = non_error_query_response.into_query_result(coordinator)?;
1026                span.record_result_fields(&result);
1027                result
1028            }
1029        };
1030
1031        Ok(result)
1032    }
1033
1034    /// Estabilishes a CQL session with the database
1035    ///
1036    /// Usually it's easier to use [SessionBuilder](crate::client::session_builder::SessionBuilder)
1037    /// instead of calling `Session::connect` directly, because it's more convenient.
1038    /// # Arguments
1039    /// * `config` - Connection configuration - known nodes, Compression, etc.
1040    ///   Must contain at least one known node.
1041    ///
1042    /// # Example
1043    /// ```rust
1044    /// # use std::error::Error;
1045    /// # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
1046    /// use scylla::client::session::{Session, SessionConfig};
1047    /// use scylla::cluster::KnownNode;
1048    ///
1049    /// let mut config = SessionConfig::new();
1050    /// config.known_nodes.push(KnownNode::Hostname("127.0.0.1:9042".to_string()));
1051    ///
1052    /// let session: Session = Session::connect(config).await?;
1053    /// # Ok(())
1054    /// # }
1055    /// ```
1056    pub async fn connect(config: SessionConfig) -> Result<Self, NewSessionError> {
1057        config.validate()?;
1058
1059        let known_nodes = config.known_nodes;
1060
1061        let (tablet_sender, tablet_receiver) = tokio::sync::mpsc::channel(TABLET_CHANNEL_SIZE);
1062
1063        let tls_provider = if let Some(tls_context) = config.tls_context {
1064            // To silence warnings when TlsContext is an empty enum (tls features are disabled).
1065            // In such case, TlsProvider is uninhabited.
1066            #[cfg_attr(
1067                not(any(feature = "openssl-010", feature = "rustls-023")),
1068                // TODO: make this expect() once MSRV is 1.92+.
1069                allow(unreachable_code, unused_variables)
1070            )]
1071            let provider = TlsProvider::new_with_global_context(tls_context);
1072            #[cfg_attr(
1073                not(any(feature = "openssl-010", feature = "rustls-023")),
1074                // TODO: remove this once MSRV is 1.92+.
1075                allow(unreachable_code)
1076            )]
1077            Some(provider)
1078        } else {
1079            None
1080        };
1081
1082        let connection_config = ConnectionConfig {
1083            local_ip_address: config.local_ip_address,
1084            shard_aware_local_port_range: config.shard_aware_local_port_range,
1085            compression: config.compression,
1086            tcp_nodelay: config.tcp_nodelay,
1087            tcp_keepalive_interval: config.tcp_keepalive_interval,
1088            tcp_recv_buffer_size: config.tcp_recv_buffer_size,
1089            tcp_send_buffer_size: config.tcp_send_buffer_size,
1090            tcp_reuse_address: config.tcp_reuse_address,
1091            tcp_linger: config.tcp_linger,
1092            timestamp_generator: config.timestamp_generator,
1093            tls_provider,
1094            authenticator: config.authenticator,
1095            connect_timeout: config.connect_timeout,
1096            event_sender: None,
1097            default_consistency: Default::default(),
1098            address_translator: config.address_translator,
1099            write_coalescing_delay: config
1100                .enable_write_coalescing
1101                .then_some(config.write_coalescing_delay),
1102            keepalive_interval: config.keepalive_interval,
1103            keepalive_timeout: config.keepalive_timeout,
1104            tablet_sender: Some(tablet_sender),
1105            identity: config.identity,
1106        };
1107
1108        let pool_config = PoolConfig {
1109            connection_config,
1110            pool_size: config.connection_pool_size,
1111            can_use_shard_aware_port: !config.disallow_shard_aware_port,
1112            #[cfg(all(scylla_unstable, feature = "unstable-reconnect-policy"))]
1113            reconnect_policy: config.reconnect_policy,
1114            #[cfg(not(all(scylla_unstable, feature = "unstable-reconnect-policy")))]
1115            reconnect_policy: Arc::new(ExponentialReconnectPolicy::new()),
1116        };
1117
1118        #[cfg(feature = "metrics")]
1119        let metrics = Arc::new(Metrics::new());
1120
1121        let host_listener = {
1122            #[cfg(all(scylla_unstable, feature = "unstable-host-listener"))]
1123            {
1124                config.host_listener
1125            }
1126            #[cfg(not(all(scylla_unstable, feature = "unstable-host-listener")))]
1127            {
1128                None
1129            }
1130        };
1131
1132        let client_routes_config: Option<ClientRoutesConfig> = {
1133            #[cfg(feature = "unstable-client-routes")]
1134            {
1135                config.client_routes_config
1136            }
1137            #[cfg(not(feature = "unstable-client-routes"))]
1138            {
1139                None
1140            }
1141        };
1142
1143        let cluster = Cluster::new(
1144            known_nodes,
1145            pool_config,
1146            config.keyspaces_to_fetch,
1147            config.fetch_schema_metadata,
1148            config.metadata_request_serverside_timeout,
1149            config.hostname_resolution_timeout,
1150            config.host_filter,
1151            host_listener,
1152            config.cluster_metadata_refresh_interval,
1153            tablet_receiver,
1154            #[cfg(feature = "metrics")]
1155            Arc::clone(&metrics),
1156            client_routes_config,
1157        )
1158        .await?;
1159
1160        let default_execution_profile_handle = config.default_execution_profile_handle;
1161
1162        let session = Self {
1163            cluster,
1164            default_execution_profile_handle,
1165            schema_agreement_interval: config.schema_agreement_interval,
1166            #[cfg(feature = "metrics")]
1167            metrics,
1168            schema_agreement_timeout: config.schema_agreement_timeout,
1169            schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
1170            refresh_metadata_on_auto_schema_agreement: config
1171                .refresh_metadata_on_auto_schema_agreement,
1172            keyspace_name: Arc::new(ArcSwapOption::default()), // will be set by use_keyspace
1173            tracing_info_fetch_attempts: config.tracing_info_fetch_attempts,
1174            tracing_info_fetch_interval: config.tracing_info_fetch_interval,
1175            tracing_info_fetch_consistency: config.tracing_info_fetch_consistency,
1176            internal_statements: InternalStatements::default(),
1177        };
1178
1179        if let Some(keyspace_name) = config.used_keyspace {
1180            session
1181                .use_keyspace(keyspace_name, config.keyspace_case_sensitive)
1182                .await?;
1183        }
1184
1185        Ok(session)
1186    }
1187
1188    /// Creates and returns ref to a shared statement for querying tracing session info.
1189    fn get_tracing_session_statement(&self) -> &Statement {
1190        self.internal_statements.tracing_session.get_or_init(|| {
1191            let mut stmt = Statement::new(crate::observability::tracing::TRACES_SESSION_QUERY_STR);
1192            stmt.set_page_size(crate::observability::tracing::TRACING_QUERY_PAGE_SIZE);
1193            stmt.set_consistency(self.tracing_info_fetch_consistency);
1194            stmt.set_is_idempotent(true);
1195            stmt
1196        })
1197    }
1198
1199    /// Creates and returns ref to a shared statement for querying tracing events.
1200    fn get_tracing_events_statement(&self) -> &Statement {
1201        self.internal_statements.tracing_events.get_or_init(|| {
1202            let mut stmt = Statement::new(crate::observability::tracing::TRACES_EVENTS_QUERY_STR);
1203            stmt.set_page_size(crate::observability::tracing::TRACING_QUERY_PAGE_SIZE);
1204            stmt.set_consistency(self.tracing_info_fetch_consistency);
1205            stmt.set_is_idempotent(true);
1206            stmt
1207        })
1208    }
1209
1210    /// Creates and returns ref to a shared statement for querying schema version.
1211    fn get_schema_version_statement(&self) -> &Statement {
1212        self.internal_statements.schema_version.get_or_init(|| {
1213            let mut statement = Statement::new(SCHEMA_VERSION_QUERY_STR);
1214            // Use ONE consistency for schema version queries - this is a local query
1215            // that reads from system.local, so ONE is appropriate.
1216            statement.set_consistency(Consistency::One);
1217            statement.set_is_idempotent(true);
1218            statement
1219        })
1220    }
1221
1222    async fn do_query_unpaged(
1223        &self,
1224        statement: &Statement,
1225        values: impl SerializeRow,
1226    ) -> Result<QueryResult, ExecutionError> {
1227        let (result, paging_state_response) = self
1228            .query(statement, values, None, PagingState::start())
1229            .await?;
1230        if !paging_state_response.finished() {
1231            error!(
1232                "Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug."
1233            );
1234            return Err(ExecutionError::LastAttemptError(
1235                RequestAttemptError::NonfinishedPagingState,
1236            ));
1237        }
1238        Ok(result)
1239    }
1240
1241    async fn do_query_single_page(
1242        &self,
1243        statement: &Statement,
1244        values: impl SerializeRow,
1245        paging_state: PagingState,
1246    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1247        self.query(
1248            statement,
1249            values,
1250            Some(statement.get_validated_page_size()),
1251            paging_state,
1252        )
1253        .await
1254    }
1255
1256    /// Sends a request to the database.
1257    /// Optionally continues fetching results from a saved point.
1258    ///
1259    /// This is now an internal method only.
1260    ///
1261    /// Tl;dr: use [Session::query_unpaged], [Session::query_single_page] or [Session::query_iter] instead.
1262    ///
1263    /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1264    /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1265    /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1266    /// should be made.
1267    async fn query(
1268        &self,
1269        statement: &Statement,
1270        values: impl SerializeRow,
1271        page_size: Option<PageSize>,
1272        paging_state: PagingState,
1273    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1274        let execution_profile = statement
1275            .get_execution_profile_handle()
1276            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1277            .access();
1278
1279        let statement_info = RoutingInfo {
1280            consistency: statement
1281                .config
1282                .consistency
1283                .unwrap_or(execution_profile.consistency),
1284            serial_consistency: statement
1285                .config
1286                .serial_consistency
1287                .unwrap_or(execution_profile.serial_consistency),
1288            ..Default::default()
1289        };
1290
1291        let span = RequestSpan::new_query(&statement.contents);
1292        let span_ref = &span;
1293        let (run_request_result, coordinator): (
1294            RunRequestResult<NonErrorQueryResponse>,
1295            Coordinator,
1296        ) = self
1297            .run_request(
1298                statement_info,
1299                &statement.config,
1300                execution_profile,
1301                |connection: Arc<Connection>,
1302                 consistency: Consistency,
1303                 execution_profile: &ExecutionProfileInner| {
1304                    let serial_consistency = statement
1305                        .config
1306                        .serial_consistency
1307                        .unwrap_or(execution_profile.serial_consistency);
1308                    // Needed to avoid moving query and values into async move block
1309                    let values_ref = &values;
1310                    let paging_state_ref = &paging_state;
1311                    async move {
1312                        if values_ref.is_empty() {
1313                            span_ref.record_request_size(0);
1314                            connection
1315                                .query_raw_with_consistency(
1316                                    statement,
1317                                    consistency,
1318                                    serial_consistency,
1319                                    page_size,
1320                                    paging_state_ref.clone(),
1321                                )
1322                                .await
1323                                .and_then(QueryResponse::into_non_error_query_response)
1324                        } else {
1325                            let prepared = connection.prepare(statement).await?;
1326                            let serialized = prepared.serialize_values(values_ref)?;
1327                            span_ref.record_request_size(serialized.buffer_size());
1328                            connection
1329                                .execute_raw_with_consistency(
1330                                    &prepared,
1331                                    &serialized,
1332                                    consistency,
1333                                    serial_consistency,
1334                                    page_size,
1335                                    paging_state_ref.clone(),
1336                                )
1337                                .await
1338                                .and_then(QueryResponse::into_non_error_query_response)
1339                        }
1340                    }
1341                },
1342                &span,
1343            )
1344            .instrument(span.span().clone())
1345            .await?;
1346
1347        let response = match run_request_result {
1348            RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1349                response: NonErrorResponseWithDeserializedMetadata::Result(
1350                    result::ResultWithDeserializedMetadata::Void,
1351                ),
1352                tracing_id: None,
1353                warnings: Vec::new(),
1354            },
1355            RunRequestResult::Completed(response) => response,
1356        };
1357
1358        let (result, paging_state_response) =
1359            response.into_query_result_and_paging_state(coordinator)?;
1360        span.record_result_fields(&result);
1361
1362        Ok((result, paging_state_response))
1363    }
1364
1365    pub(crate) async fn handle_set_keyspace_response(
1366        &self,
1367        response: &NonErrorQueryResponse,
1368    ) -> Result<(), UseKeyspaceError> {
1369        if let Some(set_keyspace) = response.as_set_keyspace() {
1370            debug!(
1371                "Detected USE KEYSPACE query, setting session's keyspace to {}",
1372                set_keyspace.keyspace_name
1373            );
1374            self.use_keyspace(set_keyspace.keyspace_name.clone(), true)
1375                .await?;
1376        }
1377
1378        Ok(())
1379    }
1380}
1381
1382#[derive(Debug, Error)]
1383/// Errors that can occur during automatic awaiting of schema agreement after a schema change.
1384pub(crate) enum AutoSchemaAwaitingError {
1385    /// Schema agreement could not be reached.
1386    #[error("Schema agreement could not be reached: {0}")]
1387    SchemaAgreement(#[from] SchemaAgreementError),
1388    /// Metadata refresh after reaching schema agreement failed.
1389    #[error("Metadata refresh after reaching schema agreement failed: {0}")]
1390    MetadataRefresh(#[from] MetadataError),
1391}
1392
1393impl From<AutoSchemaAwaitingError> for ExecutionError {
1394    fn from(err: AutoSchemaAwaitingError) -> Self {
1395        match err {
1396            AutoSchemaAwaitingError::SchemaAgreement(e) => e.into(),
1397            AutoSchemaAwaitingError::MetadataRefresh(e) => e.into(),
1398        }
1399    }
1400}
1401
1402impl Session {
1403    pub(crate) async fn handle_auto_await_schema_agreement(
1404        &self,
1405        response: &NonErrorQueryResponse,
1406        coordinator_id: Uuid,
1407    ) -> Result<(), AutoSchemaAwaitingError> {
1408        if self.schema_agreement_automatic_waiting && response.as_schema_change().is_some() {
1409            debug!("Detected schema change, so awaiting schema agreement automatically...");
1410            self.await_schema_agreement_with_required_node(Some(coordinator_id))
1411                .await?;
1412            debug!("Auto schema agreement awaiting: schema agreement reached.",);
1413
1414            if self.refresh_metadata_on_auto_schema_agreement {
1415                self.refresh_metadata().await?;
1416            }
1417        }
1418
1419        Ok(())
1420    }
1421
1422    async fn do_query_iter(
1423        &self,
1424        statement: Statement,
1425        values: impl SerializeRow,
1426    ) -> Result<QueryPager, PagerExecutionError> {
1427        if values.is_empty() {
1428            self.do_query_iter_without_values(statement).await
1429        } else {
1430            // Making QueryPager::new_for_query work with values is too hard (if even possible)
1431            // so instead of sending one prepare to a specific connection on each iterator query,
1432            // we fully prepare a statement beforehand.
1433            let prepared = self.prepare_nongeneric(&statement).await?;
1434            let values = prepared.serialize_values(&values)?;
1435            self.execute_iter_nongeneric(prepared, values).await
1436        }
1437    }
1438
1439    /// `Session::query_iter` specialization for empty values.
1440    async fn do_query_iter_without_values(
1441        &self,
1442        statement: Statement,
1443    ) -> Result<QueryPager, PagerExecutionError> {
1444        let execution_profile = statement
1445            .get_execution_profile_handle()
1446            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1447            .access();
1448
1449        QueryPager::new_for_query(
1450            self,
1451            statement,
1452            execution_profile,
1453            self.cluster.get_state(),
1454            #[cfg(feature = "metrics")]
1455            Arc::clone(&self.metrics),
1456        )
1457        .await
1458    }
1459
1460    /// Prepares a statement on the server side and returns a prepared statement,
1461    /// which can later be used to perform more efficient requests.
1462    ///
1463    /// The statement is prepared on all nodes. This function finishes once all nodes respond
1464    /// with either success or an error.
1465    ///
1466    /// Prepared statements are much faster than unprepared statements:
1467    /// * Database doesn't need to parse the statement string upon each execution (only once)
1468    /// * They are properly load balanced using token aware routing
1469    ///
1470    /// <div class="warning">
1471    ///
1472    /// **Warning!**
1473    ///
1474    /// **Prepare a statement once** (e.g., store it in a variable, static, or struct field),
1475    /// then **execute it multiple times** with different values.
1476    /// Do **NOT** call `prepare()` repeatedly for the same statement before each execution -
1477    /// this defeats the purpose of prepared statements and significantly degrades performance.
1478    ///
1479    /// </div>
1480    ///
1481    /// <div class="warning">
1482    ///
1483    /// **Warning!**
1484    ///
1485    /// For token/shard aware load balancing to work properly, all partition key values
1486    /// must be sent as bound values
1487    /// (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance))
1488    ///
1489    /// </div>
1490    ///
1491    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
1492    /// See the documentation of [`PreparedStatement`].
1493    ///
1494    /// # Arguments
1495    /// * `statement` - statement to prepare, can be just a `&str` or the [`Statement`] struct.
1496    ///
1497    /// # Example
1498    /// ```rust
1499    /// # use scylla::client::session::Session;
1500    /// # use std::error::Error;
1501    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1502    /// use scylla::statement::prepared::PreparedStatement;
1503    ///
1504    /// // Prepare the statement ONCE for later execution
1505    /// let prepared: PreparedStatement = session
1506    ///     .prepare("INSERT INTO ks.tab (a) VALUES(?)")
1507    ///     .await?;
1508    ///
1509    /// // Execute the prepared statement multiple times
1510    /// let to_insert: i32 = 12345;
1511    /// session.execute_unpaged(&prepared, (to_insert,)).await?;
1512    ///
1513    /// let to_insert: i32 = 67890;
1514    /// session.execute_unpaged(&prepared, (to_insert,)).await?;
1515    /// # Ok(())
1516    /// # }
1517    /// ```
1518    pub async fn prepare(
1519        &self,
1520        statement: impl Into<Statement>,
1521    ) -> Result<PreparedStatement, PrepareError> {
1522        let statement = statement.into();
1523        self.prepare_nongeneric(&statement).await
1524    }
1525
1526    // Introduced to avoid monomorphisation of this large function.
1527    async fn prepare_nongeneric(
1528        &self,
1529        statement: &Statement,
1530    ) -> Result<PreparedStatement, PrepareError> {
1531        let cluster_state = self.get_cluster_state();
1532
1533        // Start by attempting preparation on a single (random) connection to every node.
1534        {
1535            let mut connections_to_nodes = cluster_state.iter_working_connections_to_nodes()?;
1536            let on_all_nodes_result =
1537                Self::prepare_on_all(statement, &cluster_state, &mut connections_to_nodes).await;
1538            if let Ok(prepared) = on_all_nodes_result {
1539                // We succeeded in preparing the statement on at least one node. We're done.
1540                // Other nodes could have failed to prepare the statement, but this will be handled
1541                // as `DbError::Unprepared` upon execution, followed by a repreparation attempt.
1542                return Ok(prepared);
1543            }
1544        }
1545
1546        // We could have been just unlucky: we could have possibly chosen random connections all of which were defunct
1547        // (one possibility is that we targeted overloaded shards).
1548        // Let's try again, this time on connections to every shard. This is a "last call" fallback.
1549        {
1550            let mut connections_to_shards = cluster_state.iter_working_connections_to_shards()?;
1551
1552            Self::prepare_on_all(statement, &cluster_state, &mut connections_to_shards).await
1553        }
1554    }
1555
1556    /// Prepares the statement on all given connections.
1557    /// These are intended to be connections to either all nodes or all shards.
1558    ///
1559    /// ASSUMPTION: the `working_connections` Iterator is nonempty.
1560    ///
1561    /// Returns:
1562    /// - `Ok(PreparedStatement)`, if preparation succeeded on at least one connection;
1563    /// - `Err(PrepareError)`, if no connection is working or preparation failed on all attempted connections.
1564    // TODO: There are no timeouts here. So, just one stuck node freezes the driver here, potentially indefinitely long.
1565    // Also, what the driver requires to get from the cluster is the prepared statement metadata.
1566    // It suffices that it gets only one copy of it, just from one success response. Therefore, it's a possible
1567    // optimisation that the function only waits for one preparation to finish successfully, and then it returns.
1568    // For it to be done, other preparations must continue in the background, on a separate tokio task.
1569    // Describing issue: #1332.
1570    async fn prepare_on_all(
1571        statement: &Statement,
1572        cluster_state: &ClusterState,
1573        working_connections: &mut (dyn Iterator<Item = Arc<Connection>> + Send),
1574    ) -> Result<PreparedStatement, PrepareError> {
1575        // Find the first result that is Ok, or Err if all failed.
1576        let preparations =
1577            working_connections.map(|c| async move { c.prepare_raw(statement).await });
1578        let raw_prepared_statements_results = join_all(preparations).await;
1579
1580        let mut raw_prepared_statements_results_iter = raw_prepared_statements_results.into_iter();
1581
1582        // Safety: We pass a nonempty iterator, so there will be at least one result.
1583        let first_ok_or_error = raw_prepared_statements_results_iter
1584            .by_ref()
1585            .find_or_first(|res| res.is_ok())
1586            .unwrap(); // Safety: there is at least one connection.
1587
1588        let mut prepared: PreparedStatement = first_ok_or_error
1589            .map_err(|first_attempt| PrepareError::AllAttemptsFailed { first_attempt })?
1590            .into_prepared_statement();
1591
1592        // Validate prepared ids equality.
1593        for another_raw_prepared in raw_prepared_statements_results_iter.flatten() {
1594            if prepared.get_id() != another_raw_prepared.get_id() {
1595                tracing::error!(
1596                    "Got differing ids upon statement preparation: statement \"{}\", id1: {:?}, id2: {:?}",
1597                    prepared.get_statement(),
1598                    prepared.get_id(),
1599                    another_raw_prepared.get_id()
1600                );
1601                return Err(PrepareError::PreparedStatementIdsMismatch);
1602            }
1603
1604            // Collect all tracing ids from prepare() queries in the final result
1605            prepared
1606                .prepare_tracing_ids
1607                .extend(another_raw_prepared.tracing_id());
1608        }
1609
1610        // This is the first preparation that succeeded.
1611        // Let's return the PreparedStatement.
1612        prepared.set_partitioner_name(
1613            Self::extract_partitioner_name(&prepared, cluster_state)
1614                .and_then(PartitionerName::from_str)
1615                .unwrap_or_default(),
1616        );
1617
1618        Ok(prepared)
1619    }
1620
1621    fn extract_partitioner_name<'a>(
1622        prepared: &PreparedStatement,
1623        cluster_state: &'a ClusterState,
1624    ) -> Option<&'a str> {
1625        let table_spec = prepared.get_table_spec()?;
1626        cluster_state
1627            .keyspaces
1628            .get(table_spec.ks_name())?
1629            .tables
1630            .get(table_spec.table_name())?
1631            .partitioner
1632            .as_deref()
1633    }
1634
1635    /// Sends a prepared request to the database, optionally continuing from a saved point.
1636    ///
1637    /// This is now an internal method only.
1638    ///
1639    /// Tl;dr: use [Session::execute_unpaged], [Session::execute_single_page] or [Session::execute_iter] instead.
1640    ///
1641    /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1642    /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1643    /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1644    /// should be made.
1645    async fn execute(
1646        &self,
1647        prepared: &PreparedStatement,
1648        serialized_values: &SerializedValues,
1649        page_size: Option<PageSize>,
1650        paging_state: PagingState,
1651    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1652        let paging_state_ref = &paging_state;
1653
1654        let (partition_key, token) = prepared
1655            .extract_partition_key_and_calculate_token(
1656                prepared.get_partitioner_name(),
1657                serialized_values,
1658            )
1659            .map_err(PartitionKeyError::into_execution_error)?
1660            .unzip();
1661
1662        let execution_profile = prepared
1663            .get_execution_profile_handle()
1664            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1665            .access();
1666
1667        let table_spec = prepared.get_table_spec();
1668
1669        let statement_info = RoutingInfo {
1670            consistency: prepared
1671                .config
1672                .consistency
1673                .unwrap_or(execution_profile.consistency),
1674            serial_consistency: prepared
1675                .config
1676                .serial_consistency
1677                .unwrap_or(execution_profile.serial_consistency),
1678            token,
1679            table: table_spec,
1680            is_confirmed_lwt: prepared.is_confirmed_lwt(),
1681        };
1682
1683        let span = RequestSpan::new_prepared(
1684            partition_key.as_ref().map(|pk| pk.iter()),
1685            token,
1686            serialized_values.buffer_size(),
1687        );
1688
1689        if !span.span().is_disabled()
1690            && let (Some(table_spec), Some(token)) = (statement_info.table, token)
1691        {
1692            let cluster_state = self.get_cluster_state();
1693            let replicas = cluster_state.get_token_endpoints_iter(table_spec, token);
1694            span.record_replicas(replicas)
1695        }
1696
1697        let (run_request_result, coordinator): (
1698            RunRequestResult<NonErrorQueryResponse>,
1699            Coordinator,
1700        ) = self
1701            .run_request(
1702                statement_info,
1703                &prepared.config,
1704                execution_profile,
1705                |connection: Arc<Connection>,
1706                 consistency: Consistency,
1707                 execution_profile: &ExecutionProfileInner| {
1708                    let serial_consistency = prepared
1709                        .config
1710                        .serial_consistency
1711                        .unwrap_or(execution_profile.serial_consistency);
1712                    async move {
1713                        connection
1714                            .execute_raw_with_consistency(
1715                                prepared,
1716                                serialized_values,
1717                                consistency,
1718                                serial_consistency,
1719                                page_size,
1720                                paging_state_ref.clone(),
1721                            )
1722                            .await
1723                            .and_then(QueryResponse::into_non_error_query_response)
1724                    }
1725                },
1726                &span,
1727            )
1728            .instrument(span.span().clone())
1729            .await?;
1730
1731        let response = match run_request_result {
1732            RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1733                response: NonErrorResponseWithDeserializedMetadata::Result(
1734                    result::ResultWithDeserializedMetadata::Void,
1735                ),
1736                tracing_id: None,
1737                warnings: Vec::new(),
1738            },
1739            RunRequestResult::Completed(response) => response,
1740        };
1741
1742        let (result, paging_state_response) =
1743            response.into_query_result_and_paging_state(coordinator)?;
1744        span.record_result_fields(&result);
1745
1746        Ok((result, paging_state_response))
1747    }
1748
1749    /// Does the same as [`Session::execute_iter`], but without generics.
1750    /// This is to reduce code bloat.
1751    ///
1752    /// Additionally, this function accepts only `SerializedValues`,
1753    /// so the caller is responsible for serializing the values beforehand.
1754    /// This:
1755    /// - on one hand introduces a risk of misusing the API (passing values
1756    ///   that don't match the prepared statement) - therefore this API
1757    ///   must not be leaked to end users;
1758    /// - on the other hand allows manual preserialization of values, which
1759    ///   we need in wrapper drivers (e.g. C#-RS Driver) - for them we expose
1760    ///   a dedicated API endpoint using this function, conditionally compiled
1761    ///   if special compile flag is passed.
1762    async fn execute_iter_nongeneric(
1763        &self,
1764        prepared: PreparedStatement,
1765        values: SerializedValues,
1766    ) -> Result<QueryPager, PagerExecutionError> {
1767        let execution_profile = prepared
1768            .get_execution_profile_handle()
1769            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1770            .access();
1771
1772        QueryPager::new_for_prepared_statement(
1773            self,
1774            PreparedPagerConfig {
1775                prepared,
1776                values,
1777                execution_profile,
1778                cluster_state: self.cluster.get_state(),
1779                #[cfg(feature = "metrics")]
1780                metrics: Arc::clone(&self.metrics),
1781            },
1782        )
1783        .await
1784    }
1785
1786    /// Prepares all statements within the batch and returns a new batch where every
1787    /// statement is prepared.
1788    ///
1789    /// # Example
1790    /// ```rust
1791    /// # extern crate scylla;
1792    /// # use scylla::client::session::Session;
1793    /// # use std::error::Error;
1794    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1795    /// use scylla::statement::batch::Batch;
1796    ///
1797    /// // Create a batch statement with unprepared statements
1798    /// let mut batch: Batch = Default::default();
1799    /// batch.append_statement("INSERT INTO ks.simple_unprepared1 VALUES(?, ?)");
1800    /// batch.append_statement("INSERT INTO ks.simple_unprepared2 VALUES(?, ?)");
1801    ///
1802    /// // Prepare all statements in the batch at once
1803    /// let prepared_batch: Batch = session.prepare_batch(&batch).await?;
1804    ///
1805    /// // Specify bound values to use with each statement
1806    /// let batch_values = ((1_i32, 2_i32),
1807    ///                     (3_i32, 4_i32));
1808    ///
1809    /// // Run the prepared batch
1810    /// session.batch(&prepared_batch, batch_values).await?;
1811    /// # Ok(())
1812    /// # }
1813    /// ```
1814    pub async fn prepare_batch(&self, batch: &Batch) -> Result<Batch, PrepareError> {
1815        let mut prepared_batch = batch.clone();
1816
1817        try_join_all(
1818            prepared_batch
1819                .statements
1820                .iter_mut()
1821                .map(|statement| async move {
1822                    if let BatchStatement::Query(query) = statement {
1823                        let prepared = self.prepare_nongeneric(query).await?;
1824                        *statement = BatchStatement::PreparedStatement(prepared);
1825                    }
1826                    Ok::<(), PrepareError>(())
1827                }),
1828        )
1829        .await?;
1830
1831        Ok(prepared_batch)
1832    }
1833
1834    /// Sends `USE <keyspace_name>` request on all connections\
1835    /// This allows to write `SELECT * FROM table` instead of `SELECT * FROM keyspace.table`\
1836    ///
1837    /// Note that even failed `use_keyspace` can change currently used keyspace - the request is sent on all connections and
1838    /// can overwrite previously used keyspace.
1839    ///
1840    /// Call only one `use_keyspace` at a time.\
1841    /// Trying to do two `use_keyspace` requests simultaneously with different names
1842    /// can end with some connections using one keyspace and the rest using the other.
1843    ///
1844    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/usekeyspace.html) for more information
1845    ///
1846    /// # Arguments
1847    ///
1848    /// * `keyspace_name` - keyspace name to use,
1849    ///   keyspace names can have up to 48 alphanumeric characters and contain underscores
1850    /// * `case_sensitive` - if set to true the generated statement will put keyspace name in quotes
1851    /// # Example
1852    /// ```rust
1853    /// # use scylla::client::session::Session;
1854    /// # use scylla::client::session_builder::SessionBuilder;
1855    /// # use scylla::client::Compression;
1856    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1857    /// # let session = SessionBuilder::new().known_node("127.0.0.1:9042").build().await?;
1858    /// session
1859    ///     .query_unpaged("INSERT INTO my_keyspace.tab (a) VALUES ('test1')", &[])
1860    ///     .await?;
1861    ///
1862    /// session.use_keyspace("my_keyspace", false).await?;
1863    ///
1864    /// // Now we can omit keyspace name in the statement
1865    /// session
1866    ///     .query_unpaged("INSERT INTO tab (a) VALUES ('test2')", &[])
1867    ///     .await?;
1868    /// # Ok(())
1869    /// # }
1870    /// ```
1871    pub async fn use_keyspace(
1872        &self,
1873        keyspace_name: impl Into<String>,
1874        case_sensitive: bool,
1875    ) -> Result<(), UseKeyspaceError> {
1876        let keyspace_name = keyspace_name.into();
1877        self.keyspace_name
1878            .store(Some(Arc::new(keyspace_name.clone())));
1879
1880        // Trying to pass keyspace as bound value in "USE ?" doesn't work
1881        // So we have to create a string for query: "USE " + new_keyspace
1882        // To avoid any possible CQL injections it's good to verify that the name is valid
1883        let verified_ks_name = VerifiedKeyspaceName::new(keyspace_name, case_sensitive)?;
1884
1885        self.cluster.use_keyspace(verified_ks_name).await
1886    }
1887
1888    /// Manually trigger a metadata refresh\
1889    /// The driver will fetch current nodes in the cluster and update its metadata
1890    ///
1891    /// Normally this is not needed,
1892    /// the driver should automatically detect all metadata changes in the cluster
1893    pub async fn refresh_metadata(&self) -> Result<(), MetadataError> {
1894        debug!("Session: requested metadata refresh");
1895        let res = self.cluster.refresh_metadata().await;
1896        debug!("Session: finished metadata refresh");
1897        res
1898    }
1899
1900    /// Access metrics collected by the driver\
1901    /// Driver collects various metrics like number of queries or query latencies.
1902    /// They can be read using this method
1903    #[cfg(feature = "metrics")]
1904    pub fn get_metrics(&self) -> Arc<Metrics> {
1905        Arc::clone(&self.metrics)
1906    }
1907
1908    /// Access cluster state visible by the driver.
1909    ///
1910    /// Driver collects various information about network topology or schema.
1911    /// It can be read using this method.
1912    pub fn get_cluster_state(&self) -> Arc<ClusterState> {
1913        self.cluster.get_state()
1914    }
1915
1916    /// Get [`TracingInfo`] of a traced query performed earlier
1917    ///
1918    /// See [the book](https://rust-driver.docs.scylladb.com/stable/tracing/tracing.html)
1919    /// for more information about query tracing
1920    pub async fn get_tracing_info(&self, tracing_id: &Uuid) -> Result<TracingInfo, TracingError> {
1921        // tracing_info_fetch_attempts is NonZeroU32 so at least one attempt will be made
1922        for _ in 0..self.tracing_info_fetch_attempts.get() {
1923            let current_try: Option<TracingInfo> =
1924                self.try_getting_tracing_info(tracing_id).await?;
1925
1926            match current_try {
1927                Some(tracing_info) => return Ok(tracing_info),
1928                None => tokio::time::sleep(self.tracing_info_fetch_interval).await,
1929            };
1930        }
1931
1932        Err(TracingError::EmptyResults)
1933    }
1934
1935    /// Gets the name of the keyspace that is currently set, or `None` if no
1936    /// keyspace was set.
1937    ///
1938    /// It will initially return the name of the keyspace that was set
1939    /// in the session configuration, but calling `use_keyspace` will update
1940    /// it.
1941    ///
1942    /// Note: the return value might be wrong if `use_keyspace` was called
1943    /// concurrently or it previously failed. It is also unspecified
1944    /// if `get_keyspace` is called concurrently with `use_keyspace`.
1945    #[inline]
1946    pub fn get_keyspace(&self) -> Option<Arc<String>> {
1947        self.keyspace_name.load_full()
1948    }
1949
1950    // Tries getting the tracing info
1951    // If the queries return 0 rows then returns None - the information didn't reach this node yet
1952    // If there is some other error returns this error
1953    async fn try_getting_tracing_info(
1954        &self,
1955        tracing_id: &Uuid,
1956    ) -> Result<Option<TracingInfo>, TracingError> {
1957        // Get statements for tracing queries.
1958        // Consistency is set during construction based on session's tracing_info_fetch_consistency.
1959        let traces_session_stmt = self.get_tracing_session_statement();
1960        let traces_events_stmt = self.get_tracing_events_statement();
1961
1962        // Using non-public do_query_unpaged allows us to avoid cloning.
1963        let (traces_session_res, traces_events_res) = tokio::try_join!(
1964            self.do_query_unpaged(traces_session_stmt, (tracing_id,)),
1965            self.do_query_unpaged(traces_events_stmt, (tracing_id,))
1966        )?;
1967
1968        // Get tracing info
1969        let maybe_tracing_info: Option<TracingInfo> = traces_session_res
1970            .into_rows_result()
1971            .map_err(TracingError::TracesSessionIntoRowsResultError)?
1972            .maybe_first_row()
1973            .map_err(|err| match err {
1974                MaybeFirstRowError::TypeCheckFailed(e) => {
1975                    TracingError::TracesSessionInvalidColumnType(e)
1976                }
1977                MaybeFirstRowError::DeserializationFailed(e) => {
1978                    TracingError::TracesSessionDeserializationFailed(e)
1979                }
1980            })?;
1981
1982        let mut tracing_info = match maybe_tracing_info {
1983            None => return Ok(None),
1984            Some(tracing_info) => tracing_info,
1985        };
1986
1987        // Get tracing events
1988        let tracing_event_rows_result = traces_events_res
1989            .into_rows_result()
1990            .map_err(TracingError::TracesEventsIntoRowsResultError)?;
1991        let tracing_event_rows = tracing_event_rows_result.rows().map_err(|err| match err {
1992            RowsError::TypeCheckFailed(err) => TracingError::TracesEventsInvalidColumnType(err),
1993        })?;
1994
1995        tracing_info.events = tracing_event_rows
1996            .collect::<Result<_, _>>()
1997            .map_err(TracingError::TracesEventsDeserializationFailed)?;
1998
1999        if tracing_info.events.is_empty() {
2000            return Ok(None);
2001        }
2002
2003        Ok(Some(tracing_info))
2004    }
2005
2006    /// This method allows to easily run a request using load balancing, retry policy etc.
2007    /// Requires some information about the request and a closure.
2008    /// The closure is used to execute the request once on a chosen connection.
2009    /// - query will use connection.query()
2010    /// - execute will use connection.execute()
2011    ///
2012    /// If this closure fails with some errors, retry policy is used to perform retries.
2013    /// On success, this request's result is returned.
2014    // I tried to make this closures take a reference instead of an Arc but failed
2015    // maybe once async closures get stabilized this can be fixed
2016    async fn run_request<'a, QueryFut>(
2017        &'a self,
2018        statement_info: RoutingInfo<'a>,
2019        statement_config: &'a StatementConfig,
2020        execution_profile: Arc<ExecutionProfileInner>,
2021        run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
2022        request_span: &'a RequestSpan,
2023    ) -> Result<(RunRequestResult<NonErrorQueryResponse>, Coordinator), ExecutionError>
2024    where
2025        QueryFut: Future<Output = Result<NonErrorQueryResponse, RequestAttemptError>>,
2026    {
2027        let history_listener_and_id: Option<(&'a dyn HistoryListener, history::RequestId)> =
2028            statement_config
2029                .history_listener
2030                .as_ref()
2031                .map(|hl| (&**hl, hl.log_request_start()));
2032
2033        let load_balancer = statement_config
2034            .load_balancing_policy
2035            .as_deref()
2036            .unwrap_or(execution_profile.load_balancing_policy.as_ref());
2037
2038        let runner = async {
2039            let cluster_state = self.cluster.get_state();
2040            let request_plan =
2041                load_balancing::Plan::new(load_balancer, &statement_info, &cluster_state);
2042
2043            // If a speculative execution policy is used to run request, request_plan has to be shared
2044            // between different async functions. This struct helps to wrap request_plan in mutex so it
2045            // can be shared safely.
2046            struct SharedPlan<'a, I>
2047            where
2048                I: Iterator<Item = (NodeRef<'a>, Shard)>,
2049            {
2050                iter: std::sync::Mutex<I>,
2051            }
2052
2053            impl<'a, I> Iterator for &SharedPlan<'a, I>
2054            where
2055                I: Iterator<Item = (NodeRef<'a>, Shard)>,
2056            {
2057                type Item = (NodeRef<'a>, Shard);
2058
2059                fn next(&mut self) -> Option<Self::Item> {
2060                    self.iter.lock().unwrap().next()
2061                }
2062            }
2063
2064            let retry_policy = statement_config
2065                .retry_policy
2066                .as_deref()
2067                .unwrap_or(&*execution_profile.retry_policy);
2068
2069            let speculative_policy = execution_profile.speculative_execution_policy.as_ref();
2070
2071            match speculative_policy {
2072                Some(speculative) if statement_config.is_idempotent => {
2073                    let shared_request_plan = SharedPlan {
2074                        iter: std::sync::Mutex::new(request_plan),
2075                    };
2076
2077                    let request_runner_generator = |is_speculative: bool| {
2078                        let history_data: Option<HistoryData> = history_listener_and_id
2079                            .as_ref()
2080                            .map(|(history_listener, request_id)| {
2081                                let speculative_id: Option<history::SpeculativeId> =
2082                                    if is_speculative {
2083                                        Some(
2084                                            history_listener.log_new_speculative_fiber(*request_id),
2085                                        )
2086                                    } else {
2087                                        None
2088                                    };
2089                                HistoryData {
2090                                    listener: *history_listener,
2091                                    request_id: *request_id,
2092                                    speculative_id,
2093                                }
2094                            });
2095
2096                        if is_speculative {
2097                            request_span.inc_speculative_executions();
2098                        }
2099
2100                        self.run_request_speculative_fiber(
2101                            &shared_request_plan,
2102                            &run_request_once,
2103                            &execution_profile,
2104                            ExecuteRequestContext {
2105                                is_idempotent: statement_config.is_idempotent,
2106                                consistency_set_on_statement: statement_config.consistency,
2107                                retry_session: retry_policy.new_session(),
2108                                history_data,
2109                                load_balancing_policy: load_balancer,
2110                                query_info: &statement_info,
2111                                request_span,
2112                            },
2113                        )
2114                    };
2115
2116                    let context = speculative_execution::Context {
2117                        #[cfg(feature = "metrics")]
2118                        metrics: Arc::clone(&self.metrics),
2119                    };
2120
2121                    speculative_execution::execute(
2122                        speculative.as_ref(),
2123                        &context,
2124                        request_runner_generator,
2125                    )
2126                    .await
2127                }
2128                _ => {
2129                    let history_data: Option<HistoryData> =
2130                        history_listener_and_id
2131                            .as_ref()
2132                            .map(|(history_listener, request_id)| HistoryData {
2133                                listener: *history_listener,
2134                                request_id: *request_id,
2135                                speculative_id: None,
2136                            });
2137                    self.run_request_speculative_fiber(
2138                        request_plan,
2139                        &run_request_once,
2140                        &execution_profile,
2141                        ExecuteRequestContext {
2142                            is_idempotent: statement_config.is_idempotent,
2143                            consistency_set_on_statement: statement_config.consistency,
2144                            retry_session: retry_policy.new_session(),
2145                            history_data,
2146                            load_balancing_policy: load_balancer,
2147                            query_info: &statement_info,
2148                            request_span,
2149                        },
2150                    )
2151                    .await
2152                    .unwrap_or(Err(RequestError::EmptyPlan))
2153                }
2154            }
2155        };
2156
2157        let effective_timeout = statement_config
2158            .request_timeout
2159            .or(execution_profile.request_timeout);
2160        let result = match effective_timeout {
2161            Some(timeout) => tokio::time::timeout(timeout, runner).await.unwrap_or_else(
2162                |_: tokio::time::error::Elapsed| {
2163                    #[cfg(feature = "metrics")]
2164                    self.metrics.inc_request_timeouts();
2165
2166                    let timeout_error = RequestError::RequestTimeout(timeout);
2167                    trace!(
2168                        parent: request_span.span(),
2169                        error = %timeout_error,
2170                        "Request timed out"
2171                    );
2172                    Err(timeout_error)
2173                },
2174            ),
2175            None => runner.await,
2176        };
2177
2178        if let Some((history_listener, request_id)) = history_listener_and_id {
2179            match &result {
2180                Ok(_) => history_listener.log_request_success(request_id),
2181                Err(e) => history_listener.log_request_error(request_id, e),
2182            }
2183        }
2184
2185        // Automatically handle meaningful responses.
2186        if let Ok((RunRequestResult::Completed(ref response), ref coordinator)) = result {
2187            self.handle_set_keyspace_response(response).await?;
2188            self.handle_auto_await_schema_agreement(response, coordinator.node().host_id)
2189                .await?;
2190        }
2191
2192        result.map_err(RequestError::into_execution_error)
2193    }
2194
2195    /// Executes the closure `run_request_once`, provided the load balancing plan and some information
2196    /// about the request, including retry session.
2197    /// If request fails, retry session is used to perform retries.
2198    ///
2199    /// Returns None, if provided plan is empty.
2200    async fn run_request_speculative_fiber<'a, QueryFut>(
2201        &'a self,
2202        request_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
2203        run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
2204        execution_profile: &ExecutionProfileInner,
2205        mut context: ExecuteRequestContext<'a>,
2206    ) -> Option<Result<(RunRequestResult<NonErrorQueryResponse>, Coordinator), RequestError>>
2207    where
2208        QueryFut: Future<Output = Result<NonErrorQueryResponse, RequestAttemptError>>,
2209    {
2210        let mut last_error: Option<RequestError> = None;
2211        let mut current_consistency: Consistency = context
2212            .consistency_set_on_statement
2213            .unwrap_or(execution_profile.consistency);
2214
2215        'nodes_in_plan: for (node, shard) in request_plan {
2216            let span = trace_span!("Executing request", node = %node.address, shard = %shard);
2217            'same_node_retries: loop {
2218                trace!(parent: &span, "Execution started");
2219                let connection = match node.connection_for_shard(shard).await {
2220                    Ok(connection) => connection,
2221                    Err(e) => {
2222                        trace!(
2223                            parent: &span,
2224                            error = %e,
2225                            "Choosing connection failed"
2226                        );
2227                        last_error = Some(e.into());
2228                        // Broken connection doesn't count as a failed request, don't log in metrics
2229                        continue 'nodes_in_plan;
2230                    }
2231                };
2232                context.request_span.record_shard_id(&connection);
2233
2234                #[cfg(feature = "metrics")]
2235                self.metrics.inc_total_nonpaged_queries();
2236                let request_start = std::time::Instant::now();
2237
2238                let connect_address = connection.get_connect_address();
2239                trace!(
2240                    parent: &span,
2241                    connection = %connect_address,
2242                    "Sending"
2243                );
2244                let coordinator =
2245                    Coordinator::new(node, node.sharder().is_some().then_some(shard), &connection);
2246
2247                let attempt_id: Option<history::AttemptId> =
2248                    context.log_attempt_start(connect_address);
2249                let request_result: Result<NonErrorQueryResponse, RequestAttemptError> =
2250                    run_request_once(connection, current_consistency, execution_profile)
2251                        .instrument(span.clone())
2252                        .await;
2253
2254                let elapsed = request_start.elapsed();
2255                let request_error: RequestAttemptError = match request_result {
2256                    Ok(response) => {
2257                        trace!(parent: &span, "Request succeeded");
2258                        #[cfg(feature = "metrics")]
2259                        let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
2260                        context.log_attempt_success(&attempt_id);
2261                        context.load_balancing_policy.on_request_success(
2262                            context.query_info,
2263                            elapsed,
2264                            node,
2265                        );
2266                        return Some(Ok((RunRequestResult::Completed(response), coordinator)));
2267                    }
2268                    Err(e) => {
2269                        trace!(
2270                            parent: &span,
2271                            last_error = %e,
2272                            "Request failed"
2273                        );
2274                        #[cfg(feature = "metrics")]
2275                        self.metrics.inc_failed_nonpaged_queries();
2276                        context.load_balancing_policy.on_request_failure(
2277                            context.query_info,
2278                            elapsed,
2279                            node,
2280                            &e,
2281                        );
2282                        e
2283                    }
2284                };
2285
2286                // Use retry policy to decide what to do next
2287                let request_info = RequestInfo {
2288                    error: &request_error,
2289                    is_idempotent: context.is_idempotent,
2290                    consistency: context
2291                        .consistency_set_on_statement
2292                        .unwrap_or(execution_profile.consistency),
2293                };
2294
2295                let retry_decision = context.retry_session.decide_should_retry(request_info);
2296                trace!(
2297                    parent: &span,
2298                    retry_decision = ?retry_decision
2299                );
2300
2301                context.log_attempt_error(&attempt_id, &request_error, &retry_decision);
2302
2303                last_error = Some(request_error.into());
2304
2305                match retry_decision {
2306                    RetryDecision::RetrySameTarget(new_cl) => {
2307                        #[cfg(feature = "metrics")]
2308                        self.metrics.inc_retries_num();
2309                        current_consistency = new_cl.unwrap_or(current_consistency);
2310                        continue 'same_node_retries;
2311                    }
2312                    RetryDecision::RetryNextTarget(new_cl) => {
2313                        #[cfg(feature = "metrics")]
2314                        self.metrics.inc_retries_num();
2315                        current_consistency = new_cl.unwrap_or(current_consistency);
2316                        continue 'nodes_in_plan;
2317                    }
2318                    RetryDecision::DontRetry => break 'nodes_in_plan,
2319
2320                    RetryDecision::IgnoreWriteError => {
2321                        return Some(Ok((RunRequestResult::IgnoredWriteError, coordinator)));
2322                    }
2323                };
2324            }
2325        }
2326
2327        last_error.map(Result::Err)
2328    }
2329
2330    /// Awaits schema agreement among all reachable nodes.
2331    ///
2332    /// Issues an agreement check each `Session::schema_agreement_interval`.
2333    /// If agreement is not reached in `Session::schema_agreement_timeout`,
2334    /// `SchemaAgreementError::Timeout` is returned.
2335    pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
2336        self.await_schema_agreement_with_required_node(None).await
2337    }
2338
2339    /// Decides if an error should result in await_schema_agreement stopping immediately,
2340    /// or if it's fine to try again (after schema agreement interval).
2341    /// The errors that should stop immediately are non-transient ones, for which
2342    /// there is little or no hope that a retry will succeed.
2343    fn classify_schema_check_error(error: &SchemaAgreementError) -> ControlFlow<()> {
2344        let classify_attempt_error = |request_attempt_error: &RequestAttemptError| {
2345            #[deny(clippy::wildcard_enum_match_arm)]
2346            match request_attempt_error {
2347                // It may be possible to recover from those errors.
2348                RequestAttemptError::UnableToAllocStreamId
2349                | RequestAttemptError::BrokenConnectionError(_)
2350                | RequestAttemptError::BodyExtensionsParseError(_)
2351                | RequestAttemptError::CqlResultParseError(_)
2352                | RequestAttemptError::CqlErrorParseError(_) => ControlFlow::Continue(()),
2353
2354                // Those errors should not happen, but if they did, something is
2355                // really wrong. Let's return early.
2356                RequestAttemptError::SerializationError(_)
2357                | RequestAttemptError::CqlRequestSerialization(_)
2358                | RequestAttemptError::UnexpectedResponse(_)
2359                | RequestAttemptError::RepreparedIdChanged { .. }
2360                | RequestAttemptError::RepreparedIdMissingInBatch
2361                | RequestAttemptError::NonfinishedPagingState => ControlFlow::Break(()),
2362
2363                #[deny(clippy::wildcard_enum_match_arm)]
2364                RequestAttemptError::DbError(db_error, _) => match db_error {
2365                    // Those errors should not happen, but if they did, something is
2366                    // really wrong. Let's return early.
2367                    DbError::SyntaxError
2368                    | DbError::Invalid
2369                    | DbError::AlreadyExists { .. }
2370                    | DbError::FunctionFailure { .. }
2371                    | DbError::AuthenticationError
2372                    | DbError::Unauthorized
2373                    | DbError::ConfigError
2374                    | DbError::TruncateError
2375                    | DbError::ProtocolError => ControlFlow::Break(()),
2376
2377                    // Those errors likely won't go away on retry
2378                    DbError::Unavailable { .. }
2379                    | DbError::ReadFailure { .. }
2380                    | DbError::WriteFailure { .. }
2381                    | DbError::ServerError
2382                    | DbError::Other(_) => ControlFlow::Break(()),
2383
2384                    DbError::Overloaded
2385                    | DbError::ReadTimeout { .. }
2386                    | DbError::WriteTimeout { .. }
2387                    | DbError::Unprepared { .. }
2388                    | DbError::RateLimitReached { .. }
2389                    | DbError::IsBootstrapping
2390                    | _ => ControlFlow::Continue(()),
2391                },
2392            }
2393        };
2394        #[deny(clippy::wildcard_enum_match_arm)]
2395        match error {
2396            // Unexpected format (type, row count, frame type etc) or deserialization error of response.
2397            // It should not happen, but if it did it indicates a serious issue.
2398            SchemaAgreementError::SingleRowError(_)
2399            | SchemaAgreementError::TracesEventsIntoRowsResultError(_) => ControlFlow::Break(()),
2400
2401            // Should not be possible - we create this error only after returning here.
2402            // Let's not panic, but log a warning so that it gets noticed.
2403            SchemaAgreementError::Timeout(_) => {
2404                error!("Unexpected schema agreement error type: {}", error);
2405                ControlFlow::Break(())
2406            }
2407
2408            // Definitely a transient error.
2409            SchemaAgreementError::ConnectionPoolError(_)
2410            | SchemaAgreementError::RequiredHostAbsent(_) => ControlFlow::Continue(()),
2411
2412            SchemaAgreementError::RequestError(request_attempt_error) => {
2413                classify_attempt_error(request_attempt_error)
2414            }
2415            SchemaAgreementError::PrepareError(err) => match err {
2416                PrepareError::ConnectionPoolError(_) => ControlFlow::Continue(()),
2417                PrepareError::AllAttemptsFailed { first_attempt } => {
2418                    classify_attempt_error(first_attempt)
2419                }
2420                PrepareError::PreparedStatementIdsMismatch => ControlFlow::Break(()),
2421            },
2422        }
2423    }
2424
2425    /// Awaits schema agreement among all reachable nodes.
2426    ///
2427    /// Issues an agreement check each `Session::schema_agreement_interval`.
2428    /// If agreement is not reached in `Session::schema_agreement_timeout`,
2429    /// `SchemaAgreementError::Timeout` is returned.
2430    ///
2431    /// If `required_node` is Some, only returns Ok if this node successfully
2432    /// returned its schema version during the agreement process.
2433    pub(crate) async fn await_schema_agreement_with_required_node(
2434        &self,
2435        required_node: Option<Uuid>,
2436    ) -> Result<Uuid, SchemaAgreementError> {
2437        // None: no finished attempt recorded
2438        // Some(Ok(())): Last attempt successful, without agreement
2439        // Some(Err(_)): Last attempt failed
2440        let mut last_agreement_failure: Option<Result<(), SchemaAgreementError>> = None;
2441        // The future passed to timeout returns either Ok(Uuid) if agreement was
2442        // reached, or Err(SchemaAgreementError) if there was an error that should
2443        // stop the waiting before timeout.
2444        let agreement_result = timeout(self.schema_agreement_timeout, async {
2445            loop {
2446                let result = self
2447                    .check_schema_agreement_with_required_node(required_node)
2448                    .await;
2449                debug!("Schema agreement check result: {:?}", result);
2450                match result {
2451                    Ok(Some(agreed_version)) => return Ok(agreed_version),
2452                    Ok(None) => last_agreement_failure = Some(Ok(())),
2453                    Err(err) => {
2454                        let decision = Self::classify_schema_check_error(&err);
2455                        match decision {
2456                            ControlFlow::Continue(_) => {
2457                                last_agreement_failure = Some(Err(err));
2458                            }
2459                            ControlFlow::Break(_) => return Err(err),
2460                        }
2461                    }
2462                }
2463                tokio::time::sleep(self.schema_agreement_interval).await;
2464            }
2465        })
2466        .await;
2467        match agreement_result {
2468            Err(_timeout) => {
2469                // Timeout occurred. Either all attempts returned possibly-transient errors,
2470                // or just did not reach agreement in time.
2471                let effective_error = match last_agreement_failure {
2472                    // There were no finished attempts - the only error we can return is Timeout.
2473                    None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2474                    // If the last finished attempt resulted in an error, this error will be more informative than Timeout.
2475                    Some(Err(err)) => err,
2476                    // This is the canonical case for timeout - last attempt finished successfully, but without agreement.
2477                    Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2478                };
2479                Err(effective_error)
2480            }
2481            // Agreement encountered a non-transient error, we must return it.
2482            Ok(Err(inner_error)) => Err(inner_error),
2483            // Agreement successful
2484            Ok(Ok(uuid)) => Ok(uuid),
2485        }
2486    }
2487
2488    /// Checks if all reachable nodes have the same schema version.
2489    ///
2490    /// If so, returns that agreed upon version.
2491    pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
2492        self.check_schema_agreement_with_required_node(None).await
2493    }
2494
2495    /// Checks if all reachable nodes have the same schema version.
2496    /// If so, returns that agreed upon version.
2497    ///
2498    /// If `required_node` is Some, only returns Ok if this node successfully
2499    /// returned its schema version.
2500    async fn check_schema_agreement_with_required_node(
2501        &self,
2502        required_node: Option<Uuid>,
2503    ) -> Result<Option<Uuid>, SchemaAgreementError> {
2504        let cluster_state = self.get_cluster_state();
2505        // The iterator is guaranteed to be nonempty.
2506        let per_node_connections = cluster_state.iter_working_connections_per_node()?;
2507
2508        // Therefore, this iterator is guaranteed to be nonempty, too.
2509        let handles = per_node_connections.map(|(host_id, pool)| async move {
2510            (host_id, self.read_node_schema_version(pool).await)
2511        });
2512        // Hence, this is nonempty, too.
2513        let versions_results = join_all(handles).await;
2514
2515        // Verify that required host is present, and returned success.
2516        if let Some(required_node) = required_node {
2517            match versions_results
2518                .iter()
2519                .find(|(host_id, _)| *host_id == required_node)
2520            {
2521                Some((_, Ok(SchemaNodeResult::Success(_version)))) => (),
2522                // For other connections we can ignore Broken error, but for required
2523                // host we need an actual schema version.
2524                Some((_, Ok(SchemaNodeResult::BrokenConnection(e)))) => {
2525                    return Err(SchemaAgreementError::RequestError(
2526                        RequestAttemptError::BrokenConnectionError(e.clone()),
2527                    ));
2528                }
2529                Some((_, Err(e))) => return Err(e.clone()),
2530                None => return Err(SchemaAgreementError::RequiredHostAbsent(required_node)),
2531            }
2532        }
2533
2534        // Now we no longer need all the errors. We can return if there is
2535        // irrecoverable one, and collect the Ok values otherwise.
2536        // TODO(2.0): This expect can be avoided in next major release
2537        #[expect(clippy::result_large_err)]
2538        let versions_results: Vec<_> = versions_results
2539            .into_iter()
2540            .map(|(_, result)| result)
2541            .try_collect()?;
2542
2543        // unwrap is safe because iterator is still not empty.
2544        let local_version = match versions_results
2545            .iter()
2546            .find_or_first(|r| matches!(r, SchemaNodeResult::Success(_)))
2547            .unwrap()
2548        {
2549            SchemaNodeResult::Success(v) => *v,
2550            SchemaNodeResult::BrokenConnection(err) => {
2551                // There are only broken connection errors. Nothing better to do
2552                // than to return an error.
2553                return Err(SchemaAgreementError::RequestError(
2554                    RequestAttemptError::BrokenConnectionError(err.clone()),
2555                ));
2556            }
2557        };
2558
2559        let in_agreement = versions_results
2560            .into_iter()
2561            .filter_map(|v_r| match v_r {
2562                SchemaNodeResult::Success(v) => Some(v),
2563                SchemaNodeResult::BrokenConnection(_) => None,
2564            })
2565            .all(|v| v == local_version);
2566        Ok(in_agreement.then_some(local_version))
2567    }
2568
2569    /// Iterate over connections to the node.
2570    /// If fetching succeeds on some connection, return first fetched schema version,
2571    /// as Ok(SchemaNodeResult::Success(...)).
2572    /// Otherwise it means there are only errors:
2573    /// - If, and only if, all connections returned ConnectionBrokenError, first such error will be returned,
2574    ///   as Ok(SchemaNodeResult::BrokenConnection(...)).
2575    /// - Otherwise there is some other type of error on some connection. First such error will be returned as Err(...).
2576    ///
2577    /// `connections_to_node` iterator must be non-empty!
2578    async fn read_node_schema_version(
2579        &self,
2580        connections_to_node: impl Iterator<Item = Arc<Connection>>,
2581    ) -> Result<SchemaNodeResult, SchemaAgreementError> {
2582        let mut first_broken_connection_err: Option<BrokenConnectionError> = None;
2583        let mut first_unignorable_err: Option<SchemaAgreementError> = None;
2584        for connection in connections_to_node {
2585            match self.fetch_connection_schema_version(&connection).await {
2586                Ok(schema_version) => return Ok(SchemaNodeResult::Success(schema_version)),
2587                Err(SchemaAgreementError::RequestError(
2588                    RequestAttemptError::BrokenConnectionError(conn_err),
2589                )) => {
2590                    if first_broken_connection_err.is_none() {
2591                        first_broken_connection_err = Some(conn_err);
2592                    }
2593                }
2594                Err(err) => {
2595                    if first_unignorable_err.is_none() {
2596                        first_unignorable_err = Some(err);
2597                    }
2598                }
2599            }
2600        }
2601        // The iterator was guaranteed to be nonempty, so there must have been at least one error.
2602        // It means at least one of `first_broken_connection_err` and `first_unrecoverable_err` is Some.
2603        if let Some(err) = first_unignorable_err {
2604            return Err(err);
2605        }
2606
2607        Ok(SchemaNodeResult::BrokenConnection(
2608            first_broken_connection_err.unwrap(),
2609        ))
2610    }
2611
2612    /// Fetches the schema version from a single connection.
2613    async fn fetch_connection_schema_version(
2614        &self,
2615        connection: &Connection,
2616    ) -> Result<Uuid, SchemaAgreementError> {
2617        let result = connection
2618            .query_unpaged(self.get_schema_version_statement())
2619            .await?;
2620
2621        let (version_id,) = result
2622            .into_rows_result()
2623            .map_err(SchemaAgreementError::TracesEventsIntoRowsResultError)?
2624            .single_row::<(Uuid,)>()
2625            .map_err(SchemaAgreementError::SingleRowError)?;
2626
2627        Ok(version_id)
2628    }
2629
2630    /// Retrieves the handle to execution profile that is used by this session
2631    /// by default, i.e. when an executed statement does not define its own handle.
2632    pub fn get_default_execution_profile_handle(&self) -> &ExecutionProfileHandle {
2633        &self.default_execution_profile_handle
2634    }
2635}
2636
2637struct ExecuteRequestContext<'a> {
2638    is_idempotent: bool,
2639    consistency_set_on_statement: Option<Consistency>,
2640    retry_session: Box<dyn RetrySession>,
2641    history_data: Option<HistoryData<'a>>,
2642    load_balancing_policy: &'a dyn load_balancing::LoadBalancingPolicy,
2643    query_info: &'a load_balancing::RoutingInfo<'a>,
2644    request_span: &'a RequestSpan,
2645}
2646
2647struct HistoryData<'a> {
2648    listener: &'a dyn HistoryListener,
2649    request_id: history::RequestId,
2650    speculative_id: Option<history::SpeculativeId>,
2651}
2652
2653impl ExecuteRequestContext<'_> {
2654    fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
2655        self.history_data.as_ref().map(|hd| {
2656            hd.listener
2657                .log_attempt_start(hd.request_id, hd.speculative_id, node_addr)
2658        })
2659    }
2660
2661    fn log_attempt_success(&self, attempt_id_opt: &Option<history::AttemptId>) {
2662        let attempt_id: &history::AttemptId = match attempt_id_opt {
2663            Some(id) => id,
2664            None => return,
2665        };
2666
2667        let history_data: &HistoryData = match &self.history_data {
2668            Some(data) => data,
2669            None => return,
2670        };
2671
2672        history_data.listener.log_attempt_success(*attempt_id);
2673    }
2674
2675    fn log_attempt_error(
2676        &self,
2677        attempt_id_opt: &Option<history::AttemptId>,
2678        error: &RequestAttemptError,
2679        retry_decision: &RetryDecision,
2680    ) {
2681        let attempt_id: &history::AttemptId = match attempt_id_opt {
2682            Some(id) => id,
2683            None => return,
2684        };
2685
2686        let history_data: &HistoryData = match &self.history_data {
2687            Some(data) => data,
2688            None => return,
2689        };
2690
2691        history_data
2692            .listener
2693            .log_attempt_error(*attempt_id, error, retry_decision);
2694    }
2695}
2696
2697#[derive(Debug)]
2698enum SchemaNodeResult {
2699    Success(Uuid),
2700    BrokenConnection(BrokenConnectionError),
2701}