Skip to main content

zrx_stream/stream/function/traits/
lift.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Lift function.
27
28use std::fmt::Display;
29
30use zrx_scheduler::action::report::IntoReport;
31use zrx_scheduler::action::Result;
32use zrx_scheduler::Value;
33
34use crate::stream::function::adapter::{WithId, WithSplat};
35use crate::stream::function::{catch, Splat};
36use crate::value::Chunk;
37
38// ----------------------------------------------------------------------------
39// Traits
40// ----------------------------------------------------------------------------
41
42/// Lift function.
43///
44/// This trait defines a function that transforms data of type `T` into a vector
45/// of items of type `U`, which creates a vector of nested items, allowing us to
46/// implement higher-order collections and relationships to manage data-centric
47/// interdependencies. Transformations are expected to be immutable, which means
48/// they can't capture mutable variables. This allows operators to move function
49/// execution to a [`Task`][], which is also why it expects owned data.
50///
51/// There's a range of different implementations of this trait, allowing you to
52/// use a variety of function shapes, including support for [`Splat`], as well
53/// as support for the [`WithId`] and [`WithSplat`] adapters. Furthermore, the
54/// trait can be implemented for custom types to add new behaviors. Note that
55/// all implementations also allow to return a [`Report`][], which makes it
56/// possible to return diagnostics from the function execution.
57///
58/// Also note that it would be more efficient to return an `impl Iterator` from
59/// the lift function, but this doesn't work due to the RPITIT (return-position
60/// impl trait in trait) limitations, as they're not yet stabilized. Once those
61/// hit the stable channel, we should consider switching to this approach. We
62/// also considered making the lift function generic over the iterator type,
63/// but it would make the trait more complex and less ergonomic.
64///
65/// The `'static` lifetimes is mandatory as closures must be moved into actions,
66/// so requiring it here allows us to reduce the verbosity of trait bounds.
67///
68/// [`Report`]: zrx_scheduler::action::Report
69/// [`Task`]: zrx_scheduler::effect::Task
70///
71/// # Examples
72///
73/// Lift data:
74///
75/// ```
76/// # use std::error::Error;
77/// # fn main() -> Result<(), Box<dyn Error>> {
78/// use zrx_stream::function::LiftFn;
79/// use zrx_stream::value::Chunk;
80///
81/// // Define and execute function
82/// let f = |data: &Vec<i32>| {
83///     data.iter()
84///         .map(|&n| (format!("id/{n}"), n))
85///         .collect::<Chunk<_, _>>()
86/// };
87/// f.execute(&"id".to_string(), &vec![1, 2])?;
88/// # Ok(())
89/// # }
90/// ```
91///
92/// Lift data with splat argument:
93///
94/// ```
95/// # use std::error::Error;
96/// # fn main() -> Result<(), Box<dyn Error>> {
97/// use zrx_stream::function::{LiftFn, Splat};
98/// use zrx_stream::value::Chunk;
99///
100/// // Define and execute function
101/// let f = |a: &i32, b: &i32| {
102///     [a, b]
103///         .into_iter()
104///         .map(|&n| (format!("id/{n}"), n))
105///         .collect::<Chunk<_, _>>()
106/// };
107/// f.execute(&"id".to_string(), &Splat::from((1, 2)))?;
108/// # Ok(())
109/// # }
110/// ```
111pub trait LiftFn<I, T, U>: Send + 'static
112where
113    T: ?Sized,
114{
115    /// Executes the lift function.
116    ///
117    /// # Errors
118    ///
119    /// This method returns an error if the function fails to execute.
120    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>>;
121}
122
123impl<F, R, I, T, U> LiftFn<I, T, U> for F
124where
125    F: Fn(&T) -> R + Send + 'static,
126    R: IntoReport<Chunk<I, U>>,
127    I: Display,
128    T: Value + ?Sized,
129{
130    #[cfg_attr(
131        feature = "tracing",
132        tracing::instrument(level = "debug", skip_all, fields(id = %id))
133    )]
134    #[inline]
135    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
136        catch(|| self(data).into_report())
137    }
138}
139
140impl<F, R, I, T, U> LiftFn<I, T, U> for WithId<F>
141where
142    F: Fn(&I, &T) -> R + Send + 'static,
143    R: IntoReport<Chunk<I, U>>,
144    I: Display,
145    T: Value + ?Sized,
146{
147    #[cfg_attr(
148        feature = "tracing",
149        tracing::instrument(level = "debug", skip_all, fields(id = %id))
150    )]
151    #[inline]
152    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
153        catch(|| self(id, data).into_report())
154    }
155}
156
157impl<F, I, T, U> LiftFn<I, T, U> for WithSplat<F>
158where
159    F: LiftFn<I, Splat<T>, U>,
160{
161    #[inline]
162    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
163        F::execute(self, id, Splat::from_ref(data))
164    }
165}
166
167// ----------------------------------------------------------------------------
168// Macros
169// ----------------------------------------------------------------------------
170
171/// Implements lift function trait for splat arguments.
172macro_rules! impl_lift_fn_for_splat {
173    ($($T:ident),+) => {
174        impl<F, R, I, $($T,)+ U> LiftFn<I, Splat<($($T,)+)>, U> for F
175        where
176            F: Fn($(&$T),+) -> R + Send + 'static,
177            R: IntoReport<Chunk<I, U>>,
178            I: Display,
179        {
180            #[cfg_attr(
181                feature = "tracing",
182                tracing::instrument(level = "debug", skip_all, fields(id = %id))
183            )]
184            #[inline]
185            fn execute(
186                &self, id: &I, data: &Splat<($($T,)+)>
187            ) -> Result<Chunk<I, U>> {
188                #[allow(non_snake_case)]
189                let ($($T,)+) = data.inner();
190                catch(|| self($($T),+).into_report())
191            }
192        }
193    };
194}
195
196// ----------------------------------------------------------------------------
197
198impl_lift_fn_for_splat!(T1);
199impl_lift_fn_for_splat!(T1, T2);
200impl_lift_fn_for_splat!(T1, T2, T3);
201impl_lift_fn_for_splat!(T1, T2, T3, T4);
202impl_lift_fn_for_splat!(T1, T2, T3, T4, T5);
203impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6);
204impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6, T7);
205impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6, T7, T8);