use std::{
sync::{Arc, Mutex},
thread,
time::Duration,
};
use serde::Deserialize;
use crate::{
dataflow::{
graph::{default_graph, StreamSetupHook},
Data, Message,
},
scheduler::channel_manager::ChannelManager,
};
use super::{errors::SendError, Stream, StreamId, WriteStream, WriteStreamT};
pub struct IngestStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
id: StreamId,
write_stream_option: Arc<Mutex<Option<WriteStream<D>>>>,
}
impl<D> IngestStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
pub fn new() -> Self {
tracing::debug!("Initializing an IngestStream");
let id = StreamId::new_deterministic();
let ingest_stream = Self {
id,
write_stream_option: Arc::new(Mutex::new(None)),
};
default_graph::add_ingest_stream(&ingest_stream);
default_graph::set_stream_name(&id, &format!("ingest_stream_{}", id));
ingest_stream
}
pub fn is_closed(&self) -> bool {
self.write_stream_option
.lock()
.unwrap()
.as_ref()
.map(WriteStream::is_closed)
.unwrap_or(true)
}
pub fn send(&mut self, msg: Message<D>) -> Result<(), SendError> {
if !self.is_closed() {
loop {
{
if let Some(write_stream) = self.write_stream_option.lock().unwrap().as_mut() {
let res = write_stream.send(msg);
return res;
}
}
thread::sleep(Duration::from_millis(100));
}
} else {
tracing::warn!(
"Trying to send messages on a closed IngestStream {} (ID: {})",
default_graph::get_stream_name(&self.id()),
self.id(),
);
Err(SendError::Closed)
}
}
pub(crate) fn get_setup_hook(&self) -> impl StreamSetupHook {
let id = self.id();
let write_stream_option_copy = Arc::clone(&self.write_stream_option);
move |channel_manager: Arc<Mutex<ChannelManager>>| match channel_manager
.lock()
.unwrap()
.get_send_endpoints(id)
{
Ok(send_endpoints) => {
let write_stream = WriteStream::from_endpoints(send_endpoints, id);
write_stream_option_copy
.lock()
.unwrap()
.replace(write_stream);
}
Err(msg) => panic!("Unable to set up IngestStream {}: {}", id, msg),
}
}
}
impl<D> Default for IngestStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
fn default() -> Self {
Self::new()
}
}
impl<D> Stream<D> for IngestStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
fn id(&self) -> StreamId {
self.id
}
}
impl<D> WriteStreamT<D> for IngestStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
fn send(&mut self, msg: Message<D>) -> Result<(), SendError> {
self.send(msg)
}
}