differential_dataflow/operators/arrange/
writer.rs1use std::rc::{Rc, Weak};
7use std::cell::RefCell;
8
9use timely::progress::Antichain;
10
11use crate::trace::{Trace, Batch, BatchReader};
12
13use super::TraceAgentQueueWriter;
14use super::TraceReplayInstruction;
15use super::agent::trace_box::TraceBox;
16
17pub struct TraceWriter<Tr: Trace> {
22 upper: Antichain<Tr::Time>,
24 trace: Weak<RefCell<TraceBox<Tr>>>,
26 queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
28}
29
30impl<Tr: Trace> TraceWriter<Tr> {
31 pub fn new(
33 upper: Vec<Tr::Time>,
34 trace: Weak<RefCell<TraceBox<Tr>>>,
35 queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
36 ) -> Self
37 {
38 let mut temp = Antichain::new();
39 temp.extend(upper);
40 Self { upper: temp, trace, queues }
41 }
42
43 pub fn exert(&mut self) {
45 if let Some(trace) = self.trace.upgrade() {
46 trace.borrow_mut().trace.exert();
47 }
48 }
49
50 pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
56
57 if !(&self.upper == batch.lower()) {
59 println!("{:?} vs {:?}", self.upper, batch.lower());
60 }
61 assert!(&self.upper == batch.lower());
62 assert!(batch.lower() != batch.upper());
63
64 self.upper.clone_from(batch.upper());
65
66 let mut borrow = self.queues.borrow_mut();
68 for queue in borrow.iter_mut() {
69 if let Some(pair) = queue.upgrade() {
70 pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
71 pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().clone()));
72 pair.0.activate();
73 }
74 }
75 borrow.retain(|w| w.upgrade().is_some());
76
77 if let Some(trace) = self.trace.upgrade() {
79 trace.borrow_mut().trace.insert(batch);
80 }
81
82 }
83
84 pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
86 if self.upper != upper {
87 self.insert(Tr::Batch::empty(self.upper.clone(), upper), None);
88 }
89 }
90}
91
92impl<Tr: Trace> Drop for TraceWriter<Tr> {
93 fn drop(&mut self) {
94 self.seal(Antichain::new())
95 }
96}