zrx_stream/stream/function/traits/lift.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Lift function.
27
28use std::fmt::Display;
29
30use zrx_scheduler::action::report::IntoReport;
31use zrx_scheduler::action::Result;
32use zrx_scheduler::Value;
33
34use crate::stream::function::adapter::{WithId, WithSplat};
35use crate::stream::function::{catch, Splat};
36use crate::value::Chunk;
37
38// ----------------------------------------------------------------------------
39// Traits
40// ----------------------------------------------------------------------------
41
42/// Lift function.
43///
44/// This trait defines a function that transforms data of type `T` into a vector
45/// of items of type `U`, which creates a vector of nested items, allowing us to
46/// implement higher-order collections and relationships to manage data-centric
47/// interdependencies. Transformations are expected to be immutable, which means
48/// they can't capture mutable variables. This allows operators to move function
49/// execution to a [`Task`][], which is also why it expects owned data.
50///
51/// There's a range of different implementations of this trait, allowing you to
52/// use a variety of function shapes, including support for [`Splat`], as well
53/// as support for the [`WithId`] and [`WithSplat`] adapters. Furthermore, the
54/// trait can be implemented for custom types to add new behaviors. Note that
55/// all implementations also allow to return a [`Report`][], which makes it
56/// possible to return diagnostics from the function execution.
57///
58/// Also note that it would be more efficient to return an `impl Iterator` from
59/// the lift function, but this doesn't work due to the RPITIT (return-position
60/// impl trait in trait) limitations, as they're not yet stabilized. Once those
61/// hit the stable channel, we should consider switching to this approach. We
62/// also considered making the lift function generic over the iterator type,
63/// but it would make the trait more complex and less ergonomic.
64///
65/// The `'static` lifetimes is mandatory as closures must be moved into actions,
66/// so requiring it here allows us to reduce the verbosity of trait bounds.
67///
68/// [`Report`]: zrx_scheduler::action::Report
69/// [`Task`]: zrx_scheduler::effect::Task
70///
71/// # Examples
72///
73/// Lift data:
74///
75/// ```
76/// # use std::error::Error;
77/// # fn main() -> Result<(), Box<dyn Error>> {
78/// use zrx_stream::function::LiftFn;
79/// use zrx_stream::value::Chunk;
80///
81/// // Define and execute function
82/// let f = |data: &Vec<i32>| {
83/// data.iter()
84/// .map(|&n| (format!("id/{n}"), n))
85/// .collect::<Chunk<_, _>>()
86/// };
87/// f.execute(&"id".to_string(), &vec![1, 2])?;
88/// # Ok(())
89/// # }
90/// ```
91///
92/// Lift data with splat argument:
93///
94/// ```
95/// # use std::error::Error;
96/// # fn main() -> Result<(), Box<dyn Error>> {
97/// use zrx_stream::function::{LiftFn, Splat};
98/// use zrx_stream::value::Chunk;
99///
100/// // Define and execute function
101/// let f = |a: &i32, b: &i32| {
102/// [a, b]
103/// .into_iter()
104/// .map(|&n| (format!("id/{n}"), n))
105/// .collect::<Chunk<_, _>>()
106/// };
107/// f.execute(&"id".to_string(), &Splat::from((1, 2)))?;
108/// # Ok(())
109/// # }
110/// ```
111pub trait LiftFn<I, T, U>: Send + 'static
112where
113 T: ?Sized,
114{
115 /// Executes the lift function.
116 ///
117 /// # Errors
118 ///
119 /// This method returns an error if the function fails to execute.
120 fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>>;
121}
122
123impl<F, R, I, T, U> LiftFn<I, T, U> for F
124where
125 F: Fn(&T) -> R + Send + 'static,
126 R: IntoReport<Chunk<I, U>>,
127 I: Display,
128 T: Value + ?Sized,
129{
130 #[cfg_attr(
131 feature = "tracing",
132 tracing::instrument(level = "debug", skip_all, fields(id = %id))
133 )]
134 #[inline]
135 fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
136 catch(|| self(data).into_report())
137 }
138}
139
140impl<F, R, I, T, U> LiftFn<I, T, U> for WithId<F>
141where
142 F: Fn(&I, &T) -> R + Send + 'static,
143 R: IntoReport<Chunk<I, U>>,
144 I: Display,
145 T: Value + ?Sized,
146{
147 #[cfg_attr(
148 feature = "tracing",
149 tracing::instrument(level = "debug", skip_all, fields(id = %id))
150 )]
151 #[inline]
152 fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
153 catch(|| self(id, data).into_report())
154 }
155}
156
157impl<F, I, T, U> LiftFn<I, T, U> for WithSplat<F>
158where
159 F: LiftFn<I, Splat<T>, U>,
160{
161 #[inline]
162 fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
163 F::execute(self, id, Splat::from_ref(data))
164 }
165}
166
167// ----------------------------------------------------------------------------
168// Macros
169// ----------------------------------------------------------------------------
170
171/// Implements lift function trait for splat arguments.
172macro_rules! impl_lift_fn_for_splat {
173 ($($T:ident),+) => {
174 impl<F, R, I, $($T,)+ U> LiftFn<I, Splat<($($T,)+)>, U> for F
175 where
176 F: Fn($(&$T),+) -> R + Send + 'static,
177 R: IntoReport<Chunk<I, U>>,
178 I: Display,
179 {
180 #[cfg_attr(
181 feature = "tracing",
182 tracing::instrument(level = "debug", skip_all, fields(id = %id))
183 )]
184 #[inline]
185 fn execute(
186 &self, id: &I, data: &Splat<($($T,)+)>
187 ) -> Result<Chunk<I, U>> {
188 #[allow(non_snake_case)]
189 let ($($T,)+) = data.inner();
190 catch(|| self($($T),+).into_report())
191 }
192 }
193 };
194}
195
196// ----------------------------------------------------------------------------
197
198impl_lift_fn_for_splat!(T1);
199impl_lift_fn_for_splat!(T1, T2);
200impl_lift_fn_for_splat!(T1, T2, T3);
201impl_lift_fn_for_splat!(T1, T2, T3, T4);
202impl_lift_fn_for_splat!(T1, T2, T3, T4, T5);
203impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6);
204impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6, T7);
205impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6, T7, T8);