summer_pubsub/
consumer.rs1use crate::handler::{BoxedHandler, HandlerArgs, PubSubEnvelope};
2use google_cloud_pubsub::client::Subscriber;
3use std::ops::Deref;
4use std::sync::Arc;
5use summer::app::App;
6use summer::error::Result;
7use summer::plugin::ComponentRegistry;
8use summer::signal;
9
10#[derive(Clone, Default)]
11pub struct Consumers(Vec<Consumer>);
12
13impl Consumers {
14 pub fn new() -> Self {
15 Self::default()
16 }
17
18 pub fn add_consumer(mut self, consumer: Consumer) -> Self {
19 self.0.push(consumer);
20 self
21 }
22
23 pub(crate) fn merge(&mut self, consumers: Self) {
24 for consumer in consumers.0 {
25 self.0.push(consumer);
26 }
27 }
28}
29
30impl Deref for Consumers {
31 type Target = Vec<Consumer>;
32
33 fn deref(&self) -> &Self::Target {
34 &self.0
35 }
36}
37
38#[derive(Clone)]
39pub struct Consumer {
40 pub(crate) subscription_literal: &'static str,
41 pub(crate) handler: BoxedHandler,
42}
43
44#[derive(Clone, Default)]
45pub struct ConsumerOpts;
46
47impl Consumer {
48 pub(crate) fn new_instance(&self, project_id: &str) -> ConsumerInstance {
49 ConsumerInstance {
50 subscription: resolve_subscription(project_id, self.subscription_literal),
51 handler: self.handler.clone(),
52 }
53 }
54}
55
56impl ConsumerOpts {
57 pub fn consume<H, A>(self, subscription: &'static str, handler: H) -> Consumer
58 where
59 H: HandlerArgs<A> + Sync,
60 A: 'static,
61 {
62 Consumer {
63 handler: BoxedHandler::from_handler(handler),
64 subscription_literal: subscription,
65 }
66 }
67}
68
69pub(crate) struct ConsumerInstance {
70 subscription: String,
71 handler: BoxedHandler,
72}
73
74impl ConsumerInstance {
75 pub async fn schedule(self, app: Arc<App>) -> Result<String> {
76 let ConsumerInstance {
77 subscription,
78 handler,
79 } = self;
80 let subscriber = app.get_component::<Subscriber>().expect(
81 "summer-pubsub: Subscriber component missing; add PubSubPlugin before consumers run",
82 );
83 let mut stream = subscriber.subscribe(subscription.as_str()).build();
84 let shutdown = signal::shutdown_signal("pubsub consumer");
85 tokio::pin!(shutdown);
86
87 loop {
88 let next = tokio::select! {
89 biased;
90 _ = &mut shutdown => {
91 tracing::info!(
92 "pubsub subscription {subscription}: shutdown signal received, stopping consumer"
93 );
94 break;
95 }
96 n = stream.next() => n,
97 };
98
99 let Some(result) = next else {
100 tracing::warn!("pubsub subscription {subscription}: stream closed");
101 break;
102 };
103 let (grpc, h) = match result {
104 Ok(v) => v,
105 Err(e) => {
106 tracing::error!(?e, "pubsub subscription {subscription}: stream error");
107 break;
108 }
109 };
110 let env = PubSubEnvelope::new(h);
111 BoxedHandler::call(handler.clone(), grpc, env, app.clone()).await;
112 }
113 Ok(format!("pubsub consumer {subscription} stopped"))
114 }
115}
116
117pub fn resolve_subscription(project_id: &str, literal: &str) -> String {
118 if literal.starts_with("projects/") && literal.contains("/subscriptions/") {
119 literal.to_string()
120 } else {
121 format!("projects/{project_id}/subscriptions/{literal}")
122 }
123}
124
125pub fn resolve_topic(project_id: &str, literal: &str) -> String {
126 if literal.starts_with("projects/") && literal.contains("/topics/") {
127 literal.to_string()
128 } else {
129 format!("projects/{project_id}/topics/{literal}")
130 }
131}