1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::{cell::RefCell, rc::Rc, slice};
use crate::core::{
dirty::{self, DirtySend},
pipes::{self, Receiver},
CreationContext, ExecutionContext, Op_, Relation,
};
use super::{
handler_queue::IsInputHandler, ContextTracker, HandlerPosition, HandlerQueue, TrackingIndex,
};
struct InputHandler<T> {
receiver: pipes::Receiver<T>,
sender: pipes::Sender<Vec<T>>,
dirty_send: DirtySend,
}
impl<T> IsInputHandler for InputHandler<T> {
fn dump(&mut self) {
self.sender.send(self.receiver.receive());
self.dirty_send.set_dirty();
}
}
pub struct Input_<'a, T> {
context_tracker: ContextTracker,
tracking_index: TrackingIndex,
sender: pipes::Sender<T>,
handler_queue: Rc<RefCell<HandlerQueue<'a>>>,
self_index: HandlerPosition,
listeners: Rc<RefCell<Vec<Box<dyn FnMut(&[T]) + 'a>>>>,
}
impl<T> Clone for Input_<'_, T> {
fn clone(&self) -> Self {
Input_ {
context_tracker: self.context_tracker.clone(),
tracking_index: self.tracking_index,
sender: self.sender.clone(),
handler_queue: Rc::clone(&self.handler_queue),
self_index: self.self_index,
listeners: Rc::clone(&self.listeners),
}
}
}
impl<'a, T> Input_<'a, T> {
pub(crate) fn add_listener(
&mut self,
context: &CreationContext,
listener: impl FnMut(&[T]) + 'a,
) {
assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
self.listeners.borrow_mut().push(Box::new(listener));
}
pub fn send(&mut self, context: &ExecutionContext, x: T) {
assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
self.handler_queue.borrow_mut().enqueue(self.self_index);
for listener in self.listeners.borrow_mut().iter_mut() {
listener(slice::from_ref(&x))
}
self.sender.send(x);
}
pub fn send_all(&mut self, context: &ExecutionContext, data: Vec<T>) {
assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
self.handler_queue.borrow_mut().enqueue(self.self_index);
for listener in self.listeners.borrow_mut().iter_mut() {
listener(&data)
}
self.sender.send_all(data);
}
pub(crate) fn silent_send_all(&mut self, context: &ExecutionContext, data: Vec<T>) {
assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
self.handler_queue.borrow_mut().enqueue(self.self_index);
self.sender.send_all(data);
}
pub fn tracking_index(&self) -> TrackingIndex {
self.tracking_index
}
}
pub struct InputOp<T>(Receiver<Vec<T>>);
impl<T> Op_ for InputOp<T> {
type T = T;
fn foreach<'a>(&'a mut self, mut continuation: impl FnMut(Self::T) + 'a) {
for x in self.0.receive().into_iter().flatten() {
continuation(x)
}
}
fn get_type_name() -> &'static str {
"input"
}
}
impl<'a> CreationContext<'a> {
pub fn new_input_<T: 'a>(&mut self) -> (Input_<'a, T>, Relation<InputOp<T>>) {
let (sender1, receiver1) = pipes::new();
let (sender2, receiver2) = pipes::new();
let (dirty_send, dirty_receive) = dirty::new();
let handler = InputHandler {
receiver: receiver1,
sender: sender2,
dirty_send,
};
let i = self.0.add_handler(handler);
let relation =
self.0
.tracker
.clone()
.add_relation(dirty_receive, InputOp(receiver2), vec![]);
let input_sender = Input_ {
context_tracker: self.0.tracker.clone(),
tracking_index: relation.tracking_index,
sender: sender1,
handler_queue: Rc::clone(self.0.handler_queue()),
self_index: i,
listeners: Default::default(),
};
(input_sender, relation)
}
}