zrx_stream/stream/operator/delta_map.rs
1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Delta map operator.
27
28use std::marker::PhantomData;
29use zrx_scheduler::action::descriptor::Property;
30use zrx_scheduler::action::output::IntoOutputs;
31use zrx_scheduler::action::{Descriptor, Report};
32use zrx_scheduler::effect::{Item, Task};
33use zrx_scheduler::{Id, Value};
34
35use crate::stream::function::MapFn;
36use crate::stream::value::Delta;
37use crate::stream::Stream;
38
39use super::Operator;
40
41// ----------------------------------------------------------------------------
42// Structs
43// ----------------------------------------------------------------------------
44
45/// Delta map operator.
46struct DeltaMap<F, U> {
47 /// Operator function.
48 function: F,
49 /// Type marker.
50 marker: PhantomData<U>,
51}
52
53// ----------------------------------------------------------------------------
54// Implementations
55// ----------------------------------------------------------------------------
56
57impl<I, T> Stream<I, Delta<I, T>>
58where
59 I: Id,
60 T: Value + Clone,
61{
62 pub fn delta_map<F, U>(&self, f: F) -> Stream<I, Delta<I, U>>
63 where
64 F: MapFn<I, T, U> + Clone,
65 U: Value,
66 {
67 self.workflow.add_operator(
68 [self.id],
69 DeltaMap {
70 function: f,
71 marker: PhantomData,
72 },
73 )
74 }
75}
76
77// ----------------------------------------------------------------------------
78// Trait implementations
79// ----------------------------------------------------------------------------
80
81impl<I, T, F, U> Operator<I, Delta<I, T>> for DeltaMap<F, U>
82where
83 I: Id,
84 T: Value + Clone,
85 F: MapFn<I, T, U> + Clone,
86 U: Value,
87{
88 type Item<'a> = Item<&'a I, &'a Delta<I, T>>;
89
90 /// Handles the given item.
91 ///
92 /// This operator returns a task that applies the operator function to each
93 /// item that is part of the incoming delta. Thus, it is similar to the map
94 /// operator, but works on the deltas of items. While every insertion is
95 /// mapped to a new type, deletions are just forwarded.
96 #[cfg_attr(
97 feature = "tracing",
98 tracing::instrument(level = "debug", skip_all, fields(id = %item.id))
99 )]
100 fn handle(&mut self, item: Self::Item<'_>) -> impl IntoOutputs<I> {
101 let item = item.into_owned();
102 Task::new({
103 let function = self.function.clone();
104 move || {
105 // Since the operator function is invoked for each delta, we
106 // consolidate each returned report into this top-level report,
107 // which allows us to collect diagnostics from all invocations
108 // and return them together with the item
109 let mut report = Report::new(());
110 let iter = item.data.into_iter().map(|part| {
111 if let Some(data) = part.data {
112 // When new data arrives, we interpret the incoming item
113 // as an insertion or update, so we pass it to the given
114 // operator function, and return an item with the new
115 // data. If the function errors, we abort here.
116 let temp = function.execute(&part.id, data)?;
117 Ok(Item::new(part.id, Some(report.merge(temp))))
118 } else {
119 // If the incoming item has no data, we interpret this
120 // as a deletion, just forwarding the item
121 Ok(Item::new(part.id, None))
122 }
123 });
124
125 // Collect and return delta of items
126 iter.collect::<Result<Delta<I, U>, _>>().map(|delta| {
127 report.map(|()| Item::new(item.id, Some(delta)))
128 })
129 }
130 })
131 }
132
133 /// Returns the descriptor.
134 #[inline]
135 fn descriptor(&self) -> Descriptor {
136 Descriptor::builder()
137 .property(Property::Pure)
138 .property(Property::Stable)
139 .build()
140 }
141}