ockam_api 0.48.0

Ockam's request-response API
Documentation
#[cfg(test)]
mod test {
    use crate::kafka::inlet_controller::KafkaInletController;
    use crate::kafka::protocol_aware::utils::{encode_request, encode_response};
    use crate::kafka::protocol_aware::InletInterceptorImpl;
    use crate::kafka::protocol_aware::KafkaMessageInterceptor;
    use crate::kafka::secure_channel_map::{KafkaEncryptedContent, KafkaSecureChannelController};
    use crate::port_range::PortRange;
    use kafka_protocol::messages::ApiKey;
    use kafka_protocol::messages::BrokerId;
    use kafka_protocol::messages::{ApiVersionsRequest, MetadataRequest, MetadataResponse};
    use kafka_protocol::messages::{ApiVersionsResponse, RequestHeader, ResponseHeader};
    use kafka_protocol::protocol::{Builder, StrBytes};
    use ockam_core::compat::sync::Arc;
    use ockam_core::route;
    use ockam_core::{async_trait, Address};
    use ockam_multiaddr::MultiAddr;
    use ockam_node::Context;

    struct DummySecureChannelController;

    #[async_trait]
    impl KafkaSecureChannelController for DummySecureChannelController {
        async fn encrypt_content_for(
            &self,
            _context: &mut Context,
            _topic_name: &str,
            _partition_id: i32,
            content: Vec<u8>,
        ) -> ockam_core::Result<KafkaEncryptedContent> {
            Ok(KafkaEncryptedContent {
                content,
                consumer_decryptor_address: Address::from_string("arbitrary string"),
            })
        }

        async fn decrypt_content_for(
            &self,
            _context: &mut Context,
            _consumer_decryptor_address: &Address,
            encrypted_content: Vec<u8>,
        ) -> ockam_core::Result<Vec<u8>> {
            Ok(encrypted_content)
        }

        async fn start_relays_for(
            &self,
            _context: &mut Context,
            _topic_id: &str,
            _partitions: Vec<i32>,
        ) -> ockam_core::Result<()> {
            Ok(())
        }
    }

    #[allow(non_snake_case)]
    #[ockam_macros::test(timeout = 5_000)]
    async fn interceptor__basic_messages_with_several_api_versions__parsed_correctly(
        context: &mut Context,
    ) -> ockam::Result<()> {
        let inlet_map = KafkaInletController::new(
            MultiAddr::default(),
            route![],
            route![],
            [127, 0, 0, 1].into(),
            PortRange::new(0, 0).unwrap(),
        );

        let interceptor = InletInterceptorImpl::new(
            Arc::new(DummySecureChannelController {}),
            Default::default(),
            inlet_map,
        );

        let mut correlation_id = 0;

        for api_version in 0..14 {
            let result = interceptor
                .intercept_request(
                    context,
                    encode_request(
                        &RequestHeader::builder()
                            .request_api_version(api_version)
                            .correlation_id(correlation_id)
                            .request_api_key(ApiKey::ApiVersionsKey as i16)
                            .unknown_tagged_fields(Default::default())
                            .client_id(None)
                            .build()
                            .unwrap(),
                        &ApiVersionsRequest::builder()
                            .client_software_name(StrBytes::from_str("mr. software"))
                            .client_software_version(StrBytes::from_str("1.0.0"))
                            .unknown_tagged_fields(Default::default())
                            .build()
                            .unwrap(),
                        api_version,
                        ApiKey::ApiVersionsKey,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            let result = interceptor
                .intercept_response(
                    context,
                    encode_response(
                        &ResponseHeader::builder()
                            .correlation_id(correlation_id)
                            .unknown_tagged_fields(Default::default())
                            .build()
                            .unwrap(),
                        &ApiVersionsResponse::builder()
                            .error_code(0)
                            .api_keys(Default::default())
                            .throttle_time_ms(0)
                            .supported_features(Default::default())
                            .finalized_features_epoch(0)
                            .finalized_features(Default::default())
                            .unknown_tagged_fields(Default::default())
                            .build()
                            .unwrap(),
                        api_version,
                        ApiKey::ApiVersionsKey,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            correlation_id += 1;
        }

        for api_version in 0..14 {
            let result = interceptor
                .intercept_request(
                    context,
                    encode_request(
                        &RequestHeader::builder()
                            .request_api_version(api_version)
                            .correlation_id(correlation_id)
                            .request_api_key(ApiKey::MetadataKey as i16)
                            .unknown_tagged_fields(Default::default())
                            .client_id(None)
                            .build()
                            .unwrap(),
                        &MetadataRequest::builder()
                            .topics(None)
                            .allow_auto_topic_creation(true)
                            .include_cluster_authorized_operations(false)
                            .include_topic_authorized_operations(false)
                            .unknown_tagged_fields(Default::default())
                            .build()
                            .unwrap(),
                        api_version,
                        ApiKey::MetadataKey,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            let result = interceptor
                .intercept_response(
                    context,
                    encode_response(
                        &ResponseHeader::builder()
                            .correlation_id(correlation_id)
                            .unknown_tagged_fields(Default::default())
                            .build()
                            .unwrap(),
                        &MetadataResponse::builder()
                            .throttle_time_ms(0)
                            .brokers(Default::default())
                            .cluster_id(None)
                            .controller_id(BrokerId::from(0_i32))
                            .cluster_authorized_operations(-2147483648)
                            .topics(Default::default())
                            .unknown_tagged_fields(Default::default())
                            .build()
                            .unwrap(),
                        api_version,
                        ApiKey::MetadataKey,
                    )
                    .unwrap(),
                )
                .await;

            if let Err(error) = result {
                panic!("unexpected error: {error:?}");
            }

            correlation_id += 1;
        }

        context.stop().await
    }
}