discord_cassandra_cpp/cassandra/
batch.rs

1use crate::cassandra::consistency::Consistency;
2use crate::cassandra::error::*;
3use crate::cassandra::future::CassFuture;
4use crate::cassandra::policy::retry::RetryPolicy;
5use crate::cassandra::statement::Statement;
6use crate::cassandra::util::{Protected, ProtectedInner, ProtectedWithSession};
7use crate::{session_scope, CassResult, Session};
8
9use crate::cassandra_sys::cass_batch_add_statement;
10use crate::cassandra_sys::cass_batch_free;
11use crate::cassandra_sys::cass_batch_new;
12use crate::cassandra_sys::cass_batch_set_consistency;
13use crate::cassandra_sys::cass_batch_set_custom_payload;
14use crate::cassandra_sys::cass_batch_set_is_idempotent;
15use crate::cassandra_sys::cass_batch_set_retry_policy;
16use crate::cassandra_sys::cass_batch_set_serial_consistency;
17use crate::cassandra_sys::cass_batch_set_timestamp;
18use crate::cassandra_sys::cass_bool_t::cass_false;
19use crate::cassandra_sys::cass_bool_t::cass_true;
20use crate::cassandra_sys::cass_custom_payload_free;
21use crate::cassandra_sys::cass_custom_payload_new;
22use crate::cassandra_sys::cass_custom_payload_set_n;
23use crate::cassandra_sys::cass_session_execute_batch;
24use crate::cassandra_sys::CassBatch as _Batch;
25use crate::cassandra_sys::CassBatchType_;
26use crate::cassandra_sys::CassConsistency;
27use crate::cassandra_sys::CassCustomPayload as _CassCustomPayload;
28use std::ffi::NulError;
29use std::os::raw::c_char;
30
31#[derive(Debug)]
32struct BatchInner(*mut _Batch);
33
34/// A group of statements that are executed as a single batch.
35/// <b>Note:</b> Batches are not supported by the binary protocol version 1.
36#[derive(Debug)]
37pub struct Batch<T = session_scope::Bound>(BatchInner, Session<T>);
38
39// The underlying C type has no thread-local state, but does not support access
40// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
41unsafe impl Send for BatchInner {}
42
43impl ProtectedInner<*mut _Batch> for BatchInner {
44    #[inline(always)]
45    fn inner(&self) -> *mut _Batch {
46        self.0
47    }
48}
49
50impl Protected<*mut _Batch> for BatchInner {
51    #[inline(always)]
52    fn build(inner: *mut _Batch) -> Self {
53        if inner.is_null() {
54            panic!("Unexpected null pointer")
55        };
56        Self(inner)
57    }
58}
59
60impl<T> ProtectedInner<*mut _Batch> for Batch<T> {
61    #[inline(always)]
62    fn inner(&self) -> *mut _Batch {
63        self.0.inner()
64    }
65}
66
67impl<S> ProtectedWithSession<*mut _Batch, S> for Batch<S> {
68    #[inline(always)]
69    fn build(inner: *mut _Batch, session: Session<S>) -> Self {
70        Self(BatchInner::build(inner), session)
71    }
72
73    #[inline(always)]
74    fn session(&self) -> &Session<S> {
75        &self.1
76    }
77}
78
79/// Custom payloads not fully supported yet
80#[derive(Debug)]
81pub struct CustomPayload(*mut _CassCustomPayload);
82
83impl ProtectedInner<*mut _CassCustomPayload> for CustomPayload {
84    fn inner(&self) -> *mut _CassCustomPayload {
85        self.0
86    }
87}
88
89impl Protected<*mut _CassCustomPayload> for CustomPayload {
90    fn build(inner: *mut _CassCustomPayload) -> Self {
91        if inner.is_null() {
92            panic!("Unexpected null pointer")
93        };
94        CustomPayload(inner)
95    }
96}
97
98impl Default for CustomPayload {
99    /// creates a new custom payload
100    fn default() -> Self {
101        unsafe { CustomPayload(cass_custom_payload_new()) }
102    }
103}
104impl CustomPayload {
105    /// Sets an item to the custom payload.
106    pub fn set(&self, name: String, value: &[u8]) -> Result<()> {
107        unsafe {
108            let name_ptr = name.as_ptr() as *const c_char;
109            Ok(cass_custom_payload_set_n(
110                self.0,
111                name_ptr,
112                name.len(),
113                value.as_ptr(),
114                value.len(),
115            ))
116        }
117    }
118}
119
120impl Drop for CustomPayload {
121    fn drop(&mut self) {
122        unsafe { cass_custom_payload_free(self.0) }
123    }
124}
125
126impl Drop for BatchInner {
127    /// Frees a batch instance. Batches can be immediately freed after being
128    /// executed.
129    fn drop(&mut self) {
130        unsafe { cass_batch_free(self.0) }
131    }
132}
133
134impl<T> Batch<T> {
135    /// Creates a new batch statement with batch type.
136    pub(crate) fn new(batch_type: BatchType, session: Session<T>) -> Self {
137        unsafe { Batch(BatchInner(cass_batch_new(batch_type.inner())), session) }
138    }
139
140    /// Returns the session of which this batch is bound to.
141    pub fn session(&self) -> &Session<T> {
142        ProtectedWithSession::session(self)
143    }
144
145    /// Executes this batch.
146    pub async fn execute(self) -> Result<CassResult> {
147        let (batch, session) = (self.0, self.1);
148        let execute_future = {
149            let execute_batch =
150                unsafe { cass_session_execute_batch(session.inner(), batch.inner()) };
151            CassFuture::build(session, execute_batch)
152        };
153        execute_future.await
154    }
155
156    /// Sets the batch's consistency level
157    pub fn set_consistency(&mut self, consistency: Consistency) -> Result<&mut Self> {
158        unsafe { cass_batch_set_consistency(self.inner(), consistency.inner()).to_result(self) }
159    }
160
161    /// Sets the batch's serial consistency level.
162    ///
163    /// <b>Default:</b> Not set
164    pub fn set_serial_consistency(&mut self, consistency: Consistency) -> Result<&mut Self> {
165        unsafe {
166            cass_batch_set_serial_consistency(self.inner(), consistency.inner()).to_result(self)
167        }
168    }
169
170    /// Sets the batch's timestamp.
171    pub fn set_timestamp(&mut self, timestamp: i64) -> Result<&mut Self> {
172        unsafe { cass_batch_set_timestamp(self.inner(), timestamp).to_result(self) }
173    }
174
175    /// Sets the batch's retry policy.
176    pub fn set_retry_policy(&mut self, retry_policy: RetryPolicy) -> Result<&mut Self> {
177        unsafe { cass_batch_set_retry_policy(self.inner(), retry_policy.inner()).to_result(self) }
178    }
179
180    /// Sets the batch's custom payload.
181    pub fn set_custom_payload(&mut self, custom_payload: CustomPayload) -> Result<&mut Self> {
182        unsafe { cass_batch_set_custom_payload(self.inner(), custom_payload.0).to_result(self) }
183    }
184
185    /// Adds a statement to a batch.
186    pub fn add_statement(&mut self, statement: Statement<T>) -> Result<&mut Self> {
187        // If their sessions are not the same, we can reject at this level.
188        if self.session() != statement.session() {
189            return Err(ErrorKind::BatchSessionMismatch.into());
190        }
191        unsafe { cass_batch_add_statement(self.inner(), statement.inner()).to_result(self) }
192    }
193
194    /// Sets whether the statements in a batch are idempotent. Idempotent batches are able
195    /// to be automatically retried after timeouts/errors and can be speculatively executed.
196    pub fn set_is_idempotent(&mut self, is_idempotent: bool) -> Result<&mut Self> {
197        unsafe {
198            cass_batch_set_is_idempotent(
199                self.inner(),
200                if is_idempotent { cass_true } else { cass_false },
201            )
202            .to_result(self)
203        }
204    }
205}
206
207/// A type of batch.
208#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)]
209#[allow(missing_docs)] // Meanings are defined in CQL documentation.
210#[allow(non_camel_case_types)] // Names are traditional.
211pub enum BatchType {
212    LOGGED,
213    UNLOGGED,
214    COUNTER,
215}
216
217enhance_nullary_enum!(BatchType, CassBatchType_, {
218    (LOGGED, CASS_BATCH_TYPE_LOGGED, "LOGGED"),
219    (UNLOGGED, CASS_BATCH_TYPE_UNLOGGED, "UNLOGGED"),
220    (COUNTER, CASS_BATCH_TYPE_COUNTER, "COUNTER"),
221});