zrx_stream/stream/function/traits/
lift.rs

1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Lift function.
27
28use std::fmt::Display;
29use zrx_scheduler::action::report::IntoReport;
30use zrx_scheduler::action::Result;
31use zrx_scheduler::Value;
32
33use crate::stream::function::adapter::{WithId, WithSplat};
34use crate::stream::function::{catch, Splat};
35use crate::value::Chunk;
36
37// ----------------------------------------------------------------------------
38// Traits
39// ----------------------------------------------------------------------------
40
41/// Lift function.
42///
43/// This trait defines a function that transforms data of type `T` into a vector
44/// of items of type `U`, which creates a vector of nested items, allowing us to
45/// implement higher-order collections and relationships to manage data-centric
46/// interdependencies. Transformations are expected to be immutable, which means
47/// they can't capture mutable variables. This allows operators to move function
48/// execution to a [`Task`][], which is also why it expects owned data.
49///
50/// There's a range of different implementations of this trait, allowing you to
51/// use a variety of function shapes, including support for [`Splat`], as well
52/// as support for the [`WithId`] and [`WithSplat`] adapters. Furthermore, the
53/// trait can be implemented for custom types to add new behaviors. Note that
54/// all implementations also allow to return a [`Report`][], which makes it
55/// possible to return diagnostics from the function execution.
56///
57/// Also note that it would be more efficient to return an `impl Iterator` from
58/// the lift function, but this doesn't work due to the RPITIT (return-position
59/// impl trait in trait) limitations, as they're not yet stabilized. Once those
60/// hit the stable channel, we might should switching to this approach. We also
61/// considered making the lift function generic over the iterator type, but it
62/// would make the trait more complex and less ergonomic.
63///
64/// The `'static` lifetimes is mandatory as closures must be moved into actions,
65/// so requiring it here allows us to reduce the verbosity of trait bounds.
66///
67/// [`Report`]: zrx_scheduler::action::Report
68/// [`Task`]: zrx_scheduler::effect::Task
69///
70/// # Examples
71///
72/// Lift data:
73///
74/// ```
75/// # use std::error::Error;
76/// # fn main() -> Result<(), Box<dyn Error>> {
77/// use zrx_stream::function::LiftFn;
78/// use zrx_stream::value::Chunk;
79///
80/// // Define and execute function
81/// let f = |data: &Vec<i32>| {
82///     data.iter()
83///         .map(|&n| (format!("id/{n}"), n))
84///         .collect::<Chunk<_, _>>()
85/// };
86/// f.execute(&"id".to_string(), &vec![1, 2])?;
87/// # Ok(())
88/// # }
89/// ```
90///
91/// Lift data with splat argument:
92///
93/// ```
94/// # use std::error::Error;
95/// # fn main() -> Result<(), Box<dyn Error>> {
96/// use zrx_stream::function::{LiftFn, Splat};
97/// use zrx_stream::value::Chunk;
98///
99/// // Define and execute function
100/// let f = |a: &i32, b: &i32| {
101///     [a, b]
102///         .into_iter()
103///         .map(|&n| (format!("id/{n}"), n))
104///         .collect::<Chunk<_, _>>()
105/// };
106/// f.execute(&"id".to_string(), &Splat::from((1, 2)))?;
107/// # Ok(())
108/// # }
109/// ```
110pub trait LiftFn<I, T, U>: Send + 'static
111where
112    T: ?Sized,
113{
114    /// Executes the lift function.
115    ///
116    /// # Errors
117    ///
118    /// This method returns an error if the function fails to execute.
119    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>>;
120}
121
122impl<F, R, I, T, U> LiftFn<I, T, U> for F
123where
124    F: Fn(&T) -> R + Send + 'static,
125    R: IntoReport<Chunk<I, U>>,
126    I: Display,
127    T: Value + ?Sized,
128{
129    #[cfg_attr(
130        feature = "tracing",
131        tracing::instrument(level = "debug", skip_all, fields(id = %id))
132    )]
133    #[inline]
134    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
135        catch(|| self(data).into_report())
136    }
137}
138
139impl<F, R, I, T, U> LiftFn<I, T, U> for WithId<F>
140where
141    F: Fn(&I, &T) -> R + Send + 'static,
142    R: IntoReport<Chunk<I, U>>,
143    I: Display,
144    T: Value + ?Sized,
145{
146    #[cfg_attr(
147        feature = "tracing",
148        tracing::instrument(level = "debug", skip_all, fields(id = %id))
149    )]
150    #[inline]
151    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
152        catch(|| self(id, data).into_report())
153    }
154}
155
156impl<F, I, T, U> LiftFn<I, T, U> for WithSplat<F>
157where
158    F: LiftFn<I, Splat<T>, U>,
159{
160    #[inline]
161    fn execute(&self, id: &I, data: &T) -> Result<Chunk<I, U>> {
162        F::execute(self, id, Splat::from_ref(data))
163    }
164}
165
166// ----------------------------------------------------------------------------
167// Macros
168// ----------------------------------------------------------------------------
169
170/// Implements lift function trait for splat arguments.
171macro_rules! impl_lift_fn_for_splat {
172    ($($T:ident),+) => {
173        impl<F, R, I, $($T,)+ U> LiftFn<I, Splat<($($T,)+)>, U> for F
174        where
175            F: Fn($(&$T),+) -> R + Send + 'static,
176            R: IntoReport<Chunk<I, U>>,
177            I: Display,
178        {
179            #[cfg_attr(
180                feature = "tracing",
181                tracing::instrument(level = "debug", skip_all, fields(id = %id))
182            )]
183            #[inline]
184            fn execute(
185                &self, id: &I, data: &Splat<($($T,)+)>
186            ) -> Result<Chunk<I, U>> {
187                #[allow(non_snake_case)]
188                let ($($T,)+) = data.inner();
189                catch(|| self($($T),+).into_report())
190            }
191        }
192    };
193}
194
195// ----------------------------------------------------------------------------
196
197impl_lift_fn_for_splat!(T1);
198impl_lift_fn_for_splat!(T1, T2);
199impl_lift_fn_for_splat!(T1, T2, T3);
200impl_lift_fn_for_splat!(T1, T2, T3, T4);
201impl_lift_fn_for_splat!(T1, T2, T3, T4, T5);
202impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6);
203impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6, T7);
204impl_lift_fn_for_splat!(T1, T2, T3, T4, T5, T6, T7, T8);