use self::chain::TransformAndMetrics;
use crate::frame::MessageType;
use crate::message::{Message, MessageIdMap, Messages};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use futures::Future;
use std::fmt::Debug;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::pin::Pin;
use std::slice::IterMut;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::Instant;
#[cfg(feature = "cassandra")]
pub mod cassandra;
pub mod chain;
pub mod coalesce;
pub mod debug;
pub mod filter;
#[cfg(feature = "kafka")]
pub mod kafka;
pub mod load_balance;
pub mod loopback;
pub mod null;
#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))]
pub mod opensearch;
pub mod parallel_map;
#[cfg(all(feature = "alpha-transforms", feature = "cassandra"))]
pub mod protect;
pub mod query_counter;
pub mod tee;
#[cfg(feature = "cassandra")]
pub mod throttling;
pub mod util;
#[cfg(feature = "valkey")]
pub mod valkey;
#[derive(Clone, Debug)]
pub struct TransformContextBuilder {
pub force_run_chain: Arc<Notify>,
pub client_details: String,
}
impl TransformContextBuilder {
pub fn new_test() -> Self {
TransformContextBuilder {
force_run_chain: Arc::new(Notify::new()),
client_details: String::new(),
}
}
}
pub trait TransformBuilder: Send + Sync {
fn build(&self, transform_context: TransformContextBuilder) -> Box<dyn Transform>;
fn get_name(&self) -> &'static str;
fn validate(&self) -> Vec<String> {
vec![]
}
fn is_terminating(&self) -> bool {
false
}
}
#[typetag::serde]
#[async_trait(?Send)]
pub trait TransformConfig: Debug {
async fn get_builder(
&self,
transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>>;
fn up_chain_protocol(&self) -> UpChainProtocol;
fn down_chain_protocol(&self) -> DownChainProtocol;
}
pub enum UpChainProtocol {
MustBeOneOf(Vec<MessageType>),
Any,
}
pub enum DownChainProtocol {
TransformedTo(MessageType),
SameAsUpChain,
Terminating,
}
#[derive(Clone)]
pub struct TransformContextConfig {
pub chain_name: String,
pub up_chain_protocol: MessageType,
}
pub struct ChainState<'a> {
pub requests: Messages,
transforms: IterMut<'a, TransformAndMetrics>,
pub local_addr: SocketAddr,
pub flush: bool,
pub close_client_connection: bool,
}
impl Clone for ChainState<'_> {
fn clone(&self) -> Self {
ChainState {
requests: self.requests.clone(),
transforms: [].iter_mut(),
local_addr: self.local_addr,
flush: self.flush,
close_client_connection: self.close_client_connection,
}
}
}
impl<'shorter, 'longer: 'shorter> ChainState<'longer> {
fn take(&mut self) -> Self {
ChainState {
requests: std::mem::take(&mut self.requests),
transforms: std::mem::take(&mut self.transforms),
local_addr: self.local_addr,
flush: self.flush,
close_client_connection: self.close_client_connection,
}
}
pub async fn call_next_transform(&'shorter mut self) -> Result<Messages> {
let TransformAndMetrics {
transform,
transform_total,
transform_failures,
transform_latency,
..
} = match self.transforms.next() {
Some(transform) => transform,
None => panic!(
"The transform chain does not end with a terminating transform. If you want to throw the messages away use a NullSink transform, otherwise use a terminating sink transform to send the messages somewhere."
),
};
let transform_name = transform.get_name();
let start = Instant::now();
let result = transform
.transform(self)
.await
.map_err(|e| e.context(anyhow!("{transform_name} transform failed")));
transform_total.increment(1);
if result.is_err() {
transform_failures.increment(1);
}
transform_latency.record(start.elapsed());
result
}
pub fn clone_requests_into_hashmap(&self, destination: &mut MessageIdMap<Message>) {
for request in &self.requests {
destination.insert(request.id(), request.clone());
}
}
#[cfg(test)]
pub fn new_test(requests: Messages) -> Self {
ChainState {
requests,
transforms: [].iter_mut(),
local_addr: DUMMY_ADDRESS,
flush: false,
close_client_connection: false,
}
}
pub fn new_with_addr(requests: Messages, local_addr: SocketAddr) -> Self {
ChainState {
requests,
transforms: [].iter_mut(),
local_addr,
flush: false,
close_client_connection: false,
}
}
pub fn flush() -> Self {
ChainState {
requests: vec![],
transforms: [].iter_mut(),
local_addr: DUMMY_ADDRESS,
flush: true,
close_client_connection: false,
}
}
#[cfg(feature = "alpha-transforms")]
pub fn messages_to_high_level_string(&mut self) -> String {
let messages = self
.requests
.iter_mut()
.map(|x| x.to_high_level_string())
.collect::<Vec<_>>();
format!("{:?}", messages)
}
pub fn reset(&mut self, transforms: &'longer mut [TransformAndMetrics]) {
self.transforms = transforms.iter_mut();
}
}
const DUMMY_ADDRESS: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
#[async_trait]
pub trait Transform: Send {
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
chain_state: &'shorter mut ChainState<'longer>,
) -> Result<Messages>;
fn get_name(&self) -> &'static str;
}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<util::Response>> + Send + Sync>>;