samsa 0.1.8

Rust-native Kafka/Redpanda protocol and client implementation.
Documentation
mod testsupport;

use nom::AsBytes;
use samsa::prelude::{self, ClusterMetadata};
use samsa::prelude::{
    protocol, BrokerAddress, BrokerConnection, Error, KafkaCode, TcpConnection,
    ROUND_ROBIN_PROTOCOL,
};

const CLIENT_ID: &str = "group protocol integration test";
const CORRELATION_ID: i32 = 1;
const GROUP_ID: &str = "group integration test";
const GROUP_ID2: &str = "group integration test 2";
const PARTITION_ID: i32 = 0;

#[tokio::test]
async fn it_can_join_and_sync_groups() -> Result<(), Box<Error>> {
    let (skip, brokers, topic) = testsupport::get_brokers_and_topic()?;
    if skip {
        return Ok(());
    }

    let mut metadata = ClusterMetadata::new(
        brokers.clone(),
        CORRELATION_ID,
        CLIENT_ID.to_owned(),
        vec![],
    )
    .await?;
    let conn: &mut TcpConnection = metadata
        .broker_connections
        .get_mut(&metadata.controller_id)
        .unwrap();
    testsupport::ensure_topic_creation(conn.clone(), &topic, CORRELATION_ID, CLIENT_ID).await?;

    //
    // Get coordinator for this group
    //
    let coordinator_req =
        protocol::FindCoordinatorRequest::new(CORRELATION_ID, CLIENT_ID, GROUP_ID);
    conn.send_request(&coordinator_req).await?;
    let coordinator_res =
        protocol::FindCoordinatorResponse::try_from(conn.receive_response().await?.freeze())?;
    assert_eq!(coordinator_res.error_code, KafkaCode::None);
    let host = std::str::from_utf8(coordinator_res.host.as_bytes()).unwrap();
    let port = coordinator_res.port;
    let mut coordinator_conn = TcpConnection::new(vec![BrokerAddress {
        host: host.to_owned(),
        port: port.try_into().map_err(|err| {
            tracing::error!(
                "Error decoding Broker connection port from metadata {:?}",
                err
            );
            Error::MetadataNeedsSync
        })?,
    }])
    .await?;

    // idk why this helps... maybe redpanda needs a second to accept for the coordinator
    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

    //
    // Test join group
    //
    let protocols = [ROUND_ROBIN_PROTOCOL]
        .iter()
        .map(|protocol| protocol::join_group::request::Protocol {
            name: protocol,
            metadata: protocol::join_group::request::Metadata {
                version: 3,
                subscription: vec![&topic],
                user_data: None,
            },
        })
        .collect();
    let join_group_request = protocol::JoinGroupRequest::new(
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID,
        100000,
        10000,
        bytes::Bytes::from(""),
        "consumer",
        protocols,
    )?;

    coordinator_conn.send_request(&join_group_request).await?;
    let join_group_response =
        protocol::JoinGroupResponse::try_from(coordinator_conn.receive_response().await?.freeze())?;

    assert_eq!(join_group_response.members.len(), 1);
    assert_eq!(join_group_response.leader, join_group_response.member_id);
    assert_eq!(join_group_response.error_code, KafkaCode::None);

    //
    // Test sync group
    //
    let assignments = protocol::sync_group::request::Assignment::new(
        join_group_response.member_id.clone(),
        protocol::sync_group::request::MemberAssignment {
            version: 3,
            partition_assignments: vec![protocol::sync_group::request::PartitionAssignment {
                topic_name: &topic,
                partitions: vec![PARTITION_ID],
            }],
            user_data: None,
        },
    )?;

    let sync_req = protocol::SyncGroupRequest::new(
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID,
        join_group_response.generation_id,
        join_group_response.member_id.clone(),
        vec![assignments],
    )?;

    coordinator_conn.send_request(&sync_req).await?;
    let sync_response =
        protocol::SyncGroupResponse::try_from(coordinator_conn.receive_response().await?.freeze())?;

    assert_eq!(sync_response.error_code, KafkaCode::None);
    assert_eq!(sync_response.assignment.partition_assignments.len(), 1);
    assert_eq!(
        sync_response.assignment.partition_assignments[0],
        protocol::sync_group::response::PartitionAssignment {
            topic_name: bytes::Bytes::from(topic.clone()),
            partitions: vec![PARTITION_ID]
        }
    );
    assert_eq!(sync_response.error_code, KafkaCode::None);

    //
    // Test heartbeat
    //
    let heartbeat_request = protocol::HeartbeatRequest::new(
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID,
        join_group_response.generation_id,
        join_group_response.member_id.clone(),
    )?;
    coordinator_conn.send_request(&heartbeat_request).await?;
    let heartbeat_response =
        protocol::HeartbeatResponse::try_from(coordinator_conn.receive_response().await?.freeze())?;

    assert_eq!(heartbeat_response.error_code, KafkaCode::None);

    //
    // Test leave group
    //
    let leave_group_request = protocol::LeaveGroupRequest::new(
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID,
        join_group_response.member_id.clone(),
    )?;
    coordinator_conn.send_request(&leave_group_request).await?;
    let leave_group_response = protocol::LeaveGroupResponse::try_from(
        coordinator_conn.receive_response().await?.freeze(),
    )?;

    assert_eq!(leave_group_response.error_code, KafkaCode::None);

    //
    // Delete topic
    //
    let delete_res = prelude::delete_topics(
        conn.clone(),
        CORRELATION_ID,
        CLIENT_ID,
        vec![topic.as_str()],
    )
    .await?;
    assert_eq!(delete_res.topics[0].error_code, KafkaCode::None);

    Ok(())
}

