standing_relations/core/context/
input.rs

1use std::{cell::RefCell, rc::Rc, slice};
2
3use crate::core::{
4    dirty::{self, DirtySend},
5    pipes::{self, Receiver},
6    CreationContext, ExecutionContext, Op_, Relation,
7};
8
9use super::{
10    handler_queue::IsInputHandler, ContextTracker, HandlerPosition, HandlerQueue, TrackingIndex,
11};
12
13struct InputHandler<T> {
14    receiver: pipes::Receiver<T>,
15    sender: pipes::Sender<Vec<T>>,
16    dirty_send: DirtySend,
17}
18
19impl<T> IsInputHandler for InputHandler<T> {
20    fn dump(&mut self) {
21        self.sender.send(self.receiver.receive());
22        self.dirty_send.set_dirty();
23    }
24}
25
26pub struct Input_<'a, T> {
27    context_tracker: ContextTracker,
28    tracking_index: TrackingIndex,
29    sender: pipes::Sender<T>,
30    handler_queue: Rc<RefCell<HandlerQueue<'a>>>,
31    self_index: HandlerPosition,
32    listeners: Rc<RefCell<Vec<Box<dyn FnMut(&[T]) + 'a>>>>,
33}
34
35impl<T> Clone for Input_<'_, T> {
36    fn clone(&self) -> Self {
37        Input_ {
38            context_tracker: self.context_tracker.clone(),
39            tracking_index: self.tracking_index,
40            sender: self.sender.clone(),
41            handler_queue: Rc::clone(&self.handler_queue),
42            self_index: self.self_index,
43            listeners: Rc::clone(&self.listeners),
44        }
45    }
46}
47
48impl<'a, T> Input_<'a, T> {
49    pub(crate) fn add_listener(
50        &mut self,
51        context: &CreationContext,
52        listener: impl FnMut(&[T]) + 'a,
53    ) {
54        assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
55        self.listeners.borrow_mut().push(Box::new(listener));
56    }
57    pub fn send(&mut self, context: &ExecutionContext, x: T) {
58        assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
59        self.handler_queue.borrow_mut().enqueue(self.self_index);
60        for listener in self.listeners.borrow_mut().iter_mut() {
61            listener(slice::from_ref(&x))
62        }
63        self.sender.send(x);
64    }
65    pub fn send_all(&mut self, context: &ExecutionContext, data: Vec<T>) {
66        assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
67        self.handler_queue.borrow_mut().enqueue(self.self_index);
68        for listener in self.listeners.borrow_mut().iter_mut() {
69            listener(&data)
70        }
71        self.sender.send_all(data);
72    }
73    pub(crate) fn silent_send_all(&mut self, context: &ExecutionContext, data: Vec<T>) {
74        assert_eq!(self.context_tracker, context.0.tracker, "Context mismatch");
75        self.handler_queue.borrow_mut().enqueue(self.self_index);
76        self.sender.send_all(data);
77    }
78    pub fn tracking_index(&self) -> TrackingIndex {
79        self.tracking_index
80    }
81}
82
83pub struct InputOp<T>(Receiver<Vec<T>>);
84
85impl<T> Op_ for InputOp<T> {
86    type T = T;
87
88    fn foreach<'a>(&'a mut self, mut continuation: impl FnMut(Self::T) + 'a) {
89        for x in self.0.receive().into_iter().flatten() {
90            continuation(x)
91        }
92    }
93    fn get_type_name() -> &'static str {
94        "input"
95    }
96}
97
98impl<'a> CreationContext<'a> {
99    pub fn new_input_<T: 'a>(&mut self) -> (Input_<'a, T>, Relation<InputOp<T>>) {
100        let (sender1, receiver1) = pipes::new();
101        let (sender2, receiver2) = pipes::new();
102        let (dirty_send, dirty_receive) = dirty::new();
103        let handler = InputHandler {
104            receiver: receiver1,
105            sender: sender2,
106            dirty_send,
107        };
108        let i = self.0.add_handler(handler);
109        let relation =
110            self.0
111                .tracker
112                .clone()
113                .add_relation(dirty_receive, InputOp(receiver2), vec![]);
114        let input_sender = Input_ {
115            context_tracker: self.0.tracker.clone(),
116            tracking_index: relation.tracking_index,
117            sender: sender1,
118            handler_queue: Rc::clone(self.0.handler_queue()),
119            self_index: i,
120            listeners: Default::default(),
121        };
122        (input_sender, relation)
123    }
124}