Skip to main content

zrx_stream/stream/operator/
delta_map.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Delta map operator.
27
28use std::marker::PhantomData;
29
30use zrx_scheduler::action::descriptor::Property;
31use zrx_scheduler::action::output::IntoOutputs;
32use zrx_scheduler::action::{Descriptor, Report};
33use zrx_scheduler::effect::{Item, Task};
34use zrx_scheduler::{Id, Value};
35
36use crate::stream::function::MapFn;
37use crate::stream::value::Delta;
38use crate::stream::Stream;
39
40use super::Operator;
41
42// ----------------------------------------------------------------------------
43// Structs
44// ----------------------------------------------------------------------------
45
46/// Delta map operator.
47struct DeltaMap<F, U> {
48    /// Operator function.
49    function: F,
50    /// Capture types.
51    marker: PhantomData<U>,
52}
53
54// ----------------------------------------------------------------------------
55// Implementations
56// ----------------------------------------------------------------------------
57
58impl<I, T> Stream<I, Delta<I, T>>
59where
60    I: Id,
61    T: Value + Clone,
62{
63    pub fn delta_map<F, U>(&self, f: F) -> Stream<I, Delta<I, U>>
64    where
65        F: MapFn<I, T, U> + Clone,
66        U: Value,
67    {
68        self.workflow.add_operator(
69            [self.id],
70            DeltaMap {
71                function: f,
72                marker: PhantomData,
73            },
74        )
75    }
76}
77
78// ----------------------------------------------------------------------------
79// Trait implementations
80// ----------------------------------------------------------------------------
81
82impl<I, T, F, U> Operator<I, Delta<I, T>> for DeltaMap<F, U>
83where
84    I: Id,
85    T: Value + Clone,
86    F: MapFn<I, T, U> + Clone,
87    U: Value,
88{
89    type Item<'a> = Item<&'a I, &'a Delta<I, T>>;
90
91    /// Handles the given item.
92    ///
93    /// This operator returns a task that applies the operator function to each
94    /// item that is part of the incoming delta. Thus, it is similar to the map
95    /// operator, but works on the deltas of items. While every insertion is
96    /// mapped to a new type, deletions are just forwarded.
97    #[cfg_attr(
98        feature = "tracing",
99        tracing::instrument(level = "debug", skip_all, fields(id = %item.id))
100    )]
101    fn handle(&mut self, item: Self::Item<'_>) -> impl IntoOutputs<I> {
102        let item = item.into_owned();
103        Task::new({
104            let function = self.function.clone();
105            move || {
106                // Since the operator function is invoked for each delta, we
107                // consolidate each returned report into this top-level report,
108                // which allows us to collect diagnostics from all invocations
109                // and return them together with the item
110                let mut report = Report::new(());
111                let iter = item.data.into_iter().map(|part| {
112                    if let Some(data) = part.data {
113                        // When new data arrives, we interpret the incoming item
114                        // as an insertion or update, so we pass it to the given
115                        // operator function, and return an item with the new
116                        // data. If the function errors, we abort here.
117                        let temp = function.execute(&part.id, data)?;
118                        Ok(Item::new(part.id, Some(report.merge(temp))))
119                    } else {
120                        // If the incoming item has no data, we interpret this
121                        // as a deletion, just forwarding the item
122                        Ok(Item::new(part.id, None))
123                    }
124                });
125
126                // Collect and return delta of items
127                iter.collect::<Result<Delta<I, U>, _>>().map(|delta| {
128                    report.map(|()| Item::new(item.id, Some(delta)))
129                })
130            }
131        })
132    }
133
134    /// Returns the descriptor.
135    #[inline]
136    fn descriptor(&self) -> Descriptor {
137        Descriptor::builder()
138            .property(Property::Pure)
139            .property(Property::Stable)
140            .build()
141    }
142}