standing_relations/feedback/
op.rs

1use std::hash::Hash;
2
3use crate::{core, CountMap, Input, Observable, Op, Output, Relation};
4
5use self::pipe::{OrderedPipe, Pipe};
6
7use super::context::CreationContext;
8
9mod checked_foreach;
10mod pipe;
11
12pub struct FeedbackWhile<'a, C: Op>
13where
14    C::D: Eq + Hash,
15{
16    output: Output<C::D, C>,
17    input: Input<'a, C::D>,
18}
19
20pub struct Feedback<'a, C: Op> {
21    #[allow(clippy::type_complexity)]
22    output: Output<C::D, C, Pipe<(C::D, isize)>>,
23    input: Input<'a, C::D>,
24}
25
26pub struct FeedbackOrdered<'a, K: Ord, V: Eq + Hash, C: Op<D = (K, V)>> {
27    output: Output<(K, V), C, OrderedPipe<K, V>>,
28    input: Input<'a, V>,
29}
30
31pub struct Interrupter<C: Op, M: CountMap<C::D>, F: Fn(&M) -> I, I> {
32    output: Output<C::D, C, M>,
33    f: F,
34}
35
36pub(crate) trait IsFeeder<'a, I> {
37    fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I>;
38}
39
40pub(crate) trait IsFeedback<'a, I>: IsFeeder<'a, I> {
41    fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static);
42}
43
44pub enum Instruct<I> {
45    Unchanged,
46    Changed,
47    Interrupt(I),
48}
49
50impl<'a, C: Op, I> IsFeeder<'a, I> for FeedbackWhile<'a, C>
51where
52    C::D: Clone + Eq + Hash + 'a,
53{
54    fn feed(&mut self, context: &core::ExecutionContext) -> Instruct<I> {
55        let m = self.output.get(context);
56        if m.is_empty() {
57            Instruct::Unchanged
58        } else {
59            self.input.send_all(
60                context,
61                m.iter().map(|(x, &count)| (x.clone(), count)).collect(),
62            );
63            Instruct::Changed
64        }
65    }
66}
67
68impl<'a, C: Op, I> IsFeedback<'a, I> for FeedbackWhile<'a, C>
69where
70    C::D: Clone + Eq + Hash + 'a,
71{
72    fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
73        self.output.add_listener(context, f);
74    }
75}
76
77impl<'a, C: Op, I> IsFeeder<'a, I> for Feedback<'a, C> {
78    fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I> {
79        let m = self.output.get(context);
80        let changes = m.receive();
81        if changes.is_empty() {
82            Instruct::Unchanged
83        } else {
84            self.input.send_all(context, changes);
85            Instruct::Changed
86        }
87    }
88}
89
90impl<'a, C: Op, I> IsFeedback<'a, I> for Feedback<'a, C> {
91    fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
92        self.output.add_listener(context, f);
93    }
94}
95
96impl<'a, K: Ord, V: Eq + Hash, C: Op<D = (K, V)>, I> IsFeeder<'a, I>
97    for FeedbackOrdered<'a, K, V, C>
98{
99    fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I> {
100        let m = self.output.get(context);
101        match m.receive() {
102            None => Instruct::Unchanged,
103            Some((_, changes)) => {
104                self.input.send_all(context, changes.into_iter().collect());
105                Instruct::Changed
106            }
107        }
108    }
109}
110
111impl<'a, K: Ord, V: Eq + Hash, C: Op<D = (K, V)>, I> IsFeedback<'a, I>
112    for FeedbackOrdered<'a, K, V, C>
113{
114    fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
115        self.output.add_listener(context, f);
116    }
117}
118
119impl<'a, C: Op, M: CountMap<C::D> + Observable, F: Fn(&M) -> I, I> IsFeeder<'a, I>
120    for Interrupter<C, M, F, I>
121{
122    fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I> {
123        let m = self.output.get(context);
124        if m.is_empty() {
125            Instruct::Unchanged
126        } else {
127            Instruct::Interrupt((self.f)(&*m))
128        }
129    }
130}
131impl<'a, C: Op, M: CountMap<C::D> + Observable, F: Fn(&M) -> I, I> IsFeedback<'a, I>
132    for Interrupter<C, M, F, I>
133{
134    fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
135        self.output.add_listener(context, f);
136    }
137}
138
139impl<'a, I> CreationContext<'a, I> {
140    /// Connects a `Relation` to an `Input` such that whenever `ExecutionContext::commit` is called,
141    /// any changes to the collection represented by the `Relation` argument are fed back into the
142    /// `Input` argument. This repeats until the collection stops changing.
143    pub fn feed<D>(&mut self, rel: Relation<impl Op<D = D> + 'a>, input: Input<'a, D>) {
144        let edge = (rel.tracking_index(), input.tracking_index());
145        self.add_feeder(
146            Feedback {
147                output: rel.into_output_(self),
148                input,
149            },
150            Some(edge),
151        )
152    }
153
154    /// Similar to `feed` except that the `Relation` argument
155    /// additionally has an ordering key. Rather than feeding _all_ changes back into the `Input`, only
156    /// those with the minimum present ordering key are fed back in. If any later changes are cancelled
157    /// out as a result of this (if their count goes to zero), then they will not be fed in at all.
158    /// This can be handy in situations where using `feed` naively can cause an infinite loop.
159    pub fn feed_ordered<K: Ord + 'a, V: Eq + Hash + 'a>(
160        &mut self,
161        rel: Relation<impl Op<D = (K, V)> + 'a>,
162        input: Input<'a, V>,
163    ) {
164        let edge = (rel.tracking_index(), input.tracking_index());
165        self.add_feeder(
166            FeedbackOrdered {
167                output: rel.into_output_(self),
168                input,
169            },
170            Some(edge),
171        )
172    }
173    pub fn interrupt<D, M: CountMap<D> + Observable + 'a>(
174        &mut self,
175        output: Output<D, impl Op<D = D> + 'a, M>,
176        f: impl Fn(&M) -> I + 'a,
177    ) where
178        I: 'a,
179    {
180        self.add_feeder(Interrupter { output, f }, None)
181    }
182
183    /// Takes an `Output` as an argument rather than a `Relation` and rather
184    /// than propagating _changes_ to it's argument through will instead send the entire contents of that
185    /// `Output` on every visit. `feed_while` is intended to be used in circumstances where there exists
186    /// a negative feedback loop between the arguments and the caller wants to retain any visited values
187    /// rather than have them be immediately deleted.
188    pub fn feed_while<D: Clone + Eq + Hash + 'a>(
189        &mut self,
190        output: Output<D, impl Op<D = D> + 'a>,
191        input: Input<'a, D>,
192    ) {
193        let edge = (output.tracking_index(), input.tracking_index());
194        self.add_feeder(FeedbackWhile { output, input }, Some(edge))
195    }
196}