differential_dataflow/operators/arrange/
writer.rs

1//! Write endpoint for a sequence of batches.
2//!
3//! A `TraceWriter` accepts a sequence of batches and distributes them
4//! to both a shared trace and to a sequence of private queues.
5
6use std::rc::{Rc, Weak};
7use std::cell::RefCell;
8
9use timely::progress::Antichain;
10
11use crate::trace::{Trace, Batch, BatchReader};
12use crate::trace::wrappers::rc::TraceBox;
13
14
15use super::TraceAgentQueueWriter;
16use super::TraceReplayInstruction;
17
18/// Write endpoint for a sequence of batches.
19///
20/// A `TraceWriter` accepts a sequence of batches and distributes them
21/// to both a shared trace and to a sequence of private queues.
22pub struct TraceWriter<Tr>
23where
24    Tr: Trace,
25    Tr::Batch: Batch,
26{
27    /// Current upper limit.
28    upper: Antichain<Tr::Time>,
29    /// Shared trace, possibly absent (due to weakness).
30    trace: Weak<RefCell<TraceBox<Tr>>>,
31    /// A sequence of private queues into which batches are written.
32    queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
33}
34
35impl<Tr> TraceWriter<Tr>
36where
37    Tr: Trace,
38    Tr::Batch: Batch,
39{
40    /// Creates a new `TraceWriter`.
41    pub fn new(
42        upper: Vec<Tr::Time>,
43        trace: Weak<RefCell<TraceBox<Tr>>>,
44        queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
45    ) -> Self
46    {
47        let mut temp = Antichain::new();
48        temp.extend(upper);
49        Self { upper: temp, trace, queues }
50    }
51
52    /// Exerts merge effort, even without additional updates.
53    pub fn exert(&mut self) {
54        if let Some(trace) = self.trace.upgrade() {
55            trace.borrow_mut().trace.exert();
56        }
57    }
58
59    /// Advances the trace by `batch`.
60    ///
61    /// The `hint` argument is either `None` in the case of an empty batch,
62    /// or is `Some(time)` for a time less or equal to all updates in the
63    /// batch and which is suitable for use as a capability.
64    pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
65
66        // Something is wrong if not a sequence.
67        if !(&self.upper == batch.lower()) {
68            println!("{:?} vs {:?}", self.upper, batch.lower());
69        }
70        assert!(&self.upper == batch.lower());
71        assert!(batch.lower() != batch.upper());
72
73        self.upper.clone_from(batch.upper());
74
75        // push information to each listener that still exists.
76        let mut borrow = self.queues.borrow_mut();
77        for queue in borrow.iter_mut() {
78            if let Some(pair) = queue.upgrade() {
79                pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
80                pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().clone()));
81                pair.0.activate();
82            }
83        }
84        borrow.retain(|w| w.upgrade().is_some());
85
86        // push data to the trace, if it still exists.
87        if let Some(trace) = self.trace.upgrade() {
88            trace.borrow_mut().trace.insert(batch);
89        }
90
91    }
92
93    /// Inserts an empty batch up to `upper`.
94    pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
95        if self.upper != upper {
96            self.insert(Tr::Batch::empty(self.upper.clone(), upper), None);
97        }
98    }
99}
100
101impl<Tr> Drop for TraceWriter<Tr>
102where
103    Tr: Trace,
104    Tr::Batch: Batch,
105{
106    fn drop(&mut self) {
107        self.seal(Antichain::new())
108    }
109}