catalytic/
runtime.rs

1use crate::env_property_reader::{database_url, keyspace, password, username};
2use once_cell::sync::Lazy;
3use scylla::_macro_internal::SerializeRow;
4use scylla::execution_profile::ExecutionProfileBuilder;
5use scylla::frame::types::Consistency;
6use scylla::query::Query;
7use scylla::{FromRow, IntoTypedRows, Session, SessionBuilder};
8use tokio::runtime::{self, Runtime};
9
10pub const TEST_TABLE: &str = "test_table";
11pub const ANOTHER_TEST_TABLE: &str = "another_test_table";
12
13/// The runtime can be used to use the scylla driver in non-async context (proc macro's e.g.)
14pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
15    runtime::Builder::new_multi_thread()
16        .enable_time()
17        .enable_io()
18        .build()
19        .expect("failed to initialize Tokio runtime")
20});
21
22pub fn query_collect_to_vec<Entity: FromRow>(
23    query: impl Into<Query>,
24    values: impl SerializeRow,
25) -> Vec<Entity> {
26    touch_global_connection();
27
28    block_on(async move {
29        GLOBAL_CONNECTION
30            .query_unpaged(query, values)
31            .await
32            .unwrap()
33            .rows
34            .unwrap()
35            .into_typed::<_>()
36            .map(|r| r.unwrap())
37            .collect()
38    })
39}
40
41pub fn query(query: impl Into<Query>, values: impl SerializeRow) {
42    touch_global_connection();
43
44    block_on(async move {
45        GLOBAL_CONNECTION
46            .query_unpaged(query, values)
47            .await
48            .unwrap()
49    });
50}
51
52pub fn use_keyspace(keyspace: &str) {
53    touch_global_connection();
54
55    block_on(async {
56        GLOBAL_CONNECTION
57            .use_keyspace(keyspace, false)
58            .await
59            .unwrap();
60    });
61}
62
63/// Touch the global state so it gets initialized
64pub fn touch_global_connection() {
65    GLOBAL_CONNECTION.get_metrics();
66}
67
68pub fn set_keyspace() {
69    use_keyspace(&keyspace());
70}
71
72pub fn create_test_tables() {
73    touch_global_connection();
74
75    block_on(async {
76        GLOBAL_CONNECTION.query_unpaged(format!("create table if not exists {} (a int, b int, c int, d int, e int, primary key((b, c), d, a))", TEST_TABLE), []).await.unwrap();
77        GLOBAL_CONNECTION.query_unpaged(format!("create table if not exists {}  (a int, b text, c text, d int, primary key((a), b, c))", ANOTHER_TEST_TABLE), []).await.unwrap();
78    })
79}
80
81pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
82    RUNTIME.block_on(future)
83}
84
85pub async fn create_connection() -> Session {
86    dotenvy::dotenv().unwrap();
87
88    let session = SessionBuilder::new()
89        .known_node(database_url())
90        .user(username(), password())
91        .default_execution_profile_handle(
92            ExecutionProfileBuilder::default()
93                .consistency(Consistency::One)
94                .build()
95                .into_handle(),
96        )
97        .build()
98        .await
99        .unwrap();
100
101    session
102        .query_unpaged(format!(
103            "create keyspace if not exists {} with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }} and durable_writes = false",
104            keyspace()
105        ), [])
106        .await
107        .unwrap();
108
109    session.use_keyspace(keyspace(), false).await.unwrap();
110
111    session
112}
113
114pub static GLOBAL_CONNECTION: Lazy<Session> = Lazy::new(|| block_on(create_connection()));
115
116#[cfg(test)]
117mod tests {
118    use crate::runtime::{query, TEST_TABLE};
119
120    #[test]
121    fn it_works() {
122        query(format!("select * from {}", TEST_TABLE), []);
123    }
124}