1#[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::env;
5use std::fmt;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, LazyLock, Mutex};
8
9use async_lock::OnceCell;
10use serde_json::Value;
11
12use crate::app;
13use crate::app::FirebaseApp;
14use crate::app_check::FirebaseAppCheckInternal;
15use crate::auth::Auth;
16use crate::component::types::{
17 ComponentError, DynService, InstanceFactoryOptions, InstantiationMode,
18};
19use crate::component::{Component, ComponentType, Provider};
20use crate::data_connect::config::{
21 parse_transport_options, ConnectorConfig, DataConnectOptions, TransportOptions,
22};
23use crate::data_connect::constants::DATA_CONNECT_COMPONENT_NAME;
24use crate::data_connect::error::{
25 internal_error, DataConnectError, DataConnectErrorCode, DataConnectResult,
26};
27use crate::data_connect::mutation::MutationManager;
28use crate::data_connect::query::{
29 cache_from_serialized, QueryManager, QuerySubscriptionHandle, QuerySubscriptionHandlers,
30};
31use crate::data_connect::reference::{
32 MutationRef, OperationRef, OperationType, QueryRef, QueryResult, SerializedQuerySnapshot,
33};
34use crate::data_connect::transport::{
35 AppCheckHeaders, CallerSdkType, DataConnectTransport, RequestTokenProvider, RestTransport,
36};
37
38const EMULATOR_ENV: &str = "FIREBASE_DATA_CONNECT_EMULATOR_HOST";
39
40static DATA_CONNECT_CACHE: LazyLock<Mutex<HashMap<(String, String), Arc<DataConnectService>>>> =
41 LazyLock::new(|| Mutex::new(HashMap::new()));
42
43#[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
44thread_local! {
45 static QUERY_MANAGER_CACHE: RefCell<HashMap<usize, QueryManager>> = RefCell::new(HashMap::new());
46}
47
48#[cfg(not(all(target_arch = "wasm32", feature = "wasm-web")))]
49static QUERY_MANAGER_CACHE: LazyLock<Mutex<HashMap<usize, QueryManager>>> =
50 LazyLock::new(|| Mutex::new(HashMap::new()));
51
52#[derive(Clone)]
54pub struct DataConnectService {
55 inner: Arc<DataConnectInner>,
56}
57
58pub struct DataConnectQueryRuntime {
61 service: Arc<DataConnectService>,
62 manager: QueryManager,
63 released: bool,
64}
65
66impl fmt::Debug for DataConnectService {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct("DataConnectService")
69 .field("app", &self.app().name())
70 .field("connector", &self.inner.options.connector.identifier())
71 .finish()
72 }
73}
74
75struct DataConnectInner {
76 app: FirebaseApp,
77 options: DataConnectOptions,
78 auth_provider: Provider,
79 app_check_provider: Provider,
80 transport: OnceCell<Arc<dyn DataConnectTransport>>,
81 mutation_manager: OnceCell<MutationManager>,
82 transport_override: Mutex<Option<TransportOptions>>,
83 generated_sdk: AtomicBool,
84 caller_sdk_type: Mutex<CallerSdkType>,
85}
86
87impl DataConnectService {
88 fn new(
89 app: FirebaseApp,
90 options: DataConnectOptions,
91 auth_provider: Provider,
92 app_check_provider: Provider,
93 env_override: Option<TransportOptions>,
94 ) -> Self {
95 Self {
96 inner: Arc::new(DataConnectInner {
97 app,
98 options,
99 auth_provider,
100 app_check_provider,
101 transport: OnceCell::new(),
102 mutation_manager: OnceCell::new(),
103 transport_override: Mutex::new(env_override),
104 generated_sdk: AtomicBool::new(false),
105 caller_sdk_type: Mutex::new(CallerSdkType::Base),
106 }),
107 }
108 }
109
110 pub fn app(&self) -> &FirebaseApp {
112 &self.inner.app
113 }
114
115 pub fn options(&self) -> DataConnectOptions {
117 self.inner.options.clone()
118 }
119
120 pub fn set_generated_sdk_mode(&self, enabled: bool) {
122 self.inner.generated_sdk.store(enabled, Ordering::SeqCst);
123 if let Some(transport) = self.inner.transport.get() {
124 transport.set_generated_sdk(enabled);
125 }
126 }
127
128 pub fn set_caller_sdk_type(&self, caller: CallerSdkType) {
130 *self.inner.caller_sdk_type.lock().unwrap() = caller.clone();
131 if let Some(transport) = self.inner.transport.get() {
132 transport.set_caller_sdk_type(caller);
133 }
134 }
135
136 pub async fn query_runtime(&self) -> DataConnectResult<DataConnectQueryRuntime> {
140 let service = Arc::new(self.clone());
141 DataConnectQueryRuntime::new(service).await
142 }
143
144 pub fn connect_emulator(
146 &self,
147 host: &str,
148 port: Option<u16>,
149 ssl_enabled: bool,
150 ) -> DataConnectResult<()> {
151 let override_options = TransportOptions::new(host, port, ssl_enabled);
152 {
153 let mut guard = self.inner.transport_override.lock().unwrap();
154 if let Some(existing) = guard.as_ref() {
155 if existing != &override_options {
156 return Err(DataConnectError::new(
157 DataConnectErrorCode::AlreadyInitialized,
158 "Data Connect instance already initialized",
159 ));
160 }
161 return Ok(());
162 }
163 *guard = Some(override_options.clone());
164 }
165
166 if let Some(transport) = self.inner.transport.get() {
167 transport.use_emulator(override_options);
168 }
169 Ok(())
170 }
171
172 async fn transport(&self) -> DataConnectResult<Arc<dyn DataConnectTransport>> {
173 self.inner
174 .transport
175 .get_or_try_init(|| async {
176 let token_provider = Arc::new(TokenBroker::new(
177 self.inner.auth_provider.clone(),
178 self.inner.app_check_provider.clone(),
179 ));
180 let firebase_options = self.inner.app.options();
181 let transport = RestTransport::new(
182 self.inner.options.clone(),
183 firebase_options.api_key,
184 firebase_options.app_id,
185 token_provider,
186 )?;
187 if let Some(override_options) =
188 self.inner.transport_override.lock().unwrap().clone()
189 {
190 transport.use_emulator(override_options);
191 }
192 transport.set_generated_sdk(self.inner.generated_sdk.load(Ordering::SeqCst));
193 transport.set_caller_sdk_type(self.inner.caller_sdk_type.lock().unwrap().clone());
194 Ok(Arc::new(transport) as Arc<dyn DataConnectTransport>)
195 })
196 .await
197 .cloned()
198 }
199
200 async fn mutation_manager(&self) -> DataConnectResult<MutationManager> {
201 let transport = self.transport().await?;
202 self.inner
203 .mutation_manager
204 .get_or_try_init(|| async { Ok(MutationManager::new(transport.clone())) })
205 .await
206 .cloned()
207 }
208}
209
210impl DataConnectQueryRuntime {
211 async fn new(service: Arc<DataConnectService>) -> DataConnectResult<Self> {
212 let manager = query_manager_for_service(&service).await?;
213 Ok(Self {
214 service,
215 manager,
216 released: false,
217 })
218 }
219
220 pub fn service(&self) -> &Arc<DataConnectService> {
222 &self.service
223 }
224
225 pub async fn execute_query(&self, query_ref: &QueryRef) -> DataConnectResult<QueryResult> {
227 self.manager.execute_query(query_ref.clone()).await
228 }
229
230 pub async fn subscribe(
232 &self,
233 query_ref: QueryRef,
234 handlers: QuerySubscriptionHandlers,
235 initial_cache: Option<SerializedQuerySnapshot>,
236 ) -> DataConnectResult<QuerySubscriptionHandle> {
237 let cache = initial_cache
238 .as_ref()
239 .and_then(|snapshot| cache_from_serialized(snapshot));
240 self.manager.subscribe(query_ref, handlers, cache)
241 }
242
243 pub fn close(mut self) {
245 self.release();
246 }
247
248 fn release(&mut self) {
249 if !self.released {
250 release_query_manager(&self.service);
251 self.released = true;
252 }
253 }
254}
255
256impl Drop for DataConnectQueryRuntime {
257 fn drop(&mut self) {
258 self.release();
259 }
260}
261
262fn service_cache_key(service: &Arc<DataConnectService>) -> usize {
263 Arc::as_ptr(&service.inner) as usize
264}
265
266async fn query_manager_for_service(
267 service: &Arc<DataConnectService>,
268) -> DataConnectResult<QueryManager> {
269 let key = service_cache_key(service);
270
271 #[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
272 {
273 if let Some(manager) = QUERY_MANAGER_CACHE.with(|cache| cache.borrow().get(&key).cloned()) {
274 return Ok(manager);
275 }
276
277 let transport = service.transport().await?;
278 let manager = QueryManager::new(transport);
279 QUERY_MANAGER_CACHE.with(|cache| {
280 cache.borrow_mut().insert(key, manager.clone());
281 });
282 Ok(manager)
283 }
284
285 #[cfg(not(all(target_arch = "wasm32", feature = "wasm-web")))]
286 {
287 if let Some(manager) = QUERY_MANAGER_CACHE.lock().unwrap().get(&key).cloned() {
288 return Ok(manager);
289 }
290
291 let transport = service.transport().await?;
292 let manager = QueryManager::new(transport);
293 QUERY_MANAGER_CACHE
294 .lock()
295 .unwrap()
296 .insert(key, manager.clone());
297 Ok(manager)
298 }
299}
300
301fn release_query_manager(service: &Arc<DataConnectService>) {
302 let key = service_cache_key(service);
303
304 #[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
305 QUERY_MANAGER_CACHE.with(|cache| {
306 cache.borrow_mut().remove(&key);
307 });
308
309 #[cfg(not(all(target_arch = "wasm32", feature = "wasm-web")))]
310 {
311 QUERY_MANAGER_CACHE.lock().unwrap().remove(&key);
312 }
313}
314
315pub fn query_ref(
317 service: Arc<DataConnectService>,
318 operation_name: impl Into<String>,
319 variables: Value,
320) -> QueryRef {
321 QueryRef(OperationRef {
322 service,
323 name: Arc::from(operation_name.into()),
324 variables,
325 op_type: OperationType::Query,
326 })
327}
328
329pub fn mutation_ref(
331 service: Arc<DataConnectService>,
332 operation_name: impl Into<String>,
333 variables: Value,
334) -> MutationRef {
335 MutationRef(OperationRef {
336 service,
337 name: Arc::from(operation_name.into()),
338 variables,
339 op_type: OperationType::Mutation,
340 })
341}
342
343pub async fn execute_query(query_ref: &QueryRef) -> DataConnectResult<QueryResult> {
345 query_manager_for_service(query_ref.service())
346 .await?
347 .execute_query(query_ref.clone())
348 .await
349}
350
351pub async fn execute_mutation(
353 mutation_ref: &MutationRef,
354) -> DataConnectResult<crate::data_connect::reference::MutationResult> {
355 mutation_ref
356 .service()
357 .mutation_manager()
358 .await?
359 .execute_mutation(mutation_ref.clone())
360 .await
361}
362
363pub async fn subscribe(
365 query_ref: QueryRef,
366 handlers: QuerySubscriptionHandlers,
367 initial_cache: Option<SerializedQuerySnapshot>,
368) -> DataConnectResult<QuerySubscriptionHandle> {
369 let cache = initial_cache
370 .as_ref()
371 .and_then(|snapshot| cache_from_serialized(snapshot));
372 query_manager_for_service(query_ref.service())
373 .await?
374 .subscribe(query_ref, handlers, cache)
375}
376
377pub async fn to_query_ref(snapshot: SerializedQuerySnapshot) -> DataConnectResult<QueryRef> {
379 let service =
380 get_data_connect_service(None, snapshot.ref_info.connector_config.connector.clone())
381 .await?;
382 Ok(query_ref(
383 service,
384 snapshot.ref_info.name,
385 snapshot.ref_info.variables,
386 ))
387}
388
389pub fn connect_data_connect_emulator(
391 service: &DataConnectService,
392 host: &str,
393 port: Option<u16>,
394 ssl_enabled: bool,
395) -> DataConnectResult<()> {
396 service.connect_emulator(host, port, ssl_enabled)
397}
398
399pub fn register_data_connect_component() {
400 ensure_registered();
401}
402
403fn ensure_registered() {
404 static REGISTERED: LazyLock<()> = LazyLock::new(|| {
405 let component = Component::new(
406 DATA_CONNECT_COMPONENT_NAME,
407 Arc::new(data_connect_factory),
408 ComponentType::Public,
409 )
410 .with_instantiation_mode(InstantiationMode::Lazy)
411 .with_multiple_instances(true);
412 let _ = app::register_component(component);
413 });
414 LazyLock::force(®ISTERED);
415}
416
417fn data_connect_factory(
418 container: &crate::component::ComponentContainer,
419 options: InstanceFactoryOptions,
420) -> Result<DynService, ComponentError> {
421 let app = container.root_service::<FirebaseApp>().ok_or_else(|| {
422 ComponentError::InitializationFailed {
423 name: DATA_CONNECT_COMPONENT_NAME.to_string(),
424 reason: "Firebase app not attached to component container".to_string(),
425 }
426 })?;
427
428 let connector_config = if !options.options.is_null() {
429 serde_json::from_value::<ConnectorConfig>(options.options.clone()).map_err(|err| {
430 ComponentError::InitializationFailed {
431 name: DATA_CONNECT_COMPONENT_NAME.to_string(),
432 reason: format!("invalid connector config: {err}"),
433 }
434 })?
435 } else if let Some(identifier) = options.instance_identifier.as_deref() {
436 serde_json::from_str::<ConnectorConfig>(identifier).map_err(|err| {
437 ComponentError::InitializationFailed {
438 name: DATA_CONNECT_COMPONENT_NAME.to_string(),
439 reason: format!("invalid connector identifier: {err}"),
440 }
441 })?
442 } else {
443 return Err(ComponentError::InitializationFailed {
444 name: DATA_CONNECT_COMPONENT_NAME.to_string(),
445 reason: "connector config required".to_string(),
446 });
447 };
448
449 let project_id =
450 app.options()
451 .project_id
452 .clone()
453 .ok_or_else(|| ComponentError::InitializationFailed {
454 name: DATA_CONNECT_COMPONENT_NAME.to_string(),
455 reason: "project ID must be configured on Firebase options".to_string(),
456 })?;
457 let options = DataConnectOptions::new(connector_config.clone(), project_id).map_err(|err| {
458 ComponentError::InitializationFailed {
459 name: DATA_CONNECT_COMPONENT_NAME.to_string(),
460 reason: err.to_string(),
461 }
462 })?;
463
464 let env_override =
465 emulator_override_from_env().map_err(|err| ComponentError::InitializationFailed {
466 name: DATA_CONNECT_COMPONENT_NAME.to_string(),
467 reason: err.to_string(),
468 })?;
469
470 let auth_provider = container.get_provider("auth-internal");
471 let app_check_provider = container.get_provider("app-check-internal");
472 let service = Arc::new(DataConnectService::new(
473 (*app).clone(),
474 options,
475 auth_provider,
476 app_check_provider,
477 env_override,
478 ));
479 Ok(service as DynService)
480}
481
482fn emulator_override_from_env() -> DataConnectResult<Option<TransportOptions>> {
483 match env::var(EMULATOR_ENV) {
484 Ok(value) => parse_transport_options(&value).map(Some),
485 Err(_) => Ok(None),
486 }
487}
488
489pub async fn get_data_connect_service(
491 app: Option<FirebaseApp>,
492 config: ConnectorConfig,
493) -> DataConnectResult<Arc<DataConnectService>> {
494 ensure_registered();
495 let app = match app {
496 Some(app) => app,
497 None => crate::app::get_app(None)
498 .await
499 .map_err(|err| internal_error(err.to_string()))?,
500 };
501
502 let cache_key = (app.name().to_string(), config.identifier());
503 if let Some(service) = DATA_CONNECT_CACHE.lock().unwrap().get(&cache_key).cloned() {
504 return Ok(service);
505 }
506
507 let provider = app::get_provider(&app, DATA_CONNECT_COMPONENT_NAME);
508 let identifier = config.identifier();
509 if let Some(service) = provider
510 .get_immediate_with_options::<DataConnectService>(Some(&identifier), true)
511 .unwrap_or(None)
512 {
513 DATA_CONNECT_CACHE
514 .lock()
515 .unwrap()
516 .insert(cache_key, service.clone());
517 return Ok(service);
518 }
519
520 let options_value =
521 serde_json::to_value(&config).map_err(|err| internal_error(err.to_string()))?;
522 match provider.initialize::<DataConnectService>(options_value, Some(&identifier)) {
523 Ok(service) => {
524 DATA_CONNECT_CACHE
525 .lock()
526 .unwrap()
527 .insert(cache_key, service.clone());
528 Ok(service)
529 }
530 Err(ComponentError::InstanceUnavailable { .. }) => provider
531 .get_immediate_with_options::<DataConnectService>(Some(&identifier), true)
532 .unwrap_or(None)
533 .ok_or_else(|| internal_error("Data Connect instance unavailable"))
534 .map(|service| {
535 DATA_CONNECT_CACHE
536 .lock()
537 .unwrap()
538 .insert(cache_key, service.clone());
539 service
540 }),
541 Err(err) => Err(internal_error(err.to_string())),
542 }
543}
544
545struct TokenBroker {
546 auth_provider: Provider,
547 app_check_provider: Provider,
548}
549
550impl TokenBroker {
551 fn new(auth_provider: Provider, app_check_provider: Provider) -> Self {
552 Self {
553 auth_provider,
554 app_check_provider,
555 }
556 }
557}
558
559#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
560#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
561impl RequestTokenProvider for TokenBroker {
562 async fn auth_token(&self) -> DataConnectResult<Option<String>> {
563 let auth = match self
564 .auth_provider
565 .get_immediate_with_options::<Auth>(None, true)
566 {
567 Ok(Some(auth)) => auth,
568 Ok(None) => return Ok(None),
569 Err(err) => {
570 return Err(internal_error(format!(
571 "failed to resolve auth provider: {err}"
572 )))
573 }
574 };
575
576 auth.get_token(false)
577 .await
578 .map_err(|err| internal_error(err.to_string()))
579 }
580
581 async fn app_check_headers(&self) -> DataConnectResult<Option<AppCheckHeaders>> {
582 let app_check = match self
583 .app_check_provider
584 .get_immediate_with_options::<FirebaseAppCheckInternal>(None, true)
585 {
586 Ok(Some(app_check)) => app_check,
587 Ok(None) => return Ok(None),
588 Err(err) => {
589 return Err(internal_error(format!(
590 "failed to resolve app check provider: {err}"
591 )))
592 }
593 };
594
595 let token = match app_check.get_token(false).await {
596 Ok(result) => result.token,
597 Err(err) => {
598 if let Some(cached) = err.cached_token() {
599 cached.token.clone()
600 } else {
601 return Err(internal_error(format!(
602 "failed to obtain App Check token: {err}"
603 )));
604 }
605 }
606 };
607
608 if token.is_empty() {
609 return Ok(None);
610 }
611
612 let heartbeat = app_check
613 .heartbeat_header()
614 .await
615 .map_err(|err| internal_error(err.to_string()))?;
616 Ok(Some(AppCheckHeaders { token, heartbeat }))
617 }
618}
619
620#[cfg(all(test, not(target_arch = "wasm32")))]
621mod tests {
622 use super::*;
623 use crate::app::initialize_app;
624 use crate::app::{FirebaseAppSettings, FirebaseOptions};
625 use httpmock::prelude::*;
626 use serde_json::json;
627 use std::sync::atomic::{AtomicUsize, Ordering};
628 use std::sync::{Arc, Mutex as StdMutex};
629 use tokio::sync::oneshot;
630
631 fn unique_settings(prefix: &str) -> FirebaseAppSettings {
632 static COUNTER: AtomicUsize = AtomicUsize::new(0);
633 FirebaseAppSettings {
634 name: Some(format!(
635 "{prefix}-{}",
636 COUNTER.fetch_add(1, Ordering::SeqCst)
637 )),
638 ..Default::default()
639 }
640 }
641
642 fn base_options() -> FirebaseOptions {
643 FirebaseOptions {
644 project_id: Some("demo-project".into()),
645 api_key: Some("demo-key".into()),
646 ..Default::default()
647 }
648 }
649
650 fn clear_caches() {
651 DATA_CONNECT_CACHE.lock().unwrap().clear();
652 QUERY_MANAGER_CACHE.lock().unwrap().clear();
653 }
654
655 #[tokio::test(flavor = "current_thread")]
656 async fn execute_query_hits_emulator() {
657 clear_caches();
658 let app = initialize_app(base_options(), Some(unique_settings("dc-query")))
659 .await
660 .unwrap();
661 let server = MockServer::start();
662 let mock = server.mock(|when, then| {
663 when.method(POST).path(
664 "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
665 );
666 then.status(200)
667 .header("content-type", "application/json")
668 .json_body(json!({
669 "data": {"items": [{"id": "123"}]}
670 }));
671 });
672
673 let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
674 let service = get_data_connect_service(Some(app.clone()), config)
675 .await
676 .unwrap();
677 let host = server.host();
678 service
679 .connect_emulator(&host, Some(server.port()), false)
680 .unwrap();
681
682 let query = query_ref(service, "ListItems", Value::Null);
683 let result = execute_query(&query).await.unwrap();
684 assert_eq!(result.data["items"][0]["id"], "123");
685 mock.assert();
686 }
687
688 #[tokio::test(flavor = "current_thread")]
689 async fn execute_mutation_hits_emulator() {
690 clear_caches();
691 let app = initialize_app(base_options(), Some(unique_settings("dc-mutation")))
692 .await
693 .unwrap();
694 let server = MockServer::start();
695 let mock = server.mock(|when, then| {
696 when.method(POST).path(
697 "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeMutation",
698 );
699 then.status(200)
700 .header("content-type", "application/json")
701 .json_body(json!({
702 "data": {"insertBook": {"id": "321"}}
703 }));
704 });
705
706 let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
707 let service = get_data_connect_service(Some(app.clone()), config)
708 .await
709 .unwrap();
710 let host = server.host();
711 service
712 .connect_emulator(&host, Some(server.port()), false)
713 .unwrap();
714
715 let mutation = mutation_ref(service, "InsertBook", json!({"id": "321"}));
716 let result = execute_mutation(&mutation).await.unwrap();
717 assert_eq!(result.data["insertBook"]["id"], "321");
718 mock.assert();
719 }
720
721 #[tokio::test(flavor = "current_thread")]
722 async fn subscribe_with_initial_cache_invokes_handler() {
723 clear_caches();
724 let app = initialize_app(base_options(), Some(unique_settings("dc-subscribe")))
725 .await
726 .unwrap();
727 let server = MockServer::start();
728 let mock = server.mock(|when, then| {
729 when.method(POST).path(
730 "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
731 );
732 then.status(200)
733 .header("content-type", "application/json")
734 .json_body(json!({
735 "data": {"items": [{"id": "cached"}]}
736 }));
737 });
738
739 let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
740 let service = get_data_connect_service(Some(app.clone()), config)
741 .await
742 .unwrap();
743 let host = server.host();
744 service
745 .connect_emulator(&host, Some(server.port()), false)
746 .unwrap();
747
748 let query = query_ref(service.clone(), "ListItems", Value::Null);
749 let snapshot = execute_query(&query).await.unwrap().to_serialized();
750 mock.assert();
751
752 let query = query_ref(service, "ListItems", Value::Null);
753 let (tx, rx) = oneshot::channel();
754 let sender = Arc::new(StdMutex::new(Some(tx)));
755 let handlers = QuerySubscriptionHandlers::new(Arc::new(move |result: &QueryResult| {
756 if let Some(tx) = sender.lock().unwrap().take() {
757 let _ = tx.send(result.data.clone());
758 }
759 }));
760
761 let handle = subscribe(query, handlers, Some(snapshot)).await.unwrap();
762 let data = rx.await.unwrap();
763 assert_eq!(data["items"][0]["id"], "cached");
764 drop(handle);
765 }
766
767 #[tokio::test(flavor = "current_thread")]
768 async fn emitter_cannot_change_after_initialization() {
769 clear_caches();
770 let app = initialize_app(base_options(), Some(unique_settings("dc-emulator")))
771 .await
772 .unwrap();
773 let server = MockServer::start();
774 let mock = server.mock(|when, then| {
775 when.method(POST).path(
776 "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
777 );
778 then.status(200)
779 .header("content-type", "application/json")
780 .json_body(json!({ "data": {"ok": true} }));
781 });
782
783 let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
784 let service = get_data_connect_service(Some(app.clone()), config)
785 .await
786 .unwrap();
787 let host = server.host();
788 service
789 .connect_emulator(&host, Some(server.port()), false)
790 .unwrap();
791
792 let query = query_ref(service.clone(), "ListItems", Value::Null);
793 let _ = execute_query(&query).await.unwrap();
794 mock.assert();
795
796 let err = service
797 .connect_emulator("127.0.0.1", Some(9000), false)
798 .unwrap_err();
799 assert_eq!(err.code(), DataConnectErrorCode::AlreadyInitialized);
800 }
801
802 #[tokio::test(flavor = "current_thread")]
803 async fn query_runtime_executes_and_releases() {
804 clear_caches();
805 let app = initialize_app(base_options(), Some(unique_settings("dc-runtime")))
806 .await
807 .unwrap();
808 let server = MockServer::start();
809 let mock = server.mock(|when, then| {
810 when.method(POST).path(
811 "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
812 );
813 then.status(200)
814 .header("content-type", "application/json")
815 .json_body(json!({
816 "data": {"items": [{"id": "from-runtime"}]}
817 }));
818 });
819
820 let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
821 let service = get_data_connect_service(Some(app.clone()), config)
822 .await
823 .unwrap();
824 let host = server.host();
825 service
826 .connect_emulator(&host, Some(server.port()), false)
827 .unwrap();
828
829 let runtime = service.query_runtime().await.unwrap();
830 let query = query_ref(runtime.service().clone(), "ListItems", Value::Null);
831 let result = runtime.execute_query(&query).await.unwrap();
832 assert_eq!(result.data["items"][0]["id"], "from-runtime");
833 mock.assert();
834
835 let key = service_cache_key(runtime.service());
836 assert!(QUERY_MANAGER_CACHE.lock().unwrap().contains_key(&key));
837 runtime.close();
838 assert!(!QUERY_MANAGER_CACHE.lock().unwrap().contains_key(&key));
839
840 let runtime2 = service.query_runtime().await.unwrap();
842 let key2 = service_cache_key(runtime2.service());
843 assert!(QUERY_MANAGER_CACHE.lock().unwrap().contains_key(&key2));
844 drop(runtime2);
845 }
846}