1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
//! Write endpoint for a sequence of batches.
//!
//! A `TraceWriter` accepts a sequence of batches and distributes them
//! to both a shared trace and to a sequence of private queues.

use std::rc::{Rc, Weak};
use std::cell::RefCell;

use lattice::Lattice;
use trace::{Trace, Batch, BatchReader};

use trace::wrappers::rc::TraceBox;

use super::TraceAgentQueueWriter;
use super::TraceReplayInstruction;

/// Write endpoint for a sequence of batches.
///
/// A `TraceWriter` accepts a sequence of batches and distributes them
/// to both a shared trace and to a sequence of private queues.
pub struct TraceWriter<Tr>
where
    Tr: Trace,
    Tr::Time: Lattice+Ord+Clone+std::fmt::Debug+'static,
    Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
{
    /// Current upper limit.
    upper: Vec<Tr::Time>,
    /// Shared trace, possibly absent (due to weakness).
    trace: Weak<RefCell<TraceBox<Tr>>>,
    /// A sequence of private queues into which batches are written.
    queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
}

impl<Tr> TraceWriter<Tr>
where
    Tr: Trace,
    Tr::Time: Lattice+Ord+Clone+std::fmt::Debug+'static,
    Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
{
    /// Creates a new `TraceWriter`.
    pub fn new(
        upper: Vec<Tr::Time>,
        trace: Weak<RefCell<TraceBox<Tr>>>,
        queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
    ) -> Self
    {
        Self { upper, trace, queues }
    }

    /// Advances the trace by `batch`.
    ///
    /// The `hint` argument is either `None` in the case of an empty batch,
    /// or is `Some(time)` for a time less or equal to all updates in the
    /// batch and which is suitable for use as a capability.
    pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {

        // Something is wrong if not a sequence.
        if !(&self.upper[..] == batch.lower()) {
            println!("{:?} vs {:?}", self.upper, batch.lower());
        }
        assert!(&self.upper[..] == batch.lower());
        assert!(batch.lower() != batch.upper());

        self.upper.clear();
        self.upper.extend(batch.upper().iter().cloned());

        // push information to each listener that still exists.
        let mut borrow = self.queues.borrow_mut();
        for queue in borrow.iter_mut() {
            if let Some(pair) = queue.upgrade() {
                pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
                pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().to_vec()));
                pair.0.activate();
            }
        }
        borrow.retain(|w| w.upgrade().is_some());

        // push data to the trace, if it still exists.
        if let Some(trace) = self.trace.upgrade() {
            trace.borrow_mut().trace.insert(batch);
        }

    }

    /// Inserts an empty batch up to `upper`.
    pub fn seal(&mut self, upper: &[Tr::Time]) {
        if &self.upper[..] != upper {
            use trace::Builder;
            let builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>>::Builder::new();
            let batch = builder.done(&self.upper[..], upper, &self.upper[..]);
            self.insert(batch, None);
        }
    }
}

impl<Tr> Drop for TraceWriter<Tr>
where
    Tr: Trace,
    Tr::Time: Lattice+Ord+Clone+std::fmt::Debug+'static,
    Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
{
    fn drop(&mut self) {
        self.seal(&[])
    }
}