1#![doc(html_favicon_url = "https://summer-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://summer-rs.github.io/logo.svg")]
4
5pub mod config;
6pub mod consumer;
7pub mod extractor;
8pub mod handler;
9pub mod message;
10pub mod producer;
11
12pub use config::PubSubConfig;
13pub use consumer::{resolve_subscription, resolve_topic, Consumer, ConsumerOpts, Consumers};
14pub use google_cloud_pubsub;
15pub use handler::{auto_consumers, TypedConsumer, TypedHandlerRegistrar};
16pub use message::Message;
17pub use producer::PubSubProducer;
18pub use summer_ext_macros::pubsub_listener;
19
20use config::credentials_from_file;
21use google_cloud_pubsub::client::Subscriber;
22use std::ops::Deref;
23use std::sync::Arc;
24use summer::async_trait;
25use summer::config::ConfigRegistry;
26use summer::plugin::component::ComponentRef;
27use summer::plugin::{ComponentRegistry, MutableComponentRegistry};
28use summer::{
29 app::{App, AppBuilder},
30 plugin::Plugin,
31};
32
33pub use google_cloud_pubsub::*;
34
35pub trait PubSubConfigurator {
36 fn add_consumer(&mut self, consumers: Consumers) -> &mut Self;
37}
38
39impl PubSubConfigurator for AppBuilder {
40 fn add_consumer(&mut self, new_consumers: Consumers) -> &mut Self {
41 if let Some(consumers) = self.get_component_ref::<Consumers>() {
42 unsafe {
43 let raw_ptr = ComponentRef::into_raw(consumers);
44 let consumers = &mut *(raw_ptr as *mut Consumers);
45 consumers.merge(new_consumers);
46 }
47 self
48 } else {
49 self.add_component(new_consumers)
50 }
51 }
52}
53
54pub struct PubSubPlugin;
55
56#[async_trait]
57impl Plugin for PubSubPlugin {
58 async fn build(&self, app: &mut AppBuilder) {
59 let config = app
60 .get_config::<PubSubConfig>()
61 .expect("summer-pubsub: config with prefix `pubsub` is required");
62
63 if !config.enabled {
64 tracing::info!("summer-pubsub: disabled by config (`pubsub.enabled = false`)");
65 return;
66 }
67
68 let creds_opt = config
69 .credentials
70 .as_ref()
71 .map(|path| credentials_from_file(path))
72 .transpose()
73 .expect("summer-pubsub: load credentials failed");
74
75 let mut sub_builder = Subscriber::builder();
76 if let Some(endpoint) = &config.endpoint {
77 sub_builder = sub_builder.with_endpoint(endpoint.clone());
78 }
79 if let Some(ref creds) = creds_opt {
80 sub_builder = sub_builder.with_credentials(creds.clone());
81 }
82
83 let subscriber = sub_builder
84 .build()
85 .await
86 .expect("summer-pubsub: create Subscriber failed");
87
88 let producer = PubSubProducer::new(
89 config.project_id.clone(),
90 config.endpoint.clone(),
91 creds_opt,
92 );
93 app.add_component(subscriber.clone());
94 app.add_component(producer);
95
96 if let Some(consumers) = app.get_component_ref::<Consumers>() {
97 for consumer in consumers.deref().iter() {
98 let instance = consumer.new_instance(config.project_id.as_str());
99 app.add_scheduler(|app: Arc<App>| Box::new(instance.schedule(app)));
100 tracing::info!(
101 "summer-pubsub: register scheduler for subscription `{}`",
102 consumer.subscription_literal
103 );
104 }
105 } else {
106 tracing::info!("summer-pubsub: no Consumers registered");
107 }
108 }
109}