spring_stream/
consumer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use crate::{
    handler::{BoxedHandler, Handler},
    Streamer,
};
use anyhow::Context;
#[cfg(feature = "file")]
use sea_streamer::file::FileConsumerOptions;
#[cfg(feature = "kafka")]
use sea_streamer::kafka::KafkaConsumerOptions;
#[cfg(feature = "redis")]
use sea_streamer::redis::RedisConsumerOptions;
#[cfg(feature = "stdio")]
use sea_streamer::stdio::StdioConsumerOptions;
use sea_streamer::Consumer as _;
use sea_streamer::{ConsumerGroup, ConsumerMode, ConsumerOptions, SeaConsumer, SeaConsumerOptions};
use spring::{app::App, error::Result};
use std::{ops::Deref, sync::Arc};

#[derive(Clone, Default)]
pub struct Consumers(Vec<Consumer>);

impl Consumers {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn add_consumer(mut self, consumer: Consumer) -> Self {
        self.0.push(consumer);
        self
    }

    pub(crate) fn merge(&mut self, consumers: Self) {
        for consumer in consumers.0 {
            self.0.push(consumer);
        }
    }
}

impl Deref for Consumers {
    type Target = Vec<Consumer>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

#[derive(Clone)]
pub struct Consumer {
    pub(crate) stream_keys: &'static [&'static str],
    pub(crate) opts: ConsumerOpts,
    pub(crate) handler: BoxedHandler,
}

#[derive(Default, Clone)]
pub struct ConsumerOpts(pub(crate) SeaConsumerOptions);

impl Consumer {
    #[allow(clippy::should_implement_trait)]
    pub fn default() -> ConsumerOpts {
        ConsumerOpts(Default::default())
    }

    pub fn mode(mode: ConsumerMode) -> ConsumerOpts {
        ConsumerOpts(SeaConsumerOptions::new(mode))
    }

    pub(crate) async fn new_instance(&self, streamer: &Streamer) -> Result<ConsumerInstance> {
        let consumer = streamer
            .create_consumer(self.stream_keys, self.opts.clone())
            .await?;
        Ok(ConsumerInstance {
            consumer,
            handler: self.handler.clone(),
        })
    }
}

impl ConsumerOpts {
    pub fn group_id(mut self, group_id: &'static str) -> Self {
        let _ = self.0.set_consumer_group(ConsumerGroup::new(group_id));
        self
    }
    #[cfg(feature = "kafka")]
    pub fn kafka_consumer_options<F>(mut self, func: F) -> Self
    where
        F: FnOnce(&mut KafkaConsumerOptions) + Send + Sync + 'static,
    {
        self.0.set_kafka_consumer_options(func);
        self
    }
    #[cfg(feature = "redis")]
    pub fn redis_consumer_options<F>(mut self, func: F) -> Self
    where
        F: FnOnce(&mut RedisConsumerOptions) + Send + Sync + 'static,
    {
        self.0.set_redis_consumer_options(func);
        self
    }
    #[cfg(feature = "stdio")]
    pub fn stdio_consumer_options<F>(mut self, func: F) -> Self
    where
        F: FnOnce(&mut StdioConsumerOptions) + Send + Sync + 'static,
    {
        self.0.set_stdio_consumer_options(func);
        self
    }
    #[cfg(feature = "file")]
    pub fn file_consumer_options<F>(mut self, func: F) -> Self
    where
        F: FnOnce(&mut FileConsumerOptions) + Send + Sync + 'static,
    {
        self.0.set_file_consumer_options(func);
        self
    }

    pub fn consume<H, A>(self, stream_keys: &'static [&'static str], handler: H) -> Consumer
    where
        H: Handler<A> + Sync,
        A: 'static,
    {
        Consumer {
            handler: BoxedHandler::from_handler(handler),
            stream_keys,
            opts: self,
        }
    }
}

pub(crate) struct ConsumerInstance {
    consumer: SeaConsumer,
    handler: BoxedHandler,
}

impl ConsumerInstance {
    pub async fn schedule(self, app: Arc<App>) -> Result<String> {
        let ConsumerInstance { consumer, handler } = self;
        loop {
            let message = consumer.next().await.context("consumer poll msg failed")?;
            BoxedHandler::call(handler.clone(), message, app.clone()).await;
        }
    }
}