sea_streamer_stdio/
streamer.rs

1use std::time::Duration;
2
3use crate::{
4    consumer, create_consumer, producer, StdioConsumer, StdioErr, StdioProducer, StdioResult,
5};
6use sea_streamer_types::{
7    ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
8    ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr,
9    StreamKey, Streamer as StreamerTrait, StreamerUri,
10};
11
12#[derive(Debug, Default, Clone)]
13pub struct StdioStreamer {
14    loopback: bool,
15}
16
17#[derive(Debug, Default, Clone)]
18pub struct StdioConnectOptions {
19    loopback: bool,
20}
21
22#[derive(Debug, Clone)]
23pub struct StdioConsumerOptions {
24    mode: ConsumerMode,
25    group: Option<ConsumerGroup>,
26}
27
28#[derive(Debug, Default, Clone)]
29pub struct StdioProducerOptions {}
30
31impl StreamerTrait for StdioStreamer {
32    type Error = StdioErr;
33    type Producer = StdioProducer;
34    type Consumer = StdioConsumer;
35    type ConnectOptions = StdioConnectOptions;
36    type ConsumerOptions = StdioConsumerOptions;
37    type ProducerOptions = StdioProducerOptions;
38
39    /// Nothing will happen until you create a producer/consumer
40    async fn connect(_: StreamerUri, options: Self::ConnectOptions) -> StdioResult<Self> {
41        let StdioConnectOptions { loopback } = options;
42        Ok(StdioStreamer { loopback })
43    }
44
45    /// Call this method if you want to exit gracefully. This waits asynchronously until all pending messages
46    /// are sent.
47    ///
48    /// The side effects is global: all existing consumers and producers will become unusable, until you connect again.
49    async fn disconnect(self) -> StdioResult<()> {
50        // we can't reliably shutdown consumers
51        consumer::disconnect();
52        producer::shutdown();
53        while !producer::shutdown_already() {
54            sea_streamer_runtime::sleep(Duration::from_millis(1)).await;
55        }
56        Ok(())
57    }
58
59    async fn create_generic_producer(
60        &self,
61        _: Self::ProducerOptions,
62    ) -> StdioResult<Self::Producer> {
63        Ok(StdioProducer::new_with(self.loopback))
64    }
65
66    /// A background thread will be spawned to read stdin dedicatedly.
67    /// It is safe to spawn multiple consumers.
68    async fn create_consumer(
69        &self,
70        streams: &[StreamKey],
71        options: Self::ConsumerOptions,
72    ) -> StdioResult<Self::Consumer> {
73        match options.mode {
74            ConsumerMode::RealTime => {
75                if options.group.is_some() {
76                    log::warn!("Consumer group is set and thus will be load-balanced.");
77                }
78                Ok(create_consumer(options.group, streams.to_vec()))
79            }
80            ConsumerMode::Resumable => Err(StreamErr::Unsupported(
81                "stdio does not support Resumable".to_owned(),
82            )),
83            ConsumerMode::LoadBalanced => {
84                if options.group.is_some() {
85                    Ok(create_consumer(options.group, streams.to_vec()))
86                } else {
87                    Err(StreamErr::ConsumerGroupNotSet)
88                }
89            }
90        }
91    }
92}
93
94impl StdioConnectOptions {
95    pub fn loopback(&self) -> bool {
96        self.loopback
97    }
98
99    /// If set to true, messages produced will be feed back to consumers in the same process.
100    ///
101    /// Be careful, if your stream processor consume and produce the same stream key,
102    /// it will result in an infinite loop.
103    ///
104    /// This option is meant for testing only.
105    /// Enabling loopback might create overhead where the producer and consumer threads compete for the same Mutex.
106    pub fn set_loopback(&mut self, b: bool) {
107        self.loopback = b;
108    }
109}
110
111impl ConnectOptionsTrait for StdioConnectOptions {
112    type Error = StdioErr;
113
114    fn timeout(&self) -> StdioResult<Duration> {
115        Err(StreamErr::TimeoutNotSet)
116    }
117
118    /// This parameter is ignored because connection can never fail
119    fn set_timeout(&mut self, _: Duration) -> StdioResult<&mut Self> {
120        Ok(self)
121    }
122}
123
124impl ConsumerOptionsTrait for StdioConsumerOptions {
125    type Error = StdioErr;
126
127    fn new(mode: ConsumerMode) -> Self {
128        Self { mode, group: None }
129    }
130
131    fn mode(&self) -> StdioResult<&ConsumerMode> {
132        Ok(&self.mode)
133    }
134
135    fn consumer_group(&self) -> StdioResult<&ConsumerGroup> {
136        self.group.as_ref().ok_or(StreamErr::ConsumerGroupNotSet)
137    }
138
139    /// If multiple consumers share the same group, only one in the group will receive a message.
140    /// This is load-balanced in a round-robin fashion.
141    fn set_consumer_group(&mut self, group: ConsumerGroup) -> StdioResult<&mut Self> {
142        self.group = Some(group);
143        Ok(self)
144    }
145}
146
147impl Default for StdioConsumerOptions {
148    fn default() -> Self {
149        Self::new(ConsumerMode::RealTime)
150    }
151}
152
153impl ProducerOptionsTrait for StdioProducerOptions {}