sea_streamer_stdio/
streamer.rs1use std::time::Duration;
2
3use crate::{
4 consumer, create_consumer, producer, StdioConsumer, StdioErr, StdioProducer, StdioResult,
5};
6use sea_streamer_types::{
7 ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
8 ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr,
9 StreamKey, Streamer as StreamerTrait, StreamerUri,
10};
11
12#[derive(Debug, Default, Clone)]
13pub struct StdioStreamer {
14 loopback: bool,
15}
16
17#[derive(Debug, Default, Clone)]
18pub struct StdioConnectOptions {
19 loopback: bool,
20}
21
22#[derive(Debug, Clone)]
23pub struct StdioConsumerOptions {
24 mode: ConsumerMode,
25 group: Option<ConsumerGroup>,
26}
27
28#[derive(Debug, Default, Clone)]
29pub struct StdioProducerOptions {}
30
31impl StreamerTrait for StdioStreamer {
32 type Error = StdioErr;
33 type Producer = StdioProducer;
34 type Consumer = StdioConsumer;
35 type ConnectOptions = StdioConnectOptions;
36 type ConsumerOptions = StdioConsumerOptions;
37 type ProducerOptions = StdioProducerOptions;
38
39 async fn connect(_: StreamerUri, options: Self::ConnectOptions) -> StdioResult<Self> {
41 let StdioConnectOptions { loopback } = options;
42 Ok(StdioStreamer { loopback })
43 }
44
45 async fn disconnect(self) -> StdioResult<()> {
50 consumer::disconnect();
52 producer::shutdown();
53 while !producer::shutdown_already() {
54 sea_streamer_runtime::sleep(Duration::from_millis(1)).await;
55 }
56 Ok(())
57 }
58
59 async fn create_generic_producer(
60 &self,
61 _: Self::ProducerOptions,
62 ) -> StdioResult<Self::Producer> {
63 Ok(StdioProducer::new_with(self.loopback))
64 }
65
66 async fn create_consumer(
69 &self,
70 streams: &[StreamKey],
71 options: Self::ConsumerOptions,
72 ) -> StdioResult<Self::Consumer> {
73 match options.mode {
74 ConsumerMode::RealTime => {
75 if options.group.is_some() {
76 log::warn!("Consumer group is set and thus will be load-balanced.");
77 }
78 Ok(create_consumer(options.group, streams.to_vec()))
79 }
80 ConsumerMode::Resumable => Err(StreamErr::Unsupported(
81 "stdio does not support Resumable".to_owned(),
82 )),
83 ConsumerMode::LoadBalanced => {
84 if options.group.is_some() {
85 Ok(create_consumer(options.group, streams.to_vec()))
86 } else {
87 Err(StreamErr::ConsumerGroupNotSet)
88 }
89 }
90 }
91 }
92}
93
94impl StdioConnectOptions {
95 pub fn loopback(&self) -> bool {
96 self.loopback
97 }
98
99 pub fn set_loopback(&mut self, b: bool) {
107 self.loopback = b;
108 }
109}
110
111impl ConnectOptionsTrait for StdioConnectOptions {
112 type Error = StdioErr;
113
114 fn timeout(&self) -> StdioResult<Duration> {
115 Err(StreamErr::TimeoutNotSet)
116 }
117
118 fn set_timeout(&mut self, _: Duration) -> StdioResult<&mut Self> {
120 Ok(self)
121 }
122}
123
124impl ConsumerOptionsTrait for StdioConsumerOptions {
125 type Error = StdioErr;
126
127 fn new(mode: ConsumerMode) -> Self {
128 Self { mode, group: None }
129 }
130
131 fn mode(&self) -> StdioResult<&ConsumerMode> {
132 Ok(&self.mode)
133 }
134
135 fn consumer_group(&self) -> StdioResult<&ConsumerGroup> {
136 self.group.as_ref().ok_or(StreamErr::ConsumerGroupNotSet)
137 }
138
139 fn set_consumer_group(&mut self, group: ConsumerGroup) -> StdioResult<&mut Self> {
142 self.group = Some(group);
143 Ok(self)
144 }
145}
146
147impl Default for StdioConsumerOptions {
148 fn default() -> Self {
149 Self::new(ConsumerMode::RealTime)
150 }
151}
152
153impl ProducerOptionsTrait for StdioProducerOptions {}