cassandra_cpp/cassandra/
session.rs

1#![allow(non_camel_case_types)]
2#![allow(dead_code)]
3#![allow(missing_copy_implementations)]
4
5use crate::cassandra::custom_payload::CustomPayloadResponse;
6use crate::cassandra::error::*;
7use crate::cassandra::future::CassFuture;
8use crate::cassandra::metrics::SessionMetrics;
9use crate::cassandra::prepared::PreparedStatement;
10use crate::cassandra::result::CassResult;
11use crate::cassandra::schema::schema_meta::SchemaMeta;
12use crate::cassandra::statement::Statement;
13use crate::cassandra::util::{Protected, ProtectedInner};
14use crate::{cassandra::batch::Batch, BatchType};
15
16use crate::cassandra_sys::cass_session_execute;
17use crate::cassandra_sys::cass_session_execute_batch;
18use crate::cassandra_sys::cass_session_free;
19use crate::cassandra_sys::cass_session_get_metrics;
20use crate::cassandra_sys::cass_session_get_schema_meta;
21use crate::cassandra_sys::cass_session_new;
22use crate::cassandra_sys::cass_session_prepare_n;
23use crate::cassandra_sys::CassSession as _Session;
24
25use std::mem;
26use std::os::raw::c_char;
27use std::sync::Arc;
28
29#[derive(Debug, Eq, PartialEq)]
30pub struct SessionInner(*mut _Session);
31
32// The underlying C type has no thread-local state, and explicitly supports access
33// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
34unsafe impl Send for SessionInner {}
35unsafe impl Sync for SessionInner {}
36
37impl SessionInner {
38    fn new(inner: *mut _Session) -> Arc<Self> {
39        Arc::new(Self(inner))
40    }
41}
42
43/// A session object is used to execute queries and maintains cluster state through
44/// the control connection. The control connection is used to auto-discover nodes and
45/// monitor cluster changes (topology and schema). Each session also maintains multiple
46/// pools of connections to cluster nodes which are used to query the cluster.
47///
48/// Instances of the session object are thread-safe to execute queries.
49#[derive(Debug, Clone, Eq, PartialEq)]
50pub struct Session(pub Arc<SessionInner>);
51
52impl ProtectedInner<*mut _Session> for SessionInner {
53    fn inner(&self) -> *mut _Session {
54        self.0
55    }
56}
57
58impl ProtectedInner<*mut _Session> for Session {
59    fn inner(&self) -> *mut _Session {
60        self.0.inner()
61    }
62}
63
64impl Protected<*mut _Session> for Session {
65    fn build(inner: *mut _Session) -> Self {
66        if inner.is_null() {
67            panic!("Unexpected null pointer")
68        };
69        Session(SessionInner::new(inner))
70    }
71}
72
73impl Drop for SessionInner {
74    /// Frees a session instance. If the session is still connected it will be synchronously
75    /// closed before being deallocated.
76    fn drop(&mut self) {
77        unsafe { cass_session_free(self.0) }
78    }
79}
80
81impl Default for Session {
82    fn default() -> Session {
83        Session::new()
84    }
85}
86
87impl Session {
88    pub(crate) fn new() -> Session {
89        unsafe { Session(SessionInner::new(cass_session_new())) }
90    }
91
92    /// Create a prepared statement with the given query.
93    pub async fn prepare(&self, query: impl AsRef<str>) -> Result<PreparedStatement> {
94        let query = query.as_ref();
95        let prepare_future = {
96            let query_ptr = query.as_ptr() as *const c_char;
97            CassFuture::build(self.clone(), unsafe {
98                cass_session_prepare_n(self.inner(), query_ptr, query.len())
99            })
100        };
101        prepare_future.await
102    }
103
104    /// Creates a statement with the given query.
105    pub fn statement(&self, query: impl AsRef<str>) -> Statement {
106        let query = query.as_ref();
107        let param_count = query.matches('?').count();
108        Statement::new(self.clone(), query, param_count)
109    }
110
111    /// Execute a batch statement.
112    pub fn execute_batch(&self, batch: &Batch) -> CassFuture<CassResult> {
113        let inner_future = unsafe { cass_session_execute_batch(self.inner(), batch.inner()) };
114        <CassFuture<CassResult>>::build(self.clone(), inner_future)
115    }
116
117    /// Execute a batch statement and get any custom payloads from the response.
118    pub fn execute_batch_with_payloads(
119        &self,
120        batch: &Batch,
121    ) -> CassFuture<(CassResult, CustomPayloadResponse)> {
122        let inner_future = unsafe { cass_session_execute_batch(self.inner(), batch.inner()) };
123        <CassFuture<(CassResult, CustomPayloadResponse)>>::build(self.clone(), inner_future)
124    }
125
126    /// Executes a given query.
127    pub async fn execute(&self, query: impl AsRef<str>) -> Result<CassResult> {
128        let statement = self.statement(query);
129        statement.execute().await
130    }
131
132    /// Creates a new batch that is bound to this session.
133    pub fn batch(&self, batch_type: BatchType) -> Batch {
134        Batch::new(batch_type, self.clone())
135    }
136
137    /// Execute a statement and get any custom payloads from the response.
138    pub fn execute_with_payloads(
139        &self,
140        statement: &Statement,
141    ) -> CassFuture<(CassResult, CustomPayloadResponse)> {
142        let inner_future = unsafe { cass_session_execute(self.inner(), statement.inner()) };
143        <CassFuture<(CassResult, CustomPayloadResponse)>>::build(self.clone(), inner_future)
144    }
145
146    /// Gets a snapshot of this session's schema metadata. The returned
147    /// snapshot of the schema metadata is not updated. This function
148    /// must be called again to retrieve any schema changes since the
149    /// previous call.
150    pub fn get_schema_meta(&self) -> SchemaMeta {
151        unsafe { SchemaMeta::build(cass_session_get_schema_meta(self.inner())) }
152    }
153
154    /// Gets a copy of this session's performance/diagnostic metrics.
155    pub fn get_metrics(&self) -> SessionMetrics {
156        unsafe {
157            let mut metrics = mem::zeroed();
158            cass_session_get_metrics(self.inner(), &mut metrics);
159            SessionMetrics::build(&metrics)
160        }
161    }
162
163    //    pub fn get_schema(&self) -> Schema {
164    //        unsafe { Schema(cass_session_get_schema(self.0)) }
165    //    }
166}