use flume::{bounded, r#async::RecvFut, unbounded, Sender};
use std::{collections::HashMap, fmt::Debug, future::Future, sync::Mutex};
use sea_streamer_types::{
export::futures::FutureExt, Buffer, Message, MessageHeader, Producer as ProducerTrait, Receipt,
SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, StreamResult, Timestamp,
};
use crate::{PartialHeader, StdioErr, StdioResult, BROADCAST, TIMESTAMP_FORMAT};
lazy_static::lazy_static! {
static ref PRODUCERS: Mutex<Producers> = Default::default();
static ref THREAD: Mutex<Option<Sender<Signal>>> = Mutex::new(None);
}
#[derive(Debug, Default)]
struct Producers {
sequences: HashMap<StreamKey, SeqNo>,
}
enum Signal {
SendRequest {
message: SharedMessage,
receipt: Sender<Receipt>,
loopback: bool,
},
Shutdown,
}
#[derive(Debug, Clone)]
pub struct StdioProducer {
stream: Option<StreamKey>,
request: Sender<Signal>,
loopback: bool,
}
pub struct SendFuture {
fut: RecvFut<'static, Receipt>,
}
const ZERO: u64 = 0;
pub(crate) fn init() {
let mut thread = THREAD.lock().expect("Failed to lock stdout thread");
if thread.is_none() {
let (sender, receiver) = unbounded();
let builder = std::thread::Builder::new().name("sea-streamer-stdio-stdout".into());
builder
.spawn(move || {
log::debug!("[{pid}] stdout thread spawned", pid = std::process::id());
let mut producers = PRODUCERS
.try_lock()
.expect("Should have no other thread trying to access Producers");
while let Ok(signal) = receiver.recv() {
match signal {
Signal::SendRequest {
mut message,
receipt,
loopback,
} => {
message.touch();
if message.message().size() != 0 {
let stream_key = message.stream_key();
let seq = producers.append(&stream_key);
println!(
"[{timestamp} | {stream} | {seq}] {payload}",
timestamp = message
.timestamp()
.format(TIMESTAMP_FORMAT)
.expect("Timestamp format error"),
stream = stream_key,
seq = seq,
payload = message
.message()
.as_str()
.expect("Should have already checked is valid string"),
);
if loopback {
let payload = message.message();
super::consumer::dispatch(
PartialHeader {
timestamp: Some(message.timestamp()),
stream_key: Some(stream_key),
sequence: Some(seq),
shard_id: Some(message.shard_id()),
},
payload.into_bytes(),
0,
);
}
}
let meta = message.take_header();
receipt.send(meta).ok();
}
Signal::Shutdown => break,
}
}
log::debug!("[{pid}] stdout thread exit", pid = std::process::id());
{
let mut thread = THREAD.lock().expect("Failed to lock stdout thread");
thread.take(); }
})
.unwrap();
thread.replace(sender);
}
}
pub(crate) fn shutdown() {
let thread = THREAD.lock().expect("Failed to lock stdout thread");
if let Some(sender) = thread.as_ref() {
sender
.send(Signal::Shutdown)
.expect("stdout thread might have been shutdown already");
}
}
pub(crate) fn shutdown_already() -> bool {
let thread = THREAD.lock().expect("Failed to lock stdout thread");
thread.is_none()
}
impl Producers {
fn append(&mut self, stream: &StreamKey) -> SeqNo {
if let Some(val) = self.sequences.get_mut(stream) {
let seq = *val;
*val += 1;
seq
} else {
self.sequences.insert(stream.to_owned(), 1);
0
}
}
}
impl Future for SendFuture {
type Output = StreamResult<MessageHeader, StdioErr>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.fut.poll_unpin(cx) {
std::task::Poll::Ready(res) => std::task::Poll::Ready(match res {
Ok(res) => Ok(res),
Err(err) => Err(StreamErr::Backend(StdioErr::RecvError(err))),
}),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
impl Debug for SendFuture {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SendFuture").finish()
}
}
impl ProducerTrait for StdioProducer {
type Error = StdioErr;
type SendFuture = SendFuture;
fn send_to<S: Buffer>(&self, stream: &StreamKey, payload: S) -> StdioResult<Self::SendFuture> {
let payload = payload.as_str().map_err(StreamErr::Utf8Error)?.to_owned();
let (sender, receiver) = bounded(1);
let size = payload.len();
self.request
.send(Signal::SendRequest {
message: SharedMessage::new(
MessageHeader::new(
stream.to_owned(),
ShardId::new(ZERO),
ZERO,
Timestamp::now_utc(),
),
payload.into_bytes(),
0,
size,
),
receipt: sender,
loopback: self.loopback,
})
.map_err(|_| StreamErr::Backend(StdioErr::Disconnected))?;
Ok(SendFuture {
fut: receiver.into_recv_async(),
})
}
#[inline]
async fn end(mut self) -> StdioResult<()> {
self.flush().await
}
#[inline]
async fn flush(&mut self) -> StdioResult<()> {
self.send_to(&StreamKey::new(BROADCAST)?, "")?.await?;
Ok(())
}
fn anchor(&mut self, stream: StreamKey) -> StdioResult<()> {
if self.stream.is_none() {
self.stream = Some(stream);
Ok(())
} else {
Err(StreamErr::AlreadyAnchored)
}
}
fn anchored(&self) -> StdioResult<&StreamKey> {
if let Some(stream) = &self.stream {
Ok(stream)
} else {
Err(StreamErr::NotAnchored)
}
}
}
impl StdioProducer {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self::new_with(false)
}
pub(crate) fn new_with(loopback: bool) -> Self {
init();
let request = {
let thread = THREAD.lock().expect("Failed to lock stdout thread");
thread.as_ref().expect("Should have initialized").to_owned()
};
Self {
stream: None,
request,
loopback,
}
}
}