Skip to main content

force_pubsub/
handler.rs

1//! Pub/Sub API handler.
2
3use std::pin::Pin;
4use std::sync::Arc;
5use tonic::transport::{Channel, ClientTlsConfig};
6
7use force::auth::Authenticator;
8use force::session::Session;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use tokio::sync::OnceCell;
13use tokio_stream::Stream;
14
15use crate::config::{PubSubConfig, ReplayPreset};
16use crate::error::{PubSubError, Result};
17use crate::interceptor;
18use crate::publish_sink::{PublishSink, open_publish_stream};
19use crate::publisher::publish_unary;
20use crate::schema_cache::SchemaCache;
21use crate::subscriber::{subscribe_dynamic, subscribe_typed_dynamic};
22use crate::types::{PubSubEvent, PublishResponse};
23
24use crate::proto::eventbus_v1::{SchemaRequest, TopicRequest, pub_sub_client::PubSubClient};
25
26/// JSON structure of Salesforce's `/services/oauth2/userinfo` response (relevant fields only).
27#[derive(serde::Deserialize)]
28struct UserInfo {
29    organization_id: String,
30}
31
32/// Fetch the 18-char org ID from the Salesforce userinfo endpoint.
33///
34/// Used by [`PubSubHandler::get_tenant_id`] as the initialiser for its [`OnceCell`].
35async fn fetch_tenant_id<A: Authenticator>(session: &Arc<Session<A>>) -> Result<String> {
36    let token = session.token_manager().token().await?;
37    let userinfo_url = format!("{}/services/oauth2/userinfo", token.instance_url());
38
39    let resp = reqwest::Client::new()
40        .get(&userinfo_url)
41        .bearer_auth(token.as_str())
42        .send()
43        .await
44        .map_err(|e| PubSubError::Config(format!("userinfo request failed: {e}")))?;
45
46    if !resp.status().is_success() {
47        return Err(PubSubError::Config(format!(
48            "userinfo returned status {}",
49            resp.status()
50        )));
51    }
52
53    let body = force::http::read_capped_body(resp, 1024 * 1024)
54        .await
55        .map_err(|e| PubSubError::Config(format!("userinfo parse failed: {e}")))?;
56
57    let info: UserInfo = serde_json::from_str(&body)
58        .map_err(|e| PubSubError::Config(format!("userinfo parse failed: {e}")))?;
59
60    Ok(info.organization_id)
61}
62
63/// Public-facing topic metadata (mirrors proto without leaking generated types).
64#[derive(Debug, Clone)]
65pub struct TopicInfo {
66    /// Topic name (e.g., `/event/MyEvent__e`).
67    pub topic_name: String,
68    /// Topic URI.
69    pub topic_uri: String,
70    /// Whether events can be published to this topic.
71    pub can_publish: bool,
72    /// Whether events can be subscribed to on this topic.
73    pub can_subscribe: bool,
74    /// Current Avro schema ID for this topic.
75    pub schema_id: String,
76}
77
78/// Public-facing schema metadata.
79#[derive(Debug, Clone)]
80pub struct SchemaInfo {
81    /// Schema ID.
82    pub schema_id: String,
83    /// Avro schema JSON string.
84    pub schema_json: String,
85}
86
87/// Entry point for all Salesforce Pub/Sub operations.
88///
89/// Obtained by calling [`PubSubHandler::connect`] with a [`Session`] and [`PubSubConfig`].
90/// The handler is cheaply cloneable — clones share the same gRPC channel and schema cache.
91#[derive(Clone)]
92pub struct PubSubHandler<A: Authenticator> {
93    pub(crate) session: Arc<Session<A>>,
94    /// Configuration for this handler, used by subscribe/publish operations in later tasks.
95    pub(crate) config: PubSubConfig,
96    /// Shared schema cache, populated during subscribe/publish operations in later tasks.
97    pub schema_cache: SchemaCache,
98    pub(crate) channel: Channel,
99    /// Lazily fetched org ID (18-char) from `/services/oauth2/userinfo`.
100    ///
101    /// Populated on the first call to [`Self::get_tenant_id`] and reused thereafter.
102    tenant_id: Arc<OnceCell<String>>,
103}
104
105impl<A: Authenticator> PubSubHandler<A> {
106    /// Connect to the Pub/Sub gRPC endpoint and return a handler.
107    ///
108    /// Validates configuration and establishes the gRPC channel. This is async
109    /// because channel creation involves a DNS lookup and TLS handshake.
110    ///
111    /// # Errors
112    ///
113    /// Returns `PubSubError::Config` if `batch_size` is out of range (1–100).
114    /// Returns `PubSubError::Connect` if the gRPC channel cannot be established.
115    pub async fn connect(session: Arc<Session<A>>, config: PubSubConfig) -> Result<Self> {
116        if config.batch_size < 1 || config.batch_size > 100 {
117            return Err(PubSubError::Config(
118                "batch_size must be between 1 and 100".to_string(),
119            ));
120        }
121
122        let endpoint = Channel::from_shared(config.endpoint.clone())
123            .map_err(|e| PubSubError::Config(format!("invalid endpoint: {e}")))?;
124        let endpoint = if endpoint.uri().scheme_str() == Some("https") {
125            endpoint.tls_config(ClientTlsConfig::new().with_webpki_roots())?
126        } else {
127            endpoint
128        };
129        let channel = endpoint.connect().await?;
130
131        Ok(Self {
132            session,
133            config,
134            schema_cache: SchemaCache::new(),
135            channel,
136            tenant_id: Arc::new(OnceCell::new()),
137        })
138    }
139
140    /// Build a gRPC client for each call (channels are cheap to clone).
141    fn grpc_client(&self) -> PubSubClient<Channel> {
142        PubSubClient::new(self.channel.clone())
143    }
144
145    /// Fetch the org's 18-char tenant ID from `/services/oauth2/userinfo`, caching the result.
146    ///
147    /// Salesforce's userinfo response includes `organization_id` which is the 18-char org ID
148    /// required as the `tenantid` gRPC metadata header.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`PubSubError::Config`] if the userinfo endpoint cannot be reached or the
153    /// response does not contain a valid `organization_id` field.
154    pub(crate) async fn get_tenant_id(&self) -> Result<&str> {
155        self.tenant_id
156            .get_or_try_init(|| fetch_tenant_id(&self.session))
157            .await
158            .map(String::as_str)
159    }
160
161    /// Build a tonic request with all three required Pub/Sub auth headers.
162    ///
163    /// Fetches a fresh token and (lazily) the tenant ID, then delegates to
164    /// [`crate::interceptor::build_metadata`].
165    async fn auth_request<T>(&self, message: T) -> Result<tonic::Request<T>> {
166        let token = self.session.token_manager().token().await?;
167        let tenant_id = self.get_tenant_id().await?.to_string();
168        let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
169        let mut req = tonic::Request::new(message);
170        *req.metadata_mut() = meta;
171        Ok(req)
172    }
173
174    /// Fetch metadata about a Pub/Sub topic.
175    ///
176    /// # Errors
177    ///
178    /// Returns `PubSubError::Transport` if the gRPC call fails.
179    pub async fn get_topic(&self, topic_name: &str) -> Result<TopicInfo> {
180        let req = self
181            .auth_request(TopicRequest {
182                topic_name: topic_name.to_string(),
183            })
184            .await?;
185
186        let resp = self.grpc_client().get_topic(req).await?;
187        let info = resp.into_inner();
188        Ok(TopicInfo {
189            topic_name: info.topic_name,
190            topic_uri: info.topic_uri,
191            can_publish: info.can_publish,
192            can_subscribe: info.can_subscribe,
193            schema_id: info.schema_id,
194        })
195    }
196
197    /// Fetch an Avro schema by its ID.
198    ///
199    /// Results are **not** automatically cached here — call [`SchemaCache::parse_and_insert`]
200    /// with the returned `schema_json` to cache it.
201    ///
202    /// # Errors
203    ///
204    /// Returns `PubSubError::Transport` if the gRPC call fails (including schema not found).
205    pub async fn get_schema(&self, schema_id: &str) -> Result<SchemaInfo> {
206        let req = self
207            .auth_request(SchemaRequest {
208                schema_id: schema_id.to_string(),
209            })
210            .await?;
211
212        let resp = self.grpc_client().get_schema(req).await?;
213        let info = resp.into_inner();
214        Ok(SchemaInfo {
215            schema_id: info.schema_id,
216            schema_json: info.schema_json,
217        })
218    }
219}
220
221impl<A: Authenticator + Send + Sync + 'static> PubSubHandler<A> {
222    /// Publish events to a topic via the unary Publish RPC.
223    ///
224    /// Automatically resolves the Avro schema for `topic` by calling `GetTopic`
225    /// to obtain the schema ID, then fetching the schema via `GetSchema` if it
226    /// is not already cached.
227    ///
228    /// # Errors
229    ///
230    /// Returns `PubSubError::Transport` if the `GetTopic` or `GetSchema` RPC fails.
231    /// Returns `PubSubError::Avro` if encoding fails.
232    pub async fn publish<T: Serialize + Send>(
233        &self,
234        topic: &str,
235        events: Vec<T>,
236    ) -> Result<PublishResponse> {
237        let topic_info = self.get_topic(topic).await?;
238        let schema_id = &topic_info.schema_id;
239
240        // Ensure the schema is in the cache (fetches from GetSchema if not).
241        let token = self.session.token_manager().token().await?;
242        let tenant_id = self.get_tenant_id().await?.to_string();
243        let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
244        self.schema_cache
245            .get_or_fetch(schema_id, &self.channel, meta)
246            .await?;
247
248        publish_unary(
249            &self.session,
250            &self.channel,
251            &self.schema_cache,
252            schema_id,
253            topic,
254            events,
255            &tenant_id,
256        )
257        .await
258    }
259
260    /// Subscribe to a topic, yielding decoded events as [`serde_json::Value`].
261    ///
262    /// The returned stream emits [`PubSubEvent<Value>`] items. Use [`ReplayPreset`]
263    /// to control where playback starts.
264    ///
265    /// # Errors
266    ///
267    /// Returns [`PubSubError::Config`] if the tenant ID cannot be fetched from userinfo.
268    pub async fn subscribe(
269        &self,
270        topic: &str,
271        replay: ReplayPreset,
272    ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<Value>>> + Send>>> {
273        let tenant_id = self.get_tenant_id().await?.to_string();
274        Ok(subscribe_dynamic(
275            Arc::clone(&self.session),
276            self.config.clone(),
277            self.schema_cache.clone(),
278            self.channel.clone(),
279            topic.to_string(),
280            replay,
281            tenant_id,
282        ))
283    }
284
285    /// Subscribe to a topic, yielding typed events deserialized as `T`.
286    ///
287    /// # Errors
288    ///
289    /// Returns [`PubSubError::Config`] if the tenant ID cannot be fetched from userinfo.
290    pub async fn subscribe_typed<T>(
291        &self,
292        topic: &str,
293        replay: ReplayPreset,
294    ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<T>>> + Send>>>
295    where
296        T: DeserializeOwned + Send + 'static,
297    {
298        let tenant_id = self.get_tenant_id().await?.to_string();
299        Ok(subscribe_typed_dynamic(
300            Arc::clone(&self.session),
301            self.config.clone(),
302            self.schema_cache.clone(),
303            self.channel.clone(),
304            topic.to_string(),
305            replay,
306            tenant_id,
307        ))
308    }
309
310    /// Open a bidirectional streaming `PublishStream` RPC and return a [`PublishSink`].
311    ///
312    /// The returned sink allows callers to send multiple batches of events to
313    /// `topic` without the per-call overhead of the unary [`Self::publish`] RPC.
314    /// The server streams back [`PublishResponse`] acknowledgements, which are
315    /// accessible via [`PublishSink::responses`].
316    ///
317    /// # Errors
318    ///
319    /// - [`PubSubError::Config`] if the tenant ID cannot be fetched.
320    /// - [`PubSubError::Transport`] if the gRPC stream cannot be opened.
321    pub async fn publish_stream<T: Serialize + Send + 'static>(
322        &self,
323        topic: &str,
324    ) -> Result<PublishSink<T>> {
325        let token = self.session.token_manager().token().await?;
326        let tenant_id = self.get_tenant_id().await?.to_string();
327
328        open_publish_stream(
329            Arc::clone(&self.session),
330            self.channel.clone(),
331            self.schema_cache.clone(),
332            tenant_id,
333            topic.to_string(),
334            &token,
335        )
336        .await
337    }
338}