1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Create Cassandra tables and CRUD CQL prepared statements
//! from Rust structs.
//!
//! This crate intends to reduce the boilerplate when create
//! and executing basic Cassandra statements. We can see that
//! the Rust struct holds the metadata for executing basic
//! operations. Before this crate, you will need to write the
//! Cassandra Table definition, create the Rust struct and
//! manually write all CQL statements for all operations.
//!
//! Worse than duplicated metadata, is refactoring code when you
//! add a new table, or change tables names and the statements
//! blows up because the table no longer exists.
//!
//! This crate is not perfect, and you are welcome to contribute.
//!
//! # Installation
//!
//! The crate `cdrs` is required since we make prepared statements
//! and the values of your struct must be mapped into Cassandra
//! datatypes and this crate does this mapping, and also, you will
//! need a driver between Rust code and your Cassandra cluster.
//!
//! ```toml
//! [dependencies]
//! cdrs = { version = "2" }
//! cassandra_macro = "0.1.2"
//! cassandra_macro_derive = "0.1.2"
//! ```
//!
//! In your `main.rs`
//!
//! ```
//! #[macro_use]
//! extern crate cdrs;
//! ```
//!
//! # Example
//! ```
//! #[macro_use]
//!extern crate cdrs;
//!
//!use std::sync::Arc;
//!
//!use cassandra_macro::{CassandraTable, DeleteQuery, Projection, UpdateQuery};
//!use cassandra_macro::StoreQuery;
//!use cassandra_macro_derive::CassandraTable;
//!use cdrs::authenticators::StaticPasswordAuthenticator;
//!use cdrs::cluster::{ClusterTcpConfig, NodeTcpConfigBuilder, TcpConnectionPool};
//!use cdrs::cluster::session::{new_lz4, Session};
//!use cdrs::Error as CassandraDriverError;
//!use cdrs::frame::TryFromRow;
//!use cdrs::load_balancing::RoundRobinSync;
//!use cdrs::query::{QueryExecutor, QueryValues};
//!use cdrs::types::ByName;
//!use cdrs::types::rows::Row;
//!use cdrs::types::value::Value;
//!use chrono::Utc;
//!use uuid::Uuid;
//!
//!#[table(keyspace = "test", options = "comment='Only for RUST users' | COMPACTION = {'class':'SizeTieredCompactionStrategy'}")]
//!#[derive(Debug, CassandraTable)]
//!pub struct User {
//!    #[column(type = "TEXT", primary_key)]
//!    username: String,
//!
//!    #[column(type = "UUID")]
//!    user_internal_id: Uuid,
//!
//!    #[column(type = "TEXT")]
//!    first_name: String,
//!
//!    #[column(type = "TIMESTAMP", cluster_key(order = "ASC", position = 1))]
//!    created: i64,
//!
//!    #[column(type = "TIMESTAMP")]
//!    updated: i64,
//!}
//!
//!impl User {
//!    fn set_first_name(&mut self, first_name: String) {
//!        self.first_name = first_name;
//!    }
//!}
//!
//!impl Default for User {
//!    fn default() -> Self {
//!        User {
//!            username: "Rust".to_string(),
//!            user_internal_id: Uuid::new_v4(),
//!            first_name: "rust".to_string(),
//!            created: Utc::now().timestamp_millis(),
//!            updated: Utc::now().timestamp_millis(),
//!        }
//!    }
//!}
//!
//!impl TryFromRow for User {
//!    fn try_from_row(row: Row) -> Result<Self, cdrs::Error> {
//!        let username = row.r_by_name::<String>("username")?;
//!        let user_internal_id = row.r_by_name::<Uuid>("user_internal_id")?;
//!        let first_name = row.r_by_name::<String>("first_name")?;
//!        let created: i64 = row.r_by_name::<i64>("created")?;
//!        let updated: i64 = row.r_by_name::<i64>("updated")?;
//!
//!        Ok(User {
//!            username,
//!            user_internal_id,
//!            first_name,
//!            created,
//!            updated,
//!        })
//!    }
//!}
//!
//!
//!pub struct CassandraConfig {
//!    nodes: Vec<String>,
//!    user: String,
//!    password: String,
//!}
//!
//!pub struct CassandraDriver {
//!    connection: Arc<Session<RoundRobinSync<TcpConnectionPool<StaticPasswordAuthenticator>>>>
//!}
//!
//!impl CassandraDriver {
//!    pub fn execute_simple_statement<Q: ToString>(&self, query: Q) -> Result<bool, CassandraDriverError> {
//!        match self.connection.query(query) {
//!            Ok(_) => Ok(true),
//!            Err(e) => {
//!                Err(e)
//!            }
//!        }
//!    }
//!
//!    pub fn execute_store_query(&self, query: &StoreQuery) -> Result<bool, CassandraDriverError> {
//!        self.execute_query(query.query(), query.values())
//!    }
//!
//!    pub fn execute_update_query(&self, query: &UpdateQuery) -> Result<bool, CassandraDriverError> {
//!        self.execute_query(query.query(), query.values())
//!    }
//!
//!    pub fn execute_delete_query(&self, query: &DeleteQuery) -> Result<bool, CassandraDriverError> {
//!        self.execute_query(query.query(), query.values())
//!    }
//!
//!    pub fn execute_query(&self, query: &String, values: &QueryValues) -> Result<bool, CassandraDriverError> {
//!        let result = self.connection
//!            .query_with_values(query, values.to_owned());
//!
//!        result.map(|_| true)
//!    }
//!
//!    pub fn find<T: TryFromRow + CassandraTable>(&self, keys: Vec<String>) -> Result<Option<T>, CassandraDriverError> {
//!        let stmt = T::select_by_primary_keys(Projection::All);
//!
//!        let values = keys.iter().map(|k| Value::from(k.to_string())).collect::<Vec<Value>>();
//!
//!        let result_frame = self.connection.query_with_values(stmt, QueryValues::SimpleValues(values))?;
//!
//!        Ok(result_frame.get_body()?.into_rows()
//!            .map(|r| { r.first().map(|r| T::try_from_row(r.to_owned()).unwrap()) }).flatten())
//!    }
//!
//!    pub fn new_from_config(cassandra_configs: &CassandraConfig) -> Self {
//!        let mut nodes = Vec::with_capacity(cassandra_configs.nodes.len());
//!
//!        for node in cassandra_configs.nodes.iter() {
//!            let authenticator: StaticPasswordAuthenticator = StaticPasswordAuthenticator::new(cassandra_configs.user.clone(),
//!                                                                                              cassandra_configs.password.clone());
//!
//!            let node_tcp = NodeTcpConfigBuilder::new(node.as_str(), authenticator).build();
//!
//!            nodes.push(node_tcp);
//!        }
//!
//!        let cluster_config = ClusterTcpConfig(nodes);
//!
//!        let cassandra_session = new_lz4(&cluster_config, RoundRobinSync::new())
//!            .expect("Cassandra session must be created");
//!
//!        CassandraDriver {
//!            connection: Arc::new(cassandra_session)
//!        }
//!    }
//!}
//!
//!fn main() {
//!    let driver_conf = CassandraConfig {
//!        nodes: vec!["aella:9042".to_string()],
//!        user: String::from("test"),
//!        password: String::from("test"),
//!    };
//!
//!    let connection = CassandraDriver::new_from_config(&driver_conf);
//!
//!    println!("Keyspace:.{}.", User::key_space());
//!    println!("Table name:.{}.", User::table_name());
//!    println!("Creating table:{}", User::create_table_cql());
//!    connection.execute_simple_statement(User::create_table_cql()).expect("Must create table");
//!
//!    println!("You can test those by yourself");
//!    println!("{}", User::select_by_primary_keys(Projection::Columns(vec!["created".to_string()])));
//!    println!("{}", User::select_by_primary_and_cluster_keys(Projection::All));
//!    println!("{}", User::update_by_primary_keys(vec!["updated".to_string()]));
//!    println!("{}", User::update_by_primary_and_cluster_keys(vec!["updated".to_string()]));
//!    println!("{}", User::delete_by_primary_keys());
//!    println!("{}", User::delete_by_primary_and_cluster_keys());
//!
//!    let mut rust_user = User::default();
//!
//!    println!("Storing rust: {}", rust_user.store_query().query());
//!    connection.execute_store_query(&rust_user.store_query()).expect("User must be stored");
//!
//!    let rust_user_from_db: Option<User> = connection.find::<User>(vec!["Rust".to_string()]).unwrap();
//!    assert!(rust_user_from_db.unwrap().username.eq(&rust_user.username), "Must be the same");
//!
//!    println!("Update rust:{}", rust_user.update_query().unwrap().query());
//!    rust_user.set_first_name(String::from("IamRoot"));
//!
//!    connection.execute_update_query(&rust_user.update_query().unwrap()).unwrap();
//!
//!    let rust_user_from_db_1 = connection.find::<User>(vec!["Rust".to_string()]).unwrap();
//!
//!    assert!(rust_user_from_db_1.unwrap().username.eq(&rust_user.username), "Must be the same");
//!
//!    println!("Delete:{}", rust_user.delete_query().query());
//!    connection.execute_delete_query(&rust_user.delete_query()).expect("Must be deleted");
//!
//!    println!("Dropping table: {}", User::drop_table_cql());
//!    connection.execute_simple_statement(User::drop_table_cql()).expect("Table must be removed");
//!}
//! ```
use cdrs::query::QueryValues;
use std::fmt::{Display, Formatter};

