use super::{Mailbox, require_persistent_id};
use crate::{
Batch, BatchReader, Circuit, Error, Runtime, Stream,
circuit::{
GlobalNodeId, LocalStoreMarker, OwnershipPreference, RootCircuit, Scope,
circuit_builder::CircuitBase,
metadata::{BatchSizeStats, OUTPUT_BATCHES_STATS, OperatorMeta},
operator_traits::{BinarySinkOperator, Operator, SinkOperator},
},
storage::file::to_bytes,
trace::{
BatchReader as DynBatchReader, BatchReaderFactories, SpineSnapshot as DynSpineSnapshot,
},
typed_batch::{Spine, SpineSnapshot, TypedBatch},
};
use feldera_storage::{FileCommitter, StoragePath};
use std::{
borrow::Cow,
fmt::Debug,
hash::{Hash, Hasher},
marker::PhantomData,
mem::transmute,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use typedmap::TypedMapKey;
impl<T> Stream<RootCircuit, T>
where
T: Debug + Clone + Send + 'static,
{
#[track_caller]
pub fn output(&self) -> OutputHandle<T> {
self.output_persistent(None)
}
#[track_caller]
pub fn output_persistent(&self, persistent_id: Option<&str>) -> OutputHandle<T> {
self.output_persistent_with_gid(persistent_id).0
}
#[track_caller]
pub fn output_persistent_with_gid(
&self,
persistent_id: Option<&str>,
) -> (OutputHandle<T>, GlobalNodeId) {
let (output, output_handle) = Output::new();
let gid = self.circuit().add_sink(output, self);
self.circuit().set_persistent_node_id(&gid, persistent_id);
(output_handle, gid)
}
#[track_caller]
pub fn output_guarded(&self, guard: &Stream<RootCircuit, bool>) -> OutputHandle<T> {
let (output, output_handle) = OutputGuarded::new();
self.circuit().add_binary_sink(output, self, guard);
output_handle
}
}
impl<B> Stream<RootCircuit, B>
where
B: Batch + Send,
{
#[track_caller]
pub fn accumulate_output(&self) -> OutputHandle<SpineSnapshot<B>> {
self.accumulate_output_persistent(None)
}
#[track_caller]
pub fn accumulate_output_persistent(
&self,
persistent_id: Option<&str>,
) -> OutputHandle<SpineSnapshot<B>> {
let (handle, enable_count, _) = self.accumulate_output_persistent_with_gid(persistent_id);
enable_count.fetch_add(1, Ordering::AcqRel);
handle
}
#[track_caller]
pub fn accumulate_output_persistent_with_gid(
&self,
persistent_id: Option<&str>,
) -> (
OutputHandle<SpineSnapshot<B>>,
Arc<AtomicUsize>,
GlobalNodeId,
) {
let (output, output_handle) = AccumulateOutput::<B>::new();
let (accumulated, enable_count) = self.accumulate_with_enable_count();
let gid = self.circuit().add_sink(output, &accumulated);
self.circuit().set_persistent_node_id(&gid, persistent_id);
(output_handle, enable_count, gid)
}
}
struct OutputId<T> {
id: usize,
_marker: PhantomData<T>,
}
unsafe impl<T> Sync for OutputId<T> {}
impl<T> Hash for OutputId<T> {
fn hash<H>(&self, state: &mut H)
where
H: Hasher,
{
self.id.hash(state);
}
}
impl<T> PartialEq for OutputId<T> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl<T> Eq for OutputId<T> {}
impl<T> OutputId<T> {
fn new(id: usize) -> Self {
Self {
id,
_marker: PhantomData,
}
}
}
impl<T> TypedMapKey<LocalStoreMarker> for OutputId<T>
where
T: 'static,
{
type Value = OutputHandle<T>;
}
struct OutputHandleInternal<T> {
mailbox: Vec<Mailbox<Option<T>>>,
}
impl<T: Clone> OutputHandleInternal<T> {
fn new(num_workers: usize) -> Self {
assert_ne!(num_workers, 0);
let mut mailbox = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
mailbox.push(Mailbox::new(Arc::new(|| None)));
}
Self { mailbox }
}
fn take_from_worker(&self, worker: usize) -> Option<T> {
self.mailbox[worker].take()
}
fn peek_from_worker<F, O: 'static>(&self, worker: usize, func: F) -> O
where
F: Fn(&Option<T>) -> O,
{
self.mailbox[worker].map(func)
}
fn mailbox(&self, worker: usize) -> &Mailbox<Option<T>> {
&self.mailbox[worker]
}
}
#[derive(Clone)]
pub struct OutputHandle<T>(Arc<OutputHandleInternal<T>>);
impl<T> OutputHandle<T>
where
T: Send + Clone + 'static,
{
fn new() -> Self {
match Runtime::runtime() {
None => Self(Arc::new(OutputHandleInternal::new(1))),
Some(runtime) => {
let output_id = runtime.sequence_next();
runtime
.local_store()
.entry(OutputId::new(output_id))
.or_insert_with(|| {
Self(Arc::new(OutputHandleInternal::new(Runtime::num_workers())))
})
.value()
.clone()
}
}
}
fn mailbox(&self, worker: usize) -> &Mailbox<Option<T>> {
self.0.mailbox(worker)
}
pub fn num_nonempty_mailboxes(&self) -> usize {
let num_workers = self.0.mailbox.len();
let mut non_empty = 0;
for worker in 0..num_workers {
non_empty += self.peek_from_worker(worker, Option::is_some) as usize;
}
non_empty
}
pub fn peek_from_worker<F, O: 'static>(&self, worker: usize, func: F) -> O
where
F: Fn(&Option<T>) -> O,
{
self.0.peek_from_worker(worker, func)
}
pub fn take_from_worker(&self, worker: usize) -> Option<T> {
self.0.take_from_worker(worker)
}
pub fn take_from_all(&self) -> Vec<T> {
let num_workers = self.0.mailbox.len();
let mut res = Vec::with_capacity(num_workers);
for worker in 0..num_workers {
if let Some(v) = self.take_from_worker(worker) {
res.push(v);
}
}
res
}
}
impl<T> OutputHandle<T>
where
T: Batch<Time = ()>,
T::InnerBatch: Send,
{
pub fn consolidate(&self) -> T {
let factories = BatchReaderFactories::new::<T::Key, T::Val, T::R>();
let handle: &OutputHandle<T::Inner> = unsafe { transmute(self) };
T::from_inner(handle.dyn_consolidate(&factories))
}
}
impl<T> OutputHandle<T>
where
T: BatchReader<Time = ()> + Send + Clone,
T::Inner: Send,
{
pub fn concat(&self) -> TypedBatch<T::Key, T::Val, T::R, DynSpineSnapshot<T::IntoBatch>> {
TypedBatch::new(DynSpineSnapshot::concat(
<T::IntoBatch as DynBatchReader>::Factories::new::<T::Key, T::Val, T::R>(),
self.take_from_all()
.into_iter()
.map(|b| b.into_dyn_snapshot())
.collect::<Vec<_>>()
.iter(),
))
}
}
struct Output<T> {
global_id: GlobalNodeId,
mailbox: Mailbox<Option<T>>,
}
impl<T> Output<T>
where
T: Clone + Send + 'static,
{
fn new() -> (Self, OutputHandle<T>) {
let handle = OutputHandle::new();
let mailbox = handle.mailbox(Runtime::worker_index()).clone();
let output = Self {
global_id: GlobalNodeId::root(),
mailbox,
};
(output, handle)
}
fn checkpoint_file(base: &StoragePath, persistent_id: &str) -> StoragePath {
base.child(format!("output-{}.dat", persistent_id))
}
}
impl<T> Operator for Output<T>
where
T: Clone + Send + 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("Output")
}
fn init(&mut self, global_id: &GlobalNodeId) {
self.global_id = global_id.clone();
}
fn checkpoint(
&mut self,
base: &StoragePath,
pid: Option<&str>,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), Error> {
let pid = require_persistent_id(pid, &self.global_id)?;
let as_bytes = to_bytes(&()).expect("Serializing () should work.");
files.push(
Runtime::storage_backend()
.unwrap()
.write(&Self::checkpoint_file(base, pid), as_bytes)?,
);
Ok(())
}
fn restore(&mut self, base: &StoragePath, pid: Option<&str>) -> Result<(), Error> {
let pid = require_persistent_id(pid, &self.global_id)?;
let path = Self::checkpoint_file(base, pid);
let _content = Runtime::storage_backend().unwrap().read(&path)?;
Ok(())
}
fn fixedpoint(&self, _scope: Scope) -> bool {
true
}
}
impl<T> SinkOperator<T> for Output<T>
where
T: Debug + Clone + Send + 'static,
{
async fn eval(&mut self, val: &T) {
self.mailbox.set(Some(val.clone()));
}
async fn eval_owned(&mut self, val: T) {
self.mailbox.set(Some(val));
}
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::PREFER_OWNED
}
}
pub struct AccumulateOutput<B>
where
B: Batch,
{
global_id: GlobalNodeId,
mailbox: Mailbox<Option<SpineSnapshot<B>>>,
output_batch_stats: BatchSizeStats,
}
impl<B> AccumulateOutput<B>
where
B: Batch + Send,
{
pub fn new() -> (Self, OutputHandle<SpineSnapshot<B>>) {
let handle = OutputHandle::new();
let mailbox = handle.mailbox(Runtime::worker_index()).clone();
let output = Self {
global_id: GlobalNodeId::root(),
mailbox,
output_batch_stats: BatchSizeStats::new(),
};
(output, handle)
}
fn checkpoint_file(base: &StoragePath, persistent_id: &str) -> StoragePath {
base.child(format!("accumulate-output-{}.dat", persistent_id))
}
}
impl<B> Operator for AccumulateOutput<B>
where
B: Batch + Send,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("AccumulateOutput")
}
fn init(&mut self, global_id: &GlobalNodeId) {
self.global_id = global_id.clone();
}
fn checkpoint(
&mut self,
base: &StoragePath,
pid: Option<&str>,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), Error> {
let pid = require_persistent_id(pid, &self.global_id)?;
let as_bytes = to_bytes(&()).expect("Serializing () should work.");
files.push(
Runtime::storage_backend()
.unwrap()
.write(&Self::checkpoint_file(base, pid), as_bytes)?,
);
Ok(())
}
fn restore(&mut self, base: &StoragePath, pid: Option<&str>) -> Result<(), Error> {
let pid = require_persistent_id(pid, &self.global_id)?;
let path = Self::checkpoint_file(base, pid);
let _content = Runtime::storage_backend().unwrap().read(&path)?;
Ok(())
}
fn metadata(&self, meta: &mut OperatorMeta) {
meta.extend(metadata! {
OUTPUT_BATCHES_STATS => self.output_batch_stats.metadata(),
});
}
fn fixedpoint(&self, _scope: Scope) -> bool {
true
}
}
impl<B> SinkOperator<Option<Spine<B>>> for AccumulateOutput<B>
where
B: Batch + Send,
{
async fn eval(&mut self, val: &Option<Spine<B>>) {
if let Some(val) = val {
self.output_batch_stats.add_batch(val.len());
self.mailbox.set(Some(val.ro_snapshot()));
}
}
async fn eval_owned(&mut self, val: Option<Spine<B>>) {
if let Some(val) = val {
self.output_batch_stats.add_batch(val.len());
self.mailbox.set(Some(val.ro_snapshot()));
}
}
fn input_preference(&self) -> OwnershipPreference {
OwnershipPreference::PREFER_OWNED
}
}
struct OutputGuarded<T> {
mailbox: Mailbox<Option<T>>,
}
impl<T> OutputGuarded<T>
where
T: Clone + Send + 'static,
{
fn new() -> (Self, OutputHandle<T>) {
let handle = OutputHandle::new();
let mailbox = handle.mailbox(Runtime::worker_index()).clone();
let output = Self { mailbox };
(output, handle)
}
}
impl<T> Operator for OutputGuarded<T>
where
T: 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("OutputGuarded")
}
fn fixedpoint(&self, _scope: Scope) -> bool {
true
}
}
impl<T> BinarySinkOperator<T, bool> for OutputGuarded<T>
where
T: Clone + 'static,
{
async fn eval<'a>(&mut self, val: Cow<'a, T>, guard: Cow<'a, bool>) {
if *guard {
self.mailbox.set(Some(val.into_owned()));
}
}
fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
(
OwnershipPreference::PREFER_OWNED,
OwnershipPreference::INDIFFERENT,
)
}
}
#[cfg(test)]
mod test {
use crate::{Runtime, typed_batch::OrdZSet, utils::Tup2};
#[test]
fn test_output_handle() {
let (mut dbsp, (input, output)) = Runtime::init_circuit(4, |circuit| {
let (zset, zset_handle) = circuit.add_input_zset::<u64>();
let zset_output = zset.output();
Ok((zset_handle, zset_output))
})
.unwrap();
let inputs = vec![
vec![Tup2(1, 1), Tup2(2, 1), Tup2(3, 1), Tup2(4, 1), Tup2(5, 1)],
vec![
Tup2(1, -1),
Tup2(2, -1),
Tup2(3, -1),
Tup2(4, -1),
Tup2(5, -1),
],
];
for mut input_vec in inputs {
let input_tuples = input_vec
.iter()
.map(|Tup2(k, w)| Tup2(Tup2(*k, ()), *w))
.collect::<Vec<_>>();
let expected_output = OrdZSet::from_tuples((), input_tuples);
input.append(&mut input_vec);
dbsp.transaction().unwrap();
let output = output.consolidate();
assert_eq!(output, expected_output);
}
dbsp.kill().unwrap();
}
#[test]
fn test_guarded_output_handle() {
let (mut dbsp, (input, guard, output)) = Runtime::init_circuit(4, |circuit| {
let (zset, zset_handle) = circuit.add_input_zset::<u64>();
let (guard, guard_handle) = circuit.add_input_stream::<bool>();
let zset_output = zset.output_guarded(&guard);
Ok((zset_handle, guard_handle, zset_output))
})
.unwrap();
let inputs = vec![
vec![Tup2(1, 1), Tup2(2, 1), Tup2(3, 1), Tup2(4, 1), Tup2(5, 1)],
vec![
Tup2(1, -1),
Tup2(2, -1),
Tup2(3, -1),
Tup2(4, -1),
Tup2(5, -1),
],
];
for mut input_vec in inputs {
let input_tuples = input_vec
.iter()
.map(|Tup2(k, w)| Tup2(Tup2(*k, ()), *w))
.collect::<Vec<_>>();
let expected_output = OrdZSet::from_tuples((), input_tuples);
input.append(&mut input_vec.clone());
guard.set_for_all(false);
dbsp.transaction().unwrap();
let output1 = output.consolidate();
assert_eq!(output1, OrdZSet::empty());
input.append(&mut input_vec);
guard.set_for_all(true);
dbsp.transaction().unwrap();
let output2 = output.consolidate();
assert_eq!(output2, expected_output);
}
dbsp.kill().unwrap();
}
}