Skip to main content

summer_pubsub/
consumer.rs

1use crate::handler::{BoxedHandler, HandlerArgs, PubSubEnvelope};
2use google_cloud_pubsub::client::Subscriber;
3use std::ops::Deref;
4use std::sync::Arc;
5use summer::app::App;
6use summer::error::Result;
7use summer::plugin::ComponentRegistry;
8use summer::signal;
9
10#[derive(Clone, Default)]
11pub struct Consumers(Vec<Consumer>);
12
13impl Consumers {
14    pub fn new() -> Self {
15        Self::default()
16    }
17
18    pub fn add_consumer(mut self, consumer: Consumer) -> Self {
19        self.0.push(consumer);
20        self
21    }
22
23    pub(crate) fn merge(&mut self, consumers: Self) {
24        for consumer in consumers.0 {
25            self.0.push(consumer);
26        }
27    }
28}
29
30impl Deref for Consumers {
31    type Target = Vec<Consumer>;
32
33    fn deref(&self) -> &Self::Target {
34        &self.0
35    }
36}
37
38#[derive(Clone)]
39pub struct Consumer {
40    pub(crate) subscription_literal: &'static str,
41    pub(crate) handler: BoxedHandler,
42}
43
44#[derive(Clone, Default)]
45pub struct ConsumerOpts;
46
47impl Consumer {
48    pub(crate) fn new_instance(&self, project_id: &str) -> ConsumerInstance {
49        ConsumerInstance {
50            subscription: resolve_subscription(project_id, self.subscription_literal),
51            handler: self.handler.clone(),
52        }
53    }
54}
55
56impl ConsumerOpts {
57    pub fn consume<H, A>(self, subscription: &'static str, handler: H) -> Consumer
58    where
59        H: HandlerArgs<A> + Sync,
60        A: 'static,
61    {
62        Consumer {
63            handler: BoxedHandler::from_handler(handler),
64            subscription_literal: subscription,
65        }
66    }
67}
68
69pub(crate) struct ConsumerInstance {
70    subscription: String,
71    handler: BoxedHandler,
72}
73
74impl ConsumerInstance {
75    pub async fn schedule(self, app: Arc<App>) -> Result<String> {
76        let ConsumerInstance {
77            subscription,
78            handler,
79        } = self;
80        let subscriber = app.get_component::<Subscriber>().expect(
81            "summer-pubsub: Subscriber component missing; add PubSubPlugin before consumers run",
82        );
83        let mut stream = subscriber.subscribe(subscription.as_str()).build();
84        let shutdown = signal::shutdown_signal("pubsub consumer");
85        tokio::pin!(shutdown);
86
87        loop {
88            let next = tokio::select! {
89                biased;
90                _ = &mut shutdown => {
91                    tracing::info!(
92                        "pubsub subscription {subscription}: shutdown signal received, stopping consumer"
93                    );
94                    break;
95                }
96                n = stream.next() => n,
97            };
98
99            let Some(result) = next else {
100                tracing::warn!("pubsub subscription {subscription}: stream closed");
101                break;
102            };
103            let (grpc, h) = match result {
104                Ok(v) => v,
105                Err(e) => {
106                    tracing::error!(?e, "pubsub subscription {subscription}: stream error");
107                    break;
108                }
109            };
110            let env = PubSubEnvelope::new(h);
111            BoxedHandler::call(handler.clone(), grpc, env, app.clone()).await;
112        }
113        Ok(format!("pubsub consumer {subscription} stopped"))
114    }
115}
116
117pub fn resolve_subscription(project_id: &str, literal: &str) -> String {
118    if literal.starts_with("projects/") && literal.contains("/subscriptions/") {
119        literal.to_string()
120    } else {
121        format!("projects/{project_id}/subscriptions/{literal}")
122    }
123}
124
125pub fn resolve_topic(project_id: &str, literal: &str) -> String {
126    if literal.starts_with("projects/") && literal.contains("/topics/") {
127        literal.to_string()
128    } else {
129        format!("projects/{project_id}/topics/{literal}")
130    }
131}