differential_dataflow/operators/arrange/
writer.rs1use std::rc::{Rc, Weak};
7use std::cell::RefCell;
8
9use timely::progress::Antichain;
10
11use crate::trace::{Trace, Batch, BatchReader};
12use crate::trace::wrappers::rc::TraceBox;
13
14
15use super::TraceAgentQueueWriter;
16use super::TraceReplayInstruction;
17
18pub struct TraceWriter<Tr>
23where
24 Tr: Trace,
25 Tr::Batch: Batch,
26{
27 upper: Antichain<Tr::Time>,
29 trace: Weak<RefCell<TraceBox<Tr>>>,
31 queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
33}
34
35impl<Tr> TraceWriter<Tr>
36where
37 Tr: Trace,
38 Tr::Batch: Batch,
39{
40 pub fn new(
42 upper: Vec<Tr::Time>,
43 trace: Weak<RefCell<TraceBox<Tr>>>,
44 queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
45 ) -> Self
46 {
47 let mut temp = Antichain::new();
48 temp.extend(upper);
49 Self { upper: temp, trace, queues }
50 }
51
52 pub fn exert(&mut self) {
54 if let Some(trace) = self.trace.upgrade() {
55 trace.borrow_mut().trace.exert();
56 }
57 }
58
59 pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
65
66 if !(&self.upper == batch.lower()) {
68 println!("{:?} vs {:?}", self.upper, batch.lower());
69 }
70 assert!(&self.upper == batch.lower());
71 assert!(batch.lower() != batch.upper());
72
73 self.upper.clone_from(batch.upper());
74
75 let mut borrow = self.queues.borrow_mut();
77 for queue in borrow.iter_mut() {
78 if let Some(pair) = queue.upgrade() {
79 pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
80 pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().clone()));
81 pair.0.activate();
82 }
83 }
84 borrow.retain(|w| w.upgrade().is_some());
85
86 if let Some(trace) = self.trace.upgrade() {
88 trace.borrow_mut().trace.insert(batch);
89 }
90
91 }
92
93 pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
95 if self.upper != upper {
96 self.insert(Tr::Batch::empty(self.upper.clone(), upper), None);
97 }
98 }
99}
100
101impl<Tr> Drop for TraceWriter<Tr>
102where
103 Tr: Trace,
104 Tr::Batch: Batch,
105{
106 fn drop(&mut self) {
107 self.seal(Antichain::new())
108 }
109}