cassandra_cpp/cassandra/
batch.rs

1use crate::cassandra::consistency::Consistency;
2use crate::cassandra::custom_payload::CustomPayload;
3use crate::cassandra::error::*;
4use crate::cassandra::future::CassFuture;
5use crate::cassandra::policy::retry::RetryPolicy;
6use crate::cassandra::statement::Statement;
7use crate::cassandra::util::{Protected, ProtectedInner, ProtectedWithSession};
8use crate::cassandra_sys::cass_session_execute_batch;
9use crate::{CassResult, Session};
10
11use crate::cassandra_sys::cass_batch_add_statement;
12use crate::cassandra_sys::cass_batch_free;
13use crate::cassandra_sys::cass_batch_new;
14use crate::cassandra_sys::cass_batch_set_consistency;
15use crate::cassandra_sys::cass_batch_set_custom_payload;
16use crate::cassandra_sys::cass_batch_set_retry_policy;
17use crate::cassandra_sys::cass_batch_set_serial_consistency;
18use crate::cassandra_sys::cass_batch_set_timestamp;
19use crate::cassandra_sys::CassBatch as _Batch;
20use crate::cassandra_sys::CassBatchType_;
21
22#[derive(Debug)]
23struct BatchInner(*mut _Batch);
24
25/// A group of statements that are executed as a single batch.
26/// <b>Note:</b> Batches are not supported by the binary protocol version 1.
27#[derive(Debug)]
28pub struct Batch(BatchInner, Session);
29
30// The underlying C type has no thread-local state, and forbids only concurrent
31// mutation/free: https://datastax.github.io/cpp-driver/topics/#thread-safety
32unsafe impl Send for BatchInner {}
33unsafe impl Sync for BatchInner {}
34
35impl ProtectedInner<*mut _Batch> for BatchInner {
36    #[inline(always)]
37    fn inner(&self) -> *mut _Batch {
38        self.0
39    }
40}
41
42impl Protected<*mut _Batch> for BatchInner {
43    #[inline(always)]
44    fn build(inner: *mut _Batch) -> Self {
45        if inner.is_null() {
46            panic!("Unexpected null pointer")
47        };
48        Self(inner)
49    }
50}
51
52impl ProtectedInner<*mut _Batch> for Batch {
53    #[inline(always)]
54    fn inner(&self) -> *mut _Batch {
55        self.0.inner()
56    }
57}
58
59impl ProtectedWithSession<*mut _Batch> for Batch {
60    #[inline(always)]
61    fn build(inner: *mut _Batch, session: Session) -> Self {
62        Self(BatchInner::build(inner), session)
63    }
64
65    #[inline(always)]
66    fn session(&self) -> &Session {
67        &self.1
68    }
69}
70
71impl Drop for BatchInner {
72    /// Frees a batch instance. Batches can be immediately freed after being
73    /// executed.
74    fn drop(&mut self) {
75        unsafe { cass_batch_free(self.0) }
76    }
77}
78
79impl Batch {
80    /// Creates a new batch statement with batch type.
81    pub(crate) fn new(batch_type: BatchType, session: Session) -> Batch {
82        unsafe { Batch(BatchInner(cass_batch_new(batch_type.inner())), session) }
83    }
84
85    /// Returns the session of which this batch is bound to.
86    pub fn session(&self) -> &Session {
87        ProtectedWithSession::session(self)
88    }
89
90    /// Executes this batch.
91    pub async fn execute(self) -> Result<CassResult> {
92        let (batch, session) = (self.0, self.1);
93        let execute_future = {
94            let execute_batch =
95                unsafe { cass_session_execute_batch(session.inner(), batch.inner()) };
96            CassFuture::build(session, execute_batch)
97        };
98        execute_future.await
99    }
100
101    /// Sets the batch's consistency level
102    pub fn set_consistency(&mut self, consistency: Consistency) -> Result<&mut Self> {
103        unsafe { cass_batch_set_consistency(self.inner(), consistency.inner()).to_result(self) }
104    }
105
106    /// Sets the batch's serial consistency level.
107    ///
108    /// <b>Default:</b> Not set
109    pub fn set_serial_consistency(&mut self, consistency: Consistency) -> Result<&mut Self> {
110        unsafe {
111            cass_batch_set_serial_consistency(self.inner(), consistency.inner()).to_result(self)
112        }
113    }
114
115    /// Sets the batch's timestamp.
116    pub fn set_timestamp(&mut self, timestamp: i64) -> Result<&Self> {
117        unsafe { cass_batch_set_timestamp(self.inner(), timestamp).to_result(self) }
118    }
119
120    /// Sets the batch's retry policy.
121    pub fn set_retry_policy(&mut self, retry_policy: RetryPolicy) -> Result<&mut Self> {
122        unsafe { cass_batch_set_retry_policy(self.inner(), retry_policy.inner()).to_result(self) }
123    }
124
125    /// Sets the batch's custom payload.
126    pub fn set_custom_payload(&mut self, custom_payload: CustomPayload) -> Result<&mut Self> {
127        unsafe {
128            cass_batch_set_custom_payload(self.inner(), custom_payload.inner()).to_result(self)
129        }
130    }
131
132    /// Adds a statement to a batch.
133    pub fn add_statement(&mut self, statement: Statement) -> Result<&Self> {
134        // If their sessions are not the same, we can reject at this level.
135        if self.session() != statement.session() {
136            return Err(ErrorKind::BatchSessionMismatch(
137                self.session().clone(),
138                statement.session().clone(),
139            )
140            .into());
141        }
142        unsafe { cass_batch_add_statement(self.inner(), statement.inner()).to_result(self) }
143    }
144}
145
146/// A type of batch.
147#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)]
148#[allow(missing_docs)] // Meanings are defined in CQL documentation.
149#[allow(non_camel_case_types)] // Names are traditional.
150pub enum BatchType {
151    LOGGED,
152    UNLOGGED,
153    COUNTER,
154}
155
156enhance_nullary_enum!(BatchType, CassBatchType_, {
157    (LOGGED, CASS_BATCH_TYPE_LOGGED, "LOGGED"),
158    (UNLOGGED, CASS_BATCH_TYPE_UNLOGGED, "UNLOGGED"),
159    (COUNTER, CASS_BATCH_TYPE_COUNTER, "COUNTER"),
160});