use crate::kafka::KAFKA_OUTLET_CONSUMERS;
use crate::DefaultAddress;
use core::str::from_utf8;
use ockam::{Context, Result, Routed, Worker};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::flow_control::FlowControlId;
use ockam_core::{Address, AllowAll, AllowOnwardAddress};
pub struct PrefixRelayService {
prefix: String,
secure_channel_listener_flow_control_id: FlowControlId,
}
impl PrefixRelayService {
pub async fn create(
context: &Context,
secure_channel_listener_flow_control_id: FlowControlId,
) -> Result<()> {
let worker_address = Address::from_string(KAFKA_OUTLET_CONSUMERS);
context.flow_controls().add_consumer(
worker_address.clone(),
&secure_channel_listener_flow_control_id,
);
let worker = Self {
prefix: "consumer_".to_string(),
secure_channel_listener_flow_control_id,
};
context
.start_worker_with_access_control(
worker_address,
worker,
AllowAll,
AllowOnwardAddress(DefaultAddress::RELAY_SERVICE.into()),
)
.await
}
}
#[ockam::worker]
impl Worker for PrefixRelayService {
type Message = Vec<u8>;
type Context = Context;
async fn handle_message(
&mut self,
ctx: &mut Context,
msg: Routed<Self::Message>,
) -> Result<()> {
let address = match msg.payload().get(1..) {
Some(address) => match from_utf8(address) {
Ok(v) => v.to_string(),
Err(_e) => {
return Err(ockam_core::Error::new(
Origin::Application,
Kind::Invalid,
"invalid address",
));
}
},
None => {
return Err(ockam_core::Error::new(
Origin::Application,
Kind::Invalid,
"invalid address",
));
}
};
let new_address = format!("{}_{}", &self.prefix, address);
debug!("prefix relay, renamed from {} to {}", address, new_address);
let mut bytes = new_address.clone().into_bytes();
let mut new_payload: Vec<u8> = vec![bytes.len() as u8];
new_payload.append(&mut bytes);
let mut message = msg.into_local_message();
let transport_message = message.transport_mut();
transport_message.onward_route.step()?;
transport_message.payload = new_payload;
ctx.forward(message).await?;
ctx.flow_controls().add_consumer(
Address::from_string(new_address),
&self.secure_channel_listener_flow_control_id,
);
Ok(())
}
}