scylla 1.6.0

Async CQL driver for Rust, optimized for ScyllaDB, fully compatible with Apache Cassandraâ„¢
Documentation
use crate::cluster::node::Node;
use crate::network::Connection;
use crate::response::query_result::QueryResult;
use crate::routing::{Shard, Token};
use crate::utils::safe_format::IteratorSafeFormatExt;
use itertools::Either;
use scylla_cql::frame::response::result::{ColumnSpec, DeserializedMetadataAndRawRows};
use scylla_cql::value::deser_cql_value;
use std::borrow::Borrow;
use std::fmt::Display;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tracing::trace_span;

pub(crate) struct RequestSpan {
    span: tracing::Span,
    speculative_executions: AtomicUsize,
}

impl RequestSpan {
    pub(crate) fn new_query(contents: &str) -> Self {
        use tracing::field::Empty;

        let span = trace_span!(
            "Request",
            kind = "unprepared",
            contents = contents,
            //
            request_size = Empty,
            result_size = Empty,
            result_rows = Empty,
            replicas = Empty,
            shard = Empty,
            speculative_executions = Empty,
        );

        Self {
            span,
            speculative_executions: 0.into(),
        }
    }

    pub(crate) fn new_prepared<'ps, 'spec: 'ps>(
        partition_key: Option<impl Iterator<Item = (&'ps [u8], &'ps ColumnSpec<'spec>)> + Clone>,
        token: Option<Token>,
        request_size: usize,
    ) -> Self {
        use tracing::field::Empty;

        let span = trace_span!(
            "Request",
            kind = "prepared",
            partition_key = Empty,
            token = Empty,
            //
            request_size = request_size,
            result_size = Empty,
            result_rows = Empty,
            replicas = Empty,
            shard = Empty,
            speculative_executions = Empty,
        );

        if let Some(partition_key) = partition_key {
            span.record(
                "partition_key",
                tracing::field::display(
                    format_args!("{}", partition_key_displayer(partition_key),),
                ),
            );
        }
        if let Some(token) = token {
            span.record("token", token.value());
        }

        Self {
            span,
            speculative_executions: 0.into(),
        }
    }

    pub(crate) fn new_batch() -> Self {
        use tracing::field::Empty;

        let span = trace_span!(
            "Request",
            kind = "batch",
            //
            request_size = Empty,
            result_size = Empty,
            result_rows = Empty,
            replicas = Empty,
            shard = Empty,
            speculative_executions = Empty,
        );

        Self {
            span,
            speculative_executions: 0.into(),
        }
    }

    pub(crate) fn record_shard_id(&self, conn: &Connection) {
        if let Some(info) = conn.get_shard_info() {
            self.span.record("shard", info.shard);
        }
    }

    pub(crate) fn record_raw_rows_fields(&self, raw_rows: &DeserializedMetadataAndRawRows) {
        self.span
            .record("raw_result_size", raw_rows.metadata_and_rows_bytes_size());
    }

    pub(crate) fn record_result_fields(&self, query_result: &QueryResult) {
        if let Some(raw_rows) = query_result.deserialized_metadata_and_rows() {
            self.record_raw_rows_fields(raw_rows);
        }
    }

    pub(crate) fn record_replicas<'a>(
        &'a self,
        replicas: impl Iterator<Item = (impl Borrow<Arc<Node>> + 'a, Shard)> + Clone,
    ) {
        struct Replica<N>(N, Shard);
        impl<N> Display for Replica<N>
        where
            N: Borrow<Arc<Node>>,
        {
            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
                let Self(node, shard) = self;
                write!(f, "{}-shard{}", node.borrow().address.ip(), shard)
            }
        }
        self.span.record(
            "replicas",
            tracing::field::display(
                replicas
                    .map(|(node, shard)| Replica(node, shard))
                    .safe_format(", "),
            ),
        );
    }

    pub(crate) fn record_request_size(&self, size: usize) {
        self.span.record("request_size", size);
    }

    pub(crate) fn inc_speculative_executions(&self) {
        self.speculative_executions.fetch_add(1, Ordering::Relaxed);
    }

    pub(crate) fn span(&self) -> &tracing::Span {
        &self.span
    }
}

impl Drop for RequestSpan {
    fn drop(&mut self) {
        self.span.record(
            "speculative_executions",
            self.speculative_executions.load(Ordering::Relaxed),
        );
    }
}

fn partition_key_displayer<
    'ps,
    'res,
    'spec: 'ps,
    PkIter: Iterator<Item = (&'ps [u8], &'ps ColumnSpec<'spec>)> + 'res + Clone,
>(
    mut pk_values_iter: PkIter,
) -> impl Display + use<'res, PkIter> {
    std::iter::from_fn(move || {
        pk_values_iter
            .next()
            .map(|(mut cell, spec)| deser_cql_value(spec.typ(), &mut cell))
    })
    .map(|c| match c {
        Ok(c) => Either::Left(c),
        Err(_) => Either::Right("<decoding error>"),
    })
    .safe_format(", ")
}