1use crate::env_property_reader::{database_url, keyspace, password, username};
2use once_cell::sync::Lazy;
3use scylla::_macro_internal::SerializeRow;
4use scylla::execution_profile::ExecutionProfileBuilder;
5use scylla::frame::types::Consistency;
6use scylla::query::Query;
7use scylla::{FromRow, IntoTypedRows, Session, SessionBuilder};
8use tokio::runtime::{self, Runtime};
9
10pub const TEST_TABLE: &str = "test_table";
11pub const ANOTHER_TEST_TABLE: &str = "another_test_table";
12
13pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
15 runtime::Builder::new_multi_thread()
16 .enable_time()
17 .enable_io()
18 .build()
19 .expect("failed to initialize Tokio runtime")
20});
21
22pub fn query_collect_to_vec<Entity: FromRow>(
23 query: impl Into<Query>,
24 values: impl SerializeRow,
25) -> Vec<Entity> {
26 touch_global_connection();
27
28 block_on(async move {
29 GLOBAL_CONNECTION
30 .query_unpaged(query, values)
31 .await
32 .unwrap()
33 .rows
34 .unwrap()
35 .into_typed::<_>()
36 .map(|r| r.unwrap())
37 .collect()
38 })
39}
40
41pub fn query(query: impl Into<Query>, values: impl SerializeRow) {
42 touch_global_connection();
43
44 block_on(async move {
45 GLOBAL_CONNECTION
46 .query_unpaged(query, values)
47 .await
48 .unwrap()
49 });
50}
51
52pub fn use_keyspace(keyspace: &str) {
53 touch_global_connection();
54
55 block_on(async {
56 GLOBAL_CONNECTION
57 .use_keyspace(keyspace, false)
58 .await
59 .unwrap();
60 });
61}
62
63pub fn touch_global_connection() {
65 GLOBAL_CONNECTION.get_metrics();
66}
67
68pub fn set_keyspace() {
69 use_keyspace(&keyspace());
70}
71
72pub fn create_test_tables() {
73 touch_global_connection();
74
75 block_on(async {
76 GLOBAL_CONNECTION.query_unpaged(format!("create table if not exists {} (a int, b int, c int, d int, e int, primary key((b, c), d, a))", TEST_TABLE), []).await.unwrap();
77 GLOBAL_CONNECTION.query_unpaged(format!("create table if not exists {} (a int, b text, c text, d int, primary key((a), b, c))", ANOTHER_TEST_TABLE), []).await.unwrap();
78 })
79}
80
81pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
82 RUNTIME.block_on(future)
83}
84
85pub async fn create_connection() -> Session {
86 dotenvy::dotenv().unwrap();
87
88 let session = SessionBuilder::new()
89 .known_node(database_url())
90 .user(username(), password())
91 .default_execution_profile_handle(
92 ExecutionProfileBuilder::default()
93 .consistency(Consistency::One)
94 .build()
95 .into_handle(),
96 )
97 .build()
98 .await
99 .unwrap();
100
101 session
102 .query_unpaged(format!(
103 "create keyspace if not exists {} with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }} and durable_writes = false",
104 keyspace()
105 ), [])
106 .await
107 .unwrap();
108
109 session.use_keyspace(keyspace(), false).await.unwrap();
110
111 session
112}
113
114pub static GLOBAL_CONNECTION: Lazy<Session> = Lazy::new(|| block_on(create_connection()));
115
116#[cfg(test)]
117mod tests {
118 use crate::runtime::{query, TEST_TABLE};
119
120 #[test]
121 fn it_works() {
122 query(format!("select * from {}", TEST_TABLE), []);
123 }
124}