cquill/
lib.rs

1use std::{path::PathBuf, str};
2
3use anyhow::{anyhow, Result};
4use scylla::Session;
5
6pub use crate::cql_file::CqlFile;
7use crate::keyspace::*;
8pub use crate::migrate::{MigrateError, MigrateErrorState};
9use crate::queries::*;
10
11#[allow(unused)]
12mod cql;
13mod cql_file;
14pub mod keyspace;
15mod migrate;
16mod queries;
17#[cfg(test)]
18pub(crate) mod test_utils;
19
20const NODE_ADDRESS: &str = "127.0.0.1:9042";
21
22pub const KEYSPACE: &str = "cquill";
23
24pub const TABLE: &str = "migrated_cql";
25
26pub struct MigrateOpts {
27    pub cassandra_opts: Option<CassandraOpts>,
28    pub cql_dir: PathBuf,
29    pub history_keyspace: Option<KeyspaceOpts>,
30    pub history_table: Option<String>,
31}
32
33#[derive(Default)]
34pub struct CassandraOpts {
35    pub cassandra_host: Option<String>,
36}
37
38impl CassandraOpts {
39    pub fn node_address(&self) -> String {
40        let node_address = match &self.cassandra_host {
41            None => std::env::var("CASSANDRA_NODE").unwrap_or(NODE_ADDRESS.to_string()),
42            Some(cassandra_host) => cassandra_host.clone(),
43        };
44        if node_address.contains(':') {
45            node_address
46        } else {
47            format!("{node_address}:9042")
48        }
49    }
50}
51
52/// `migrate_cql` performs a migration of all newly added cql scripts in [MigrateOpts::cql_dir]
53/// since its last invocation. Migrated scripts are tracked in a cquill keyspace and history table
54/// specified with [MigrateOpts::history_keyspace] and [MigrateOpts::history_table]. A successful
55/// method result contains a vec of the cql script paths executed during this invocation.
56pub async fn migrate_cql(opts: MigrateOpts) -> Result<Vec<CqlFile>, MigrateError> {
57    let cql_files = cql_file::files_from_dir(&opts.cql_dir)?;
58    let node_address = opts.cassandra_opts.unwrap_or_default().node_address();
59    let session = cql_session(node_address).await?;
60
61    let cquill_keyspace = opts
62        .history_keyspace
63        .unwrap_or_else(|| KeyspaceOpts::simple(String::from(KEYSPACE), 1));
64    let history_table = opts.history_table.unwrap_or_else(|| String::from(TABLE));
65    prepare_cquill_keyspace(&session, &cquill_keyspace, &history_table).await?;
66
67    migrate::perform(
68        &session,
69        &cql_files,
70        migrate::MigrateArgs {
71            cql_dir: opts.cql_dir,
72            history_keyspace: cquill_keyspace.name,
73            history_table,
74        },
75    )
76    .await
77}
78
79// todo drop and recreate dev mode
80async fn prepare_cquill_keyspace(
81    session: &Session,
82    keyspace: &KeyspaceOpts,
83    table_name: &String,
84) -> Result<()> {
85    let create_table: bool = match table_names_from_session_metadata(session, &keyspace.name) {
86        Ok(table_names) => !table_names.contains(table_name),
87        Err(_) => {
88            queries::keyspace::create(session, keyspace).await?;
89            true
90        }
91    };
92    if create_table {
93        migrated::table::create(session, &keyspace.name, table_name).await?;
94    }
95    Ok(())
96}
97
98async fn cql_session(node_address: String) -> Result<Session> {
99    let connecting = scylla::SessionBuilder::new()
100        .known_node(&node_address)
101        .build()
102        .await;
103    match connecting {
104        Ok(session) => Ok(session),
105        Err(_) => Err(anyhow!("could not connect to {}", &node_address)),
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn test_cassandra_opts_provides_node_address() {
115        let without_host = CassandraOpts {
116            cassandra_host: None,
117        };
118        let with_host = CassandraOpts {
119            cassandra_host: Some("localhost".to_string()),
120        };
121        let with_port = CassandraOpts {
122            cassandra_host: Some("localhost:9043".to_string()),
123        };
124        assert_eq!(
125            without_host.node_address(),
126            std::env::var("CASSANDRA_NODE").unwrap_or(NODE_ADDRESS.to_string())
127        );
128        assert_eq!(with_host.node_address(), "localhost:9042");
129        assert_eq!(with_port.node_address(), "localhost:9043");
130    }
131
132    #[tokio::test]
133    async fn test_prepare_cquill_keyspace_when_keyspace_does_not_exist() {
134        let session = test_utils::cql_session().await;
135        let keyspace_opts = KeyspaceOpts::simple(test_utils::keyspace_name(), 1);
136        let table_name = String::from("table_name");
137
138        if let Err(err) = prepare_cquill_keyspace(&session, &keyspace_opts, &table_name).await {
139            println!("{err}");
140            panic!();
141        }
142        match table_names_from_session_metadata(&session, &keyspace_opts.name) {
143            Ok(table_names) => assert!(table_names.contains(&table_name)),
144            Err(_) => panic!(),
145        }
146
147        queries::keyspace::drop(&session, &keyspace_opts.name)
148            .await
149            .expect("drop keyspace");
150    }
151
152    #[tokio::test]
153    async fn test_prepare_cquill_keyspace_when_table_does_not_exist() {
154        let session = test_utils::cql_session().await;
155        let keyspace_opts = test_utils::create_keyspace(&session).await;
156        let table_name = String::from("table_name");
157
158        prepare_cquill_keyspace(&session, &keyspace_opts, &table_name)
159            .await
160            .expect("prepare keyspace");
161        match table_names_from_session_metadata(&session, &keyspace_opts.name) {
162            Ok(table_names) => assert!(table_names.contains(&table_name)),
163            Err(_) => panic!(),
164        }
165
166        queries::keyspace::drop(&session, &keyspace_opts.name)
167            .await
168            .expect("drop keyspace");
169    }
170
171    #[tokio::test]
172    async fn test_prepare_cquill_keyspace_when_keyspace_and_table_exist() {
173        let harness = test_utils::TestHarness::builder().initialize().await;
174
175        prepare_cquill_keyspace(
176            &harness.session,
177            &KeyspaceOpts::simple(harness.cquill_keyspace.clone(), 1),
178            &harness.cquill_table,
179        )
180        .await
181        .expect("prepare keyspace");
182        match table_names_from_session_metadata(&harness.session, &harness.cquill_keyspace) {
183            Ok(table_names) => assert!(table_names.contains(&harness.cquill_table)),
184            Err(_) => panic!(),
185        }
186
187        harness.drop_keyspace().await;
188    }
189}