pub enum Projection {
    Count,
    All,
    Columns(Vec<String>),
}

pub trait CassandraTable {
    /// key space
    fn key_space() -> &'static str;

    /// Table name
    fn table_name() -> &'static str;

    /// CQL for table creation
    fn create_table_cql() -> &'static str;

    /// CQL for drop table
    fn drop_table_cql() -> &'static str;

    /// Prepared statement for selection by primary keys
    fn select_by_primary_keys(projection: Projection) -> String;

    /// Prepared statement for selection by primary keys and cluster keys
    fn select_by_primary_and_cluster_keys(projection: Projection) -> String;

    /// Prepared statement for update by primary keys
    fn update_by_primary_keys(columns: Vec<String>) -> String;

    /// Prepared statement for update by primary keys and cluster keys
    fn update_by_primary_and_cluster_keys(columns: Vec<String>) -> String;

    /// Prepared statement for delete by primary keys
    fn delete_by_primary_keys() -> String;

    /// Prepared statement for delete by primary keys and cluster key
    fn delete_by_primary_and_cluster_keys() -> String;

    /// Create `StoreQuery` containing the prepared statement
    /// to store this entity
    fn store_query(&self) -> StoreQuery;

    /// Create `UpdateQuery` containing the prepared statement
    /// to update this entity
    ///
    /// The statement only can update columns that are not
    /// part of the primary keys.
    fn update_query(&self) -> Result<UpdateQuery, TableWithNoUpdatableColumnsError>;

