1use std::{path::PathBuf, str};
2
3use anyhow::{anyhow, Result};
4use scylla::Session;
5
6pub use crate::cql_file::CqlFile;
7use crate::keyspace::*;
8pub use crate::migrate::{MigrateError, MigrateErrorState};
9use crate::queries::*;
10
11#[allow(unused)]
12mod cql;
13mod cql_file;
14pub mod keyspace;
15mod migrate;
16mod queries;
17#[cfg(test)]
18pub(crate) mod test_utils;
19
20const NODE_ADDRESS: &str = "127.0.0.1:9042";
21
22pub const KEYSPACE: &str = "cquill";
23
24pub const TABLE: &str = "migrated_cql";
25
26pub struct MigrateOpts {
27 pub cassandra_opts: Option<CassandraOpts>,
28 pub cql_dir: PathBuf,
29 pub history_keyspace: Option<KeyspaceOpts>,
30 pub history_table: Option<String>,
31}
32
33#[derive(Default)]
34pub struct CassandraOpts {
35 pub cassandra_host: Option<String>,
36}
37
38impl CassandraOpts {
39 pub fn node_address(&self) -> String {
40 let node_address = match &self.cassandra_host {
41 None => std::env::var("CASSANDRA_NODE").unwrap_or(NODE_ADDRESS.to_string()),
42 Some(cassandra_host) => cassandra_host.clone(),
43 };
44 if node_address.contains(':') {
45 node_address
46 } else {
47 format!("{node_address}:9042")
48 }
49 }
50}
51
52pub async fn migrate_cql(opts: MigrateOpts) -> Result<Vec<CqlFile>, MigrateError> {
57 let cql_files = cql_file::files_from_dir(&opts.cql_dir)?;
58 let node_address = opts.cassandra_opts.unwrap_or_default().node_address();
59 let session = cql_session(node_address).await?;
60
61 let cquill_keyspace = opts
62 .history_keyspace
63 .unwrap_or_else(|| KeyspaceOpts::simple(String::from(KEYSPACE), 1));
64 let history_table = opts.history_table.unwrap_or_else(|| String::from(TABLE));
65 prepare_cquill_keyspace(&session, &cquill_keyspace, &history_table).await?;
66
67 migrate::perform(
68 &session,
69 &cql_files,
70 migrate::MigrateArgs {
71 cql_dir: opts.cql_dir,
72 history_keyspace: cquill_keyspace.name,
73 history_table,
74 },
75 )
76 .await
77}
78
79async fn prepare_cquill_keyspace(
81 session: &Session,
82 keyspace: &KeyspaceOpts,
83 table_name: &String,
84) -> Result<()> {
85 let create_table: bool = match table_names_from_session_metadata(session, &keyspace.name) {
86 Ok(table_names) => !table_names.contains(table_name),
87 Err(_) => {
88 queries::keyspace::create(session, keyspace).await?;
89 true
90 }
91 };
92 if create_table {
93 migrated::table::create(session, &keyspace.name, table_name).await?;
94 }
95 Ok(())
96}
97
98async fn cql_session(node_address: String) -> Result<Session> {
99 let connecting = scylla::SessionBuilder::new()
100 .known_node(&node_address)
101 .build()
102 .await;
103 match connecting {
104 Ok(session) => Ok(session),
105 Err(_) => Err(anyhow!("could not connect to {}", &node_address)),
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112
113 #[test]
114 fn test_cassandra_opts_provides_node_address() {
115 let without_host = CassandraOpts {
116 cassandra_host: None,
117 };
118 let with_host = CassandraOpts {
119 cassandra_host: Some("localhost".to_string()),
120 };
121 let with_port = CassandraOpts {
122 cassandra_host: Some("localhost:9043".to_string()),
123 };
124 assert_eq!(
125 without_host.node_address(),
126 std::env::var("CASSANDRA_NODE").unwrap_or(NODE_ADDRESS.to_string())
127 );
128 assert_eq!(with_host.node_address(), "localhost:9042");
129 assert_eq!(with_port.node_address(), "localhost:9043");
130 }
131
132 #[tokio::test]
133 async fn test_prepare_cquill_keyspace_when_keyspace_does_not_exist() {
134 let session = test_utils::cql_session().await;
135 let keyspace_opts = KeyspaceOpts::simple(test_utils::keyspace_name(), 1);
136 let table_name = String::from("table_name");
137
138 if let Err(err) = prepare_cquill_keyspace(&session, &keyspace_opts, &table_name).await {
139 println!("{err}");
140 panic!();
141 }
142 match table_names_from_session_metadata(&session, &keyspace_opts.name) {
143 Ok(table_names) => assert!(table_names.contains(&table_name)),
144 Err(_) => panic!(),
145 }
146
147 queries::keyspace::drop(&session, &keyspace_opts.name)
148 .await
149 .expect("drop keyspace");
150 }
151
152 #[tokio::test]
153 async fn test_prepare_cquill_keyspace_when_table_does_not_exist() {
154 let session = test_utils::cql_session().await;
155 let keyspace_opts = test_utils::create_keyspace(&session).await;
156 let table_name = String::from("table_name");
157
158 prepare_cquill_keyspace(&session, &keyspace_opts, &table_name)
159 .await
160 .expect("prepare keyspace");
161 match table_names_from_session_metadata(&session, &keyspace_opts.name) {
162 Ok(table_names) => assert!(table_names.contains(&table_name)),
163 Err(_) => panic!(),
164 }
165
166 queries::keyspace::drop(&session, &keyspace_opts.name)
167 .await
168 .expect("drop keyspace");
169 }
170
171 #[tokio::test]
172 async fn test_prepare_cquill_keyspace_when_keyspace_and_table_exist() {
173 let harness = test_utils::TestHarness::builder().initialize().await;
174
175 prepare_cquill_keyspace(
176 &harness.session,
177 &KeyspaceOpts::simple(harness.cquill_keyspace.clone(), 1),
178 &harness.cquill_table,
179 )
180 .await
181 .expect("prepare keyspace");
182 match table_names_from_session_metadata(&harness.session, &harness.cquill_keyspace) {
183 Ok(table_names) => assert!(table_names.contains(&harness.cquill_table)),
184 Err(_) => panic!(),
185 }
186
187 harness.drop_keyspace().await;
188 }
189}