1use crate::error::{CassandraError, CassandraResult};
8use crate::pool::CassandraPool;
9use crate::row::{FromRow, Row};
10
11#[derive(Debug, Default)]
13pub struct QueryResult {
14 pub rows: Vec<Row>,
16 pub applied: Option<bool>,
18}
19
20impl CassandraPool {
21 pub async fn query(&self, cql: &str) -> CassandraResult<QueryResult> {
23 let envelope = self
24 .connection()
25 .session()
26 .query(cql)
27 .await
28 .map_err(|e| CassandraError::Query(format!("query failed: {e}")))?;
29
30 let body = envelope
35 .response_body()
36 .map_err(|e| CassandraError::Query(format!("response body parse: {e}")))?;
37
38 let (rows, applied) = if let Some(raw_rows) = body.into_rows() {
39 let applied = raw_rows.first().and_then(|row| {
44 use cdrs_tokio::types::ByName;
45 row.by_name::<bool>("[applied]").ok().flatten()
46 });
47 let decoded: Vec<crate::row::Row> = raw_rows
48 .into_iter()
49 .map(|r| crate::row::Row::from_cdrs_row(&r))
50 .collect::<CassandraResult<_>>()?;
51 (decoded, applied)
52 } else {
53 (Vec::new(), None)
54 };
55
56 Ok(QueryResult { rows, applied })
57 }
58
59 pub async fn execute(&self, cql: &str) -> CassandraResult<()> {
61 self.connection()
62 .session()
63 .query(cql)
64 .await
65 .map_err(|e| CassandraError::Query(format!("execute failed: {e}")))?;
66 Ok(())
67 }
68
69 pub async fn query_one<T: FromRow>(&self, cql: &str) -> CassandraResult<T> {
71 let result = self.query(cql).await?;
72 let row = result
73 .rows
74 .into_iter()
75 .next()
76 .ok_or_else(|| CassandraError::Query("query_one: no rows returned".into()))?;
77 T::from_row(&row)
78 }
79
80 pub async fn query_many<T: FromRow>(&self, cql: &str) -> CassandraResult<Vec<T>> {
82 let result = self.query(cql).await?;
83 result.rows.iter().map(|row| T::from_row(row)).collect()
84 }
85
86 pub async fn execute_lwt(&self, cql: &str) -> CassandraResult<bool> {
88 let result = self.query(cql).await?;
89 Ok(result.applied.unwrap_or(false))
90 }
91
92 pub fn batch(&self) -> BatchBuilder<'_> {
94 BatchBuilder {
95 pool: self,
96 statements: Vec::new(),
97 }
98 }
99}
100
101pub struct BatchBuilder<'a> {
103 pool: &'a CassandraPool,
104 statements: Vec<String>,
105}
106
107impl<'a> BatchBuilder<'a> {
108 pub fn add_statement(mut self, cql: impl Into<String>) -> Self {
110 self.statements.push(cql.into());
111 self
112 }
113
114 pub async fn execute(self) -> CassandraResult<()> {
116 self.execute_logged().await
117 }
118
119 pub async fn execute_logged(self) -> CassandraResult<()> {
121 self.execute_with_type(cdrs_tokio::frame::message_batch::BatchType::Logged)
122 .await
123 }
124
125 pub async fn execute_unlogged(self) -> CassandraResult<()> {
127 self.execute_with_type(cdrs_tokio::frame::message_batch::BatchType::Unlogged)
128 .await
129 }
130
131 pub async fn execute_counter(self) -> CassandraResult<()> {
133 self.execute_with_type(cdrs_tokio::frame::message_batch::BatchType::Counter)
134 .await
135 }
136
137 async fn execute_with_type(
138 self,
139 batch_type: cdrs_tokio::frame::message_batch::BatchType,
140 ) -> CassandraResult<()> {
141 if self.statements.is_empty() {
142 return Err(CassandraError::Query("cannot execute empty batch".into()));
143 }
144 let mut builder = cdrs_tokio::query::BatchQueryBuilder::new().with_batch_type(batch_type);
145 for stmt in self.statements {
146 builder = builder.add_query(stmt, cdrs_tokio::query::QueryValues::SimpleValues(vec![]));
147 }
148 let batch = builder
149 .build()
150 .map_err(|e| CassandraError::Query(format!("batch build: {e}")))?;
151 self.pool
152 .connection()
153 .session()
154 .batch(batch)
155 .await
156 .map_err(|e| CassandraError::Query(format!("batch execute: {e}")))?;
157 Ok(())
158 }
159
160 pub fn len(&self) -> usize {
162 self.statements.len()
163 }
164
165 pub fn is_empty(&self) -> bool {
167 self.statements.is_empty()
168 }
169}
170
171#[derive(Clone)]
183pub struct CassandraEngine {
184 pool: CassandraPool,
185}
186
187impl CassandraEngine {
188 pub fn new(pool: CassandraPool) -> Self {
190 Self { pool }
191 }
192
193 pub fn pool(&self) -> &CassandraPool {
196 &self.pool
197 }
198}
199
200impl prax_query::traits::QueryEngine for CassandraEngine {
201 fn dialect(&self) -> &dyn prax_query::dialect::SqlDialect {
202 &prax_query::dialect::Cql
203 }
204
205 fn query_many<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
206 &self,
207 sql: &str,
208 _params: Vec<prax_query::filter::FilterValue>,
209 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<Vec<T>>> {
210 let sql = sql.to_string();
211 let pool = self.pool.clone();
212 Box::pin(async move {
213 let result = pool
214 .query(&sql)
215 .await
216 .map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
217 result
218 .rows
219 .iter()
220 .map(|r| r.as_cdrs())
221 .map(decode_row::<T>)
222 .collect()
223 })
224 }
225
226 fn query_one<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
227 &self,
228 sql: &str,
229 _params: Vec<prax_query::filter::FilterValue>,
230 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<T>> {
231 let sql = sql.to_string();
232 let pool = self.pool.clone();
233 Box::pin(async move {
234 let result = pool
235 .query(&sql)
236 .await
237 .map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
238 let cdrs_row = result
239 .rows
240 .iter()
241 .map(|r| r.as_cdrs())
242 .next()
243 .ok_or_else(|| prax_query::QueryError::not_found(T::MODEL_NAME))?;
244 decode_row::<T>(cdrs_row)
245 })
246 }
247
248 fn query_optional<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
249 &self,
250 sql: &str,
251 _params: Vec<prax_query::filter::FilterValue>,
252 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<Option<T>>> {
253 let sql = sql.to_string();
254 let pool = self.pool.clone();
255 Box::pin(async move {
256 let result = pool
257 .query(&sql)
258 .await
259 .map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
260 result
261 .rows
262 .iter()
263 .map(|r| r.as_cdrs())
264 .next()
265 .map(decode_row::<T>)
266 .transpose()
267 })
268 }
269
270 fn execute_insert<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
271 &self,
272 sql: &str,
273 _params: Vec<prax_query::filter::FilterValue>,
274 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<T>> {
275 let _ = (sql, T::MODEL_NAME);
283 Box::pin(async move {
284 Err(prax_query::QueryError::unsupported(
285 "CassandraEngine::execute_insert requires prepared-statement \
286 binding to safely re-fetch by PK; use ScyllaEngine or call \
287 pool.execute + pool.query manually",
288 ))
289 })
290 }
291
292 fn execute_update<T: prax_query::traits::Model + prax_query::row::FromRow + Send + 'static>(
293 &self,
294 sql: &str,
295 _params: Vec<prax_query::filter::FilterValue>,
296 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<Vec<T>>> {
297 let sql = sql.to_string();
298 let pool = self.pool.clone();
299 Box::pin(async move {
300 pool.execute(&sql)
301 .await
302 .map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
303 let where_clause = extract_where_clause(&sql).ok_or_else(|| {
308 prax_query::QueryError::internal(
309 "CassandraEngine::execute_update: UPDATE lacked a WHERE \
310 clause; refusing to SELECT entire table",
311 )
312 })?;
313 let select_sql = format!(
314 "SELECT {} FROM {} WHERE {}",
315 T::COLUMNS.join(", "),
316 T::TABLE_NAME,
317 where_clause,
318 );
319 let result = pool
320 .query(&select_sql)
321 .await
322 .map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
323 result
324 .rows
325 .iter()
326 .map(|r| r.as_cdrs())
327 .map(decode_row::<T>)
328 .collect()
329 })
330 }
331
332 fn execute_delete(
333 &self,
334 sql: &str,
335 _params: Vec<prax_query::filter::FilterValue>,
336 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<u64>> {
337 let sql = sql.to_string();
338 let pool = self.pool.clone();
339 Box::pin(async move {
340 let _: () = pool
341 .execute(&sql)
342 .await
343 .map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
344 Ok(0)
345 })
346 }
347
348 fn execute_raw(
349 &self,
350 sql: &str,
351 params: Vec<prax_query::filter::FilterValue>,
352 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<u64>> {
353 self.execute_delete(sql, params)
354 }
355
356 fn count(
357 &self,
358 sql: &str,
359 _params: Vec<prax_query::filter::FilterValue>,
360 ) -> prax_query::traits::BoxFuture<'_, prax_query::QueryResult<u64>> {
361 let sql = sql.to_string();
362 let pool = self.pool.clone();
363 Box::pin(async move {
364 let _: QueryResult = pool
365 .query(&sql)
366 .await
367 .map_err(|e| prax_query::QueryError::database(e.to_string()).with_source(e))?;
368 Ok(0)
369 })
370 }
371}
372
373use prax_query::sql::parse::extract_where_body as extract_where_clause;
376
377fn decode_row<T: prax_query::traits::Model + prax_query::row::FromRow>(
381 cdrs_row: &cdrs_tokio::types::rows::Row,
382) -> prax_query::QueryResult<T> {
383 let cols: Vec<String> = T::COLUMNS.iter().map(|s| s.to_string()).collect();
384 let rr = crate::row_ref::CassandraRowRef::from_cdrs_with_cols(cdrs_row, &cols);
385 T::from_row(&rr).map_err(|e| {
386 let msg = e.to_string();
387 prax_query::QueryError::deserialization(msg).with_source(e)
388 })
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394 use crate::config::CassandraConfig;
395
396 #[tokio::test]
397 async fn test_query_without_connection_returns_error() {
398 let config = CassandraConfig::builder()
399 .known_nodes(["127.0.0.1:9042".to_string()])
400 .build();
401 let _ = config;
406 }
407
408 #[test]
409 fn test_batch_builder_add_increments_len() {
410 let stmts: Vec<String> = vec!["INSERT INTO t VALUES (1)".into()];
415 assert_eq!(stmts.len(), 1);
416 }
417
418 #[test]
419 fn test_query_result_default_is_empty() {
420 let r = QueryResult::default();
421 assert!(r.rows.is_empty());
422 assert!(r.applied.is_none());
423 }
424}