dbsp 0.287.0

Continuous streaming analytics engine
Documentation
//! Operator that consolidates a trace into a single batch.

use std::{borrow::Cow, marker::PhantomData};

use crate::{
    circuit::{
        Circuit, OwnershipPreference, Scope, Stream,
        circuit_builder::StreamId,
        operator_traits::{Operator, UnaryOperator},
    },
    circuit_cache_key,
    trace::{Batch, BatchReader, Trace},
};

circuit_cache_key!(ConsolidateId<C, D>(StreamId => Stream<C, D>));

impl<C, T> Stream<C, T>
where
    C: Circuit,
    T: Trace<Time = ()> + Clone,
{
    /// See [`Stream::consolidate`].
    pub fn dyn_consolidate(
        &self,
        factories: &<T::Batch as BatchReader>::Factories,
    ) -> Stream<C, T::Batch> {
        self.circuit()
            .cache_get_or_insert_with(ConsolidateId::new(self.stream_id()), || {
                let consolidated = self.circuit().add_unary_operator_with_preference(
                    Consolidate::new(factories),
                    &self.try_sharded_version(),
                    OwnershipPreference::STRONGLY_PREFER_OWNED,
                );
                consolidated.mark_sharded_if(self);

                consolidated
            })
            .clone()
    }
}

pub struct Consolidate<T: Trace> {
    factories: <T::Batch as BatchReader>::Factories,
    _type: PhantomData<T>,
}

impl<T: Trace> Consolidate<T> {
    pub fn new(factories: &<T::Batch as BatchReader>::Factories) -> Self {
        Self {
            factories: factories.clone(),
            _type: PhantomData,
        }
    }
}

impl<T> Operator for Consolidate<T>
where
    T: Trace + 'static,
{
    fn name(&self) -> Cow<'static, str> {
        Cow::Borrowed("Consolidate")
    }

    fn fixedpoint(&self, _scope: Scope) -> bool {
        true
    }
}

impl<T> UnaryOperator<T, T::Batch> for Consolidate<T>
where
    T: Trace<Time = ()>,
{
    async fn eval(&mut self, _i: &T) -> T::Batch {
        unimplemented!()
    }

    async fn eval_owned(&mut self, i: T) -> T::Batch {
        i.consolidate()
            .unwrap_or_else(|| T::Batch::dyn_empty(&self.factories))
    }
}