1use std::pin::Pin;
4use std::sync::Arc;
5use tonic::transport::{Channel, ClientTlsConfig};
6
7use force::auth::Authenticator;
8use force::session::Session;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use tokio::sync::OnceCell;
13use tokio_stream::Stream;
14
15use crate::config::{PubSubConfig, ReplayPreset};
16use crate::error::{PubSubError, Result};
17use crate::interceptor;
18use crate::publish_sink::{PublishSink, open_publish_stream};
19use crate::publisher::publish_unary;
20use crate::schema_cache::SchemaCache;
21use crate::subscriber::{subscribe_dynamic, subscribe_typed_dynamic};
22use crate::types::{PubSubEvent, PublishResponse};
23
24use crate::proto::eventbus_v1::{SchemaRequest, TopicRequest, pub_sub_client::PubSubClient};
25
26#[derive(serde::Deserialize)]
28struct UserInfo {
29 organization_id: String,
30}
31
32async fn fetch_tenant_id<A: Authenticator>(session: &Arc<Session<A>>) -> Result<String> {
36 let token = session.token_manager().token().await?;
37 let userinfo_url = format!("{}/services/oauth2/userinfo", token.instance_url());
38
39 let resp = reqwest::Client::new()
40 .get(&userinfo_url)
41 .bearer_auth(token.as_str())
42 .send()
43 .await
44 .map_err(|e| PubSubError::Config(format!("userinfo request failed: {e}")))?;
45
46 if !resp.status().is_success() {
47 return Err(PubSubError::Config(format!(
48 "userinfo returned status {}",
49 resp.status()
50 )));
51 }
52
53 let body = force::http::read_capped_body(resp, 1024 * 1024)
54 .await
55 .map_err(|e| PubSubError::Config(format!("userinfo parse failed: {e}")))?;
56
57 let info: UserInfo = serde_json::from_str(&body)
58 .map_err(|e| PubSubError::Config(format!("userinfo parse failed: {e}")))?;
59
60 Ok(info.organization_id)
61}
62
63#[derive(Debug, Clone)]
65pub struct TopicInfo {
66 pub topic_name: String,
68 pub topic_uri: String,
70 pub can_publish: bool,
72 pub can_subscribe: bool,
74 pub schema_id: String,
76}
77
78#[derive(Debug, Clone)]
80pub struct SchemaInfo {
81 pub schema_id: String,
83 pub schema_json: String,
85}
86
87#[derive(Clone)]
92pub struct PubSubHandler<A: Authenticator> {
93 pub(crate) session: Arc<Session<A>>,
94 pub(crate) config: PubSubConfig,
96 pub schema_cache: SchemaCache,
98 pub(crate) channel: Channel,
99 tenant_id: Arc<OnceCell<String>>,
103}
104
105impl<A: Authenticator> PubSubHandler<A> {
106 pub async fn connect(session: Arc<Session<A>>, config: PubSubConfig) -> Result<Self> {
116 if config.batch_size < 1 || config.batch_size > 100 {
117 return Err(PubSubError::Config(
118 "batch_size must be between 1 and 100".to_string(),
119 ));
120 }
121
122 let endpoint = Channel::from_shared(config.endpoint.clone())
123 .map_err(|e| PubSubError::Config(format!("invalid endpoint: {e}")))?;
124 let endpoint = if endpoint.uri().scheme_str() == Some("https") {
125 endpoint.tls_config(ClientTlsConfig::new().with_webpki_roots())?
126 } else {
127 endpoint
128 };
129 let channel = endpoint.connect().await?;
130
131 Ok(Self {
132 session,
133 config,
134 schema_cache: SchemaCache::new(),
135 channel,
136 tenant_id: Arc::new(OnceCell::new()),
137 })
138 }
139
140 fn grpc_client(&self) -> PubSubClient<Channel> {
142 PubSubClient::new(self.channel.clone())
143 }
144
145 pub(crate) async fn get_tenant_id(&self) -> Result<&str> {
155 self.tenant_id
156 .get_or_try_init(|| fetch_tenant_id(&self.session))
157 .await
158 .map(String::as_str)
159 }
160
161 async fn auth_request<T>(&self, message: T) -> Result<tonic::Request<T>> {
166 let token = self.session.token_manager().token().await?;
167 let tenant_id = self.get_tenant_id().await?.to_string();
168 let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
169 let mut req = tonic::Request::new(message);
170 *req.metadata_mut() = meta;
171 Ok(req)
172 }
173
174 pub async fn get_topic(&self, topic_name: &str) -> Result<TopicInfo> {
180 let req = self
181 .auth_request(TopicRequest {
182 topic_name: topic_name.to_string(),
183 })
184 .await?;
185
186 let resp = self.grpc_client().get_topic(req).await?;
187 let info = resp.into_inner();
188 Ok(TopicInfo {
189 topic_name: info.topic_name,
190 topic_uri: info.topic_uri,
191 can_publish: info.can_publish,
192 can_subscribe: info.can_subscribe,
193 schema_id: info.schema_id,
194 })
195 }
196
197 pub async fn get_schema(&self, schema_id: &str) -> Result<SchemaInfo> {
206 let req = self
207 .auth_request(SchemaRequest {
208 schema_id: schema_id.to_string(),
209 })
210 .await?;
211
212 let resp = self.grpc_client().get_schema(req).await?;
213 let info = resp.into_inner();
214 Ok(SchemaInfo {
215 schema_id: info.schema_id,
216 schema_json: info.schema_json,
217 })
218 }
219}
220
221impl<A: Authenticator + Send + Sync + 'static> PubSubHandler<A> {
222 pub async fn publish<T: Serialize + Send>(
233 &self,
234 topic: &str,
235 events: Vec<T>,
236 ) -> Result<PublishResponse> {
237 let topic_info = self.get_topic(topic).await?;
238 let schema_id = &topic_info.schema_id;
239
240 let token = self.session.token_manager().token().await?;
242 let tenant_id = self.get_tenant_id().await?.to_string();
243 let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
244 self.schema_cache
245 .get_or_fetch(schema_id, &self.channel, meta)
246 .await?;
247
248 publish_unary(
249 &self.session,
250 &self.channel,
251 &self.schema_cache,
252 schema_id,
253 topic,
254 events,
255 &tenant_id,
256 )
257 .await
258 }
259
260 pub async fn subscribe(
269 &self,
270 topic: &str,
271 replay: ReplayPreset,
272 ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<Value>>> + Send>>> {
273 let tenant_id = self.get_tenant_id().await?.to_string();
274 Ok(subscribe_dynamic(
275 Arc::clone(&self.session),
276 self.config.clone(),
277 self.schema_cache.clone(),
278 self.channel.clone(),
279 topic.to_string(),
280 replay,
281 tenant_id,
282 ))
283 }
284
285 pub async fn subscribe_typed<T>(
291 &self,
292 topic: &str,
293 replay: ReplayPreset,
294 ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<T>>> + Send>>>
295 where
296 T: DeserializeOwned + Send + 'static,
297 {
298 let tenant_id = self.get_tenant_id().await?.to_string();
299 Ok(subscribe_typed_dynamic(
300 Arc::clone(&self.session),
301 self.config.clone(),
302 self.schema_cache.clone(),
303 self.channel.clone(),
304 topic.to_string(),
305 replay,
306 tenant_id,
307 ))
308 }
309
310 pub async fn publish_stream<T: Serialize + Send + 'static>(
322 &self,
323 topic: &str,
324 ) -> Result<PublishSink<T>> {
325 let token = self.session.token_manager().token().await?;
326 let tenant_id = self.get_tenant_id().await?.to_string();
327
328 open_publish_stream(
329 Arc::clone(&self.session),
330 self.channel.clone(),
331 self.schema_cache.clone(),
332 tenant_id,
333 topic.to_string(),
334 &token,
335 )
336 .await
337 }
338}