Skip to main content

couchbase_core/
analyticscomponent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::analyticsx::analytics::Query;
20use crate::authenticator::Authenticator;
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error;
23use crate::error::ErrorKind;
24use crate::httpcomponent::{HttpComponent, HttpComponentState};
25use crate::httpx::client::Client;
26use crate::httpx::request::Auth;
27use crate::options::analytics::{AnalyticsOptions, GetPendingMutationsOptions};
28use crate::results::analytics::AnalyticsResultStream;
29use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
30use crate::service_type::ServiceType;
31use std::collections::HashMap;
32use std::sync::Arc;
33use tracing::debug;
34
35pub(crate) struct AnalyticsComponent<C: Client> {
36    id: String,
37    http_component: HttpComponent<C>,
38
39    retry_manager: Arc<RetryManager>,
40}
41
42pub(crate) struct AnalyticsComponentConfig {
43    pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
44    pub authenticator: Authenticator,
45}
46
47pub(crate) struct AnalyticsComponentOptions {
48    pub id: String,
49    pub user_agent: String,
50}
51
52impl<C: Client + 'static> AnalyticsComponent<C> {
53    pub fn new(
54        retry_manager: Arc<RetryManager>,
55        http_client: Arc<C>,
56        config: AnalyticsComponentConfig,
57        opts: AnalyticsComponentOptions,
58    ) -> Self {
59        Self {
60            id: opts.id,
61            http_component: HttpComponent::new(
62                ServiceType::ANALYTICS,
63                opts.user_agent,
64                http_client,
65                HttpComponentState::new(config.endpoints, config.authenticator),
66            ),
67            retry_manager,
68        }
69    }
70
71    pub fn reconfigure(&self, config: AnalyticsComponentConfig) {
72        debug!(
73            "Analytics component {} updating endpoints to {:?}",
74            self.id,
75            &config.endpoints.keys().collect::<Vec<_>>()
76        );
77
78        self.http_component.reconfigure(HttpComponentState::new(
79            config.endpoints,
80            config.authenticator,
81        ))
82    }
83
84    pub async fn query(&self, opts: AnalyticsOptions) -> error::Result<AnalyticsResultStream> {
85        let retry_info = RetryRequest::new("analytics", opts.read_only.unwrap_or_default());
86
87        let retry = opts.retry_strategy.clone();
88        let endpoint = opts.endpoint.clone();
89        let copts = opts.into();
90
91        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
92            self.http_component
93                .orchestrate_endpoint(
94                    endpoint.clone(),
95                    async |client: Arc<C>,
96                           endpoint_id: String,
97                           endpoint: String,
98                           canonical_endpoint: String,
99                           auth: Auth| {
100                        let res = match (Query::<C> {
101                            http_client: client,
102                            user_agent: self.http_component.user_agent().to_string(),
103                            endpoint: endpoint.clone(),
104                            canonical_endpoint,
105                            auth,
106                        }
107                        .query(&copts)
108                        .await)
109                        {
110                            Ok(r) => r,
111                            Err(e) => return Err(ErrorKind::Analytics(e).into()),
112                        };
113
114                        Ok(AnalyticsResultStream {
115                            inner: res,
116                            endpoint,
117                        })
118                    },
119                )
120                .await
121        })
122        .await
123    }
124
125    pub async fn get_pending_mutations(
126        &self,
127        opts: &GetPendingMutationsOptions<'_>,
128    ) -> error::Result<HashMap<String, HashMap<String, i64>>> {
129        let retry_info = RetryRequest::new("get_pending_mutations", true);
130
131        let retry = opts.retry_strategy.clone();
132        let endpoint = opts.endpoint.clone();
133        let copts = opts.into();
134
135        orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
136            self.http_component
137                .orchestrate_endpoint(
138                    endpoint.clone(),
139                    async |client: Arc<C>,
140                           endpoint_id: String,
141                           endpoint: String,
142                           canonical_endpoint: String,
143                           auth: Auth| {
144                        let res = match (Query::<C> {
145                            http_client: client,
146                            user_agent: self.http_component.user_agent().to_string(),
147                            endpoint,
148                            canonical_endpoint,
149                            auth,
150                        }
151                        .get_pending_mutations(&copts)
152                        .await)
153                        {
154                            Ok(r) => r,
155                            Err(e) => return Err(ErrorKind::Analytics(e).into()),
156                        };
157
158                        Ok(res)
159                    },
160                )
161                .await
162        })
163        .await
164    }
165}