1use std::pin::Pin;
4use std::sync::Arc;
5use tonic::transport::Channel;
6
7use force::auth::Authenticator;
8use force::session::Session;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value;
12use tokio::sync::OnceCell;
13use tokio_stream::Stream;
14
15use crate::config::{PubSubConfig, ReplayPreset};
16use crate::error::{PubSubError, Result};
17use crate::interceptor;
18use crate::publish_sink::{PublishSink, open_publish_stream};
19use crate::publisher::publish_unary;
20use crate::schema_cache::SchemaCache;
21use crate::subscriber::{subscribe_dynamic, subscribe_typed_dynamic};
22use crate::types::{PubSubEvent, PublishResponse};
23
24use crate::proto::eventbus_v1::{SchemaRequest, TopicRequest, pub_sub_client::PubSubClient};
25
26#[derive(serde::Deserialize)]
28struct UserInfo {
29 organization_id: String,
30}
31
32async fn fetch_tenant_id<A: Authenticator>(session: &Arc<Session<A>>) -> Result<String> {
36 let token = session.token_manager().token().await?;
37 let userinfo_url = format!("{}/services/oauth2/userinfo", token.instance_url());
38
39 let resp = reqwest::Client::new()
40 .get(&userinfo_url)
41 .bearer_auth(token.as_str())
42 .send()
43 .await
44 .map_err(|e| PubSubError::Config(format!("userinfo request failed: {e}")))?;
45
46 if !resp.status().is_success() {
47 return Err(PubSubError::Config(format!(
48 "userinfo returned status {}",
49 resp.status()
50 )));
51 }
52
53 let info: UserInfo = resp
54 .json()
55 .await
56 .map_err(|e| PubSubError::Config(format!("userinfo parse failed: {e}")))?;
57
58 Ok(info.organization_id)
59}
60
61#[derive(Debug, Clone)]
63pub struct TopicInfo {
64 pub topic_name: String,
66 pub topic_uri: String,
68 pub can_publish: bool,
70 pub can_subscribe: bool,
72 pub schema_id: String,
74}
75
76#[derive(Debug, Clone)]
78pub struct SchemaInfo {
79 pub schema_id: String,
81 pub schema_json: String,
83}
84
85#[derive(Clone)]
90pub struct PubSubHandler<A: Authenticator> {
91 pub(crate) session: Arc<Session<A>>,
92 pub(crate) config: PubSubConfig,
94 pub schema_cache: SchemaCache,
96 pub(crate) channel: Channel,
97 tenant_id: Arc<OnceCell<String>>,
101}
102
103impl<A: Authenticator> PubSubHandler<A> {
104 pub async fn connect(session: Arc<Session<A>>, config: PubSubConfig) -> Result<Self> {
114 if config.batch_size < 1 || config.batch_size > 100 {
115 return Err(PubSubError::Config(
116 "batch_size must be between 1 and 100".to_string(),
117 ));
118 }
119
120 let channel = Channel::from_shared(config.endpoint.clone())
121 .map_err(|e| PubSubError::Config(format!("invalid endpoint: {e}")))?
122 .connect()
123 .await?;
124
125 Ok(Self {
126 session,
127 config,
128 schema_cache: SchemaCache::new(),
129 channel,
130 tenant_id: Arc::new(OnceCell::new()),
131 })
132 }
133
134 fn grpc_client(&self) -> PubSubClient<Channel> {
136 PubSubClient::new(self.channel.clone())
137 }
138
139 pub(crate) async fn get_tenant_id(&self) -> Result<&str> {
149 self.tenant_id
150 .get_or_try_init(|| fetch_tenant_id(&self.session))
151 .await
152 .map(String::as_str)
153 }
154
155 async fn auth_request<T>(&self, message: T) -> Result<tonic::Request<T>> {
160 let token = self.session.token_manager().token().await?;
161 let tenant_id = self.get_tenant_id().await?.to_string();
162 let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
163 let mut req = tonic::Request::new(message);
164 *req.metadata_mut() = meta;
165 Ok(req)
166 }
167
168 pub async fn get_topic(&self, topic_name: &str) -> Result<TopicInfo> {
174 let req = self
175 .auth_request(TopicRequest {
176 topic_name: topic_name.to_string(),
177 })
178 .await?;
179
180 let resp = self.grpc_client().get_topic(req).await?;
181 let info = resp.into_inner();
182 Ok(TopicInfo {
183 topic_name: info.topic_name,
184 topic_uri: info.topic_uri,
185 can_publish: info.can_publish,
186 can_subscribe: info.can_subscribe,
187 schema_id: info.schema_id,
188 })
189 }
190
191 pub async fn get_schema(&self, schema_id: &str) -> Result<SchemaInfo> {
200 let req = self
201 .auth_request(SchemaRequest {
202 schema_id: schema_id.to_string(),
203 })
204 .await?;
205
206 let resp = self.grpc_client().get_schema(req).await?;
207 let info = resp.into_inner();
208 Ok(SchemaInfo {
209 schema_id: info.schema_id,
210 schema_json: info.schema_json,
211 })
212 }
213}
214
215impl<A: Authenticator + Send + Sync + 'static> PubSubHandler<A> {
216 pub async fn publish<T: Serialize + Send>(
227 &self,
228 topic: &str,
229 events: Vec<T>,
230 ) -> Result<PublishResponse> {
231 let topic_info = self.get_topic(topic).await?;
232 let schema_id = &topic_info.schema_id;
233
234 let token = self.session.token_manager().token().await?;
236 let tenant_id = self.get_tenant_id().await?.to_string();
237 let meta = interceptor::build_metadata(&token, token.instance_url(), &tenant_id)?;
238 self.schema_cache
239 .get_or_fetch(schema_id, &self.channel, meta)
240 .await?;
241
242 publish_unary(
243 &self.session,
244 &self.channel,
245 &self.schema_cache,
246 schema_id,
247 topic,
248 events,
249 &tenant_id,
250 )
251 .await
252 }
253
254 pub async fn subscribe(
263 &self,
264 topic: &str,
265 replay: ReplayPreset,
266 ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<Value>>> + Send>>> {
267 let tenant_id = self.get_tenant_id().await?.to_string();
268 Ok(subscribe_dynamic(
269 Arc::clone(&self.session),
270 self.config.clone(),
271 self.schema_cache.clone(),
272 self.channel.clone(),
273 topic.to_string(),
274 replay,
275 tenant_id,
276 ))
277 }
278
279 pub async fn subscribe_typed<T>(
285 &self,
286 topic: &str,
287 replay: ReplayPreset,
288 ) -> Result<Pin<Box<dyn Stream<Item = Result<PubSubEvent<T>>> + Send>>>
289 where
290 T: DeserializeOwned + Send + 'static,
291 {
292 let tenant_id = self.get_tenant_id().await?.to_string();
293 Ok(subscribe_typed_dynamic(
294 Arc::clone(&self.session),
295 self.config.clone(),
296 self.schema_cache.clone(),
297 self.channel.clone(),
298 topic.to_string(),
299 replay,
300 tenant_id,
301 ))
302 }
303
304 pub async fn publish_stream<T: Serialize + Send + 'static>(
316 &self,
317 topic: &str,
318 ) -> Result<PublishSink<T>> {
319 let token = self.session.token_manager().token().await?;
320 let tenant_id = self.get_tenant_id().await?.to_string();
321
322 open_publish_stream(
323 Arc::clone(&self.session),
324 self.channel.clone(),
325 self.schema_cache.clone(),
326 tenant_id,
327 topic.to_string(),
328 &token,
329 )
330 .await
331 }
332}