Skip to main content

differential_dataflow/operators/arrange/
writer.rs

1//! Write endpoint for a sequence of batches.
2//!
3//! A `TraceWriter` accepts a sequence of batches and distributes them
4//! to both a shared trace and to a sequence of private queues.
5
6use std::rc::{Rc, Weak};
7use std::cell::RefCell;
8
9use timely::progress::Antichain;
10
11use crate::trace::{Trace, Batch, BatchReader};
12
13use super::TraceAgentQueueWriter;
14use super::TraceReplayInstruction;
15use super::agent::trace_box::TraceBox;
16
17/// Write endpoint for a sequence of batches.
18///
19/// A `TraceWriter` accepts a sequence of batches and distributes them
20/// to both a shared trace and to a sequence of private queues.
21pub struct TraceWriter<Tr: Trace> {
22    /// Current upper limit.
23    upper: Antichain<Tr::Time>,
24    /// Shared trace, possibly absent (due to weakness).
25    trace: Weak<RefCell<TraceBox<Tr>>>,
26    /// A sequence of private queues into which batches are written.
27    queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
28}
29
30impl<Tr: Trace> TraceWriter<Tr> {
31    /// Creates a new `TraceWriter`.
32    pub fn new(
33        upper: Vec<Tr::Time>,
34        trace: Weak<RefCell<TraceBox<Tr>>>,
35        queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
36    ) -> Self
37    {
38        let mut temp = Antichain::new();
39        temp.extend(upper);
40        Self { upper: temp, trace, queues }
41    }
42
43    /// Exerts merge effort, even without additional updates.
44    pub fn exert(&mut self) {
45        if let Some(trace) = self.trace.upgrade() {
46            trace.borrow_mut().trace.exert();
47        }
48    }
49
50    /// Advances the trace by `batch`.
51    ///
52    /// The `hint` argument is either `None` in the case of an empty batch,
53    /// or is `Some(time)` for a time less or equal to all updates in the
54    /// batch and which is suitable for use as a capability.
55    pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
56
57        // Something is wrong if not a sequence.
58        if !(&self.upper == batch.lower()) {
59            println!("{:?} vs {:?}", self.upper, batch.lower());
60        }
61        assert!(&self.upper == batch.lower());
62        assert!(batch.lower() != batch.upper());
63
64        self.upper.clone_from(batch.upper());
65
66        // push information to each listener that still exists.
67        let mut borrow = self.queues.borrow_mut();
68        for queue in borrow.iter_mut() {
69            if let Some(pair) = queue.upgrade() {
70                pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
71                pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().clone()));
72                pair.0.activate();
73            }
74        }
75        borrow.retain(|w| w.upgrade().is_some());
76
77        // push data to the trace, if it still exists.
78        if let Some(trace) = self.trace.upgrade() {
79            trace.borrow_mut().trace.insert(batch);
80        }
81
82    }
83
84    /// Inserts an empty batch up to `upper`.
85    pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
86        if self.upper != upper {
87            self.insert(Tr::Batch::empty(self.upper.clone(), upper), None);
88        }
89    }
90}
91
92impl<Tr: Trace> Drop for TraceWriter<Tr> {
93    fn drop(&mut self) {
94        self.seal(Antichain::new())
95    }
96}