spring_stream/
consumer.rs

1use crate::{
2    handler::{BoxedHandler, Handler},
3    Streamer,
4};
5use anyhow::Context;
6#[cfg(feature = "file")]
7use sea_streamer::file::FileConsumerOptions;
8#[cfg(feature = "kafka")]
9use sea_streamer::kafka::KafkaConsumerOptions;
10#[cfg(feature = "redis")]
11use sea_streamer::redis::RedisConsumerOptions;
12#[cfg(feature = "stdio")]
13use sea_streamer::stdio::StdioConsumerOptions;
14use sea_streamer::Consumer as _;
15use sea_streamer::{ConsumerGroup, ConsumerMode, ConsumerOptions, SeaConsumer, SeaConsumerOptions};
16use spring::{app::App, error::Result};
17use std::{ops::Deref, sync::Arc};
18
19#[derive(Clone, Default)]
20pub struct Consumers(Vec<Consumer>);
21
22impl Consumers {
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    pub fn add_consumer(mut self, consumer: Consumer) -> Self {
28        self.0.push(consumer);
29        self
30    }
31
32    pub(crate) fn merge(&mut self, consumers: Self) {
33        for consumer in consumers.0 {
34            self.0.push(consumer);
35        }
36    }
37}
38
39impl Deref for Consumers {
40    type Target = Vec<Consumer>;
41
42    fn deref(&self) -> &Self::Target {
43        &self.0
44    }
45}
46
47#[derive(Clone)]
48pub struct Consumer {
49    pub(crate) stream_keys: &'static [&'static str],
50    pub(crate) opts: ConsumerOpts,
51    pub(crate) handler: BoxedHandler,
52}
53
54#[derive(Default, Clone)]
55pub struct ConsumerOpts(pub(crate) SeaConsumerOptions);
56
57impl Consumer {
58    #[allow(clippy::should_implement_trait)]
59    pub fn default() -> ConsumerOpts {
60        ConsumerOpts(Default::default())
61    }
62
63    pub fn mode(mode: ConsumerMode) -> ConsumerOpts {
64        ConsumerOpts(SeaConsumerOptions::new(mode))
65    }
66
67    pub(crate) async fn new_instance(&self, streamer: &Streamer) -> Result<ConsumerInstance> {
68        let consumer = streamer
69            .create_consumer(self.stream_keys, self.opts.clone())
70            .await?;
71        Ok(ConsumerInstance {
72            consumer,
73            handler: self.handler.clone(),
74        })
75    }
76}
77
78impl ConsumerOpts {
79    pub fn group_id(mut self, group_id: &'static str) -> Self {
80        let _ = self.0.set_consumer_group(ConsumerGroup::new(group_id));
81        self
82    }
83    #[cfg(feature = "kafka")]
84    pub fn kafka_consumer_options<F>(mut self, func: F) -> Self
85    where
86        F: FnOnce(&mut KafkaConsumerOptions) + Send + Sync + 'static,
87    {
88        self.0.set_kafka_consumer_options(func);
89        self
90    }
91    #[cfg(feature = "redis")]
92    pub fn redis_consumer_options<F>(mut self, func: F) -> Self
93    where
94        F: FnOnce(&mut RedisConsumerOptions) + Send + Sync + 'static,
95    {
96        self.0.set_redis_consumer_options(func);
97        self
98    }
99    #[cfg(feature = "stdio")]
100    pub fn stdio_consumer_options<F>(mut self, func: F) -> Self
101    where
102        F: FnOnce(&mut StdioConsumerOptions) + Send + Sync + 'static,
103    {
104        self.0.set_stdio_consumer_options(func);
105        self
106    }
107    #[cfg(feature = "file")]
108    pub fn file_consumer_options<F>(mut self, func: F) -> Self
109    where
110        F: FnOnce(&mut FileConsumerOptions) + Send + Sync + 'static,
111    {
112        self.0.set_file_consumer_options(func);
113        self
114    }
115
116    pub fn consume<H, A>(self, stream_keys: &'static [&'static str], handler: H) -> Consumer
117    where
118        H: Handler<A> + Sync,
119        A: 'static,
120    {
121        Consumer {
122            handler: BoxedHandler::from_handler(handler),
123            stream_keys,
124            opts: self,
125        }
126    }
127}
128
129pub(crate) struct ConsumerInstance {
130    consumer: SeaConsumer,
131    handler: BoxedHandler,
132}
133
134impl ConsumerInstance {
135    pub async fn schedule(self, app: Arc<App>) -> Result<String> {
136        let ConsumerInstance { consumer, handler } = self;
137        loop {
138            let message = consumer.next().await.context("consumer poll msg failed")?;
139            BoxedHandler::call(handler.clone(), message, app.clone()).await;
140        }
141    }
142}