1use async_trait::async_trait;
38use cdrs_tokio::cluster::session::{Session, SessionBuilder, TcpSessionBuilder};
39use cdrs_tokio::cluster::{NodeTcpConfigBuilder, TcpConnectionManager};
40use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy;
41use cdrs_tokio::transport::TransportTcp;
42use cdrs_tokio::types::rows::Row;
43use cdrs_tokio::types::ByName;
44
45pub use cqlmig::{CqlMigrator, GenResult, Migration, Version};
46use cqlmig::{Db, DbRow};
47
48pub type DbSession = Session<
50 TransportTcp,
51 TcpConnectionManager,
52 RoundRobinLoadBalancingStrategy<TransportTcp, TcpConnectionManager>,
53>;
54
55impl<'a> From<&'a DbSession> for CdrsDbSession<'a> {
56 fn from(db: &'a DbSession) -> Self {
57 CdrsDbSession::new(db)
58 }
59}
60
61pub struct CdrsDbSession<'a> {
63 ses: &'a DbSession,
64}
65
66impl<'a> CdrsDbSession<'a> {
67 pub async fn connect_no_auth(addrs: Vec<String>) -> GenResult<DbSession> {
71 let cluster_config = NodeTcpConfigBuilder::new()
72 .with_contact_points(addrs.iter().map(|it| it.into()).collect())
73 .build()
74 .await?;
75 let ses = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config)
76 .build()?;
77 Ok(ses)
78 }
79
80 pub fn new(db: &'a DbSession) -> CdrsDbSession<'a> {
82 CdrsDbSession { ses: db }
83 }
84}
85
86#[derive(Clone, Debug)]
88pub struct ARow {
89 row: Row,
90}
91
92impl DbRow for ARow {
93 fn string_by_name(&self, name: &str, def: String) -> GenResult<String> {
94 Ok(self.row.by_name(name)?.unwrap_or(def))
95 }
96
97 fn i32_by_name(&self, name: &str, def: i32) -> GenResult<i32> {
98 Ok(self.row.by_name(name)?.unwrap_or(def))
99 }
100}
101
102#[async_trait]
103impl<'a> Db for CdrsDbSession<'a> {
104 type Row = ARow;
105
106 async fn query(&self, query: &str) -> GenResult<Vec<Self::Row>> {
107 let rows = self
108 .ses
109 .query(query)
110 .await?
111 .response_body()?
112 .into_rows()
113 .unwrap_or_default();
114
115 let mut res: Vec<Self::Row> = vec![];
116
117 for row in rows {
118 res.push(ARow { row });
119 }
120 Ok(res)
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use std::borrow::Borrow;
127 use std::future::Future;
128
129 use dockertest::Composition;
130 use dockertest::DockerTest;
131
132 use cqlmig::{CqlMigrator, GenResult, Migration};
133
134 use crate::CdrsDbSession;
135
136 #[tokio::test]
137 async fn test_migrations_with_server() {
138 test_migrations(String::from("localhost:9042"))
139 .await
140 .unwrap();
141 }
142
143 async fn test_migrations(addr: String) -> GenResult<()> {
144 async fn run(addr: String) -> GenResult<()> {
145 let ses = CdrsDbSession::connect_no_auth(vec![addr]).await?;
146 let db: CdrsDbSession = ses.borrow().into();
147 CqlMigrator::default()
148 .with_logger(|s| println!("{}", s))
149 .migrate(&db, Vec::<Migration>::new())
150 .await
151 }
152
153 let _ = run(addr.clone()).await?;
154 let _ = run(addr.clone()).await?;
156 Ok(())
157 }
158
159 async fn test_migrations_with_docker() {
162 run_with_docker(|addr| async move { test_migrations(addr).await }).await;
163 }
164
165 async fn run_with_docker<T, Fut>(fn_test: T)
167 where
168 T: FnOnce(String) -> Fut + Send + 'static,
169 Fut: Future<Output = GenResult<()>> + Send + 'static,
170 {
171 let mut dt = DockerTest::new();
172 let scylladb = Composition::with_repository("scylladb/scylla")
173 .publish_all_ports()
174 .clone();
175
176 dt.add_composition(scylladb);
177 let _ = dt
178 .run_async(|ops| async move {
179 let container = ops.handle("scylladb/scylla");
181 let host_port = container.host_port(9042);
183 let bind = match host_port {
184 None => panic!("scylladb port not found"),
185 Some(t) => t,
186 };
187
188 let _ = fn_test(format!("{}:{}", bind.0, bind.1)).await.unwrap();
189 })
190 .await;
191 }
192}