Skip to main content

summer_pubsub/
lib.rs

1//! [![summer-rs](https://img.shields.io/github/stars/summer-rs/summer-rs)](https://summer-rs.github.io/)
2#![doc(html_favicon_url = "https://summer-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://summer-rs.github.io/logo.svg")]
4
5pub mod config;
6pub mod consumer;
7pub mod extractor;
8pub mod handler;
9pub mod message;
10pub mod producer;
11
12pub use config::PubSubConfig;
13pub use consumer::{resolve_subscription, resolve_topic, Consumer, ConsumerOpts, Consumers};
14pub use google_cloud_pubsub;
15pub use handler::{auto_consumers, TypedConsumer, TypedHandlerRegistrar};
16pub use message::Message;
17pub use producer::PubSubProducer;
18pub use summer_ext_macros::pubsub_listener;
19
20use config::credentials_from_file;
21use google_cloud_pubsub::client::Subscriber;
22use std::ops::Deref;
23use std::sync::Arc;
24use summer::async_trait;
25use summer::config::ConfigRegistry;
26use summer::plugin::component::ComponentRef;
27use summer::plugin::{ComponentRegistry, MutableComponentRegistry};
28use summer::{
29    app::{App, AppBuilder},
30    plugin::Plugin,
31};
32
33pub use google_cloud_pubsub::*;
34
35pub trait PubSubConfigurator {
36    fn add_consumer(&mut self, consumers: Consumers) -> &mut Self;
37}
38
39impl PubSubConfigurator for AppBuilder {
40    fn add_consumer(&mut self, new_consumers: Consumers) -> &mut Self {
41        if let Some(consumers) = self.get_component_ref::<Consumers>() {
42            unsafe {
43                let raw_ptr = ComponentRef::into_raw(consumers);
44                let consumers = &mut *(raw_ptr as *mut Consumers);
45                consumers.merge(new_consumers);
46            }
47            self
48        } else {
49            self.add_component(new_consumers)
50        }
51    }
52}
53
54pub struct PubSubPlugin;
55
56#[async_trait]
57impl Plugin for PubSubPlugin {
58    async fn build(&self, app: &mut AppBuilder) {
59        let config = app
60            .get_config::<PubSubConfig>()
61            .expect("summer-pubsub: config with prefix `pubsub` is required");
62
63        if !config.enabled {
64            tracing::info!("summer-pubsub: disabled by config (`pubsub.enabled = false`)");
65            return;
66        }
67
68        let creds_opt = config
69            .credentials
70            .as_ref()
71            .map(|path| credentials_from_file(path))
72            .transpose()
73            .expect("summer-pubsub: load credentials failed");
74
75        let mut sub_builder = Subscriber::builder();
76        if let Some(endpoint) = &config.endpoint {
77            sub_builder = sub_builder.with_endpoint(endpoint.clone());
78        }
79        if let Some(ref creds) = creds_opt {
80            sub_builder = sub_builder.with_credentials(creds.clone());
81        }
82
83        let subscriber = sub_builder
84            .build()
85            .await
86            .expect("summer-pubsub: create Subscriber failed");
87
88        let producer = PubSubProducer::new(
89            config.project_id.clone(),
90            config.endpoint.clone(),
91            creds_opt,
92        );
93        app.add_component(subscriber.clone());
94        app.add_component(producer);
95
96        if let Some(consumers) = app.get_component_ref::<Consumers>() {
97            for consumer in consumers.deref().iter() {
98                let instance = consumer.new_instance(config.project_id.as_str());
99                app.add_scheduler(|app: Arc<App>| Box::new(instance.schedule(app)));
100                tracing::info!(
101                    "summer-pubsub: register scheduler for subscription `{}`",
102                    consumer.subscription_literal
103                );
104            }
105        } else {
106            tracing::info!("summer-pubsub: no Consumers registered");
107        }
108    }
109}