cdrs-tokio 9.0.2

Async Cassandra DB driver written in Rust
Documentation
use cdrs_tokio::authenticators::NoneAuthenticatorProvider;
use cdrs_tokio::cluster::session::{Session, SessionBuilder, TcpSessionBuilder};
use cdrs_tokio::cluster::{NodeTcpConfigBuilder, TcpConnectionManager};
use cdrs_tokio::frame::TryFromRow;
use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy;
use cdrs_tokio::query::*;
use cdrs_tokio::query_values;
use cdrs_tokio::transport::TransportTcp;
use cdrs_tokio::{IntoCdrsValue, TryFromRow, TryFromUdt};
use std::sync::Arc;

type CurrentSession = Session<
    TransportTcp,
    TcpConnectionManager,
    RoundRobinLoadBalancingStrategy<TransportTcp, TcpConnectionManager>,
>;

#[tokio::main]
async fn main() {
    let cluster_config = NodeTcpConfigBuilder::new()
        .with_contact_point("127.0.0.1:9042".into())
        .with_authenticator_provider(Arc::new(NoneAuthenticatorProvider))
        .build()
        .await
        .unwrap();
    let lb = RoundRobinLoadBalancingStrategy::new();
    let session: Arc<CurrentSession> = Arc::new(
        TcpSessionBuilder::new(lb, cluster_config)
            .build()
            .await
            .unwrap(),
    );

    create_keyspace(session.clone()).await;
    create_table(session.clone()).await;

    let futures: Vec<tokio::task::JoinHandle<()>> = (0..20)
        .map(|i| {
            let thread_session = session.clone();
            tokio::spawn(insert_struct(thread_session, i))
        })
        .collect();

    let _responses = futures::future::join_all(futures);

    select_struct(session).await;
}

#[derive(Clone, Debug, IntoCdrsValue, TryFromRow, PartialEq)]
struct RowStruct {
    key: i32,
}

impl RowStruct {
    fn into_query_values(self) -> QueryValues {
        query_values!("key" => self.key)
    }
}

#[derive(Debug, Clone, PartialEq, IntoCdrsValue, TryFromUdt)]
struct User {
    username: String,
}

async fn create_keyspace(session: Arc<CurrentSession>) {
    let create_ks: &'static str = "CREATE KEYSPACE IF NOT EXISTS test_ks WITH REPLICATION = { \
                                   'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
    session
        .query(create_ks)
        .await
        .expect("Keyspace creation error");
}

async fn create_table(session: Arc<CurrentSession>) {
    let create_table_cql =
        "CREATE TABLE IF NOT EXISTS test_ks.multi_thread_table (key int PRIMARY KEY);";
    session
        .query(create_table_cql)
        .await
        .expect("Table creation error");
}

async fn insert_struct(session: Arc<CurrentSession>, key: i32) {
    let row = RowStruct { key };

    let insert_struct_cql = "INSERT INTO test_ks.multi_thread_table (key) VALUES (?)";
    session
        .query_with_values(insert_struct_cql, row.into_query_values())
        .await
        .expect("insert");
}

async fn select_struct(session: Arc<CurrentSession>) {
    let select_struct_cql = "SELECT * FROM test_ks.multi_thread_table";
    let rows = session
        .query(select_struct_cql)
        .await
        .expect("query")
        .response_body()
        .expect("get body")
        .into_rows()
        .expect("into rows");

    for row in rows {
        let my_row: RowStruct = RowStruct::try_from_row(row).expect("into RowStruct");
        println!("struct got: {my_row:?}");
    }
}