sea_streamer_stdio/
consumer.rs1use flume::{
2 r#async::{RecvFut, RecvStream},
3 unbounded, Receiver, RecvError, Sender,
4};
5use std::sync::Mutex;
6
7use sea_streamer_types::{
8 export::futures::{future::MapErr, stream::Map as StreamMap, StreamExt, TryFutureExt},
9 Consumer as ConsumerTrait, ConsumerGroup, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey,
10 Timestamp,
11};
12
13use crate::{
14 consumer_group::{Cid, Consumers},
15 parse_meta,
16 util::PanicGuard,
17 PartialHeader, StdioErr, StdioResult,
18};
19
20lazy_static::lazy_static! {
21 static ref CONSUMERS: Mutex<Consumers> = Mutex::new(Default::default());
22 static ref THREAD: Mutex<bool> = Mutex::new(false);
23}
24
25#[derive(Debug)]
26pub struct StdioConsumer {
27 id: Cid,
28 streams: Vec<StreamKey>,
29 receiver: Receiver<SharedMessage>,
30}
31
32pub(crate) type ConsumerMember = StdioConsumer;
33
34pub type NextFuture<'a> = MapErr<RecvFut<'a, SharedMessage>, fn(RecvError) -> StreamErr<StdioErr>>;
35
36pub type StdioMessageStream<'a> =
37 StreamMap<RecvStream<'a, SharedMessage>, fn(SharedMessage) -> StdioResult<SharedMessage>>;
38
39pub type StdioMessage = SharedMessage;
40
41pub(crate) fn create_consumer(
42 group: Option<ConsumerGroup>,
43 streams: Vec<StreamKey>,
44) -> StdioConsumer {
45 init();
46 let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
47 consumers.add(group, streams)
48}
49
50pub(crate) fn init() {
51 let mut thread = THREAD.lock().expect("Failed to lock stdin thread");
52 if !*thread {
53 let builder = std::thread::Builder::new().name("sea-streamer-stdio-stdin".into());
54 builder
55 .spawn(move || {
56 log::debug!("[{pid}] stdin thread spawned", pid = std::process::id());
57 let _guard = PanicGuard;
58 loop {
59 let mut line = String::new();
60 match std::io::stdin().read_line(&mut line) {
62 Ok(0) => break, Ok(_) => {}
64 Err(e) => {
65 panic!("{e:?}");
66 }
67 }
68 if line.ends_with('\n') {
69 line.truncate(line.len() - 1);
70 }
71 let (meta, remaining) = parse_meta(&line)
72 .unwrap_or_else(|_| panic!("Failed to parse line: {line}"));
73 let offset = remaining.as_ptr() as usize - line.as_ptr() as usize;
74 dispatch(meta, line.into_bytes(), offset);
75 }
76 log::debug!("[{pid}] stdin thread exit", pid = std::process::id());
77 {
78 let mut thread = THREAD.lock().expect("Failed to lock stdin thread");
79 *thread = false;
80 }
81 })
82 .unwrap();
83 *thread = true;
84 }
85}
86
87pub(crate) fn disconnect() {
88 let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
89 consumers.disconnect()
90}
91
92pub(crate) fn dispatch(meta: PartialHeader, bytes: Vec<u8>, offset: usize) {
93 let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
94 consumers.dispatch(meta, bytes, offset)
95}
96
97impl StdioConsumer {
98 pub(crate) fn new(id: Cid, streams: Vec<StreamKey>) -> (Self, Sender<SharedMessage>) {
99 let (sender, receiver) = unbounded();
100 (
101 Self {
102 id,
103 streams,
104 receiver,
105 },
106 sender,
107 )
108 }
109}
110
111impl Drop for StdioConsumer {
112 fn drop(&mut self) {
113 let mut consumers = CONSUMERS.lock().expect("Failed to lock Consumers");
114 consumers.remove(self.id)
115 }
116}
117
118impl ConsumerTrait for StdioConsumer {
119 type Error = StdioErr;
120 type Message<'a> = SharedMessage;
121 type NextFuture<'a> = NextFuture<'a>;
123 type Stream<'a> = StdioMessageStream<'a>;
124
125 async fn seek(&mut self, _: Timestamp) -> StdioResult<()> {
126 Err(StreamErr::Unsupported("StdioConsumer::seek".to_owned()))
127 }
128
129 async fn rewind(&mut self, _: SeqPos) -> StdioResult<()> {
130 Err(StreamErr::Unsupported("StdioConsumer::rewind".to_owned()))
131 }
132
133 fn assign(&mut self, (s, _): (StreamKey, ShardId)) -> StdioResult<()> {
135 for stream in self.streams.iter() {
136 if &s == stream {
137 return Ok(());
138 }
139 }
140 Err(StreamErr::StreamKeyNotFound)
141 }
142
143 fn unassign(&mut self, _: (StreamKey, ShardId)) -> StdioResult<()> {
145 Err(StreamErr::StreamKeyNotFound)
146 }
147
148 fn next(&self) -> Self::NextFuture<'_> {
149 self.receiver
150 .recv_async()
151 .map_err(|e| StreamErr::Backend(StdioErr::RecvError(e)))
152 }
153
154 fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
155 self.receiver.stream().map(Result::Ok)
156 }
157}