ockam_api 0.93.0

Ockam's request-response API
use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl;
use crate::kafka::key_exchange::KafkaKeyExchangeController;
use crate::kafka::protocol_aware::{
    CorrelationId, KafkaMessageInterceptor, KafkaMessageInterceptorWrapper, RequestInfo,
    TopicUuidMap, MAX_KAFKA_MESSAGE_SIZE,
};
use crate::kafka::KafkaInletController;
use ockam_core::async_trait;
use ockam_core::compat::collections::HashMap;
use ockam_transport_tcp::{PortalInterceptor, PortalInterceptorFactory};
use std::sync::{Arc, Mutex};

mod request;
mod response;

#[cfg(test)]
mod tests;

#[derive(Clone)]
pub(crate) struct InletInterceptorImpl {
    request_map: Arc<Mutex<HashMap<CorrelationId, RequestInfo>>>,
    uuid_to_name: TopicUuidMap,
    key_exchange_controller: Arc<dyn KafkaKeyExchangeController>,
    inlet_map: KafkaInletController,
    encrypt_content: bool,
    encrypted_fields: Vec<String>,
}

#[async_trait]
impl KafkaMessageInterceptor for InletInterceptorImpl {}

impl InletInterceptorImpl {
    pub(crate) fn new(
        key_exchange_controller: Arc<dyn KafkaKeyExchangeController>,
        uuid_to_name: TopicUuidMap,
        inlet_map: KafkaInletController,
        encrypt_content: bool,
        encrypted_fields: Vec<String>,
    ) -> InletInterceptorImpl {
        Self {
            request_map: Arc::new(Mutex::new(Default::default())),
            uuid_to_name,
            key_exchange_controller,
            inlet_map,
            encrypt_content,
            encrypted_fields,
        }
    }

    #[cfg(test)]
    pub(crate) fn add_request(
        &self,
        correlation_id: CorrelationId,
        api_key: kafka_protocol::messages::ApiKey,
        api_version: i16,
    ) {
        self.request_map.lock().unwrap().insert(
            correlation_id,
            RequestInfo {
                request_api_key: api_key,
                request_api_version: api_version,
            },
        );
    }
}

pub(crate) struct KafkaInletInterceptorFactory {
    secure_channel_controller: KafkaKeyExchangeControllerImpl,
    uuid_to_name: TopicUuidMap,
    inlet_map: KafkaInletController,
    encrypt_content: bool,
    encrypted_fields: Vec<String>,
}

impl KafkaInletInterceptorFactory {
    pub(crate) fn new(
        secure_channel_controller: KafkaKeyExchangeControllerImpl,
        inlet_map: KafkaInletController,
        encrypt_content: bool,
        encrypted_fields: Vec<String>,
    ) -> Self {
        Self {
            secure_channel_controller,
            uuid_to_name: Default::default(),
            inlet_map,
            encrypt_content,
            encrypted_fields,
        }
    }
}

impl PortalInterceptorFactory for KafkaInletInterceptorFactory {
    fn create(&self) -> Arc<dyn PortalInterceptor> {
        Arc::new(KafkaMessageInterceptorWrapper::new(
            Arc::new(InletInterceptorImpl::new(
                Arc::new(self.secure_channel_controller.clone()),
                self.uuid_to_name.clone(),
                self.inlet_map.clone(),
                self.encrypt_content,
                self.encrypted_fields.clone(),
            )),
            MAX_KAFKA_MESSAGE_SIZE,
        ))
    }
}