1#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4
5pub mod config;
6pub mod consumer;
7pub mod extractor;
8pub mod handler;
9
10pub use consumer::{ConsumerOpts, Consumers};
11#[cfg(feature = "file")]
12pub use sea_streamer::file;
13#[cfg(feature = "kafka")]
14pub use sea_streamer::kafka;
15#[cfg(feature = "redis")]
16pub use sea_streamer::redis;
17#[cfg(feature = "stdio")]
18pub use sea_streamer::stdio;
19pub use sea_streamer::ConsumerMode;
20pub use spring_macros::stream_listener;
22
23use anyhow::Context;
24use config::StreamConfig;
25use sea_streamer::{
26 Buffer, MessageHeader, Producer as _, SeaConsumer, SeaProducer, SeaStreamer, StreamKey,
27 Streamer as _, StreamerUri,
28};
29#[cfg(feature = "json")]
30use serde::Serialize;
31use spring::async_trait;
32use spring::config::ConfigRegistry;
33use spring::error::Result;
34use spring::plugin::component::ComponentRef;
35use spring::plugin::{ComponentRegistry, MutableComponentRegistry};
36use spring::{
37 app::{App, AppBuilder},
38 plugin::Plugin,
39};
40use std::ops::Deref;
41use std::{str::FromStr, sync::Arc};
42
43pub trait StreamConfigurator {
44 fn add_consumer(&mut self, consumers: Consumers) -> &mut Self;
45}
46
47impl StreamConfigurator for AppBuilder {
48 fn add_consumer(&mut self, new_consumers: Consumers) -> &mut Self {
49 if let Some(consumers) = self.get_component_ref::<Consumers>() {
50 unsafe {
51 let raw_ptr = ComponentRef::into_raw(consumers);
52 let consumers = &mut *(raw_ptr as *mut Consumers);
53 consumers.merge(new_consumers);
54 }
55 self
56 } else {
57 self.add_component(new_consumers)
58 }
59 }
60}
61
62pub struct StreamPlugin;
63
64#[async_trait]
65impl Plugin for StreamPlugin {
66 async fn build(&self, app: &mut AppBuilder) {
67 let config = app
68 .get_config::<StreamConfig>()
69 .expect("sea-streamer plugin config load failed");
70
71 let streamer = Streamer::new(config).await.expect("create streamer failed");
72
73 if let Some(consumers) = app.get_component_ref::<Consumers>() {
74 for consumer in consumers.deref().iter() {
75 let consumer_instance = consumer
76 .new_instance(&streamer)
77 .await
78 .expect("create customer instance failed");
79 app.add_scheduler(|app: Arc<App>| Box::new(consumer_instance.schedule(app)));
80 tracing::info!(
81 "register scheduler for \"{:?}\" stream consumer",
82 consumer.stream_keys
83 );
84 }
85 } else {
86 tracing::info!("not consumer be registry");
87 }
88 let producer = streamer
89 .create_generic_producer()
90 .await
91 .expect("create producer failed");
92
93 app.add_component(producer);
94 }
95}
96
97pub struct Streamer {
98 streamer: SeaStreamer,
99 config: StreamConfig,
100}
101
102impl Streamer {
103 async fn new(config: StreamConfig) -> Result<Self> {
104 let uri = StreamerUri::from_str(config.uri.as_str())
105 .with_context(|| format!("parse stream server \"{}\" failed", config.uri))?;
106
107 let streamer = SeaStreamer::connect(uri, config.connect_options())
108 .await
109 .with_context(|| format!("connect stream server \"{}\" failed", config.uri))?;
110
111 Ok(Self { streamer, config })
112 }
113
114 async fn create_consumer(
115 &self,
116 stream_keys: &'static [&'static str],
117 opts: ConsumerOpts,
118 ) -> Result<SeaConsumer> {
119 let consumer_options = self.config.new_consumer_options(opts);
120 let mut consumer_stream_keys = Vec::with_capacity(stream_keys.len());
121 for key in stream_keys {
122 consumer_stream_keys.push(
123 StreamKey::new(*key)
124 .with_context(|| format!("consumer stream key \"{}\" is valid", key))?,
125 );
126 }
127 Ok(self
128 .streamer
129 .create_consumer(&consumer_stream_keys, consumer_options)
130 .await
131 .with_context(|| format!("create stream consumer failed: {:?}", stream_keys))?)
132 }
133
134 async fn create_generic_producer(&self) -> Result<Producer> {
135 let producer_options = self.config.new_producer_options();
136 let producer = self
137 .streamer
138 .create_generic_producer(producer_options)
139 .await
140 .context("create stream generic producer failed")?;
141 Ok(Producer::new(producer))
142 }
143}
144
145#[derive(Clone)]
146pub struct Producer(Arc<SeaProducer>);
147
148impl Producer {
149 fn new(producer: SeaProducer) -> Self {
150 Self(Arc::new(producer))
151 }
152
153 #[cfg(feature = "json")]
154 pub async fn send_json<T: Serialize>(
155 &self,
156 stream_key: &str,
157 payload: T,
158 ) -> Result<MessageHeader> {
159 let json = serde_json::to_string(&payload).context("json serialize failed")?;
160 self.send_to(stream_key, json.as_str()).await
161 }
162
163 pub async fn send_to<S: Buffer>(&self, stream_key: &str, payload: S) -> Result<MessageHeader> {
164 let producer_stream_key = StreamKey::new(stream_key)
165 .with_context(|| format!("producer stream key \"{}\" is valid", stream_key))?;
166
167 let header = self
168 .0
169 .send_to(&producer_stream_key, payload)
170 .with_context(|| format!("send to stream key failed:{stream_key}"))?
171 .await
172 .with_context(|| {
173 format!("await response for sending stream key failed:{stream_key}")
174 })?;
175
176 Ok(header)
177 }
178}