use std::{collections::HashMap, iter::Iterator, marker::PhantomData, sync::Arc};
use serde::Deserialize;
use crate::dataflow::{
deadlines::{DeadlineId, DeadlineT},
operator::OperatorConfig,
stream::StreamId,
AppendableState, Data, State, Timestamp, WriteStream,
};
pub struct SetupContext<S> {
deadlines: HashMap<DeadlineId, Arc<dyn DeadlineT<S>>>,
read_stream_ids: Vec<StreamId>,
write_stream_ids: Vec<StreamId>,
}
#[allow(dead_code)]
impl<S> SetupContext<S> {
pub fn new(read_stream_ids: Vec<StreamId>, write_stream_ids: Vec<StreamId>) -> Self {
Self {
deadlines: HashMap::new(),
read_stream_ids,
write_stream_ids,
}
}
pub fn add_deadline(&mut self, deadline: impl DeadlineT<S> + 'static) {
let deadline_id = deadline.id();
self.deadlines.insert(deadline_id, Arc::new(deadline));
}
pub(crate) fn deadlines(&mut self) -> impl Iterator<Item = &mut Arc<dyn DeadlineT<S>>> {
self.deadlines.values_mut()
}
pub(crate) fn get_read_stream_ids(&self) -> &Vec<StreamId> {
&self.read_stream_ids
}
pub(crate) fn write_stream_ids(&self) -> &Vec<StreamId> {
&self.write_stream_ids
}
pub(crate) fn invoke_handler(&self, deadline_id: DeadlineId, state: &S, timestamp: &Timestamp) {
self.deadlines
.get(&deadline_id)
.unwrap()
.invoke_handler(state, timestamp);
}
}
pub struct ParallelSinkContext<'a, S: AppendableState<T>, T> {
timestamp: Timestamp,
config: OperatorConfig,
state: &'a S,
phantomdata_t: PhantomData<T>,
}
impl<'a, S, T> ParallelSinkContext<'a, S, T>
where
S: 'static + AppendableState<T>,
{
pub fn new(timestamp: Timestamp, config: OperatorConfig, state: &'a S) -> Self {
Self {
timestamp,
config,
state,
phantomdata_t: PhantomData,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn state(&self) -> &S {
self.state
}
}
pub struct SinkContext<'a, S: State> {
timestamp: Timestamp,
config: OperatorConfig,
state: &'a mut S,
}
impl<'a, S> SinkContext<'a, S>
where
S: State,
{
pub fn new(timestamp: Timestamp, config: OperatorConfig, state: &'a mut S) -> Self {
Self {
timestamp,
config,
state,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn current_state(&mut self) -> Option<&mut S::Item> {
self.state.at(&self.timestamp)
}
pub fn past_state(&mut self, time: &Timestamp) -> Option<&S::Item> {
if *time <= self.state.last_committed_timestamp() {
match self.state.at(time) {
Some(state_val) => Some(state_val),
None => None,
}
} else {
None
}
}
pub fn last_committed_timestamp(&self) -> Timestamp {
self.state.last_committed_timestamp()
}
}
pub struct ParallelOneInOneOutContext<'a, S, T, U>
where
S: AppendableState<U>,
T: Data + for<'b> Deserialize<'b>,
{
timestamp: Timestamp,
config: OperatorConfig,
state: &'a S,
write_stream: WriteStream<T>,
phantom_u: PhantomData<U>,
}
impl<'a, S, T, U> ParallelOneInOneOutContext<'a, S, T, U>
where
S: AppendableState<U>,
T: Data + for<'b> Deserialize<'b>,
{
pub fn new(
timestamp: Timestamp,
config: OperatorConfig,
state: &'a S,
write_stream: WriteStream<T>,
) -> Self {
Self {
timestamp,
config,
state,
write_stream,
phantom_u: PhantomData,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn state(&self) -> &S {
self.state
}
pub fn write_stream(&mut self) -> &mut WriteStream<T> {
&mut self.write_stream
}
}
pub struct OneInOneOutContext<'a, S, T>
where
S: State,
T: Data + for<'b> Deserialize<'b>,
{
timestamp: Timestamp,
config: OperatorConfig,
state: &'a mut S,
write_stream: WriteStream<T>,
}
impl<'a, S, T> OneInOneOutContext<'a, S, T>
where
S: State,
T: Data + for<'b> Deserialize<'b>,
{
pub fn new(
timestamp: Timestamp,
config: OperatorConfig,
state: &'a mut S,
write_stream: WriteStream<T>,
) -> Self {
Self {
timestamp,
config,
state,
write_stream,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn current_state(&mut self) -> Option<&mut S::Item> {
self.state.at(&self.timestamp)
}
pub fn past_state(&mut self, time: &Timestamp) -> Option<&S::Item> {
if *time <= self.state.last_committed_timestamp() {
match self.state.at(time) {
Some(state_val) => Some(state_val),
None => None,
}
} else {
None
}
}
pub fn last_committed_timestamp(&self) -> Timestamp {
self.state.last_committed_timestamp()
}
pub fn write_stream(&mut self) -> &mut WriteStream<T> {
&mut self.write_stream
}
}
pub struct ParallelTwoInOneOutContext<'a, S, T, U>
where
S: AppendableState<U>,
T: Data + for<'b> Deserialize<'b>,
{
timestamp: Timestamp,
config: OperatorConfig,
state: &'a S,
write_stream: WriteStream<T>,
phantom_u: PhantomData<U>,
}
impl<'a, S, T, U> ParallelTwoInOneOutContext<'a, S, T, U>
where
S: AppendableState<U>,
T: Data + for<'b> Deserialize<'b>,
{
pub fn new(
timestamp: Timestamp,
config: OperatorConfig,
state: &'a S,
write_stream: WriteStream<T>,
) -> Self {
Self {
timestamp,
config,
state,
write_stream,
phantom_u: PhantomData,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn state(&self) -> &S {
self.state
}
pub fn write_stream(&mut self) -> &mut WriteStream<T> {
&mut self.write_stream
}
}
pub struct TwoInOneOutContext<'a, S, T>
where
S: State,
T: Data + for<'b> Deserialize<'b>,
{
timestamp: Timestamp,
config: OperatorConfig,
state: &'a mut S,
write_stream: WriteStream<T>,
}
impl<'a, S, T> TwoInOneOutContext<'a, S, T>
where
S: State,
T: Data + for<'b> Deserialize<'b>,
{
pub fn new(
timestamp: Timestamp,
config: OperatorConfig,
state: &'a mut S,
write_stream: WriteStream<T>,
) -> Self {
Self {
timestamp,
config,
state,
write_stream,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn current_state(&mut self) -> Option<&mut S::Item> {
self.state.at(&self.timestamp)
}
pub fn past_state(&mut self, time: &Timestamp) -> Option<&S::Item> {
if *time <= self.state.last_committed_timestamp() {
match self.state.at(time) {
Some(state_val) => Some(state_val),
None => None,
}
} else {
None
}
}
pub(crate) fn state_mut(&mut self) -> &mut S {
self.state
}
pub fn last_committed_timestamp(&self) -> Timestamp {
self.state.last_committed_timestamp()
}
pub fn write_stream(&mut self) -> &mut WriteStream<T> {
&mut self.write_stream
}
}
pub struct ParallelOneInTwoOutContext<'a, S, T, U, V>
where
S: AppendableState<V>,
T: Data + for<'b> Deserialize<'b>,
U: Data + for<'b> Deserialize<'b>,
{
timestamp: Timestamp,
config: OperatorConfig,
state: &'a S,
left_write_stream: WriteStream<T>,
right_write_stream: WriteStream<U>,
phantom_v: PhantomData<V>,
}
impl<'a, S, T, U, V> ParallelOneInTwoOutContext<'a, S, T, U, V>
where
S: AppendableState<V>,
T: Data + for<'b> Deserialize<'b>,
U: Data + for<'b> Deserialize<'b>,
{
pub fn new(
timestamp: Timestamp,
config: OperatorConfig,
state: &'a S,
left_write_stream: WriteStream<T>,
right_write_stream: WriteStream<U>,
) -> Self {
Self {
timestamp,
config,
state,
left_write_stream,
right_write_stream,
phantom_v: PhantomData,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn state(&self) -> &S {
self.state
}
pub fn left_write_stream(&mut self) -> &mut WriteStream<T> {
&mut self.left_write_stream
}
pub fn right_write_stream(&mut self) -> &mut WriteStream<U> {
&mut self.right_write_stream
}
}
pub struct OneInTwoOutContext<'a, S, T, U>
where
S: State,
T: Data + for<'b> Deserialize<'b>,
U: Data + for<'b> Deserialize<'b>,
{
timestamp: Timestamp,
config: OperatorConfig,
state: &'a mut S,
left_write_stream: WriteStream<T>,
right_write_stream: WriteStream<U>,
}
impl<'a, S, T, U> OneInTwoOutContext<'a, S, T, U>
where
S: State,
T: Data + for<'b> Deserialize<'b>,
U: Data + for<'b> Deserialize<'b>,
{
pub fn new(
timestamp: Timestamp,
config: OperatorConfig,
state: &'a mut S,
left_write_stream: WriteStream<T>,
right_write_stream: WriteStream<U>,
) -> Self {
Self {
timestamp,
config,
state,
left_write_stream,
right_write_stream,
}
}
pub fn timestamp(&self) -> &Timestamp {
&self.timestamp
}
pub fn operator_config(&self) -> &OperatorConfig {
&self.config
}
pub fn current_state(&mut self) -> Option<&mut S::Item> {
self.state.at(&self.timestamp)
}
pub fn past_state(&mut self, time: &Timestamp) -> Option<&S::Item> {
if *time <= self.state.last_committed_timestamp() {
match self.state.at(time) {
Some(state_val) => Some(state_val),
None => None,
}
} else {
None
}
}
pub fn last_committed_timestamp(&self) -> Timestamp {
self.state.last_committed_timestamp()
}
pub fn left_write_stream(&mut self) -> &mut WriteStream<T> {
&mut self.left_write_stream
}
pub fn right_write_stream(&mut self) -> &mut WriteStream<U> {
&mut self.right_write_stream
}
}