#[macro_export]
macro_rules! impl_record_registrar_ext {
(
$trait_name:ident,
$runtime:ty,
$buffer:ty,
$feature:literal,
$buffer_new:expr
) => {
pub trait $trait_name<'a, T>
where
T: Send + Sync + Clone + core::fmt::Debug + 'static,
{
fn buffer(
&'a mut self,
cfg: $crate::buffer::BufferCfg,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>;
fn source<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer<T, $runtime>) -> Fut
+ Send
+ Sync
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static;
fn tap<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer<T, $runtime>) -> Fut
+ Send
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static;
fn transform<I, F>(
&'a mut self,
input_key: impl $crate::RecordKey,
build_fn: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
I: Send + Sync + Clone + core::fmt::Debug + 'static,
F: FnOnce(
$crate::transform::TransformBuilder<I, T, $runtime>,
) -> $crate::transform::TransformPipeline<I, T, $runtime>;
#[cfg(feature = "std")]
fn transform_join<F>(
&'a mut self,
build_fn: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce(
$crate::transform::JoinBuilder<T, $runtime>,
) -> $crate::transform::JoinPipeline<T, $runtime>;
}
#[cfg(feature = $feature)]
impl<'a, T> $trait_name<'a, T> for $crate::RecordRegistrar<'a, T, $runtime>
where
T: Send + Sync + Clone + core::fmt::Debug + 'static,
{
fn buffer(
&'a mut self,
cfg: $crate::buffer::BufferCfg,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> {
use $crate::buffer::Buffer;
#[cfg(feature = "std")]
{
let buffer = Box::new($buffer_new(&cfg));
self.buffer_with_cfg(buffer, cfg)
}
#[cfg(not(feature = "std"))]
{
extern crate alloc;
let buffer = alloc::boxed::Box::new($buffer_new(&cfg));
self.buffer_raw(buffer)
}
}
fn source<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer<T, $runtime>) -> Fut
+ Send
+ Sync
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static,
{
self.source_raw(|producer, ctx_any| {
let ctx = $crate::RuntimeContext::extract_from_any(ctx_any);
f(ctx, producer)
})
}
fn tap<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer<T, $runtime>) -> Fut
+ Send
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static,
{
self.tap_raw(|consumer, ctx_any| {
let ctx = $crate::RuntimeContext::extract_from_any(ctx_any);
f(ctx, consumer)
})
}
fn transform<I, F>(
&'a mut self,
input_key: impl $crate::RecordKey,
build_fn: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
I: Send + Sync + Clone + core::fmt::Debug + 'static,
F: FnOnce(
$crate::transform::TransformBuilder<I, T, $runtime>,
) -> $crate::transform::TransformPipeline<I, T, $runtime>,
{
self.transform_raw::<I, F>(input_key, build_fn)
}
#[cfg(feature = "std")]
fn transform_join<F>(
&'a mut self,
build_fn: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce(
$crate::transform::JoinBuilder<T, $runtime>,
) -> $crate::transform::JoinPipeline<T, $runtime>,
{
self.transform_join_raw(build_fn)
}
}
};
(
$trait_name:ident,
$runtime:ty,
$buffer:ty,
[$($feature:literal),+],
$buffer_new:expr
) => {
pub trait $trait_name<'a, T>
where
T: Send + Sync + Clone + core::fmt::Debug + 'static,
{
fn buffer(
&'a mut self,
cfg: $crate::buffer::BufferCfg,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>;
fn source<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer<T, $runtime>) -> Fut
+ Send
+ Sync
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static;
fn tap<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer<T, $runtime>) -> Fut
+ Send
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static;
fn transform<I, F>(
&'a mut self,
input_key: impl $crate::RecordKey,
build_fn: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
I: Send + Sync + Clone + core::fmt::Debug + 'static,
F: FnOnce(
$crate::transform::TransformBuilder<I, T, $runtime>,
) -> $crate::transform::TransformPipeline<I, T, $runtime>;
}
#[cfg(all($(feature = $feature),+))]
impl<'a, T> $trait_name<'a, T> for $crate::RecordRegistrar<'a, T, $runtime>
where
T: Send + Sync + Clone + core::fmt::Debug + 'static,
{
fn buffer(
&'a mut self,
cfg: $crate::buffer::BufferCfg,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> {
use $crate::buffer::Buffer;
#[cfg(feature = "std")]
{
let buffer = Box::new($buffer_new(&cfg));
self.buffer_with_cfg(buffer, cfg)
}
#[cfg(not(feature = "std"))]
{
extern crate alloc;
let buffer = alloc::boxed::Box::new($buffer_new(&cfg));
self.buffer_raw(buffer)
}
}
fn source<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Producer<T, $runtime>) -> Fut
+ Send
+ Sync
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static,
{
self.source_raw(|producer, ctx_any| {
let ctx = $crate::RuntimeContext::extract_from_any(ctx_any);
f(ctx, producer)
})
}
fn tap<F, Fut>(
&'a mut self,
f: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
F: FnOnce($crate::RuntimeContext<$runtime>, $crate::Consumer<T, $runtime>) -> Fut
+ Send
+ 'static,
Fut: core::future::Future<Output = ()> + Send + 'static,
{
self.tap_raw(|consumer, ctx_any| {
let ctx = $crate::RuntimeContext::extract_from_any(ctx_any);
f(ctx, consumer)
})
}
fn transform<I, F>(
&'a mut self,
input_key: impl $crate::RecordKey,
build_fn: F,
) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime>
where
I: Send + Sync + Clone + core::fmt::Debug + 'static,
F: FnOnce(
$crate::transform::TransformBuilder<I, T, $runtime>,
) -> $crate::transform::TransformPipeline<I, T, $runtime>,
{
self.transform_raw::<I, F>(input_key, build_fn)
}
}
};
}