discord_cassandra_cpp/cassandra/
session.rs

1#![allow(non_camel_case_types)]
2#![allow(dead_code)]
3#![allow(missing_copy_implementations)]
4
5use crate::cassandra::cluster::Cluster;
6use crate::cassandra::error::*;
7use crate::cassandra::future::CassFuture;
8use crate::cassandra::metrics::SessionMetrics;
9use crate::cassandra::prepared::PreparedStatement;
10use crate::cassandra::result::CassResult;
11use crate::cassandra::schema::schema_meta::SchemaMeta;
12use crate::cassandra::statement::Statement;
13use crate::cassandra::util::{Protected, ProtectedInner};
14use crate::{cassandra::batch::Batch, BatchType};
15
16use crate::cassandra_sys::cass_session_close;
17use crate::cassandra_sys::cass_session_connect;
18use crate::cassandra_sys::cass_session_connect_keyspace_n;
19use crate::cassandra_sys::cass_session_execute;
20use crate::cassandra_sys::cass_session_execute_batch;
21use crate::cassandra_sys::cass_session_free;
22use crate::cassandra_sys::cass_session_get_metrics;
23use crate::cassandra_sys::cass_session_get_schema_meta;
24use crate::cassandra_sys::cass_session_new;
25use crate::cassandra_sys::cass_session_prepare_n;
26use crate::cassandra_sys::CassSession as _Session;
27
28use std::ffi::NulError;
29use std::mem;
30use std::os::raw::c_char;
31use std::sync::Arc;
32
33#[derive(Debug, Eq, PartialEq)]
34struct SessionInner<T>(T, *mut _Session);
35
36// The underlying C type has no thread-local state, and explicitly supports access
37// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
38unsafe impl<T: Send> Send for SessionInner<T> {}
39unsafe impl<T: Sync> Sync for SessionInner<T> {}
40
41impl<T> SessionInner<T> {
42    fn new(bound: T, inner: *mut _Session) -> Arc<Self> {
43        Arc::new(Self(bound, inner))
44    }
45}
46
47/// The session scope defines if the session is bound [`session_scope::Bound`], meaning it is associated
48/// with a keyspace.  Or unbound [`session_scope::Unbound`], meaning it is not associated with any keyspace.
49///
50/// Keyspace association is done by the `connect` and `connect_lazy` methods in [`Cluster`].
51pub trait SessionScope: session_scope::sealed::Sealed + Clone + Send + Sync + 'static {}
52
53/// See [`SessionScope`] for more information.
54pub mod session_scope {
55    pub use super::SessionScope;
56
57    pub(super) mod sealed {
58        pub trait Sealed {}
59    }
60
61    /// The session is bound to a given keyspace.
62    #[derive(Debug, Clone)]
63    pub struct Bound {
64        pub(crate) keyspace: String,
65    }
66
67    impl Bound {
68        pub(crate) fn new(keyspace: impl Into<String>) -> Self {
69            Self {
70                keyspace: keyspace.into(),
71            }
72        }
73    }
74
75    /// The session is not bound to any keyspace in particular.
76    #[derive(Debug, Clone)]
77    pub struct Unbound;
78
79    impl sealed::Sealed for Bound {}
80    impl sealed::Sealed for Unbound {}
81
82    impl super::SessionScope for Bound {}
83    impl super::SessionScope for Unbound {}
84}
85/// A session object is used to execute queries and maintains cluster state through
86/// the control connection. The control connection is used to auto-discover nodes and
87/// monitor cluster changes (topology and schema). Each session also maintains multiple
88/// /pools of connections to cluster nodes which are used to query the cluster.
89///
90/// Instances of the session object are thread-safe to execute queries.
91#[derive(Debug, Clone)]
92pub struct Session<T = session_scope::Bound>(Arc<SessionInner<T>>);
93
94impl<T> Eq for Session<T> {}
95
96impl<T> PartialEq for Session<T> {
97    fn eq(&self, other: &Self) -> bool {
98        (self.0).1 == (other.0).1
99    }
100}
101
102// The underlying C type has no thread-local state, and explicitly supports access
103// from multiple threads: https://datastax.github.io/cpp-driver/topics/#thread-safety
104unsafe impl<T: Send> Send for Session<T> {}
105unsafe impl<T: Sync> Sync for Session<T> {}
106
107impl<T> ProtectedInner<*mut _Session> for SessionInner<T> {
108    fn inner(&self) -> *mut _Session {
109        self.1
110    }
111}
112
113impl<T> ProtectedInner<*mut _Session> for Session<T> {
114    fn inner(&self) -> *mut _Session {
115        self.0.inner()
116    }
117}
118
119impl Protected<*mut _Session> for Session<session_scope::Unbound> {
120    fn build(inner: *mut _Session) -> Self {
121        if inner.is_null() {
122            panic!("Unexpected null pointer")
123        };
124        Session(SessionInner::new(session_scope::Unbound, inner))
125    }
126}
127
128impl<T> Drop for SessionInner<T> {
129    /// Frees a session instance. If the session is still connected it will be synchronously
130    /// closed before being deallocated.
131    fn drop(&mut self) {
132        unsafe { cass_session_free(self.1) }
133    }
134}
135
136impl Default for Session<session_scope::Unbound> {
137    fn default() -> Session<session_scope::Unbound> {
138        Session::new(session_scope::Unbound)
139    }
140}
141
142impl<S> Session<S>
143where
144    S: SessionScope,
145{
146    pub(crate) fn new(bound: S) -> Self {
147        unsafe { Session(SessionInner::new(bound, cass_session_new())) }
148    }
149
150    /// Creates a statement with the given query.
151    pub fn statement(&self, query: impl AsRef<str>) -> Statement<S> {
152        let query = query.as_ref();
153        let param_count = query.matches("?").count();
154        Statement::new(self.clone(), query, param_count)
155    }
156
157    /// Executes a given query.
158    pub async fn execute(&self, query: impl AsRef<str>) -> Result<CassResult> {
159        let statement = self.statement(query);
160        statement.execute().await
161    }
162
163    /// Creates a new batch that is bound to this session.
164    pub fn batch(&self, batch_type: BatchType) -> Batch<S> {
165        Batch::new(batch_type, self.clone())
166    }
167
168    /// Gets a snapshot of this session's schema metadata. The returned
169    /// snapshot of the schema metadata is not updated. This function
170    /// must be called again to retrieve any schema changes since the
171    /// previous call.
172    pub fn get_schema_meta(&self) -> SchemaMeta {
173        unsafe { SchemaMeta::build(cass_session_get_schema_meta(self.inner())) }
174    }
175
176    /// Gets a copy of this session's performance/diagnostic metrics.
177    pub fn get_metrics(&self) -> SessionMetrics {
178        unsafe {
179            let mut metrics = mem::zeroed();
180            cass_session_get_metrics(self.inner(), &mut metrics);
181            SessionMetrics::build(&metrics)
182        }
183    }
184
185    /// Create a prepared statement with the given query.
186    pub async fn prepare(&self, query: impl AsRef<str>) -> Result<PreparedStatement<S>> {
187        let query = query.as_ref();
188        let prepare_future = {
189            let query_ptr = query.as_ptr() as *const c_char;
190            CassFuture::build(self.clone(), unsafe {
191                cass_session_prepare_n(self.inner(), query_ptr, query.len())
192            })
193        };
194        prepare_future.await
195    }
196
197    //    pub fn get_schema(&self) -> Schema {
198    //        unsafe { Schema(cass_session_get_schema(self.0)) }
199    //    }
200}
201
202impl Session<session_scope::Bound> {
203    /// Returns the keyspace that the session is bound to.
204    pub fn keyspace(&self) -> &str {
205        &(self.0).0.keyspace
206    }
207}