Skip to main content

force_pubsub/
handler.rs

1//! Pub/Sub API handler.
2
3use std::pin::Pin;
4use std::sync::Arc;
5use tonic::transport::Channel;
6
7use force::auth::Authenticator;
8use force::session::Session;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use tokio::sync::OnceCell;
13use tokio_stream::Stream;
14
15use crate::config::{PubSubConfig, ReplayPreset};
16use crate::error::{PubSubError, Result};
17use crate::interceptor;
18use crate::publish_sink::{PublishSink, open_publish_stream};
19use crate::publisher::publish_unary;
20use crate::schema_cache::SchemaCache;
21use crate::subscriber::{subscribe_dynamic, subscribe_typed_dynamic};
22use crate::types::{PubSubEvent, PublishResponse};
23
24use crate::proto::eventbus_v1::{SchemaRequest, TopicRequest, pub_sub_client::PubSubClient};
25
26/// JSON structure of Salesforce's `/services/oauth2/userinfo` response (relevant fields only).
27#[derive(serde::Deserialize)]
28struct UserInfo {
29    organization_id: String,
30}
31
32/// Fetch the 18-char org ID from the Salesforce userinfo endpoint.
33///
34/// Used by [`PubSubHandler::get_tenant_id`] as the initialiser for its [`OnceCell`].
35async fn fetch_tenant_id<A: Authenticator>(session: &Arc<Session<A>>) -> Result<String> {
36    let token = session.token_manager().token().await?;
37    let userinfo_url = format!("{}/services/oauth2/userinfo", token.instance_url());
38
39    let resp = reqwest::Client::new()
40        .get(&userinfo_url)
41        .bearer_auth(token.as_str())
42        .send()
43        .await
44        .map_err(|e| PubSubError::Config(format!("userinfo request failed: {e}")))?;
45
46    if !resp.status().is_success() {
47        return Err(PubSubError::Config(format!(
48            "userinfo returned status {}",
49            resp.status()
50        )));
51    }
52
53    let info: UserInfo = resp
54        .json()
55        .await
56        .map_err(|e| PubSubError::Config(format!("userinfo parse failed: {e}")))?;
57
58    Ok(info.organization_id)
59}
60
61/// Public-facing topic metadata (mirrors proto without leaking generated types).
62#[derive(Debug, Clone)]
63pub struct TopicInfo {
64    /// Topic name (e.g., `/event/MyEvent__e`).
65    pub topic_name: String,
66    /// Topic URI.
67    pub topic_uri: String,
68    /// Whether events can be published to this topic.
69    pub can_publish: bool,
70    /// Whether events can be subscribed to on this topic.
71    pub can_subscribe: bool,
72    /// Current Avro schema ID for this topic.
73    pub schema_id: String,
74}
75
76/// Public-facing schema metadata.
77#[derive(Debug, Clone)]
78pub struct SchemaInfo {
79    /// Schema ID.
80    pub schema_id: String,
81    /// Avro schema JSON string.
82    pub schema_json: String,
83}
84
85/// Entry point for all Salesforce Pub/Sub operations.
86///
87/// Obtained by calling [`PubSubHandler::connect`] with a [`Session`] and [`PubSubConfig`].
88/// The handler is cheaply cloneable — clones share the same gRPC channel and schema cache.
89#[derive(Clone)]
90pub struct PubSubHandler<A: Authenticator> {
91    pub(crate) session: Arc<Session<A>>,
92    /// Configuration for this handler, used by subscribe/publish operations in later tasks.
93    pub(crate) config: PubSubConfig,
94    /// Shared schema cache, populated during subscribe/publish operations in later tasks.
95    pub schema_cache: SchemaCache,
96    pub(crate) channel: Channel,
97    /// Lazily fetched org ID (18-char) from `/services/oauth2/userinfo`.
98    ///
99    /// Populated on the first call to [`Self::get_tenant_id`] and reused thereafter.
100    tenant_id: Arc<OnceCell<String>>,
101}
102
103impl<A: Authenticator> PubSubHandler<A> {
104    /// Connect to the Pub/Sub gRPC endpoint and return a handler.
105    ///
106    /// Validates configuration and establishes the gRPC channel. This is async
107    /// because channel creation involves a DNS lookup and TLS handshake.
108    ///
109    /// # Errors
110    ///
111    /// Returns `PubSubError::Config` if `batch_size` is out of range (1–100).
112    /// Returns `PubSubError::Connect` if the gRPC channel cannot be established.
113    pub async fn connect(session: Arc<Session<A>>, config: PubSubConfig) -> Result<Self> {
114        if config.batch_size < 1 || config.batch_size > 100 {
115            return Err(PubSubError::Config(
116                "batch_size must be between 1 and 100".to_string(),
117            ));
118        }
119
120        let channel = Channel::from_shared(config.endpoint.clone())
121            .map_err(|e| PubSubError::Config(format!("invalid endpoint: {e}")))?
122            .connect()
123            .await?;
124
125        Ok(Self {
126            session,
127            config,
128            schema_cache: SchemaCache::new(),
129            channel,
130            tenant_id: Arc::new(OnceCell::new()),
131        })
132    }
133
134    /// Build a gRPC client for each call (channels are cheap to clone).
135    fn grpc_client(&self) -> PubSubClient<Channel> {
136        PubSubClient::new(self.channel.clone())
137    }
138
139    /// Fetch the org's 18-char tenant ID from `/services/oauth2/userinfo`, caching the result.
140    ///
141    /// Salesforce's userinfo response includes `organization_id` which is the 18-char org ID
142    /// required as the `tenantid` gRPC metadata header.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`PubSubError::Config`] if the userinfo endpoint cannot be reached or the
147    /// response does not contain a valid `organization_id` field.
148    pub(crate) async fn get_tenant_id(&self) -> Result<&str> {
149        self.tenant_id
150            .get_or_try_init(|| fetch_tenant_id(&self.session))
151            .await
152            .map(String::as_str)
153    }
154
155    /// Build a tonic request with all three required Pub/Sub auth headers.
156    ///
157    /// Fetches a fresh token and (lazily) the tenant ID, then delegates to
158    /// [`crate::interceptor::build_metadata`].
159    async fn auth_request<T>(&self, message: T) -> Result<tonic::Request<T>> {
160        let token = self.session.token_manager().token().await?;
161        let tenant_id = self.get_tenant_id().await?.to_string();
162        let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
163        let mut req = tonic::Request::new(message);
164        *req.metadata_mut() = meta;
165        Ok(req)
166    }
167
168    /// Fetch metadata about a Pub/Sub topic.
169    ///
170    /// # Errors
171    ///
172    /// Returns `PubSubError::Transport` if the gRPC call fails.
173    pub async fn get_topic(&self, topic_name: &str) -> Result<TopicInfo> {
174        let req = self
175            .auth_request(TopicRequest {
176                topic_name: topic_name.to_string(),
177            })
178            .await?;
179
180        let resp = self.grpc_client().get_topic(req).await?;
181        let info = resp.into_inner();
182        Ok(TopicInfo {
183            topic_name: info.topic_name,
184            topic_uri: info.topic_uri,
185            can_publish: info.can_publish,
186            can_subscribe: info.can_subscribe,
187            schema_id: info.schema_id,
188        })
189    }
190
191    /// Fetch an Avro schema by its ID.
192    ///
193    /// Results are **not** automatically cached here — call [`SchemaCache::parse_and_insert`]
194    /// with the returned `schema_json` to cache it.
195    ///
196    /// # Errors
197    ///
198    /// Returns `PubSubError::Transport` if the gRPC call fails (including schema not found).
199    pub async fn get_schema(&self, schema_id: &str) -> Result<SchemaInfo> {
200        let req = self
201            .auth_request(SchemaRequest {
202                schema_id: schema_id.to_string(),
203            })
204            .await?;
205
206        let resp = self.grpc_client().get_schema(req).await?;
207        let info = resp.into_inner();
208        Ok(SchemaInfo {
209            schema_id: info.schema_id,
210            schema_json: info.schema_json,
211        })
212    }
213}
214
215impl<A: Authenticator + Send + Sync + 'static> PubSubHandler<A> {
216    /// Publish events to a topic via the unary Publish RPC.
217    ///
218    /// Automatically resolves the Avro schema for `topic` by calling `GetTopic`
219    /// to obtain the schema ID, then fetching the schema via `GetSchema` if it
220    /// is not already cached.
221    ///
222    /// # Errors
223    ///
224    /// Returns `PubSubError::Transport` if the `GetTopic` or `GetSchema` RPC fails.
225    /// Returns `PubSubError::Avro` if encoding fails.
226    pub async fn publish<T: Serialize + Send>(
227        &self,
228        topic: &str,
229        events: Vec<T>,
230    ) -> Result<PublishResponse> {
231        let topic_info = self.get_topic(topic).await?;
232        let schema_id = &topic_info.schema_id;
233
234        // Ensure the schema is in the cache (fetches from GetSchema if not).
235        let token = self.session.token_manager().token().await?;
236        let tenant_id = self.get_tenant_id().await?.to_string();
237        let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
238        self.schema_cache
239            .get_or_fetch(schema_id, &self.channel, meta)
240            .await?;
241
242        publish_unary(
243            &self.session,
244            &self.channel,
245            &self.schema_cache,
246            schema_id,
247            topic,
248            events,
249            &tenant_id,
250        )
251        .await
252    }
253
254    /// Subscribe to a topic, yielding decoded events as [`serde_json::Value`].
255    ///
256    /// The returned stream emits [`PubSubEvent<Value>`] items. Use [`ReplayPreset`]
257    /// to control where playback starts.
258    ///
259    /// # Errors
260    ///
261    /// Returns [`PubSubError::Config`] if the tenant ID cannot be fetched from userinfo.
262    pub async fn subscribe(
263        &self,
264        topic: &str,
265        replay: ReplayPreset,
266    ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<Value>>> + Send>>> {
267        let tenant_id = self.get_tenant_id().await?.to_string();
268        Ok(subscribe_dynamic(
269            Arc::clone(&self.session),
270            self.config.clone(),
271            self.schema_cache.clone(),
272            self.channel.clone(),
273            topic.to_string(),
274            replay,
275            tenant_id,
276        ))
277    }
278
279    /// Subscribe to a topic, yielding typed events deserialized as `T`.
280    ///
281    /// # Errors
282    ///
283    /// Returns [`PubSubError::Config`] if the tenant ID cannot be fetched from userinfo.
284    pub async fn subscribe_typed<T>(
285        &self,
286        topic: &str,
287        replay: ReplayPreset,
288    ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<T>>> + Send>>>
289    where
290        T: DeserializeOwned + Send + 'static,
291    {
292        let tenant_id = self.get_tenant_id().await?.to_string();
293        Ok(subscribe_typed_dynamic(
294            Arc::clone(&self.session),
295            self.config.clone(),
296            self.schema_cache.clone(),
297            self.channel.clone(),
298            topic.to_string(),
299            replay,
300            tenant_id,
301        ))
302    }
303
304    /// Open a bidirectional streaming `PublishStream` RPC and return a [`PublishSink`].
305    ///
306    /// The returned sink allows callers to send multiple batches of events to
307    /// `topic` without the per-call overhead of the unary [`Self::publish`] RPC.
308    /// The server streams back [`PublishResponse`] acknowledgements, which are
309    /// accessible via [`PublishSink::responses`].
310    ///
311    /// # Errors
312    ///
313    /// - [`PubSubError::Config`] if the tenant ID cannot be fetched.
314    /// - [`PubSubError::Transport`] if the gRPC stream cannot be opened.
315    pub async fn publish_stream<T: Serialize + Send + 'static>(
316        &self,
317        topic: &str,
318    ) -> Result<PublishSink<T>> {
319        let token = self.session.token_manager().token().await?;
320        let tenant_id = self.get_tenant_id().await?.to_string();
321
322        open_publish_stream(
323            Arc::clone(&self.session),
324            self.channel.clone(),
325            self.schema_cache.clone(),
326            tenant_id,
327            topic.to_string(),
328            &token,
329        )
330        .await
331    }
332}