use std::{path::PathBuf, str};
use anyhow::{anyhow, Result};
use scylla::Session;
pub use crate::cql_file::CqlFile;
use crate::keyspace::*;
pub use crate::migrate::{MigrateError, MigrateErrorState};
use crate::queries::*;
#[allow(unused)]
mod cql;
mod cql_file;
pub mod keyspace;
mod migrate;
mod queries;
#[cfg(test)]
pub(crate) mod test_utils;
const NODE_ADDRESS: &str = "127.0.0.1:9042";
pub const KEYSPACE: &str = "cquill";
pub const TABLE: &str = "migrated_cql";
pub struct MigrateOpts {
pub cassandra_opts: Option<CassandraOpts>,
pub cql_dir: PathBuf,
pub history_keyspace: Option<KeyspaceOpts>,
pub history_table: Option<String>,
}
#[derive(Default)]
pub struct CassandraOpts {
pub cassandra_host: Option<String>,
}
impl CassandraOpts {
pub fn node_address(&self) -> String {
let node_address = match &self.cassandra_host {
None => std::env::var("CASSANDRA_NODE").unwrap_or(NODE_ADDRESS.to_string()),
Some(cassandra_host) => cassandra_host.clone(),
};
if node_address.contains(':') {
node_address
} else {
format!("{node_address}:9042")
}
}
}
pub async fn migrate_cql(opts: MigrateOpts) -> Result<Vec<CqlFile>, MigrateError> {
let cql_files = cql_file::files_from_dir(&opts.cql_dir)?;
let node_address = opts.cassandra_opts.unwrap_or_default().node_address();
let session = cql_session(node_address).await?;
let cquill_keyspace = opts
.history_keyspace
.unwrap_or_else(|| KeyspaceOpts::simple(String::from(KEYSPACE), 1));
let history_table = opts.history_table.unwrap_or_else(|| String::from(TABLE));
prepare_cquill_keyspace(&session, &cquill_keyspace, &history_table).await?;
migrate::perform(
&session,
&cql_files,
migrate::MigrateArgs {
cql_dir: opts.cql_dir,
history_keyspace: cquill_keyspace.name,
history_table,
},
)
.await
}
async fn prepare_cquill_keyspace(
session: &Session,
keyspace: &KeyspaceOpts,
table_name: &String,
) -> Result<()> {
let create_table: bool = match table_names_from_session_metadata(session, &keyspace.name) {
Ok(table_names) => !table_names.contains(table_name),
Err(_) => {
queries::keyspace::create(session, keyspace).await?;
true
}
};
if create_table {
migrated::table::create(session, &keyspace.name, table_name).await?;
}
Ok(())
}
async fn cql_session(node_address: String) -> Result<Session> {
let connecting = scylla::SessionBuilder::new()
.known_node(&node_address)
.build()
.await;
match connecting {
Ok(session) => Ok(session),
Err(_) => Err(anyhow!("could not connect to {}", &node_address)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cassandra_opts_provides_node_address() {
let without_host = CassandraOpts {
cassandra_host: None,
};
let with_host = CassandraOpts {
cassandra_host: Some("localhost".to_string()),
};
let with_port = CassandraOpts {
cassandra_host: Some("localhost:9043".to_string()),
};
assert_eq!(
without_host.node_address(),
std::env::var("CASSANDRA_NODE").unwrap_or(NODE_ADDRESS.to_string())
);
assert_eq!(with_host.node_address(), "localhost:9042");
assert_eq!(with_port.node_address(), "localhost:9043");
}
#[tokio::test]
async fn test_prepare_cquill_keyspace_when_keyspace_does_not_exist() {
let session = test_utils::cql_session().await;
let keyspace_opts = KeyspaceOpts::simple(test_utils::keyspace_name(), 1);
let table_name = String::from("table_name");
if let Err(err) = prepare_cquill_keyspace(&session, &keyspace_opts, &table_name).await {
println!("{err}");
panic!();
}
match table_names_from_session_metadata(&session, &keyspace_opts.name) {
Ok(table_names) => assert!(table_names.contains(&table_name)),
Err(_) => panic!(),
}
queries::keyspace::drop(&session, &keyspace_opts.name)
.await
.expect("drop keyspace");
}
#[tokio::test]
async fn test_prepare_cquill_keyspace_when_table_does_not_exist() {
let session = test_utils::cql_session().await;
let keyspace_opts = test_utils::create_keyspace(&session).await;
let table_name = String::from("table_name");
prepare_cquill_keyspace(&session, &keyspace_opts, &table_name)
.await
.expect("prepare keyspace");
match table_names_from_session_metadata(&session, &keyspace_opts.name) {
Ok(table_names) => assert!(table_names.contains(&table_name)),
Err(_) => panic!(),
}
queries::keyspace::drop(&session, &keyspace_opts.name)
.await
.expect("drop keyspace");
}
#[tokio::test]
async fn test_prepare_cquill_keyspace_when_keyspace_and_table_exist() {
let harness = test_utils::TestHarness::builder().initialize().await;
prepare_cquill_keyspace(
&harness.session,
&KeyspaceOpts::simple(harness.cquill_keyspace.clone(), 1),
&harness.cquill_table,
)
.await
.expect("prepare keyspace");
match table_names_from_session_metadata(&harness.session, &harness.cquill_keyspace) {
Ok(table_names) => assert!(table_names.contains(&harness.cquill_table)),
Err(_) => panic!(),
}
harness.drop_keyspace().await;
}
}