Skip to main content

cqlmig_cdrs_tokio/
lib.rs

1//! Run migrations on a CQL database (Casandra or ScyllaDB).
2//!
3//! This package is the cdrs-tokio implementation of cqlmig.
4//!
5//! # Examples
6//!
7//! ```
8//! # use std::error::Error;
9//! # #[tokio::main]
10//! # async fn main() -> Result<(), Box<dyn Error>> {
11//! use std::borrow::Borrow;
12//! use cdrs_tokio::cluster::NodeTcpConfigBuilder;
13//! use cdrs_tokio::cluster::session::{SessionBuilder, TcpSessionBuilder};
14//! use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy;
15//! use cqlmig::CqlMigrator;
16//! use cqlmig_cdrs_tokio::CdrsDbSession;
17//!
18//! let cluster_config = NodeTcpConfigBuilder::new()
19//!     .with_contact_points(vec!["localhost:9042".to_string().into()])
20//!     .build()
21//!     .await
22//!     .unwrap();
23//! let tcp_ses = TcpSessionBuilder::new(
24//!      RoundRobinLoadBalancingStrategy::new(),
25//!      cluster_config)
26//!  .build()
27//!  .unwrap();
28//!
29//! let ses: CdrsDbSession = tcp_ses.borrow().into();
30//!
31//! // Use: CqlMigrator::default().migrate_files(&ses, Path::new("path/to/migrations").into()).await.unwrap();
32//! // Or provider a vec of migrations
33//! CqlMigrator::default().migrate(&ses, vec![]).await.unwrap();
34//! # Ok(())
35//! # }
36
37use async_trait::async_trait;
38use cdrs_tokio::cluster::session::{Session, SessionBuilder, TcpSessionBuilder};
39use cdrs_tokio::cluster::{NodeTcpConfigBuilder, TcpConnectionManager};
40use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy;
41use cdrs_tokio::transport::TransportTcp;
42use cdrs_tokio::types::rows::Row;
43use cdrs_tokio::types::ByName;
44
45pub use cqlmig::{CqlMigrator, GenResult, Migration, Version};
46use cqlmig::{Db, DbRow};
47
48/// A db session.
49pub type DbSession = Session<
50    TransportTcp,
51    TcpConnectionManager,
52    RoundRobinLoadBalancingStrategy<TransportTcp, TcpConnectionManager>,
53>;
54
55impl<'a> From<&'a DbSession> for CdrsDbSession<'a> {
56    fn from(db: &'a DbSession) -> Self {
57        CdrsDbSession::new(db)
58    }
59}
60
61/// Session wrapper.
62pub struct CdrsDbSession<'a> {
63    ses: &'a DbSession,
64}
65
66impl<'a> CdrsDbSession<'a> {
67    /// Helper function to create an un-authenticated connection to an DB.
68    ///
69    /// See [`CdrsDbSession`] for an example of creating a custom connection.
70    pub async fn connect_no_auth(addrs: Vec<String>) -> GenResult<DbSession> {
71        let cluster_config = NodeTcpConfigBuilder::new()
72            .with_contact_points(addrs.iter().map(|it| it.into()).collect())
73            .build()
74            .await?;
75        let ses = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config)
76            .build()?;
77        Ok(ses)
78    }
79
80    /// Create a [`CdrsDbSession`].
81    pub fn new(db: &'a DbSession) -> CdrsDbSession<'a> {
82        CdrsDbSession { ses: db }
83    }
84}
85
86/// Row wrapper.
87#[derive(Clone, Debug)]
88pub struct ARow {
89    row: Row,
90}
91
92impl DbRow for ARow {
93    fn string_by_name(&self, name: &str, def: String) -> GenResult<String> {
94        Ok(self.row.by_name(name)?.unwrap_or(def))
95    }
96
97    fn i32_by_name(&self, name: &str, def: i32) -> GenResult<i32> {
98        Ok(self.row.by_name(name)?.unwrap_or(def))
99    }
100}
101
102#[async_trait]
103impl<'a> Db for CdrsDbSession<'a> {
104    type Row = ARow;
105
106    async fn query(&self, query: &str) -> GenResult<Vec<Self::Row>> {
107        let rows = self
108            .ses
109            .query(query)
110            .await?
111            .response_body()?
112            .into_rows()
113            .unwrap_or_default();
114
115        let mut res: Vec<Self::Row> = vec![];
116
117        for row in rows {
118            res.push(ARow { row });
119        }
120        Ok(res)
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use std::borrow::Borrow;
127    use std::future::Future;
128
129    use dockertest::Composition;
130    use dockertest::DockerTest;
131
132    use cqlmig::{CqlMigrator, GenResult, Migration};
133
134    use crate::CdrsDbSession;
135
136    #[tokio::test]
137    async fn test_migrations_with_server() {
138        test_migrations(String::from("localhost:9042"))
139            .await
140            .unwrap();
141    }
142
143    async fn test_migrations(addr: String) -> GenResult<()> {
144        async fn run(addr: String) -> GenResult<()> {
145            let ses = CdrsDbSession::connect_no_auth(vec![addr]).await?;
146            let db: CdrsDbSession = ses.borrow().into();
147            CqlMigrator::default()
148                .with_logger(|s| println!("{}", s))
149                .migrate(&db, Vec::<Migration>::new())
150                .await
151        }
152
153        let _ = run(addr.clone()).await?;
154        // Run twice to make sure it does not apply the same migrations twice
155        let _ = run(addr.clone()).await?;
156        Ok(())
157    }
158
159    //For local testing
160    //#[tokio::test]
161    async fn test_migrations_with_docker() {
162        run_with_docker(|addr| async move { test_migrations(addr).await }).await;
163    }
164
165    // For local testing
166    async fn run_with_docker<T, Fut>(fn_test: T)
167    where
168        T: FnOnce(String) -> Fut + Send + 'static,
169        Fut: Future<Output = GenResult<()>> + Send + 'static,
170    {
171        let mut dt = DockerTest::new();
172        let scylladb = Composition::with_repository("scylladb/scylla")
173            .publish_all_ports()
174            .clone();
175
176        dt.add_composition(scylladb);
177        let _ = dt
178            .run_async(|ops| async move {
179                // A handle to operate on the Container.
180                let container = ops.handle("scylladb/scylla");
181                // The container is in a running state at this point.
182                let host_port = container.host_port(9042);
183                let bind = match host_port {
184                    None => panic!("scylladb port not found"),
185                    Some(t) => t,
186                };
187
188                let _ = fn_test(format!("{}:{}", bind.0, bind.1)).await.unwrap();
189            })
190            .await;
191    }
192}