shotover 0.7.2

Shotover API for building custom transforms
Documentation
#[cfg(test)]
mod test_token_aware_router {
    use super::super::node_pool::KeyspaceMetadata;
    use super::super::routing_key::calculate_routing_key;
    use crate::transforms::cassandra::sink_cluster::node::CassandraNode;
    use crate::transforms::cassandra::sink_cluster::node_pool::{
        NodePoolBuilder, PreparedMetadata, ReplicationStrategy,
    };
    use crate::transforms::cassandra::sink_cluster::{KeyspaceChanRx, KeyspaceChanTx};
    use cassandra_protocol::consistency::Consistency::One;
    use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
    use cassandra_protocol::query::QueryParams;
    use cassandra_protocol::query::QueryValues::SimpleValues;
    use cassandra_protocol::token::Murmur3Token;
    use cassandra_protocol::types::CBytesShort;
    use cassandra_protocol::types::value::Value;
    use pretty_assertions::assert_eq;
    use rand::prelude::*;
    use std::collections::HashMap;
    use std::net::SocketAddr;
    use tokio::sync::watch;
    use uuid::Uuid;

    #[tokio::test]
    async fn test_router() {
        let mut rng = SmallRng::from_rng(&mut rand::rng());

        let nodes = prepare_nodes();
        let mut router = NodePoolBuilder::new("chain".to_owned()).build();
        let (_nodes_tx, mut nodes_rx) = watch::channel(nodes);
        router.update_nodes(&mut nodes_rx);

        let id = CBytesShort::new(vec![
            11, 241, 38, 11, 140, 72, 217, 34, 214, 128, 175, 241, 151, 73, 197, 227,
        ]);

        let keyspace_metadata = KeyspaceMetadata {
            replication_factor: 3,
            replication_strategy: ReplicationStrategy::SimpleStrategy,
        };

        let (keyspaces_tx, mut keyspaces_rx): (KeyspaceChanTx, KeyspaceChanRx) =
            watch::channel(HashMap::new());

        keyspaces_tx
            .send(
                [("demo_ks".to_string(), keyspace_metadata)]
                    .into_iter()
                    .collect(),
            )
            .unwrap();

        router.update_keyspaces(&mut keyspaces_rx);

        router
            .add_prepared_result(id.clone(), prepared_metadata())
            .await;

        for (pk, test_token, rack_replicas, dc_replicas) in test_data() {
            let query_parameters = QueryParams {
                consistency: One,
                with_names: false,
                values: Some(SimpleValues(vec![Value::Some(pk.as_bytes().to_vec())])),
                page_size: None,
                paging_state: None,
                serial_consistency: None,
                timestamp: None,
                keyspace: None,
                now_in_seconds: None,
            };

            let token = calculate_routing_key(
                &prepared_metadata().pk_indexes,
                query_parameters.values.as_ref().unwrap(),
            )
            .unwrap();

            assert_eq!(token, test_token);

            let node = router
                .get_replica_node_in_dc(
                    &execute_body(id.clone(), query_parameters),
                    "rack1",
                    &mut rng,
                )
                .await
                .unwrap()
                .remove(0);

            if !rack_replicas.is_empty() {
                assert!(rack_replicas.contains(&node.address));
            } else {
                assert!(dc_replicas.contains(&node.address));
            }
        }
    }

    fn test_data() -> Vec<(&'static str, Murmur3Token, Vec<SocketAddr>, Vec<SocketAddr>)> {
        vec![
            (
                "J.R.R.Tolkien",
                Murmur3Token::new(33977059392662464),
                vec![
                    "172.16.1.4:9042".parse().unwrap(),
                    "172.16.1.3:9042".parse().unwrap(),
                ],
                vec!["172.16.1.10:9042".parse().unwrap()],
            ),
            (
                "LeoTolstoy",
                Murmur3Token::new(1121202131577552268),
                vec![],
                vec![
                    "172.16.1.7:9042".parse().unwrap(),
                    "172.16.1.8:9042".parse().unwrap(),
                    "172.16.1.9:9042".parse().unwrap(),
                ],
            ),
            (
                "MarkTwain",
                Murmur3Token::new(5440223371079860612),
                vec!["172.16.1.4:9042".parse().unwrap()],
                vec![
                    "172.16.1.10:9042".parse().unwrap(),
                    "172.16.1.7:9042".parse().unwrap(),
                ],
            ),
            (
                "J.K.Rowling",
                Murmur3Token::new(8929012386469871705),
                vec!["172.16.1.2:9042".parse().unwrap()],
                vec![
                    "172.16.1.10:9042".parse().unwrap(),
                    "172.16.1.7:9042".parse().unwrap(),
                ],
            ),
        ]
    }

    fn execute_body(id: CBytesShort, query_parameters: QueryParams) -> BodyReqExecuteOwned {
        BodyReqExecuteOwned {
            id,
            result_metadata_id: None,
            query_parameters,
        }
    }

    fn prepared_metadata() -> PreparedMetadata {
        PreparedMetadata {
            pk_indexes: vec![0],
            keyspace: Some("demo_ks".into()),
        }
    }

    fn prepare_nodes() -> Vec<CassandraNode> {
        let tokens = prepare_tokens();

        vec![
            CassandraNode::new(
                "172.16.1.10:9042".parse().unwrap(),
                "rack3".into(),
                tokens.get("172.16.1.10").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.3:9042".parse().unwrap(),
                "rack1".into(),
                tokens.get("172.16.1.3").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.8:9042".parse().unwrap(),
                "rack3".into(),
                tokens.get("172.16.1.8").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.6:9042".parse().unwrap(),
                "rack2".into(),
                tokens.get("172.16.1.6").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.5:9042".parse().unwrap(),
                "rack2".into(),
                tokens.get("172.16.1.5").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.2:9042".parse().unwrap(),
                "rack1".into(),
                tokens.get("172.16.1.2").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.9:9042".parse().unwrap(),
                "rack3".into(),
                tokens.get("172.16.1.9").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.4:9042".parse().unwrap(),
                "rack1".into(),
                tokens.get("172.16.1.4").unwrap().clone(),
                Uuid::new_v4(),
            ),
            CassandraNode::new(
                "172.16.1.7:9042".parse().unwrap(),
                "rack2".into(),
                tokens.get("172.16.1.7").unwrap().clone(),
                Uuid::new_v4(),
            ),
        ]
    }

    fn prepare_tokens() -> HashMap<String, Vec<Murmur3Token>> {
        let data = include_str!("./test_cluster_data.json");

        let parsed: HashMap<String, Vec<i64>> = serde_json::from_str(data).unwrap();

        let mut result = HashMap::new();
        for (address, tokens) in parsed {
            result.insert(
                address,
                tokens
                    .iter()
                    .map(|token| Murmur3Token::new(*token))
                    .collect(),
            );
        }

        result
    }
}