zrx_stream/stream/operator/
delta_map.rs

1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Delta map operator.
27
28use std::marker::PhantomData;
29use zrx_scheduler::action::descriptor::Property;
30use zrx_scheduler::action::output::IntoOutputs;
31use zrx_scheduler::action::{Descriptor, Report};
32use zrx_scheduler::effect::{Item, Task};
33use zrx_scheduler::{Id, Value};
34
35use crate::stream::function::MapFn;
36use crate::stream::value::Delta;
37use crate::stream::Stream;
38
39use super::Operator;
40
41// ----------------------------------------------------------------------------
42// Structs
43// ----------------------------------------------------------------------------
44
45/// Delta map operator.
46struct DeltaMap<F, U> {
47    /// Operator function.
48    function: F,
49    /// Type marker.
50    marker: PhantomData<U>,
51}
52
53// ----------------------------------------------------------------------------
54// Implementations
55// ----------------------------------------------------------------------------
56
57impl<I, T> Stream<I, Delta<I, T>>
58where
59    I: Id,
60    T: Value + Clone,
61{
62    pub fn delta_map<F, U>(&self, f: F) -> Stream<I, Delta<I, U>>
63    where
64        F: MapFn<I, T, U> + Clone,
65        U: Value,
66    {
67        self.workflow.add_operator(
68            [self.id],
69            DeltaMap {
70                function: f,
71                marker: PhantomData,
72            },
73        )
74    }
75}
76
77// ----------------------------------------------------------------------------
78// Trait implementations
79// ----------------------------------------------------------------------------
80
81impl<I, T, F, U> Operator<I, Delta<I, T>> for DeltaMap<F, U>
82where
83    I: Id,
84    T: Value + Clone,
85    F: MapFn<I, T, U> + Clone,
86    U: Value,
87{
88    type Item<'a> = Item<&'a I, &'a Delta<I, T>>;
89
90    /// Handles the given item.
91    ///
92    /// This operator returns a task that applies the operator function to each
93    /// item that is part of the incoming delta. Thus, it is similar to the map
94    /// operator, but works on the deltas of items. While every insertion is
95    /// mapped to a new type, deletions are just forwarded.
96    #[cfg_attr(
97        feature = "tracing",
98        tracing::instrument(level = "debug", skip_all, fields(id = %item.id))
99    )]
100    fn handle(&mut self, item: Self::Item<'_>) -> impl IntoOutputs<I> {
101        let item = item.into_owned();
102        Task::new({
103            let function = self.function.clone();
104            move || {
105                // Since the operator function is invoked for each delta, we
106                // consolidate each returned report into this top-level report,
107                // which allows us to collect diagnostics from all invocations
108                // and return them together with the item
109                let mut report = Report::new(());
110                let iter = item.data.into_iter().map(|part| {
111                    if let Some(data) = part.data {
112                        // When new data arrives, we interpret the incoming item
113                        // as an insertion or update, so we pass it to the given
114                        // operator function, and return an item with the new
115                        // data. If the function errors, we abort here.
116                        let temp = function.execute(&part.id, data)?;
117                        Ok(Item::new(part.id, Some(report.merge(temp))))
118                    } else {
119                        // If the incoming item has no data, we interpret this
120                        // as a deletion, just forwarding the item
121                        Ok(Item::new(part.id, None))
122                    }
123                });
124
125                // Collect and return delta of items
126                iter.collect::<Result<Delta<I, U>, _>>().map(|delta| {
127                    report.map(|()| Item::new(item.id, Some(delta)))
128                })
129            }
130        })
131    }
132
133    /// Returns the descriptor.
134    #[inline]
135    fn descriptor(&self) -> Descriptor {
136        Descriptor::builder()
137            .property(Property::Pure)
138            .property(Property::Stable)
139            .build()
140    }
141}