sqlx_scylladb_core/
connection.rs

1mod establish;
2mod executor;
3mod transaction;
4
5use std::fmt::Debug;
6
7use futures_core::future::BoxFuture;
8use scylla::client::caching_session::CachingSession;
9use sqlx::{Connection, Transaction};
10
11use crate::{ScyllaDB, ScyllaDBConnectOptions, connection::transaction::ScyllaDBTransaction};
12
13/// Implementation of [sqlx::Connection] for ScyllaDB.
14pub struct ScyllaDBConnection {
15    pub(crate) caching_session: CachingSession,
16    pub(crate) page_size: i32,
17    pub(crate) transaction: Option<ScyllaDBTransaction>,
18}
19
20impl Debug for ScyllaDBConnection {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("ScyllaDBConnection")
23            .field("caching_session", &self.caching_session)
24            .field("page_size", &self.page_size)
25            .finish()
26    }
27}
28
29impl Connection for ScyllaDBConnection {
30    type Database = ScyllaDB;
31
32    type Options = ScyllaDBConnectOptions;
33
34    fn close(self) -> BoxFuture<'static, Result<(), sqlx::Error>> {
35        Box::pin(async move { Ok(()) })
36    }
37
38    fn close_hard(self) -> BoxFuture<'static, Result<(), sqlx::Error>> {
39        Box::pin(async move { Ok(()) })
40    }
41
42    fn ping(&mut self) -> BoxFuture<'_, Result<(), sqlx::Error>> {
43        Box::pin(async move {
44            let state = self.caching_session.get_session().get_cluster_state();
45            let nodes = state.get_nodes_info();
46            for node in nodes {
47                if !node.is_connected() {
48                    return Err(sqlx::Error::PoolClosed);
49                }
50            }
51            Ok(())
52        })
53    }
54
55    fn begin(&mut self) -> BoxFuture<'_, Result<sqlx::Transaction<'_, Self::Database>, sqlx::Error>>
56    where
57        Self: Sized,
58    {
59        Transaction::begin(self, None)
60    }
61
62    fn shrink_buffers(&mut self) {
63        ()
64    }
65
66    fn flush(&mut self) -> BoxFuture<'_, Result<(), sqlx::Error>> {
67        Box::pin(async move { Ok(()) })
68    }
69
70    fn should_flush(&self) -> bool {
71        false
72    }
73}
74
75impl ScyllaDBConnection {
76    #[cfg(feature = "migrate")]
77    pub(crate) fn get_keyspace(&self) -> Option<String> {
78        self.caching_session
79            .get_session()
80            .get_keyspace()
81            .as_deref()
82            .cloned()
83    }
84}