summer-pubsub 0.7.0

Integrate Google Cloud Pub/Sub (google-cloud-pubsub) with summer-rs
Documentation
use crate::handler::{BoxedHandler, HandlerArgs, PubSubEnvelope};
use google_cloud_pubsub::client::Subscriber;
use std::ops::Deref;
use std::sync::Arc;
use summer::app::App;
use summer::error::Result;
use summer::plugin::ComponentRegistry;
use summer::signal;

#[derive(Clone, Default)]
pub struct Consumers(Vec<Consumer>);

impl Consumers {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn add_consumer(mut self, consumer: Consumer) -> Self {
        self.0.push(consumer);
        self
    }

    pub(crate) fn merge(&mut self, consumers: Self) {
        for consumer in consumers.0 {
            self.0.push(consumer);
        }
    }
}

impl Deref for Consumers {
    type Target = Vec<Consumer>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

#[derive(Clone)]
pub struct Consumer {
    pub(crate) subscription_literal: &'static str,
    pub(crate) handler: BoxedHandler,
}

#[derive(Clone, Default)]
pub struct ConsumerOpts;

impl Consumer {
    pub(crate) fn new_instance(&self, project_id: &str) -> ConsumerInstance {
        ConsumerInstance {
            subscription: resolve_subscription(project_id, self.subscription_literal),
            handler: self.handler.clone(),
        }
    }
}

impl ConsumerOpts {
    pub fn consume<H, A>(self, subscription: &'static str, handler: H) -> Consumer
    where
        H: HandlerArgs<A> + Sync,
        A: 'static,
    {
        Consumer {
            handler: BoxedHandler::from_handler(handler),
            subscription_literal: subscription,
        }
    }
}

pub(crate) struct ConsumerInstance {
    subscription: String,
    handler: BoxedHandler,
}

impl ConsumerInstance {
    pub async fn schedule(self, app: Arc<App>) -> Result<String> {
        let ConsumerInstance {
            subscription,
            handler,
        } = self;
        let subscriber = app.get_component::<Subscriber>().expect(
            "summer-pubsub: Subscriber component missing; add PubSubPlugin before consumers run",
        );
        let mut stream = subscriber.subscribe(subscription.as_str()).build();
        let shutdown = signal::shutdown_signal("pubsub consumer");
        tokio::pin!(shutdown);

        loop {
            let next = tokio::select! {
                biased;
                _ = &mut shutdown => {
                    tracing::info!(
                        "pubsub subscription {subscription}: shutdown signal received, stopping consumer"
                    );
                    break;
                }
                n = stream.next() => n,
            };

            let Some(result) = next else {
                tracing::warn!("pubsub subscription {subscription}: stream closed");
                break;
            };
            let (grpc, h) = match result {
                Ok(v) => v,
                Err(e) => {
                    tracing::error!(?e, "pubsub subscription {subscription}: stream error");
                    break;
                }
            };
            let env = PubSubEnvelope::new(h);
            BoxedHandler::call(handler.clone(), grpc, env, app.clone()).await;
        }
        Ok(format!("pubsub consumer {subscription} stopped"))
    }
}

pub fn resolve_subscription(project_id: &str, literal: &str) -> String {
    if literal.starts_with("projects/") && literal.contains("/subscriptions/") {
        literal.to_string()
    } else {
        format!("projects/{project_id}/subscriptions/{literal}")
    }
}

pub fn resolve_topic(project_id: &str, literal: &str) -> String {
    if literal.starts_with("projects/") && literal.contains("/topics/") {
        literal.to_string()
    } else {
        format!("projects/{project_id}/topics/{literal}")
    }
}