Skip to main content

couchbase_core/
analyticscomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::analyticsx::analytics::Query;
20use crate::authenticator::Authenticator;
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error;
23use crate::error::ErrorKind;
24use crate::httpcomponent::{HttpComponent, HttpComponentState};
25use crate::httpx::client::Client;
26use crate::httpx::request::Auth;
27use crate::options::analytics::{AnalyticsOptions, GetPendingMutationsOptions};
28use crate::results::analytics::AnalyticsResultStream;
29use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
30use crate::service_type::ServiceType;
31use std::collections::HashMap;
32use std::sync::Arc;
33
34pub(crate) struct AnalyticsComponent<C: Client> {
35    http_component: HttpComponent<C>,
36
37    retry_manager: Arc<RetryManager>,
38}
39
40pub(crate) struct AnalyticsComponentConfig {
41    pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
42    pub authenticator: Authenticator,
43}
44
45pub(crate) struct AnalyticsComponentOptions {
46    pub user_agent: String,
47}
48
49impl<C: Client + 'static> AnalyticsComponent<C> {
50    pub fn new(
51        retry_manager: Arc<RetryManager>,
52        http_client: Arc<C>,
53        config: AnalyticsComponentConfig,
54        opts: AnalyticsComponentOptions,
55    ) -> Self {
56        Self {
57            http_component: HttpComponent::new(
58                ServiceType::ANALYTICS,
59                opts.user_agent,
60                http_client,
61                HttpComponentState::new(config.endpoints, config.authenticator),
62            ),
63            retry_manager,
64        }
65    }
66
67    pub fn reconfigure(&self, config: AnalyticsComponentConfig) {
68        self.http_component.reconfigure(HttpComponentState::new(
69            config.endpoints,
70            config.authenticator,
71        ))
72    }
73
74    pub async fn query(&self, opts: AnalyticsOptions) -> error::Result<AnalyticsResultStream> {
75        let retry_info = RetryRequest::new("analytics", opts.read_only.unwrap_or_default());
76
77        let retry = opts.retry_strategy.clone();
78        let endpoint = opts.endpoint.clone();
79        let copts = opts.into();
80
81        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
82            self.http_component
83                .orchestrate_endpoint(
84                    endpoint.clone(),
85                    async |client: Arc<C>,
86                           endpoint_id: String,
87                           endpoint: String,
88                           canonical_endpoint: String,
89                           auth: Auth| {
90                        let res = match (Query::<C> {
91                            http_client: client,
92                            user_agent: self.http_component.user_agent().to_string(),
93                            endpoint: endpoint.clone(),
94                            canonical_endpoint,
95                            auth,
96                        }
97                        .query(&copts)
98                        .await)
99                        {
100                            Ok(r) => r,
101                            Err(e) => return Err(ErrorKind::Analytics(e).into()),
102                        };
103
104                        Ok(AnalyticsResultStream {
105                            inner: res,
106                            endpoint,
107                        })
108                    },
109                )
110                .await
111        })
112        .await
113    }
114
115    pub async fn get_pending_mutations(
116        &self,
117        opts: &GetPendingMutationsOptions<'_>,
118    ) -> error::Result<HashMap<String, HashMap<String, i64>>> {
119        let retry_info = RetryRequest::new("get_pending_mutations", true);
120
121        let retry = opts.retry_strategy.clone();
122        let endpoint = opts.endpoint.clone();
123        let copts = opts.into();
124
125        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
126            self.http_component
127                .orchestrate_endpoint(
128                    endpoint.clone(),
129                    async |client: Arc<C>,
130                           endpoint_id: String,
131                           endpoint: String,
132                           canonical_endpoint: String,
133                           auth: Auth| {
134                        let res = match (Query::<C> {
135                            http_client: client,
136                            user_agent: self.http_component.user_agent().to_string(),
137                            endpoint,
138                            canonical_endpoint,
139                            auth,
140                        }
141                        .get_pending_mutations(&copts)
142                        .await)
143                        {
144                            Ok(r) => r,
145                            Err(e) => return Err(ErrorKind::Analytics(e).into()),
146                        };
147
148                        Ok(res)
149                    },
150                )
151                .await
152        })
153        .await
154    }
155}