ockam_api 0.93.0

Ockam's request-response API
use crate::kafka::{ConsumerPublishing, ConsumerResolution};
use crate::nodes::models::portal::{InletStatus, OutletStatus};
use crate::nodes::models::services::{
    StartKafkaInletRequest, StartKafkaOutletRequest, StartServiceRequest,
};
use crate::nodes::BackgroundNodeClient;
use crate::port_range::PortRange;
use ockam_abac::PolicyExpression;
use ockam_core::api::{Reply, Request};
use ockam_core::async_trait;
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use ockam_transport_core::HostnamePort;

#[async_trait]
pub trait KafkaPortals {
    #[allow(clippy::too_many_arguments)]
    async fn create_kafka_inlet(
        &self,
        ctx: &Context,
        name: &str,
        bind_address: HostnamePort,
        brokers_port_range: PortRange,
        kafka_outlet_route: MultiAddr,
        encrypt_content: bool,
        encrypted_fields: Vec<String>,
        consumer_resolution: ConsumerResolution,
        consumer_publishing: ConsumerPublishing,
        inlet_policy_expression: Option<PolicyExpression>,
        consumer_policy_expression: Option<PolicyExpression>,
        producer_policy_expression: Option<PolicyExpression>,
    ) -> miette::Result<Reply<InletStatus>>;

    async fn create_kafka_outlet(
        &self,
        ctx: &Context,
        name: &str,
        bootstrap_server_addr: HostnamePort,
        tls: bool,
        policy_expression: Option<PolicyExpression>,
    ) -> miette::Result<Reply<OutletStatus>>;
}

#[async_trait]
impl KafkaPortals for BackgroundNodeClient {
    async fn create_kafka_inlet(
        &self,
        ctx: &Context,
        name: &str,
        bind_address: HostnamePort,
        brokers_port_range: PortRange,
        kafka_outlet_route: MultiAddr,
        encrypt_content: bool,
        encrypted_fields: Vec<String>,
        consumer_resolution: ConsumerResolution,
        consumer_publishing: ConsumerPublishing,
        inlet_policy_expression: Option<PolicyExpression>,
        consumer_policy_expression: Option<PolicyExpression>,
        producer_policy_expression: Option<PolicyExpression>,
    ) -> miette::Result<Reply<InletStatus>> {
        let request = {
            let payload = StartKafkaInletRequest::new(
                bind_address,
                brokers_port_range,
                kafka_outlet_route,
                encrypt_content,
                encrypted_fields,
                consumer_resolution,
                consumer_publishing,
                inlet_policy_expression,
                consumer_policy_expression,
                producer_policy_expression,
            );
            let payload = StartServiceRequest::new(payload, name);
            Request::post("/node/services/kafka_inlet").body(payload)
        };
        self.ask_and_get_reply(ctx, request).await
    }

    async fn create_kafka_outlet(
        &self,
        ctx: &Context,
        name: &str,
        bootstrap_server_addr: HostnamePort,
        tls: bool,
        policy_expression: Option<PolicyExpression>,
    ) -> miette::Result<Reply<OutletStatus>> {
        let request = {
            let payload =
                StartKafkaOutletRequest::new(bootstrap_server_addr, tls, policy_expression);
            let payload = StartServiceRequest::new(payload, name);
            Request::post("/node/services/kafka_outlet").body(payload)
        };
        self.ask_and_get_reply(ctx, request).await
    }
}