spring_stream/
lib.rs

1//! [![spring-rs](https://img.shields.io/github/stars/spring-rs/spring-rs)](https://spring-rs.github.io/docs/plugins/spring-stream)
2#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4
5pub mod config;
6pub mod consumer;
7pub mod extractor;
8pub mod handler;
9
10pub use consumer::{ConsumerOpts, Consumers};
11#[cfg(feature = "file")]
12pub use sea_streamer::file;
13#[cfg(feature = "kafka")]
14pub use sea_streamer::kafka;
15#[cfg(feature = "redis")]
16pub use sea_streamer::redis;
17#[cfg(feature = "stdio")]
18pub use sea_streamer::stdio;
19pub use sea_streamer::ConsumerMode;
20/////////////////stream-macros/////////////////////
21pub use spring_macros::stream_listener;
22
23use anyhow::Context;
24use config::StreamConfig;
25use sea_streamer::{
26    Buffer, MessageHeader, Producer as _, SeaConsumer, SeaProducer, SeaStreamer, StreamKey,
27    Streamer as _, StreamerUri,
28};
29#[cfg(feature = "json")]
30use serde::Serialize;
31use spring::async_trait;
32use spring::config::ConfigRegistry;
33use spring::error::Result;
34use spring::plugin::component::ComponentRef;
35use spring::plugin::{ComponentRegistry, MutableComponentRegistry};
36use spring::{
37    app::{App, AppBuilder},
38    plugin::Plugin,
39};
40use std::ops::Deref;
41use std::{str::FromStr, sync::Arc};
42
43pub trait StreamConfigurator {
44    fn add_consumer(&mut self, consumers: Consumers) -> &mut Self;
45}
46
47impl StreamConfigurator for AppBuilder {
48    fn add_consumer(&mut self, new_consumers: Consumers) -> &mut Self {
49        if let Some(consumers) = self.get_component_ref::<Consumers>() {
50            unsafe {
51                let raw_ptr = ComponentRef::into_raw(consumers);
52                let consumers = &mut *(raw_ptr as *mut Consumers);
53                consumers.merge(new_consumers);
54            }
55            self
56        } else {
57            self.add_component(new_consumers)
58        }
59    }
60}
61
62pub struct StreamPlugin;
63
64#[async_trait]
65impl Plugin for StreamPlugin {
66    async fn build(&self, app: &mut AppBuilder) {
67        let config = app
68            .get_config::<StreamConfig>()
69            .expect("sea-streamer plugin config load failed");
70
71        let streamer = Streamer::new(config).await.expect("create streamer failed");
72
73        if let Some(consumers) = app.get_component_ref::<Consumers>() {
74            for consumer in consumers.deref().iter() {
75                let consumer_instance = consumer
76                    .new_instance(&streamer)
77                    .await
78                    .expect("create customer instance failed");
79                app.add_scheduler(|app: Arc<App>| Box::new(consumer_instance.schedule(app)));
80                tracing::info!(
81                    "register scheduler for \"{:?}\" stream consumer",
82                    consumer.stream_keys
83                );
84            }
85        } else {
86            tracing::info!("not consumer be registry");
87        }
88        let producer = streamer
89            .create_generic_producer()
90            .await
91            .expect("create producer failed");
92
93        app.add_component(producer);
94    }
95}
96
97pub struct Streamer {
98    streamer: SeaStreamer,
99    config: StreamConfig,
100}
101
102impl Streamer {
103    async fn new(config: StreamConfig) -> Result<Self> {
104        let uri = StreamerUri::from_str(config.uri.as_str())
105            .with_context(|| format!("parse stream server \"{}\" failed", config.uri))?;
106
107        let streamer = SeaStreamer::connect(uri, config.connect_options())
108            .await
109            .with_context(|| format!("connect stream server \"{}\" failed", config.uri))?;
110
111        Ok(Self { streamer, config })
112    }
113
114    async fn create_consumer(
115        &self,
116        stream_keys: &'static [&'static str],
117        opts: ConsumerOpts,
118    ) -> Result<SeaConsumer> {
119        let consumer_options = self.config.new_consumer_options(opts);
120        let mut consumer_stream_keys = Vec::with_capacity(stream_keys.len());
121        for key in stream_keys {
122            consumer_stream_keys.push(
123                StreamKey::new(*key)
124                    .with_context(|| format!("consumer stream key \"{}\" is valid", key))?,
125            );
126        }
127        Ok(self
128            .streamer
129            .create_consumer(&consumer_stream_keys, consumer_options)
130            .await
131            .with_context(|| format!("create stream consumer failed: {:?}", stream_keys))?)
132    }
133
134    async fn create_generic_producer(&self) -> Result<Producer> {
135        let producer_options = self.config.new_producer_options();
136        let producer = self
137            .streamer
138            .create_generic_producer(producer_options)
139            .await
140            .context("create stream generic producer failed")?;
141        Ok(Producer::new(producer))
142    }
143}
144
145#[derive(Clone)]
146pub struct Producer(Arc<SeaProducer>);
147
148impl Producer {
149    fn new(producer: SeaProducer) -> Self {
150        Self(Arc::new(producer))
151    }
152
153    #[cfg(feature = "json")]
154    pub async fn send_json<T: Serialize>(
155        &self,
156        stream_key: &str,
157        payload: T,
158    ) -> Result<MessageHeader> {
159        let json = serde_json::to_string(&payload).context("json serialize failed")?;
160        self.send_to(stream_key, json.as_str()).await
161    }
162
163    pub async fn send_to<S: Buffer>(&self, stream_key: &str, payload: S) -> Result<MessageHeader> {
164        let producer_stream_key = StreamKey::new(stream_key)
165            .with_context(|| format!("producer stream key \"{}\" is valid", stream_key))?;
166
167        let header = self
168            .0
169            .send_to(&producer_stream_key, payload)
170            .with_context(|| format!("send to stream key failed:{stream_key}"))?
171            .await
172            .with_context(|| {
173                format!("await response for sending stream key failed:{stream_key}")
174            })?;
175
176        Ok(header)
177    }
178}