    /// Create `DeleteQuery` containing the prepared statement
    /// to delete this entity
    fn delete_query(&self) -> DeleteQuery;
}

#[derive(Debug)]
pub struct StoreQuery {
    query: String,
    values: QueryValues,
}

impl StoreQuery {
    /// New instance
    pub fn new(query: String, values: QueryValues) -> Self {
        StoreQuery { query, values }
    }

    /// Prepared statement for insertion
    pub fn query(&self) -> &String {
        &self.query
    }

    /// Values for executing prepared statement
    pub fn values(&self) -> &QueryValues {
        &self.values
    }
}

impl Display for StoreQuery {
    /// Only display the prepared statement
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "query:{}", self.query)
    }
}

#[derive(Debug)]
pub struct UpdateQuery {
    query: String,
    values: QueryValues,
}

impl UpdateQuery {
    /// New instance
    pub fn new(query: String, values: QueryValues) -> Self {
        UpdateQuery { query, values }
    }
    /// Prepared statement for update
    pub fn query(&self) -> &String {
        &self.query
    }
    /// Values for executing prepared statement
    pub fn values(&self) -> &QueryValues {
        &self.values
    }
}

impl Display for UpdateQuery {
    /// Only display the prepared statement
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "query:{}", self.query)
    }
}

#[derive(Debug)]
pub struct DeleteQuery {
    query: String,
    values: QueryValues,
}

impl DeleteQuery {
    /// New instance
    pub fn new(query: String, values: QueryValues) -> Self {
        DeleteQuery { query, values }
    }

    /// Prepared statement for deletion
    pub fn query(&self) -> &String {
        &self.query
    }

    /// Values for executing prepared statement
    pub fn values(&self) -> &QueryValues {
        &self.values
    }
}

impl Display for DeleteQuery {
    /// Only display the prepared statement
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "query:{}", self.query)
    }
}

/// Error if user tries to create
/// invalid update statement
#[derive(Debug)]
pub struct TableWithNoUpdatableColumnsError {
    message: String
}

impl TableWithNoUpdatableColumnsError {
    pub fn new(message: String) -> Self {
        TableWithNoUpdatableColumnsError { message }
    }
}

impl std::error::Error for TableWithNoUpdatableColumnsError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        Some(self)
    }
}

impl std::fmt::Display for TableWithNoUpdatableColumnsError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.message.as_str())
    }
}