prax-cassandra 0.7.3

Apache Cassandra database driver for Prax ORM - pure Rust async driver via cdrs-tokio
Documentation
//! Query execution engine.
//!
//! This module defines the public query API (query/execute/batch/LWT/paging).
//! Actual network calls to cdrs-tokio are wired up in the live integration
//! task so these methods currently return a "not yet wired" error.

use crate::error::{CassandraError, CassandraResult};
use crate::pool::CassandraPool;
use crate::row::{FromRow, Row};

/// Aggregate result of a CQL query.
#[derive(Debug, Default)]
pub struct QueryResult {
    /// Rows returned by the query. Empty for non-SELECT statements.
    pub rows: Vec<Row>,
    /// Whether a lightweight transaction applied.
    pub applied: Option<bool>,
}

impl CassandraPool {
    /// Execute a query returning rows.
    pub async fn query(&self, _cql: &str) -> CassandraResult<QueryResult> {
        Err(CassandraError::Query(
            "query() not yet wired to cdrs-tokio".into(),
        ))
    }

    /// Execute a statement not expecting rows (INSERT, UPDATE, DELETE, DDL).
    pub async fn execute(&self, _cql: &str) -> CassandraResult<()> {
        Err(CassandraError::Query(
            "execute() not yet wired to cdrs-tokio".into(),
        ))
    }

    /// Query a single row, deserialized into T.
    pub async fn query_one<T: FromRow>(&self, cql: &str) -> CassandraResult<T> {
        let result = self.query(cql).await?;
        let row = result
            .rows
            .into_iter()
            .next()
            .ok_or_else(|| CassandraError::Query("query_one: no rows returned".into()))?;
        T::from_row(&row)
    }

    /// Query many rows.
    pub async fn query_many<T: FromRow>(&self, cql: &str) -> CassandraResult<Vec<T>> {
        let result = self.query(cql).await?;
        result.rows.iter().map(|row| T::from_row(row)).collect()
    }

    /// Execute a lightweight transaction. Returns whether the CAS succeeded.
    pub async fn execute_lwt(&self, cql: &str) -> CassandraResult<bool> {
        let result = self.query(cql).await?;
        Ok(result.applied.unwrap_or(false))
    }

    /// Build a batch of statements.
    pub fn batch(&self) -> BatchBuilder<'_> {
        BatchBuilder {
            pool: self,
            statements: Vec::new(),
        }
    }
}

/// Builder for a CQL batch.
pub struct BatchBuilder<'a> {
    pool: &'a CassandraPool,
    statements: Vec<String>,
}

impl<'a> BatchBuilder<'a> {
    /// Add a statement to the batch.
    pub fn add_statement(mut self, cql: impl Into<String>) -> Self {
        self.statements.push(cql.into());
        self
    }

    /// Execute the batch as a LOGGED batch (default).
    pub async fn execute(self) -> CassandraResult<()> {
        self.execute_logged().await
    }

    /// Execute the batch as a LOGGED batch.
    pub async fn execute_logged(self) -> CassandraResult<()> {
        let _ = self.pool;
        if self.statements.is_empty() {
            return Err(CassandraError::Query("cannot execute empty batch".into()));
        }
        Err(CassandraError::Query(
            "batch.execute_logged not yet wired to cdrs-tokio".into(),
        ))
    }

    /// Execute the batch as an UNLOGGED batch.
    pub async fn execute_unlogged(self) -> CassandraResult<()> {
        let _ = self.pool;
        if self.statements.is_empty() {
            return Err(CassandraError::Query("cannot execute empty batch".into()));
        }
        Err(CassandraError::Query(
            "batch.execute_unlogged not yet wired to cdrs-tokio".into(),
        ))
    }

    /// Execute the batch as a COUNTER batch.
    pub async fn execute_counter(self) -> CassandraResult<()> {
        let _ = self.pool;
        if self.statements.is_empty() {
            return Err(CassandraError::Query("cannot execute empty batch".into()));
        }
        Err(CassandraError::Query(
            "batch.execute_counter not yet wired to cdrs-tokio".into(),
        ))
    }

    /// Number of statements in the batch (for test/debug).
    pub fn len(&self) -> usize {
        self.statements.len()
    }

    /// True if the batch has no statements.
    pub fn is_empty(&self) -> bool {
        self.statements.is_empty()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::CassandraConfig;

    #[tokio::test]
    async fn test_query_without_connection_returns_error() {
        let config = CassandraConfig::builder()
            .known_nodes(["127.0.0.1:9042".to_string()])
            .build();
        // Pool.connect returns an error in the stub phase, so we can't
        // build a pool here. Instead, construct the error directly via
        // the assertion below. This test primarily exercises the API
        // surface compiles.
        let _ = config;
    }

    #[test]
    fn test_batch_builder_add_increments_len() {
        // Construct a fake pool surface through a compile-check-only path.
        // We can't instantiate a real pool without a live cluster, so this
        // test lives as a TODO placeholder; live integration covers the
        // real behavior.
        let stmts: Vec<String> = vec!["INSERT INTO t VALUES (1)".into()];
        assert_eq!(stmts.len(), 1);
    }

    #[test]
    fn test_query_result_default_is_empty() {
        let r = QueryResult::default();
        assert!(r.rows.is_empty());
        assert!(r.applied.is_none());
    }
}