#[tokio::test]
async fn it_can_join_and_sync_groups_with_functions() -> Result<(), Box<Error>> {
    let (skip, brokers) = testsupport::get_brokers()?;
    if skip {
        return Ok(());
    }
    let topic = "group-integration-test".to_owned();
    let conn = TcpConnection::new(brokers).await?;
    testsupport::ensure_topic_creation(conn.clone(), &topic.clone(), CORRELATION_ID, CLIENT_ID)
        .await?;

    //
    // Get coordinator for this group
    //
    let coordinator_res =
        samsa::prelude::find_coordinator(conn.clone(), CORRELATION_ID, CLIENT_ID, GROUP_ID2)
            .await?;
    assert_eq!(coordinator_res.error_code, KafkaCode::None);
    let host = std::str::from_utf8(coordinator_res.host.as_bytes()).unwrap();
    let port = coordinator_res.port;
    let coordinator_conn = TcpConnection::new(vec![BrokerAddress {
        host: host.to_owned(),
        port: port.try_into().map_err(|err| {
            tracing::error!(
                "Error decoding Broker connection port from metadata {:?}",
                err
            );
            Error::MetadataNeedsSync
        })?,
    }])
    .await?;

    // idk why this helps... maybe redpanda needs a second to accept for the coordinator
    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

    //
    // Test join group
    //
    let protocols = [ROUND_ROBIN_PROTOCOL]
        .iter()
        .map(|protocol| protocol::join_group::request::Protocol {
            name: protocol,
            metadata: protocol::join_group::request::Metadata {
                version: 3,
                subscription: vec![&topic],
                user_data: None,
            },
        })
        .collect();

    let join_group_response = samsa::prelude::join_group(
        coordinator_conn.clone(),
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID2,
        100000,
        10000,
        bytes::Bytes::from(""),
        "consumer",
        protocols,
    )
    .await?;

    assert_eq!(join_group_response.members.len(), 1);
    assert_eq!(join_group_response.leader, join_group_response.member_id);
    assert_eq!(join_group_response.error_code, KafkaCode::None);

    //
    // Test sync group
    //
    let assignments = protocol::sync_group::request::Assignment::new(
        join_group_response.member_id.clone(),
        protocol::sync_group::request::MemberAssignment {
            version: 3,
            partition_assignments: vec![protocol::sync_group::request::PartitionAssignment {
                topic_name: &topic,
                partitions: vec![PARTITION_ID],
            }],
            user_data: None,
        },
    )?;

    let sync_response = samsa::prelude::sync_group(
        coordinator_conn.clone(),
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID2,
        join_group_response.generation_id,
        join_group_response.member_id.clone(),
        vec![assignments],
    )
    .await?;

    assert_eq!(sync_response.error_code, KafkaCode::None);
    assert_eq!(sync_response.assignment.partition_assignments.len(), 1);
    assert_eq!(
        sync_response.assignment.partition_assignments[0],
        protocol::sync_group::response::PartitionAssignment {
            topic_name: bytes::Bytes::from(topic.clone()),
            partitions: vec![PARTITION_ID]
        }
    );
    assert_eq!(sync_response.error_code, KafkaCode::None);

    //
    // Test heartbeat
    //
    let heartbeat_response = samsa::prelude::heartbeat(
        coordinator_conn.clone(),
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID2,
        join_group_response.generation_id,
        join_group_response.member_id.clone(),
    )
    .await?;
    assert_eq!(heartbeat_response.error_code, KafkaCode::None);

    //
    // Test leave group
    //
    let leave_group_response = samsa::prelude::leave_group(
        coordinator_conn,
        CORRELATION_ID,
        CLIENT_ID,
        GROUP_ID2,
        join_group_response.member_id.clone(),
    )
    .await?;
    assert_eq!(leave_group_response.error_code, KafkaCode::None);

    //
    // Delete topic
    //
    prelude::delete_topics(
        conn.clone(),
        CORRELATION_ID,
        CLIENT_ID,
        vec![topic.as_str()],
    )
    .await?;

    Ok(())
}