standing_relations/core/context/
input.rs1use std::{cell::RefCell, rc::Rc, slice};
2
3use crate::core::{
4 dirty::{self, DirtySend},
5 pipes::{self, Receiver},
6 CreationContext, ExecutionContext, Op_, Relation,
7};
8
9use super::{
10 handler_queue::IsInputHandler, ContextTracker, HandlerPosition, HandlerQueue, TrackingIndex,
11};
12
13struct InputHandler<T> {
14 receiver: pipes::Receiver<T>,
15 sender: pipes::Sender<Vec<T>>,
16 dirty_send: DirtySend,
17}
18
19impl<T> IsInputHandler for InputHandler<T> {
20 fn dump(&mut self) {
21 self.sender.send(self.receiver.receive());
22 self.dirty_send.set_dirty();
23 }
24}
25
26pub struct Input_<'a, T> {
27 context_tracker: ContextTracker,
28 tracking_index: TrackingIndex,
29 sender: pipes::Sender<T>,
30 handler_queue: Rc<RefCell<HandlerQueue<'a>>>,
31 self_index: HandlerPosition,
32 listeners: Rc<RefCell<Vec<Box<dyn FnMut(&[T]) + 'a>>>>,
33}
34
35impl<T> Clone for Input_<'_, T> {
36 fn clone(&self) -> Self {
37 Input_ {
38 context_tracker: self.context_tracker.clone(),
39 tracking_index: self.tracking_index,
40 sender: self.sender.clone(),
41 handler_queue: Rc::clone(&self.handler_queue),
42 self_index: self.self_index,
43 listeners: Rc::clone(&self.listeners),
44 }
45 }
46}
47
48impl<'a, T> Input_<'a, T> {
49 pub(crate) fn add_listener(
50 &mut self,
51 context: &CreationContext,
52 listener: impl FnMut(&[T]) + 'a,
53 ) {
54 assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
55 self.listeners.borrow_mut().push(Box::new(listener));
56 }
57 pub fn send(&mut self, context: &ExecutionContext, x: T) {
58 assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
59 self.handler_queue.borrow_mut().enqueue(self.self_index);
60 for listener in self.listeners.borrow_mut().iter_mut() {
61 listener(slice::from_ref(&x))
62 }
63 self.sender.send(x);
64 }
65 pub fn send_all(&mut self, context: &ExecutionContext, data: Vec<T>) {
66 assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
67 self.handler_queue.borrow_mut().enqueue(self.self_index);
68 for listener in self.listeners.borrow_mut().iter_mut() {
69 listener(&data)
70 }
71 self.sender.send_all(data);
72 }
73 pub(crate) fn silent_send_all(&mut self, context: &ExecutionContext, data: Vec<T>) {
74 assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
75 self.handler_queue.borrow_mut().enqueue(self.self_index);
76 self.sender.send_all(data);
77 }
78 pub fn tracking_index(&self) -> TrackingIndex {
79 self.tracking_index
80 }
81}
82
83pub struct InputOp<T>(Receiver<Vec<T>>);
84
85impl<T> Op_ for InputOp<T> {
86 type T = T;
87
88 fn foreach<'a>(&'a mut self, mut continuation: impl FnMut(Self::T) + 'a) {
89 for x in self.0.receive().into_iter().flatten() {
90 continuation(x)
91 }
92 }
93 fn get_type_name() -> &'static str {
94 "input"
95 }
96}
97
98impl<'a> CreationContext<'a> {
99 pub fn new_input_<T: 'a>(&mut self) -> (Input_<'a, T>, Relation<InputOp<T>>) {
100 let (sender1, receiver1) = pipes::new();
101 let (sender2, receiver2) = pipes::new();
102 let (dirty_send, dirty_receive) = dirty::new();
103 let handler = InputHandler {
104 receiver: receiver1,
105 sender: sender2,
106 dirty_send,
107 };
108 let i = self.0.add_handler(handler);
109 let relation =
110 self.0
111 .tracker
112 .clone()
113 .add_relation(dirty_receive, InputOp(receiver2), vec![]);
114 let input_sender = Input_ {
115 context_tracker: self.0.tracker.clone(),
116 tracking_index: relation.tracking_index,
117 sender: sender1,
118 handler_queue: Rc::clone(self.0.handler_queue()),
119 self_index: i,
120 listeners: Default::default(),
121 };
122 (input_sender, relation)
123 }
124}