use crate::error::{CassandraError, CassandraResult};
use crate::pool::CassandraPool;
use crate::row::{FromRow, Row};
#[derive(Debug, Default)]
pub struct QueryResult {
pub rows: Vec<Row>,
pub applied: Option<bool>,
}
impl CassandraPool {
pub async fn query(&self, cql: &str) -> CassandraResult<QueryResult> {
let envelope = self
.connection()
.session()
.query(cql)
.await
.map_err(|e| CassandraError::Query(format!("query failed: {e}")))?;
let body = envelope
.response_body()
.map_err(|e| CassandraError::Query(format!("response body parse: {e}")))?;
let (rows, applied) = if let Some(raw_rows) = body.into_rows() {
let applied = raw_rows.first().and_then(|row| {
use cdrs_tokio::types::ByName;
row.by_name::<bool>("[applied]").ok().flatten()
});
let decoded: Vec<crate::row::Row> = raw_rows
.into_iter()
.map(|r| crate::row::Row::from_cdrs_row(&r))
.collect::<CassandraResult<_>>()?;
(decoded, applied)
} else {
(Vec::new(), None)
};
Ok(QueryResult { rows, applied })
}
pub async fn execute(&self, cql: &str) -> CassandraResult<()> {
self.connection()
.session()
.query(cql)
.await
.map_err(|e| CassandraError::Query(format!("execute failed: {e}")))?;
Ok(())
}
pub async fn query_one<T: FromRow>(&self, cql: &str) -> CassandraResult<T> {
let result = self.query(cql).await?;
let row = result
.rows
.into_iter()
.next()
.ok_or_else(|| CassandraError::Query("query_one: no rows returned".into()))?;
T::from_row(&row)
}
pub async fn query_many<T: FromRow>(&self, cql: &str) -> CassandraResult<Vec<T>> {
let result = self.query(cql).await?;
result.rows.iter().map(|row| T::from_row(row)).collect()
}
pub async fn execute_lwt(&self, cql: &str) -> CassandraResult<bool> {
let result = self.query(cql).await?;
Ok(result.applied.unwrap_or(false))
}
pub fn batch(&self) -> BatchBuilder<'_> {
BatchBuilder {
pool: self,
statements: Vec::new(),
}
}
}
pub struct BatchBuilder<'a> {
pool: &'a CassandraPool,
statements: Vec<String>,
}
impl<'a> BatchBuilder<'a> {
pub fn add_statement(mut self, cql: impl Into<String>) -> Self {
self.statements.push(cql.into());
self
}
pub async fn execute(self) -> CassandraResult<()> {
self.execute_logged().await
}
pub async fn execute_logged(self) -> CassandraResult<()> {
self.execute_with_type(cdrs_tokio::frame::message_batch::BatchType::Logged)
.await
}
pub async fn execute_unlogged(self) -> CassandraResult<()> {
self.execute_with_type(cdrs_tokio::frame::message_batch::BatchType::Unlogged)
.await
}
pub async fn execute_counter(self) -> CassandraResult<()> {
self.execute_with_type(cdrs_tokio::frame::message_batch::BatchType::Counter)
.await
}
async fn execute_with_type(
self,
batch_type: cdrs_tokio::frame::message_batch::BatchType,
) -> CassandraResult<()> {
if self.statements.is_empty() {
return Err(CassandraError::Query("cannot execute empty batch".into()));
}
let mut builder = cdrs_tokio::query::BatchQueryBuilder::new().with_batch_type(batch_type);
for stmt in self.statements {
builder = builder.add_query(stmt, cdrs_tokio::query::QueryValues::SimpleValues(vec![]));
}
let batch = builder
.build()
.map_err(|e| CassandraError::Query(format!("batch build: {e}")))?;
self.pool
.connection()
.session()
.batch(batch)
.await
.map_err(|e| CassandraError::Query(format!("batch execute: {e}")))?;
Ok(())
}
pub fn len(&self) -> usize {
self.statements.len()
}
pub fn is_empty(&self) -> bool {
self.statements.is_empty()
}
}
#[derive(Clone)]
pub struct CassandraEngine {
pool: CassandraPool,
}
impl CassandraEngine {
pub fn new(pool: CassandraPool) -> Self {
Self { pool }
}
pub fn pool(&self) -> &CassandraPool {
&self.pool
}
}
impl prax_query::traits::QueryEngine for CassandraEngine {
fn dialect(&self) -> &dyn prax_query::dialect::SqlDialect {
&prax_query::dialect::Cql
}
fn query_many<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
&self,
sql: &str,
_params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<Vec<T>>> {
let sql = sql.to_string();
let pool = self.pool.clone();
Box::pin(async move {
let result = pool
.query(&sql)
.await
.map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
result
.rows
.iter()
.map(|r| r.as_cdrs())
.map(decode_row::<T>)
.collect()
})
}
fn query_one<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
&self,
sql: &str,
_params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<T>> {
let sql = sql.to_string();
let pool = self.pool.clone();
Box::pin(async move {
let result = pool
.query(&sql)
.await
.map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
let cdrs_row = result
.rows
.iter()
.map(|r| r.as_cdrs())
.next()
.ok_or_else(|| prax_query::QueryError::not_found(T::MODEL_NAME))?;
decode_row::<T>(cdrs_row)
})
}
fn query_optional<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
&self,
sql: &str,
_params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<Option<T>>> {
let sql = sql.to_string();
let pool = self.pool.clone();
Box::pin(async move {
let result = pool
.query(&sql)
.await
.map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
result
.rows
.iter()
.map(|r| r.as_cdrs())
.next()
.map(decode_row::<T>)
.transpose()
})
}
fn execute_insert<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
&self,
sql: &str,
_params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<T>> {
let _ = (sql, T::MODEL_NAME);
Box::pin(async move {
Err(prax_query::QueryError::unsupported(
"CassandraEngine::execute_insert requires prepared-statement \
binding to safely re-fetch by PK; use ScyllaEngine or call \
pool.execute + pool.query manually",
))
})
}
fn execute_update<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
&self,
sql: &str,
_params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<Vec<T>>> {
let sql = sql.to_string();
let pool = self.pool.clone();
Box::pin(async move {
pool.execute(&sql)
.await
.map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
let where_clause = extract_where_clause(&sql).ok_or_else(|| {
prax_query::QueryError::internal(
"CassandraEngine::execute_update: UPDATE lacked a WHERE \
clause; refusing to SELECT entire table",
)
})?;
let select_sql = format!(
"SELECT {} FROM {} WHERE {}",
T::COLUMNS.join(", "),
T::TABLE_NAME,
where_clause,
);
let result = pool
.query(&select_sql)
.await
.map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
result
.rows
.iter()
.map(|r| r.as_cdrs())
.map(decode_row::<T>)
.collect()
})
}
fn execute_delete(
&self,
sql: &str,
_params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<u64>> {
let sql = sql.to_string();
let pool = self.pool.clone();
Box::pin(async move {
let _: () = pool
.execute(&sql)
.await
.map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
Ok(0)
})
}
fn execute_raw(
&self,
sql: &str,
params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<u64>> {
self.execute_delete(sql, params)
}
fn count(
&self,
sql: &str,
_params: Vec<prax_query::filter::FilterValue>,
) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<u64>> {
let sql = sql.to_string();
let pool = self.pool.clone();
Box::pin(async move {
let _: QueryResult = pool
.query(&sql)
.await
.map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
Ok(0)
})
}
}
use prax_query::sql::parse::extract_where_body as extract_where_clause;
fn decode_row<T: prax_query::traits::Model + prax_query::row::FromRow>(
cdrs_row: &cdrs_tokio::types::rows::Row,
) -> prax_query::QueryResult<T> {
let cols: Vec<String> = T::COLUMNS.iter().map(|s| s.to_string()).collect();
let rr = crate::row_ref::CassandraRowRef::from_cdrs_with_cols(cdrs_row, &cols);
T::from_row(&rr).map_err(|e| {
let msg = e.to_string();
prax_query::QueryError::deserialization(msg).with_source(e)
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CassandraConfig;
#[tokio::test]
async fn test_query_without_connection_returns_error() {
let config = CassandraConfig::builder()
.known_nodes(["127.0.0.1:9042".to_string()])
.build();
let _ = config;
}
#[test]
fn test_batch_builder_add_increments_len() {
let stmts: Vec<String> = vec!["INSERT INTO t VALUES (1)".into()];
assert_eq!(stmts.len(), 1);
}
#[test]
fn test_query_result_default_is_empty() {
let r = QueryResult::default();
assert!(r.rows.is_empty());
assert!(r.applied.is_none());
}
}