1use async_trait::async_trait;
18use google_cloud_pubsub::client::{Client, ClientConfig};
19use sse_gateway::{ConnectionManager, IncomingMessage, MessageHandler, MessageSource};
20use tokio_util::sync::CancellationToken;
21use tracing::{error, info};
22
23pub struct GcpPubSubSource {
32 project_id: String,
33 subscription_id: String,
34}
35
36impl GcpPubSubSource {
37 pub fn new(project_id: impl Into<String>, subscription_id: impl Into<String>) -> Self {
39 Self {
40 project_id: project_id.into(),
41 subscription_id: subscription_id.into(),
42 }
43 }
44}
45
46#[async_trait]
47impl MessageSource for GcpPubSubSource {
48 async fn start(
49 &self,
50 handler: MessageHandler,
51 _connection_manager: ConnectionManager,
52 cancel: CancellationToken,
53 ) -> anyhow::Result<()> {
54 info!(
55 project = %self.project_id,
56 subscription = %self.subscription_id,
57 "Starting GCP Pub/Sub"
58 );
59
60 let config = ClientConfig::default().with_auth().await?;
61 let client = Client::new(config).await?;
62 let subscription = client.subscription(&self.subscription_id);
63
64 info!("Connected to GCP Pub/Sub");
65
66 subscription
67 .receive(
68 move |message, _cancel| {
69 let handler = handler.clone();
70 async move {
71 let msg = &message.message;
72
73 let channel_id = msg.attributes.get("channel_id").map(|s| s.to_string());
74 let event_type = msg
75 .attributes
76 .get("event_type")
77 .map(|s| s.as_str())
78 .unwrap_or("message")
79 .to_string();
80 let id = msg.attributes.get("id").map(|s| s.to_string());
81 let data = String::from_utf8_lossy(&msg.data).to_string();
82
83 handler(IncomingMessage {
84 channel_id,
85 event_type,
86 data,
87 id,
88 });
89
90 if let Err(e) = message.ack().await {
91 error!(error = %e, "Failed to ack message");
92 }
93 }
94 },
95 cancel,
96 None,
97 )
98 .await?;
99
100 info!("GCP Pub/Sub stopped");
101 Ok(())
102 }
103
104 fn name(&self) -> &'static str {
105 "GCP Pub/Sub"
106 }
107}