standing_relations/feedback/
op.rs1use std::hash::Hash;
2
3use crate::{core, CountMap, Input, Observable, Op, Output, Relation};
4
5use self::pipe::{OrderedPipe, Pipe};
6
7use super::context::CreationContext;
8
9mod checked_foreach;
10mod pipe;
11
12pub struct FeedbackWhile<'a, C: Op>
13where
14 C::D: Eq + Hash,
15{
16 output: Output<C::D, C>,
17 input: Input<'a, C::D>,
18}
19
20pub struct Feedback<'a, C: Op> {
21 #[allow(clippy::type_complexity)]
22 output: Output<C::D, C, Pipe<(C::D, isize)>>,
23 input: Input<'a, C::D>,
24}
25
26pub struct FeedbackOrdered<'a, K: Ord, V: Eq + Hash, C: Op<D = (K, V)>> {
27 output: Output<(K, V), C, OrderedPipe<K, V>>,
28 input: Input<'a, V>,
29}
30
31pub struct Interrupter<C: Op, M: CountMap<C::D>, F: Fn(&M) -> I, I> {
32 output: Output<C::D, C, M>,
33 f: F,
34}
35
36pub(crate) trait IsFeeder<'a, I> {
37 fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I>;
38}
39
40pub(crate) trait IsFeedback<'a, I>: IsFeeder<'a, I> {
41 fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static);
42}
43
44pub enum Instruct<I> {
45 Unchanged,
46 Changed,
47 Interrupt(I),
48}
49
50impl<'a, C: Op, I> IsFeeder<'a, I> for FeedbackWhile<'a, C>
51where
52 C::D: Clone + Eq + Hash + 'a,
53{
54 fn feed(&mut self, context: &core::ExecutionContext) -> Instruct<I> {
55 let m = self.output.get(context);
56 if m.is_empty() {
57 Instruct::Unchanged
58 } else {
59 self.input.send_all(
60 context,
61 m.iter().map(|(x, &count)| (x.clone(), count)).collect(),
62 );
63 Instruct::Changed
64 }
65 }
66}
67
68impl<'a, C: Op, I> IsFeedback<'a, I> for FeedbackWhile<'a, C>
69where
70 C::D: Clone + Eq + Hash + 'a,
71{
72 fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
73 self.output.add_listener(context, f);
74 }
75}
76
77impl<'a, C: Op, I> IsFeeder<'a, I> for Feedback<'a, C> {
78 fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I> {
79 let m = self.output.get(context);
80 let changes = m.receive();
81 if changes.is_empty() {
82 Instruct::Unchanged
83 } else {
84 self.input.send_all(context, changes);
85 Instruct::Changed
86 }
87 }
88}
89
90impl<'a, C: Op, I> IsFeedback<'a, I> for Feedback<'a, C> {
91 fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
92 self.output.add_listener(context, f);
93 }
94}
95
96impl<'a, K: Ord, V: Eq + Hash, C: Op<D = (K, V)>, I> IsFeeder<'a, I>
97 for FeedbackOrdered<'a, K, V, C>
98{
99 fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I> {
100 let m = self.output.get(context);
101 match m.receive() {
102 None => Instruct::Unchanged,
103 Some((_, changes)) => {
104 self.input.send_all(context, changes.into_iter().collect());
105 Instruct::Changed
106 }
107 }
108 }
109}
110
111impl<'a, K: Ord, V: Eq + Hash, C: Op<D = (K, V)>, I> IsFeedback<'a, I>
112 for FeedbackOrdered<'a, K, V, C>
113{
114 fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
115 self.output.add_listener(context, f);
116 }
117}
118
119impl<'a, C: Op, M: CountMap<C::D> + Observable, F: Fn(&M) -> I, I> IsFeeder<'a, I>
120 for Interrupter<C, M, F, I>
121{
122 fn feed(&mut self, context: &core::ExecutionContext<'a>) -> Instruct<I> {
123 let m = self.output.get(context);
124 if m.is_empty() {
125 Instruct::Unchanged
126 } else {
127 Instruct::Interrupt((self.f)(&*m))
128 }
129 }
130}
131impl<'a, C: Op, M: CountMap<C::D> + Observable, F: Fn(&M) -> I, I> IsFeedback<'a, I>
132 for Interrupter<C, M, F, I>
133{
134 fn add_listener(&mut self, context: &core::CreationContext, f: impl FnMut() + 'static) {
135 self.output.add_listener(context, f);
136 }
137}
138
139impl<'a, I> CreationContext<'a, I> {
140 pub fn feed<D>(&mut self, rel: Relation<impl Op<D = D> + 'a>, input: Input<'a, D>) {
144 let edge = (rel.tracking_index(), input.tracking_index());
145 self.add_feeder(
146 Feedback {
147 output: rel.into_output_(self),
148 input,
149 },
150 Some(edge),
151 )
152 }
153
154 pub fn feed_ordered<K: Ord + 'a, V: Eq + Hash + 'a>(
160 &mut self,
161 rel: Relation<impl Op<D = (K, V)> + 'a>,
162 input: Input<'a, V>,
163 ) {
164 let edge = (rel.tracking_index(), input.tracking_index());
165 self.add_feeder(
166 FeedbackOrdered {
167 output: rel.into_output_(self),
168 input,
169 },
170 Some(edge),
171 )
172 }
173 pub fn interrupt<D, M: CountMap<D> + Observable + 'a>(
174 &mut self,
175 output: Output<D, impl Op<D = D> + 'a, M>,
176 f: impl Fn(&M) -> I + 'a,
177 ) where
178 I: 'a,
179 {
180 self.add_feeder(Interrupter { output, f }, None)
181 }
182
183 pub fn feed_while<D: Clone + Eq + Hash + 'a>(
189 &mut self,
190 output: Output<D, impl Op<D = D> + 'a>,
191 input: Input<'a, D>,
192 ) {
193 let edge = (output.tracking_index(), input.tracking_index());
194 self.add_feeder(FeedbackWhile { output, input }, Some(edge))
195 }
196}