use std::{ops::DerefMut, sync::Mutex};
use once_cell::sync::Lazy;
use serde::Deserialize;
use crate::{
dataflow::{
stream::{ExtractStream, IngestStream, LoopStream, OperatorStream, Stream, StreamId},
Data,
},
OperatorConfig,
};
use super::{AbstractGraph, OperatorRunner};
static DEFAULT_GRAPH: Lazy<Mutex<AbstractGraph>> = Lazy::new(|| Mutex::new(AbstractGraph::new()));
pub(crate) fn add_operator<F, T, U, V, W>(
config: OperatorConfig,
runner: F,
left_read_stream: Option<&dyn Stream<T>>,
right_read_stream: Option<&dyn Stream<U>>,
left_write_stream: Option<&OperatorStream<V>>,
right_write_stream: Option<&OperatorStream<W>>,
) where
F: OperatorRunner,
for<'a> T: Data + Deserialize<'a>,
for<'a> U: Data + Deserialize<'a>,
for<'a> V: Data + Deserialize<'a>,
for<'a> W: Data + Deserialize<'a>,
{
DEFAULT_GRAPH.lock().unwrap().add_operator(
config,
runner,
left_read_stream,
right_read_stream,
left_write_stream,
right_write_stream,
);
}
pub(crate) fn add_ingest_stream<D>(ingest_stream: &IngestStream<D>)
where
for<'a> D: Data + Deserialize<'a>,
{
DEFAULT_GRAPH
.lock()
.unwrap()
.add_ingest_stream(ingest_stream);
}
pub(crate) fn add_extract_stream<D>(extract_stream: &ExtractStream<D>)
where
for<'a> D: Data + Deserialize<'a>,
{
DEFAULT_GRAPH
.lock()
.unwrap()
.add_extract_stream(extract_stream);
}
pub(crate) fn add_loop_stream<D>(loop_stream: &LoopStream<D>)
where
for<'a> D: Data + Deserialize<'a>,
{
DEFAULT_GRAPH.lock().unwrap().add_loop_stream(loop_stream);
}
pub(crate) fn connect_loop<D>(loop_stream: &LoopStream<D>, stream: &OperatorStream<D>)
where
for<'a> D: Data + Deserialize<'a>,
{
DEFAULT_GRAPH
.lock()
.unwrap()
.connect_loop(loop_stream, stream);
}
pub(crate) fn set_stream_name(stream_id: &StreamId, name: &str) {
DEFAULT_GRAPH
.lock()
.unwrap()
.set_stream_name(stream_id, name.to_string());
}
pub(crate) fn get_stream_name(stream_id: &StreamId) -> String {
DEFAULT_GRAPH.lock().unwrap().get_stream_name(stream_id)
}
pub(crate) fn resolve_stream_id(stream_id: &StreamId) -> Option<StreamId> {
DEFAULT_GRAPH.lock().unwrap().resolve_stream_id(stream_id)
}
pub(crate) fn clone() -> AbstractGraph {
DEFAULT_GRAPH.lock().unwrap().clone()
}
pub(crate) fn set(graph: AbstractGraph) -> AbstractGraph {
std::mem::replace(DEFAULT_GRAPH.lock().unwrap().deref_mut(), graph)
}