1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
//! Create Cassandra tables and CRUD CQL prepared statements //! from Rust structs. //! //! This crate intends to reduce the boilerplate when create //! and executing basic Cassandra statements. We can see that //! the Rust struct holds the metadata for executing basic //! operations. Before this crate, you will need to write the //! Cassandra Table definition, create the Rust struct and //! manually write all CQL statements for all operations. //! //! Worse than duplicated metadata, is refactoring code when you //! add a new table, or change tables names and the statements //! blows up because the table no longer exists. //! //! This crate is not perfect, and you are welcome to contribute. //! //! # Installation //! //! The crate `cdrs` is required since we make prepared statements //! and the values of your struct must be mapped into Cassandra //! datatypes and this crate does this mapping, and also, you will //! need a driver between Rust code and your Cassandra cluster. //! //! ```toml //! [dependencies] //! cdrs = { version = "2" } //! cassandra_macro = "0.1.2" //! cassandra_macro_derive = "0.1.2" //! ``` //! //! In your `main.rs` //! //! ``` //! #[macro_use] //! extern crate cdrs; //! ``` //! //! # Example //! ``` //! #[macro_use] //!extern crate cdrs; //! //!use std::sync::Arc; //! //!use cassandra_macro::{CassandraTable, DeleteQuery, Projection, UpdateQuery}; //!use cassandra_macro::StoreQuery; //!use cassandra_macro_derive::CassandraTable; //!use cdrs::authenticators::StaticPasswordAuthenticator; //!use cdrs::cluster::{ClusterTcpConfig, NodeTcpConfigBuilder, TcpConnectionPool}; //!use cdrs::cluster::session::{new_lz4, Session}; //!use cdrs::Error as CassandraDriverError; //!use cdrs::frame::TryFromRow; //!use cdrs::load_balancing::RoundRobinSync; //!use cdrs::query::{QueryExecutor, QueryValues}; //!use cdrs::types::ByName; //!use cdrs::types::rows::Row; //!use cdrs::types::value::Value; //!use chrono::Utc; //!use uuid::Uuid; //! //!#[table(keyspace = "test", options = "comment='Only for RUST users' | COMPACTION = {'class':'SizeTieredCompactionStrategy'}")] //!#[derive(Debug, CassandraTable)] //!pub struct User { //! #[column(type = "TEXT", primary_key)] //! username: String, //! //! #[column(type = "UUID")] //! user_internal_id: Uuid, //! //! #[column(type = "TEXT")] //! first_name: String, //! //! #[column(type = "TIMESTAMP", cluster_key(order = "ASC", position = 1))] //! created: i64, //! //! #[column(type = "TIMESTAMP")] //! updated: i64, //!} //! //!impl User { //! fn set_first_name(&mut self, first_name: String) { //! self.first_name = first_name; //! } //!} //! //!impl Default for User { //! fn default() -> Self { //! User { //! username: "Rust".to_string(), //! user_internal_id: Uuid::new_v4(), //! first_name: "rust".to_string(), //! created: Utc::now().timestamp_millis(), //! updated: Utc::now().timestamp_millis(), //! } //! } //!} //! //!impl TryFromRow for User { //! fn try_from_row(row: Row) -> Result<Self, cdrs::Error> { //! let username = row.r_by_name::<String>("username")?; //! let user_internal_id = row.r_by_name::<Uuid>("user_internal_id")?; //! let first_name = row.r_by_name::<String>("first_name")?; //! let created: i64 = row.r_by_name::<i64>("created")?; //! let updated: i64 = row.r_by_name::<i64>("updated")?; //! //! Ok(User { //! username, //! user_internal_id, //! first_name, //! created, //! updated, //! }) //! } //!} //! //! //!pub struct CassandraConfig { //! nodes: Vec<String>, //! user: String, //! password: String, //!} //! //!pub struct CassandraDriver { //! connection: Arc<Session<RoundRobinSync<TcpConnectionPool<StaticPasswordAuthenticator>>>> //!} //! //!impl CassandraDriver { //! pub fn execute_simple_statement<Q: ToString>(&self, query: Q) -> Result<bool, CassandraDriverError> { //! match self.connection.query(query) { //! Ok(_) => Ok(true), //! Err(e) => { //! Err(e) //! } //! } //! } //! //! pub fn execute_store_query(&self, query: &StoreQuery) -> Result<bool, CassandraDriverError> { //! self.execute_query(query.query(), query.values()) //! } //! //! pub fn execute_update_query(&self, query: &UpdateQuery) -> Result<bool, CassandraDriverError> { //! self.execute_query(query.query(), query.values()) //! } //! //! pub fn execute_delete_query(&self, query: &DeleteQuery) -> Result<bool, CassandraDriverError> { //! self.execute_query(query.query(), query.values()) //! } //! //! pub fn execute_query(&self, query: &String, values: &QueryValues) -> Result<bool, CassandraDriverError> { //! let result = self.connection //! .query_with_values(query, values.to_owned()); //! //! result.map(|_| true) //! } //! //! pub fn find<T: TryFromRow + CassandraTable>(&self, keys: Vec<String>) -> Result<Option<T>, CassandraDriverError> { //! let stmt = T::select_by_primary_keys(Projection::All); //! //! let values = keys.iter().map(|k| Value::from(k.to_string())).collect::<Vec<Value>>(); //! //! let result_frame = self.connection.query_with_values(stmt, QueryValues::SimpleValues(values))?; //! //! Ok(result_frame.get_body()?.into_rows() //! .map(|r| { r.first().map(|r| T::try_from_row(r.to_owned()).unwrap()) }).flatten()) //! } //! //! pub fn new_from_config(cassandra_configs: &CassandraConfig) -> Self { //! let mut nodes = Vec::with_capacity(cassandra_configs.nodes.len()); //! //! for node in cassandra_configs.nodes.iter() { //! let authenticator: StaticPasswordAuthenticator = StaticPasswordAuthenticator::new(cassandra_configs.user.clone(), //! cassandra_configs.password.clone()); //! //! let node_tcp = NodeTcpConfigBuilder::new(node.as_str(), authenticator).build(); //! //! nodes.push(node_tcp); //! } //! //! let cluster_config = ClusterTcpConfig(nodes); //! //! let cassandra_session = new_lz4(&cluster_config, RoundRobinSync::new()) //! .expect("Cassandra session must be created"); //! //! CassandraDriver { //! connection: Arc::new(cassandra_session) //! } //! } //!} //! //!fn main() { //! let driver_conf = CassandraConfig { //! nodes: vec!["aella:9042".to_string()], //! user: String::from("test"), //! password: String::from("test"), //! }; //! //! let connection = CassandraDriver::new_from_config(&driver_conf); //! //! println!("Keyspace:.{}.", User::key_space()); //! println!("Table name:.{}.", User::table_name()); //! println!("Creating table:{}", User::create_table_cql()); //! connection.execute_simple_statement(User::create_table_cql()).expect("Must create table"); //! //! println!("You can test those by yourself"); //! println!("{}", User::select_by_primary_keys(Projection::Columns(vec!["created".to_string()]))); //! println!("{}", User::select_by_primary_and_cluster_keys(Projection::All)); //! println!("{}", User::update_by_primary_keys(vec!["updated".to_string()])); //! println!("{}", User::update_by_primary_and_cluster_keys(vec!["updated".to_string()])); //! println!("{}", User::delete_by_primary_keys()); //! println!("{}", User::delete_by_primary_and_cluster_keys()); //! //! let mut rust_user = User::default(); //! //! println!("Storing rust: {}", rust_user.store_query().query()); //! connection.execute_store_query(&rust_user.store_query()).expect("User must be stored"); //! //! let rust_user_from_db: Option<User> = connection.find::<User>(vec!["Rust".to_string()]).unwrap(); //! assert!(rust_user_from_db.unwrap().username.eq(&rust_user.username), "Must be the same"); //! //! println!("Update rust:{}", rust_user.update_query().unwrap().query()); //! rust_user.set_first_name(String::from("IamRoot")); //! //! connection.execute_update_query(&rust_user.update_query().unwrap()).unwrap(); //! //! let rust_user_from_db_1 = connection.find::<User>(vec!["Rust".to_string()]).unwrap(); //! //! assert!(rust_user_from_db_1.unwrap().username.eq(&rust_user.username), "Must be the same"); //! //! println!("Delete:{}", rust_user.delete_query().query()); //! connection.execute_delete_query(&rust_user.delete_query()).expect("Must be deleted"); //! //! println!("Dropping table: {}", User::drop_table_cql()); //! connection.execute_simple_statement(User::drop_table_cql()).expect("Table must be removed"); //!} //! ``` use cdrs::query::QueryValues; use std::fmt::{Display, Formatter}; pub enum Projection { Count, All, Columns(Vec<String>), } pub trait CassandraTable { /// key space fn key_space() -> &'static str; /// Table name fn table_name() -> &'static str; /// CQL for table creation fn create_table_cql() -> &'static str; /// CQL for drop table fn drop_table_cql() -> &'static str; /// Prepared statement for selection by primary keys fn select_by_primary_keys(projection: Projection) -> String; /// Prepared statement for selection by primary keys and cluster keys fn select_by_primary_and_cluster_keys(projection: Projection) -> String; /// Prepared statement for update by primary keys fn update_by_primary_keys(columns: Vec<String>) -> String; /// Prepared statement for update by primary keys and cluster keys fn update_by_primary_and_cluster_keys(columns: Vec<String>) -> String; /// Prepared statement for delete by primary keys fn delete_by_primary_keys() -> String; /// Prepared statement for delete by primary keys and cluster key fn delete_by_primary_and_cluster_keys() -> String; /// Create `StoreQuery` containing the prepared statement /// to store this entity fn store_query(&self) -> StoreQuery; /// Create `UpdateQuery` containing the prepared statement /// to update this entity /// /// The statement only can update columns that are not /// part of the primary keys. fn update_query(&self) -> Result<UpdateQuery, TableWithNoUpdatableColumnsError>; /// Create `DeleteQuery` containing the prepared statement /// to delete this entity fn delete_query(&self) -> DeleteQuery; } #[derive(Debug)] pub struct StoreQuery { query: String, values: QueryValues, } impl StoreQuery { /// New instance pub fn new(query: String, values: QueryValues) -> Self { StoreQuery { query, values } } /// Prepared statement for insertion pub fn query(&self) -> &String { &self.query } /// Values for executing prepared statement pub fn values(&self) -> &QueryValues { &self.values } } impl Display for StoreQuery { /// Only display the prepared statement fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "query:{}", self.query) } } #[derive(Debug)] pub struct UpdateQuery { query: String, values: QueryValues, } impl UpdateQuery { /// New instance pub fn new(query: String, values: QueryValues) -> Self { UpdateQuery { query, values } } /// Prepared statement for update pub fn query(&self) -> &String { &self.query } /// Values for executing prepared statement pub fn values(&self) -> &QueryValues { &self.values } } impl Display for UpdateQuery { /// Only display the prepared statement fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "query:{}", self.query) } } #[derive(Debug)] pub struct DeleteQuery { query: String, values: QueryValues, } impl DeleteQuery { /// New instance pub fn new(query: String, values: QueryValues) -> Self { DeleteQuery { query, values } } /// Prepared statement for deletion pub fn query(&self) -> &String { &self.query } /// Values for executing prepared statement pub fn values(&self) -> &QueryValues { &self.values } } impl Display for DeleteQuery { /// Only display the prepared statement fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "query:{}", self.query) } } /// Error if user tries to create /// invalid update statement #[derive(Debug)] pub struct TableWithNoUpdatableColumnsError { message: String } impl TableWithNoUpdatableColumnsError { pub fn new(message: String) -> Self { TableWithNoUpdatableColumnsError { message } } } impl std::error::Error for TableWithNoUpdatableColumnsError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { Some(self) } } impl std::fmt::Display for TableWithNoUpdatableColumnsError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.message.as_str()) } }