firebase_rs_sdk/data_connect/
api.rs

1#[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::env;
5use std::fmt;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, LazyLock, Mutex};
8
9use async_lock::OnceCell;
10use serde_json::Value;
11
12use crate::app;
13use crate::app::FirebaseApp;
14use crate::app_check::FirebaseAppCheckInternal;
15use crate::auth::Auth;
16use crate::component::types::{
17    ComponentError, DynService, InstanceFactoryOptions, InstantiationMode,
18};
19use crate::component::{Component, ComponentType, Provider};
20use crate::data_connect::config::{
21    parse_transport_options, ConnectorConfig, DataConnectOptions, TransportOptions,
22};
23use crate::data_connect::constants::DATA_CONNECT_COMPONENT_NAME;
24use crate::data_connect::error::{
25    internal_error, DataConnectError, DataConnectErrorCode, DataConnectResult,
26};
27use crate::data_connect::mutation::MutationManager;
28use crate::data_connect::query::{
29    cache_from_serialized, QueryManager, QuerySubscriptionHandle, QuerySubscriptionHandlers,
30};
31use crate::data_connect::reference::{
32    MutationRef, OperationRef, OperationType, QueryRef, QueryResult, SerializedQuerySnapshot,
33};
34use crate::data_connect::transport::{
35    AppCheckHeaders, CallerSdkType, DataConnectTransport, RequestTokenProvider, RestTransport,
36};
37
38const EMULATOR_ENV: &str = "FIREBASE_DATA_CONNECT_EMULATOR_HOST";
39
40static DATA_CONNECT_CACHE: LazyLock<Mutex<HashMap<(String, String), Arc<DataConnectService>>>> =
41    LazyLock::new(|| Mutex::new(HashMap::new()));
42
43#[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
44thread_local! {
45    static QUERY_MANAGER_CACHE: RefCell<HashMap<usize, QueryManager>> = RefCell::new(HashMap::new());
46}
47
48#[cfg(not(all(target_arch = "wasm32", feature = "wasm-web")))]
49static QUERY_MANAGER_CACHE: LazyLock<Mutex<HashMap<usize, QueryManager>>> =
50    LazyLock::new(|| Mutex::new(HashMap::new()));
51
52/// Primary interface for Data Connect operations.
53#[derive(Clone)]
54pub struct DataConnectService {
55    inner: Arc<DataConnectInner>,
56}
57
58/// Owns the cached `QueryManager` for a service, allowing callers to explicitly control when
59/// observer state is created or discarded.
60pub struct DataConnectQueryRuntime {
61    service: Arc<DataConnectService>,
62    manager: QueryManager,
63    released: bool,
64}
65
66impl fmt::Debug for DataConnectService {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct("DataConnectService")
69            .field("app", &self.app().name())
70            .field("connector", &self.inner.options.connector.identifier())
71            .finish()
72    }
73}
74
75struct DataConnectInner {
76    app: FirebaseApp,
77    options: DataConnectOptions,
78    auth_provider: Provider,
79    app_check_provider: Provider,
80    transport: OnceCell<Arc<dyn DataConnectTransport>>,
81    mutation_manager: OnceCell<MutationManager>,
82    transport_override: Mutex<Option<TransportOptions>>,
83    generated_sdk: AtomicBool,
84    caller_sdk_type: Mutex<CallerSdkType>,
85}
86
87impl DataConnectService {
88    fn new(
89        app: FirebaseApp,
90        options: DataConnectOptions,
91        auth_provider: Provider,
92        app_check_provider: Provider,
93        env_override: Option<TransportOptions>,
94    ) -> Self {
95        Self {
96            inner: Arc::new(DataConnectInner {
97                app,
98                options,
99                auth_provider,
100                app_check_provider,
101                transport: OnceCell::new(),
102                mutation_manager: OnceCell::new(),
103                transport_override: Mutex::new(env_override),
104                generated_sdk: AtomicBool::new(false),
105                caller_sdk_type: Mutex::new(CallerSdkType::Base),
106            }),
107        }
108    }
109
110    /// Returns the Firebase app associated with this service.
111    pub fn app(&self) -> &FirebaseApp {
112        &self.inner.app
113    }
114
115    /// Returns the fully qualified connector options.
116    pub fn options(&self) -> DataConnectOptions {
117        self.inner.options.clone()
118    }
119
120    /// Marks this instance as being used by a generated SDK, updating telemetry headers.
121    pub fn set_generated_sdk_mode(&self, enabled: bool) {
122        self.inner.generated_sdk.store(enabled, Ordering::SeqCst);
123        if let Some(transport) = self.inner.transport.get() {
124            transport.set_generated_sdk(enabled);
125        }
126    }
127
128    /// Updates the caller SDK type for telemetry purposes.
129    pub fn set_caller_sdk_type(&self, caller: CallerSdkType) {
130        *self.inner.caller_sdk_type.lock().unwrap() = caller.clone();
131        if let Some(transport) = self.inner.transport.get() {
132            transport.set_caller_sdk_type(caller);
133        }
134    }
135
136    /// Builds a [`DataConnectQueryRuntime`] that keeps the query observer state alive for as long
137    /// as the handle is held. Dropping the runtime (or calling [`DataConnectQueryRuntime::close`])
138    /// releases the cached manager so a future runtime can start fresh.
139    pub async fn query_runtime(&self) -> DataConnectResult<DataConnectQueryRuntime> {
140        let service = Arc::new(self.clone());
141        DataConnectQueryRuntime::new(service).await
142    }
143
144    /// Routes subsequent requests to the specified emulator endpoint.
145    pub fn connect_emulator(
146        &self,
147        host: &str,
148        port: Option<u16>,
149        ssl_enabled: bool,
150    ) -> DataConnectResult<()> {
151        let override_options = TransportOptions::new(host, port, ssl_enabled);
152        {
153            let mut guard = self.inner.transport_override.lock().unwrap();
154            if let Some(existing) = guard.as_ref() {
155                if existing != &override_options {
156                    return Err(DataConnectError::new(
157                        DataConnectErrorCode::AlreadyInitialized,
158                        "Data Connect instance already initialized",
159                    ));
160                }
161                return Ok(());
162            }
163            *guard = Some(override_options.clone());
164        }
165
166        if let Some(transport) = self.inner.transport.get() {
167            transport.use_emulator(override_options);
168        }
169        Ok(())
170    }
171
172    async fn transport(&self) -> DataConnectResult<Arc<dyn DataConnectTransport>> {
173        self.inner
174            .transport
175            .get_or_try_init(|| async {
176                let token_provider = Arc::new(TokenBroker::new(
177                    self.inner.auth_provider.clone(),
178                    self.inner.app_check_provider.clone(),
179                ));
180                let firebase_options = self.inner.app.options();
181                let transport = RestTransport::new(
182                    self.inner.options.clone(),
183                    firebase_options.api_key,
184                    firebase_options.app_id,
185                    token_provider,
186                )?;
187                if let Some(override_options) =
188                    self.inner.transport_override.lock().unwrap().clone()
189                {
190                    transport.use_emulator(override_options);
191                }
192                transport.set_generated_sdk(self.inner.generated_sdk.load(Ordering::SeqCst));
193                transport.set_caller_sdk_type(self.inner.caller_sdk_type.lock().unwrap().clone());
194                Ok(Arc::new(transport) as Arc<dyn DataConnectTransport>)
195            })
196            .await
197            .cloned()
198    }
199
200    async fn mutation_manager(&self) -> DataConnectResult<MutationManager> {
201        let transport = self.transport().await?;
202        self.inner
203            .mutation_manager
204            .get_or_try_init(|| async { Ok(MutationManager::new(transport.clone())) })
205            .await
206            .cloned()
207    }
208}
209
210impl DataConnectQueryRuntime {
211    async fn new(service: Arc<DataConnectService>) -> DataConnectResult<Self> {
212        let manager = query_manager_for_service(&service).await?;
213        Ok(Self {
214            service,
215            manager,
216            released: false,
217        })
218    }
219
220    /// Returns the service associated with this runtime.
221    pub fn service(&self) -> &Arc<DataConnectService> {
222        &self.service
223    }
224
225    /// Executes the provided query reference using this runtime's cached manager.
226    pub async fn execute_query(&self, query_ref: &QueryRef) -> DataConnectResult<QueryResult> {
227        self.manager.execute_query(query_ref.clone()).await
228    }
229
230    /// Subscribes to a query using this runtime's cached manager.
231    pub async fn subscribe(
232        &self,
233        query_ref: QueryRef,
234        handlers: QuerySubscriptionHandlers,
235        initial_cache: Option<SerializedQuerySnapshot>,
236    ) -> DataConnectResult<QuerySubscriptionHandle> {
237        let cache = initial_cache
238            .as_ref()
239            .and_then(|snapshot| cache_from_serialized(snapshot));
240        self.manager.subscribe(query_ref, handlers, cache)
241    }
242
243    /// Releases the cached manager immediately. Dropping the runtime has the same effect.
244    pub fn close(mut self) {
245        self.release();
246    }
247
248    fn release(&mut self) {
249        if !self.released {
250            release_query_manager(&self.service);
251            self.released = true;
252        }
253    }
254}
255
256impl Drop for DataConnectQueryRuntime {
257    fn drop(&mut self) {
258        self.release();
259    }
260}
261
262fn service_cache_key(service: &Arc<DataConnectService>) -> usize {
263    Arc::as_ptr(&service.inner) as usize
264}
265
266async fn query_manager_for_service(
267    service: &Arc<DataConnectService>,
268) -> DataConnectResult<QueryManager> {
269    let key = service_cache_key(service);
270
271    #[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
272    {
273        if let Some(manager) = QUERY_MANAGER_CACHE.with(|cache| cache.borrow().get(&key).cloned()) {
274            return Ok(manager);
275        }
276
277        let transport = service.transport().await?;
278        let manager = QueryManager::new(transport);
279        QUERY_MANAGER_CACHE.with(|cache| {
280            cache.borrow_mut().insert(key, manager.clone());
281        });
282        Ok(manager)
283    }
284
285    #[cfg(not(all(target_arch = "wasm32", feature = "wasm-web")))]
286    {
287        if let Some(manager) = QUERY_MANAGER_CACHE.lock().unwrap().get(&key).cloned() {
288            return Ok(manager);
289        }
290
291        let transport = service.transport().await?;
292        let manager = QueryManager::new(transport);
293        QUERY_MANAGER_CACHE
294            .lock()
295            .unwrap()
296            .insert(key, manager.clone());
297        Ok(manager)
298    }
299}
300
301fn release_query_manager(service: &Arc<DataConnectService>) {
302    let key = service_cache_key(service);
303
304    #[cfg(all(target_arch = "wasm32", feature = "wasm-web"))]
305    QUERY_MANAGER_CACHE.with(|cache| {
306        cache.borrow_mut().remove(&key);
307    });
308
309    #[cfg(not(all(target_arch = "wasm32", feature = "wasm-web")))]
310    {
311        QUERY_MANAGER_CACHE.lock().unwrap().remove(&key);
312    }
313}
314
315/// Constructs a query reference for the specified operation name and variables.
316pub fn query_ref(
317    service: Arc<DataConnectService>,
318    operation_name: impl Into<String>,
319    variables: Value,
320) -> QueryRef {
321    QueryRef(OperationRef {
322        service,
323        name: Arc::from(operation_name.into()),
324        variables,
325        op_type: OperationType::Query,
326    })
327}
328
329/// Constructs a mutation reference for the specified operation name and variables.
330pub fn mutation_ref(
331    service: Arc<DataConnectService>,
332    operation_name: impl Into<String>,
333    variables: Value,
334) -> MutationRef {
335    MutationRef(OperationRef {
336        service,
337        name: Arc::from(operation_name.into()),
338        variables,
339        op_type: OperationType::Mutation,
340    })
341}
342
343/// Executes the provided query reference.
344pub async fn execute_query(query_ref: &QueryRef) -> DataConnectResult<QueryResult> {
345    query_manager_for_service(query_ref.service())
346        .await?
347        .execute_query(query_ref.clone())
348        .await
349}
350
351/// Executes the provided mutation reference.
352pub async fn execute_mutation(
353    mutation_ref: &MutationRef,
354) -> DataConnectResult<crate::data_connect::reference::MutationResult> {
355    mutation_ref
356        .service()
357        .mutation_manager()
358        .await?
359        .execute_mutation(mutation_ref.clone())
360        .await
361}
362
363/// Subscribes to a query reference, optionally hydrating from a serialized snapshot.
364pub async fn subscribe(
365    query_ref: QueryRef,
366    handlers: QuerySubscriptionHandlers,
367    initial_cache: Option<SerializedQuerySnapshot>,
368) -> DataConnectResult<QuerySubscriptionHandle> {
369    let cache = initial_cache
370        .as_ref()
371        .and_then(|snapshot| cache_from_serialized(snapshot));
372    query_manager_for_service(query_ref.service())
373        .await?
374        .subscribe(query_ref, handlers, cache)
375}
376
377/// Converts a serialized snapshot back into a live `QueryRef` using the default app.
378pub async fn to_query_ref(snapshot: SerializedQuerySnapshot) -> DataConnectResult<QueryRef> {
379    let service =
380        get_data_connect_service(None, snapshot.ref_info.connector_config.connector.clone())
381            .await?;
382    Ok(query_ref(
383        service,
384        snapshot.ref_info.name,
385        snapshot.ref_info.variables,
386    ))
387}
388
389/// Routes all future requests through the emulator configured by the caller.
390pub fn connect_data_connect_emulator(
391    service: &DataConnectService,
392    host: &str,
393    port: Option<u16>,
394    ssl_enabled: bool,
395) -> DataConnectResult<()> {
396    service.connect_emulator(host, port, ssl_enabled)
397}
398
399pub fn register_data_connect_component() {
400    ensure_registered();
401}
402
403fn ensure_registered() {
404    static REGISTERED: LazyLock<()> = LazyLock::new(|| {
405        let component = Component::new(
406            DATA_CONNECT_COMPONENT_NAME,
407            Arc::new(data_connect_factory),
408            ComponentType::Public,
409        )
410        .with_instantiation_mode(InstantiationMode::Lazy)
411        .with_multiple_instances(true);
412        let _ = app::register_component(component);
413    });
414    LazyLock::force(&REGISTERED);
415}
416
417fn data_connect_factory(
418    container: &crate::component::ComponentContainer,
419    options: InstanceFactoryOptions,
420) -> Result<DynService, ComponentError> {
421    let app = container.root_service::<FirebaseApp>().ok_or_else(|| {
422        ComponentError::InitializationFailed {
423            name: DATA_CONNECT_COMPONENT_NAME.to_string(),
424            reason: "Firebase app not attached to component container".to_string(),
425        }
426    })?;
427
428    let connector_config = if !options.options.is_null() {
429        serde_json::from_value::<ConnectorConfig>(options.options.clone()).map_err(|err| {
430            ComponentError::InitializationFailed {
431                name: DATA_CONNECT_COMPONENT_NAME.to_string(),
432                reason: format!("invalid connector config: {err}"),
433            }
434        })?
435    } else if let Some(identifier) = options.instance_identifier.as_deref() {
436        serde_json::from_str::<ConnectorConfig>(identifier).map_err(|err| {
437            ComponentError::InitializationFailed {
438                name: DATA_CONNECT_COMPONENT_NAME.to_string(),
439                reason: format!("invalid connector identifier: {err}"),
440            }
441        })?
442    } else {
443        return Err(ComponentError::InitializationFailed {
444            name: DATA_CONNECT_COMPONENT_NAME.to_string(),
445            reason: "connector config required".to_string(),
446        });
447    };
448
449    let project_id =
450        app.options()
451            .project_id
452            .clone()
453            .ok_or_else(|| ComponentError::InitializationFailed {
454                name: DATA_CONNECT_COMPONENT_NAME.to_string(),
455                reason: "project ID must be configured on Firebase options".to_string(),
456            })?;
457    let options = DataConnectOptions::new(connector_config.clone(), project_id).map_err(|err| {
458        ComponentError::InitializationFailed {
459            name: DATA_CONNECT_COMPONENT_NAME.to_string(),
460            reason: err.to_string(),
461        }
462    })?;
463
464    let env_override =
465        emulator_override_from_env().map_err(|err| ComponentError::InitializationFailed {
466            name: DATA_CONNECT_COMPONENT_NAME.to_string(),
467            reason: err.to_string(),
468        })?;
469
470    let auth_provider = container.get_provider("auth-internal");
471    let app_check_provider = container.get_provider("app-check-internal");
472    let service = Arc::new(DataConnectService::new(
473        (*app).clone(),
474        options,
475        auth_provider,
476        app_check_provider,
477        env_override,
478    ));
479    Ok(service as DynService)
480}
481
482fn emulator_override_from_env() -> DataConnectResult<Option<TransportOptions>> {
483    match env::var(EMULATOR_ENV) {
484        Ok(value) => parse_transport_options(&value).map(Some),
485        Err(_) => Ok(None),
486    }
487}
488
489/// Retrieves (or initializes) a Data Connect service instance for the supplied connector config.
490pub async fn get_data_connect_service(
491    app: Option<FirebaseApp>,
492    config: ConnectorConfig,
493) -> DataConnectResult<Arc<DataConnectService>> {
494    ensure_registered();
495    let app = match app {
496        Some(app) => app,
497        None => crate::app::get_app(None)
498            .await
499            .map_err(|err| internal_error(err.to_string()))?,
500    };
501
502    let cache_key = (app.name().to_string(), config.identifier());
503    if let Some(service) = DATA_CONNECT_CACHE.lock().unwrap().get(&cache_key).cloned() {
504        return Ok(service);
505    }
506
507    let provider = app::get_provider(&app, DATA_CONNECT_COMPONENT_NAME);
508    let identifier = config.identifier();
509    if let Some(service) = provider
510        .get_immediate_with_options::<DataConnectService>(Some(&identifier), true)
511        .unwrap_or(None)
512    {
513        DATA_CONNECT_CACHE
514            .lock()
515            .unwrap()
516            .insert(cache_key, service.clone());
517        return Ok(service);
518    }
519
520    let options_value =
521        serde_json::to_value(&config).map_err(|err| internal_error(err.to_string()))?;
522    match provider.initialize::<DataConnectService>(options_value, Some(&identifier)) {
523        Ok(service) => {
524            DATA_CONNECT_CACHE
525                .lock()
526                .unwrap()
527                .insert(cache_key, service.clone());
528            Ok(service)
529        }
530        Err(ComponentError::InstanceUnavailable { .. }) => provider
531            .get_immediate_with_options::<DataConnectService>(Some(&identifier), true)
532            .unwrap_or(None)
533            .ok_or_else(|| internal_error("Data Connect instance unavailable"))
534            .map(|service| {
535                DATA_CONNECT_CACHE
536                    .lock()
537                    .unwrap()
538                    .insert(cache_key, service.clone());
539                service
540            }),
541        Err(err) => Err(internal_error(err.to_string())),
542    }
543}
544
545struct TokenBroker {
546    auth_provider: Provider,
547    app_check_provider: Provider,
548}
549
550impl TokenBroker {
551    fn new(auth_provider: Provider, app_check_provider: Provider) -> Self {
552        Self {
553            auth_provider,
554            app_check_provider,
555        }
556    }
557}
558
559#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
560#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
561impl RequestTokenProvider for TokenBroker {
562    async fn auth_token(&self) -> DataConnectResult<Option<String>> {
563        let auth = match self
564            .auth_provider
565            .get_immediate_with_options::<Auth>(None, true)
566        {
567            Ok(Some(auth)) => auth,
568            Ok(None) => return Ok(None),
569            Err(err) => {
570                return Err(internal_error(format!(
571                    "failed to resolve auth provider: {err}"
572                )))
573            }
574        };
575
576        auth.get_token(false)
577            .await
578            .map_err(|err| internal_error(err.to_string()))
579    }
580
581    async fn app_check_headers(&self) -> DataConnectResult<Option<AppCheckHeaders>> {
582        let app_check = match self
583            .app_check_provider
584            .get_immediate_with_options::<FirebaseAppCheckInternal>(None, true)
585        {
586            Ok(Some(app_check)) => app_check,
587            Ok(None) => return Ok(None),
588            Err(err) => {
589                return Err(internal_error(format!(
590                    "failed to resolve app check provider: {err}"
591                )))
592            }
593        };
594
595        let token = match app_check.get_token(false).await {
596            Ok(result) => result.token,
597            Err(err) => {
598                if let Some(cached) = err.cached_token() {
599                    cached.token.clone()
600                } else {
601                    return Err(internal_error(format!(
602                        "failed to obtain App Check token: {err}"
603                    )));
604                }
605            }
606        };
607
608        if token.is_empty() {
609            return Ok(None);
610        }
611
612        let heartbeat = app_check
613            .heartbeat_header()
614            .await
615            .map_err(|err| internal_error(err.to_string()))?;
616        Ok(Some(AppCheckHeaders { token, heartbeat }))
617    }
618}
619
620#[cfg(all(test, not(target_arch = "wasm32")))]
621mod tests {
622    use super::*;
623    use crate::app::initialize_app;
624    use crate::app::{FirebaseAppSettings, FirebaseOptions};
625    use httpmock::prelude::*;
626    use serde_json::json;
627    use std::sync::atomic::{AtomicUsize, Ordering};
628    use std::sync::{Arc, Mutex as StdMutex};
629    use tokio::sync::oneshot;
630
631    fn unique_settings(prefix: &str) -> FirebaseAppSettings {
632        static COUNTER: AtomicUsize = AtomicUsize::new(0);
633        FirebaseAppSettings {
634            name: Some(format!(
635                "{prefix}-{}",
636                COUNTER.fetch_add(1, Ordering::SeqCst)
637            )),
638            ..Default::default()
639        }
640    }
641
642    fn base_options() -> FirebaseOptions {
643        FirebaseOptions {
644            project_id: Some("demo-project".into()),
645            api_key: Some("demo-key".into()),
646            ..Default::default()
647        }
648    }
649
650    fn clear_caches() {
651        DATA_CONNECT_CACHE.lock().unwrap().clear();
652        QUERY_MANAGER_CACHE.lock().unwrap().clear();
653    }
654
655    #[tokio::test(flavor = "current_thread")]
656    async fn execute_query_hits_emulator() {
657        clear_caches();
658        let app = initialize_app(base_options(), Some(unique_settings("dc-query")))
659            .await
660            .unwrap();
661        let server = MockServer::start();
662        let mock = server.mock(|when, then| {
663            when.method(POST).path(
664                "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
665            );
666            then.status(200)
667                .header("content-type", "application/json")
668                .json_body(json!({
669                    "data": {"items": [{"id": "123"}]}
670                }));
671        });
672
673        let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
674        let service = get_data_connect_service(Some(app.clone()), config)
675            .await
676            .unwrap();
677        let host = server.host();
678        service
679            .connect_emulator(&host, Some(server.port()), false)
680            .unwrap();
681
682        let query = query_ref(service, "ListItems", Value::Null);
683        let result = execute_query(&query).await.unwrap();
684        assert_eq!(result.data["items"][0]["id"], "123");
685        mock.assert();
686    }
687
688    #[tokio::test(flavor = "current_thread")]
689    async fn execute_mutation_hits_emulator() {
690        clear_caches();
691        let app = initialize_app(base_options(), Some(unique_settings("dc-mutation")))
692            .await
693            .unwrap();
694        let server = MockServer::start();
695        let mock = server.mock(|when, then| {
696            when.method(POST).path(
697                "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeMutation",
698            );
699            then.status(200)
700                .header("content-type", "application/json")
701                .json_body(json!({
702                    "data": {"insertBook": {"id": "321"}}
703                }));
704        });
705
706        let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
707        let service = get_data_connect_service(Some(app.clone()), config)
708            .await
709            .unwrap();
710        let host = server.host();
711        service
712            .connect_emulator(&host, Some(server.port()), false)
713            .unwrap();
714
715        let mutation = mutation_ref(service, "InsertBook", json!({"id": "321"}));
716        let result = execute_mutation(&mutation).await.unwrap();
717        assert_eq!(result.data["insertBook"]["id"], "321");
718        mock.assert();
719    }
720
721    #[tokio::test(flavor = "current_thread")]
722    async fn subscribe_with_initial_cache_invokes_handler() {
723        clear_caches();
724        let app = initialize_app(base_options(), Some(unique_settings("dc-subscribe")))
725            .await
726            .unwrap();
727        let server = MockServer::start();
728        let mock = server.mock(|when, then| {
729            when.method(POST).path(
730                "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
731            );
732            then.status(200)
733                .header("content-type", "application/json")
734                .json_body(json!({
735                    "data": {"items": [{"id": "cached"}]}
736                }));
737        });
738
739        let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
740        let service = get_data_connect_service(Some(app.clone()), config)
741            .await
742            .unwrap();
743        let host = server.host();
744        service
745            .connect_emulator(&host, Some(server.port()), false)
746            .unwrap();
747
748        let query = query_ref(service.clone(), "ListItems", Value::Null);
749        let snapshot = execute_query(&query).await.unwrap().to_serialized();
750        mock.assert();
751
752        let query = query_ref(service, "ListItems", Value::Null);
753        let (tx, rx) = oneshot::channel();
754        let sender = Arc::new(StdMutex::new(Some(tx)));
755        let handlers = QuerySubscriptionHandlers::new(Arc::new(move |result: &QueryResult| {
756            if let Some(tx) = sender.lock().unwrap().take() {
757                let _ = tx.send(result.data.clone());
758            }
759        }));
760
761        let handle = subscribe(query, handlers, Some(snapshot)).await.unwrap();
762        let data = rx.await.unwrap();
763        assert_eq!(data["items"][0]["id"], "cached");
764        drop(handle);
765    }
766
767    #[tokio::test(flavor = "current_thread")]
768    async fn emitter_cannot_change_after_initialization() {
769        clear_caches();
770        let app = initialize_app(base_options(), Some(unique_settings("dc-emulator")))
771            .await
772            .unwrap();
773        let server = MockServer::start();
774        let mock = server.mock(|when, then| {
775            when.method(POST).path(
776                "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
777            );
778            then.status(200)
779                .header("content-type", "application/json")
780                .json_body(json!({ "data": {"ok": true} }));
781        });
782
783        let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
784        let service = get_data_connect_service(Some(app.clone()), config)
785            .await
786            .unwrap();
787        let host = server.host();
788        service
789            .connect_emulator(&host, Some(server.port()), false)
790            .unwrap();
791
792        let query = query_ref(service.clone(), "ListItems", Value::Null);
793        let _ = execute_query(&query).await.unwrap();
794        mock.assert();
795
796        let err = service
797            .connect_emulator("127.0.0.1", Some(9000), false)
798            .unwrap_err();
799        assert_eq!(err.code(), DataConnectErrorCode::AlreadyInitialized);
800    }
801
802    #[tokio::test(flavor = "current_thread")]
803    async fn query_runtime_executes_and_releases() {
804        clear_caches();
805        let app = initialize_app(base_options(), Some(unique_settings("dc-runtime")))
806            .await
807            .unwrap();
808        let server = MockServer::start();
809        let mock = server.mock(|when, then| {
810            when.method(POST).path(
811                "/v1/projects/demo-project/locations/us-central1/services/catalog/connectors/books:executeQuery",
812            );
813            then.status(200)
814                .header("content-type", "application/json")
815                .json_body(json!({
816                    "data": {"items": [{"id": "from-runtime"}]}
817                }));
818        });
819
820        let config = ConnectorConfig::new("us-central1", "books", "catalog").unwrap();
821        let service = get_data_connect_service(Some(app.clone()), config)
822            .await
823            .unwrap();
824        let host = server.host();
825        service
826            .connect_emulator(&host, Some(server.port()), false)
827            .unwrap();
828
829        let runtime = service.query_runtime().await.unwrap();
830        let query = query_ref(runtime.service().clone(), "ListItems", Value::Null);
831        let result = runtime.execute_query(&query).await.unwrap();
832        assert_eq!(result.data["items"][0]["id"], "from-runtime");
833        mock.assert();
834
835        let key = service_cache_key(runtime.service());
836        assert!(QUERY_MANAGER_CACHE.lock().unwrap().contains_key(&key));
837        runtime.close();
838        assert!(!QUERY_MANAGER_CACHE.lock().unwrap().contains_key(&key));
839
840        // A new runtime recreates the manager seamlessly.
841        let runtime2 = service.query_runtime().await.unwrap();
842        let key2 = service_cache_key(runtime2.service());
843        assert!(QUERY_MANAGER_CACHE.lock().unwrap().contains_key(&key2));
844        drop(runtime2);
845    }
846}