Skip to main content

sea_streamer_stdio/
consumer.rs

1use flume::{
2    r#async::{RecvFut, RecvStream},
3    unbounded, Receiver, RecvError, Sender,
4};
5use std::sync::Mutex;
6
7use sea_streamer_types::{
8    export::futures::{future::MapErr, stream::Map as StreamMap, StreamExt, TryFutureExt},
9    Consumer as ConsumerTrait, ConsumerGroup, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey,
10    Timestamp,
11};
12
13use crate::{
14    consumer_group::{Cid, Consumers},
15    parse_meta,
16    util::PanicGuard,
17    PartialHeader, StdioErr, StdioResult,
18};
19
20lazy_static::lazy_static! {
21    static ref CONSUMERS: Mutex<Consumers> = Mutex::new(Default::default());
22    static ref THREAD: Mutex<bool> = Mutex::new(false);
23}
24
25#[derive(Debug)]
26pub struct StdioConsumer {
27    id: Cid,
28    streams: Vec<StreamKey>,
29    receiver: Receiver<SharedMessage>,
30}
31
32pub(crate) type ConsumerMember = StdioConsumer;
33
34pub type NextFuture<'a> = MapErr<RecvFut<'a, SharedMessage>, fn(RecvError) -> StreamErr<StdioErr>>;
35
36pub type StdioMessageStream<'a> =
37    StreamMap<RecvStream<'a, SharedMessage>, fn(SharedMessage) -> StdioResult<SharedMessage>>;
38
39pub type StdioMessage = SharedMessage;
40
41pub(crate) fn create_consumer(
42    group: Option<ConsumerGroup>,
43    streams: Vec<StreamKey>,
44) -> StdioConsumer {
45    init();
46    let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
47    consumers.add(group, streams)
48}
49
50pub(crate) fn init() {
51    let mut thread = THREAD.lock().expect("Failed to lock stdin thread");
52    if !*thread {
53        let builder = std::thread::Builder::new().name("sea-streamer-stdio-stdin".into());
54        builder
55            .spawn(move || {
56                log::debug!("[{pid}] stdin thread spawned", pid = std::process::id());
57                let _guard = PanicGuard;
58                loop {
59                    let mut line = String::new();
60                    // this has the potential to block forever
61                    match std::io::stdin().read_line(&mut line) {
62                        Ok(0) => break, // this means stdin is closed
63                        Ok(_) => {}
64                        Err(e) => {
65                            panic!("{e:?}");
66                        }
67                    }
68                    if line.ends_with('\n') {
69                        line.truncate(line.len() - 1);
70                    }
71                    let (meta, remaining) = parse_meta(&line)
72                        .unwrap_or_else(|_| panic!("Failed to parse line: {line}"));
73                    let offset = remaining.as_ptr() as usize - line.as_ptr() as usize;
74                    dispatch(meta, line.into_bytes(), offset);
75                }
76                log::debug!("[{pid}] stdin thread exit", pid = std::process::id());
77                {
78                    let mut thread = THREAD.lock().expect("Failed to lock stdin thread");
79                    *thread = false;
80                }
81            })
82            .unwrap();
83        *thread = true;
84    }
85}
86
87pub(crate) fn disconnect() {
88    let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
89    consumers.disconnect()
90}
91
92pub(crate) fn dispatch(meta: PartialHeader, bytes: Vec<u8>, offset: usize) {
93    let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
94    consumers.dispatch(meta, bytes, offset)
95}
96
97impl StdioConsumer {
98    pub(crate) fn new(id: Cid, streams: Vec<StreamKey>) -> (Self, Sender<SharedMessage>) {
99        let (sender, receiver) = unbounded();
100        (
101            Self {
102                id,
103                streams,
104                receiver,
105            },
106            sender,
107        )
108    }
109}
110
111impl Drop for StdioConsumer {
112    fn drop(&mut self) {
113        let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
114        consumers.remove(self.id)
115    }
116}
117
118impl ConsumerTrait for StdioConsumer {
119    type Error = StdioErr;
120    type Message<'a> = SharedMessage;
121    // See we don't actually have to Box these! Looking forward to `type_alias_impl_trait`
122    type NextFuture<'a> = NextFuture<'a>;
123    type Stream<'a> = StdioMessageStream<'a>;
124
125    async fn seek(&mut self, _: Timestamp) -> StdioResult<()> {
126        Err(StreamErr::Unsupported("StdioConsumer::seek".to_owned()))
127    }
128
129    async fn rewind(&mut self, _: SeqPos) -> StdioResult<()> {
130        Err(StreamErr::Unsupported("StdioConsumer::rewind".to_owned()))
131    }
132
133    /// Always succeed if the stream exists. There is only shard ZERO anyway.
134    fn assign(&mut self, (s, _): (StreamKey, ShardId)) -> StdioResult<()> {
135        for stream in self.streams.iter() {
136            if &s == stream {
137                return Ok(());
138            }
139        }
140        Err(StreamErr::StreamKeyNotFound)
141    }
142
143    /// Always fail. There is only shard ZERO anyway.
144    fn unassign(&mut self, _: (StreamKey, ShardId)) -> StdioResult<()> {
145        Err(StreamErr::StreamKeyNotFound)
146    }
147
148    fn next(&self) -> Self::NextFuture<'_> {
149        self.receiver
150            .recv_async()
151            .map_err(|e| StreamErr::Backend(StdioErr::RecvError(e)))
152    }
153
154    fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
155        self.receiver.stream().map(Result::Ok)
156    }
157}