ockam_api 0.93.0

Ockam's request-response API
#[cfg(test)]
mod test {
    use crate::kafka::inlet_controller::KafkaInletController;
    use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl;
    use crate::kafka::protocol_aware::inlet::InletInterceptorImpl;
    use crate::kafka::protocol_aware::utils::{encode_request, encode_response};
    use crate::kafka::protocol_aware::{
        KafkaMessageRequestInterceptor, KafkaMessageResponseInterceptor,
    };
    use crate::kafka::{ConsumerPublishing, ConsumerResolution};
    use crate::port_range::PortRange;
    use crate::test_utils::TestNode;
    use kafka_protocol::messages::ApiKey;
    use kafka_protocol::messages::BrokerId;
    use kafka_protocol::messages::{ApiVersionsRequest, MetadataRequest, MetadataResponse};
    use kafka_protocol::messages::{ApiVersionsResponse, RequestHeader, ResponseHeader};
    use kafka_protocol::protocol::StrBytes;
    use ockam_abac::{Action, Env, Resource, ResourceType};
    use ockam_core::route;
    use ockam_multiaddr::MultiAddr;
    use ockam_node::Context;
    use std::sync::Arc;

    #[allow(non_snake_case)]
    #[ockam_macros::test(timeout = 5_000)]
    async fn interceptor__basic_messages_with_several_api_versions__parsed_correctly(
        context: &mut Context,
    ) -> ockam::Result<()> {
        TestNode::clean().await?;
        let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;

        let inlet_map = KafkaInletController::new(
            (*handle.node_manager).clone(),
            MultiAddr::default(),
            route![],
            route![],
            "127.0.0.1".to_string(),
            PortRange::new(0, 0).unwrap(),
            None,
        );

        let secure_channels = handle.node_manager.secure_channels();
        let policies = handle.node_manager.policies();

        let consumer_policy_access_control = policies.make_policy_access_control(
            secure_channels.identities().identities_attributes(),
            Resource::new("arbitrary-resource-name", ResourceType::KafkaConsumer),
            Action::HandleMessage,
            Env::new(),
            Some(handle.node_manager.identifier()),
        );

        let producer_policy_access_control = policies.make_policy_access_control(
            secure_channels.identities().identities_attributes(),
            Resource::new("arbitrary-resource-name", ResourceType::KafkaProducer),
            Action::HandleMessage,
            Env::new(),
            Some(handle.node_manager.identifier()),
        );

        let secure_channel_controller = KafkaKeyExchangeControllerImpl::new(
            (*handle.node_manager).clone(),
            secure_channels,
            ConsumerResolution::None,
            ConsumerPublishing::None,
            consumer_policy_access_control,
            producer_policy_access_control,
        );

        let interceptor = InletInterceptorImpl::new(
            Arc::new(secure_channel_controller),
            Default::default(),
            inlet_map,
            true,
            vec![],
        );

        let mut correlation_id = 0;

        for api_version in 0..14 {
            let result = interceptor
                .intercept_request(
                    context,
                    encode_request(
                        &RequestHeader::default()
                            .with_request_api_version(api_version)
                            .with_correlation_id(correlation_id)
                            .with_request_api_key(ApiKey::ApiVersions as i16),
                        &ApiVersionsRequest::default()
                            .with_client_software_name(StrBytes::from_static_str("mr. software"))
                            .with_client_software_version(StrBytes::from_static_str("1.0.0")),
                        api_version,
                        ApiKey::ApiVersions,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            let result = interceptor
                .intercept_response(
                    context,
                    encode_response(
                        &ResponseHeader::default().with_correlation_id(correlation_id),
                        &ApiVersionsResponse::default(),
                        api_version,
                        ApiKey::ApiVersions,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            correlation_id += 1;
        }

        for api_version in 0..14 {
            let result = interceptor
                .intercept_request(
                    context,
                    encode_request(
                        &RequestHeader::default()
                            .with_request_api_version(api_version)
                            .with_correlation_id(correlation_id)
                            .with_request_api_key(ApiKey::Metadata as i16),
                        &MetadataRequest::default(),
                        api_version,
                        ApiKey::Metadata,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            let result = interceptor
                .intercept_response(
                    context,
                    encode_response(
                        &ResponseHeader::default().with_correlation_id(correlation_id),
                        &MetadataResponse::default().with_controller_id(BrokerId::from(0_i32)),
                        api_version,
                        ApiKey::Metadata,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            correlation_id += 1;
        }
        Ok(())
    }
}