Skip to main content

sse_gateway_gcp/
lib.rs

1//! Google Cloud Pub/Sub adapter for SSE Gateway
2//!
3//! # Example
4//!
5//! ```rust,ignore
6//! use sse_gateway::Gateway;
7//! use sse_gateway_gcp::GcpPubSubSource;
8//!
9//! Gateway::builder()
10//!     .source(GcpPubSubSource::new("my-project", "my-subscription"))
11//!     .storage(sse_gateway::MemoryStorage::default())
12//!     .build()?
13//!     .run()
14//!     .await
15//! ```
16
17use async_trait::async_trait;
18use google_cloud_pubsub::client::{Client, ClientConfig};
19use sse_gateway::{ConnectionManager, IncomingMessage, MessageHandler, MessageSource};
20use tokio_util::sync::CancellationToken;
21use tracing::{error, info};
22
23/// Google Cloud Pub/Sub message source
24///
25/// # Message Attributes
26///
27/// The source reads the following attributes from Pub/Sub messages:
28/// - `channel_id`: Target channel (optional, omit for broadcast)
29/// - `event_type`: Event type (defaults to "message")
30/// - `id`: Business message ID (optional)
31pub struct GcpPubSubSource {
32    project_id: String,
33    subscription_id: String,
34}
35
36impl GcpPubSubSource {
37    /// Create a new GCP Pub/Sub source
38    pub fn new(project_id: impl Into<String>, subscription_id: impl Into<String>) -> Self {
39        Self {
40            project_id: project_id.into(),
41            subscription_id: subscription_id.into(),
42        }
43    }
44}
45
46#[async_trait]
47impl MessageSource for GcpPubSubSource {
48    async fn start(
49        &self,
50        handler: MessageHandler,
51        _connection_manager: ConnectionManager,
52        cancel: CancellationToken,
53    ) -> anyhow::Result<()> {
54        info!(
55            project = %self.project_id,
56            subscription = %self.subscription_id,
57            "Starting GCP Pub/Sub"
58        );
59
60        let config = ClientConfig::default().with_auth().await?;
61        let client = Client::new(config).await?;
62        let subscription = client.subscription(&self.subscription_id);
63
64        info!("Connected to GCP Pub/Sub");
65
66        subscription
67            .receive(
68                move |message, _cancel| {
69                    let handler = handler.clone();
70                    async move {
71                        let msg = &message.message;
72
73                        let channel_id = msg.attributes.get("channel_id").map(|s| s.to_string());
74                        let event_type = msg
75                            .attributes
76                            .get("event_type")
77                            .map(|s| s.as_str())
78                            .unwrap_or("message")
79                            .to_string();
80                        let id = msg.attributes.get("id").map(|s| s.to_string());
81                        let data = String::from_utf8_lossy(&msg.data).to_string();
82
83                        handler(IncomingMessage {
84                            channel_id,
85                            event_type,
86                            data,
87                            id,
88                        });
89
90                        if let Err(e) = message.ack().await {
91                            error!(error = %e, "Failed to ack message");
92                        }
93                    }
94                },
95                cancel,
96                None,
97            )
98            .await?;
99
100        info!("GCP Pub/Sub stopped");
101        Ok(())
102    }
103
104    fn name(&self) -> &'static str {
105        "GCP Pub/Sub"
106    }
107}