spring_stream/
consumer.rs1use crate::{
2 handler::{BoxedHandler, Handler},
3 Streamer,
4};
5use anyhow::Context;
6#[cfg(feature = "file")]
7use sea_streamer::file::FileConsumerOptions;
8#[cfg(feature = "kafka")]
9use sea_streamer::kafka::KafkaConsumerOptions;
10#[cfg(feature = "redis")]
11use sea_streamer::redis::RedisConsumerOptions;
12#[cfg(feature = "stdio")]
13use sea_streamer::stdio::StdioConsumerOptions;
14use sea_streamer::Consumer as _;
15use sea_streamer::{ConsumerGroup, ConsumerMode, ConsumerOptions, SeaConsumer, SeaConsumerOptions};
16use spring::{app::App, error::Result};
17use std::{ops::Deref, sync::Arc};
18
19#[derive(Clone, Default)]
20pub struct Consumers(Vec<Consumer>);
21
22impl Consumers {
23 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn add_consumer(mut self, consumer: Consumer) -> Self {
28 self.0.push(consumer);
29 self
30 }
31
32 pub(crate) fn merge(&mut self, consumers: Self) {
33 for consumer in consumers.0 {
34 self.0.push(consumer);
35 }
36 }
37}
38
39impl Deref for Consumers {
40 type Target = Vec<Consumer>;
41
42 fn deref(&self) -> &Self::Target {
43 &self.0
44 }
45}
46
47#[derive(Clone)]
48pub struct Consumer {
49 pub(crate) stream_keys: &'static [&'static str],
50 pub(crate) opts: ConsumerOpts,
51 pub(crate) handler: BoxedHandler,
52}
53
54#[derive(Default, Clone)]
55pub struct ConsumerOpts(pub(crate) SeaConsumerOptions);
56
57impl Consumer {
58 #[allow(clippy::should_implement_trait)]
59 pub fn default() -> ConsumerOpts {
60 ConsumerOpts(Default::default())
61 }
62
63 pub fn mode(mode: ConsumerMode) -> ConsumerOpts {
64 ConsumerOpts(SeaConsumerOptions::new(mode))
65 }
66
67 pub(crate) async fn new_instance(&self, streamer: &Streamer) -> Result<ConsumerInstance> {
68 let consumer = streamer
69 .create_consumer(self.stream_keys, self.opts.clone())
70 .await?;
71 Ok(ConsumerInstance {
72 consumer,
73 handler: self.handler.clone(),
74 })
75 }
76}
77
78impl ConsumerOpts {
79 pub fn group_id(mut self, group_id: &'static str) -> Self {
80 let _ = self.0.set_consumer_group(ConsumerGroup::new(group_id));
81 self
82 }
83 #[cfg(feature = "kafka")]
84 pub fn kafka_consumer_options<F>(mut self, func: F) -> Self
85 where
86 F: FnOnce(&mut KafkaConsumerOptions) + Send + Sync + 'static,
87 {
88 self.0.set_kafka_consumer_options(func);
89 self
90 }
91 #[cfg(feature = "redis")]
92 pub fn redis_consumer_options<F>(mut self, func: F) -> Self
93 where
94 F: FnOnce(&mut RedisConsumerOptions) + Send + Sync + 'static,
95 {
96 self.0.set_redis_consumer_options(func);
97 self
98 }
99 #[cfg(feature = "stdio")]
100 pub fn stdio_consumer_options<F>(mut self, func: F) -> Self
101 where
102 F: FnOnce(&mut StdioConsumerOptions) + Send + Sync + 'static,
103 {
104 self.0.set_stdio_consumer_options(func);
105 self
106 }
107 #[cfg(feature = "file")]
108 pub fn file_consumer_options<F>(mut self, func: F) -> Self
109 where
110 F: FnOnce(&mut FileConsumerOptions) + Send + Sync + 'static,
111 {
112 self.0.set_file_consumer_options(func);
113 self
114 }
115
116 pub fn consume<H, A>(self, stream_keys: &'static [&'static str], handler: H) -> Consumer
117 where
118 H: Handler<A> + Sync,
119 A: 'static,
120 {
121 Consumer {
122 handler: BoxedHandler::from_handler(handler),
123 stream_keys,
124 opts: self,
125 }
126 }
127}
128
129pub(crate) struct ConsumerInstance {
130 consumer: SeaConsumer,
131 handler: BoxedHandler,
132}
133
134impl ConsumerInstance {
135 pub async fn schedule(self, app: Arc<App>) -> Result<String> {
136 let ConsumerInstance { consumer, handler } = self;
137 loop {
138 let message = consumer.next().await.context("consumer poll msg failed")?;
139 BoxedHandler::call(handler.clone(), message, app.clone()).await;
140 }
141 }
142}