couchbase_core/
analyticscomponent.rs1use crate::analyticsx::analytics::Query;
20use crate::authenticator::Authenticator;
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error;
23use crate::error::ErrorKind;
24use crate::httpcomponent::{HttpComponent, HttpComponentState};
25use crate::httpx::client::Client;
26use crate::httpx::request::Auth;
27use crate::options::analytics::{AnalyticsOptions, GetPendingMutationsOptions};
28use crate::results::analytics::AnalyticsResultStream;
29use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
30use crate::service_type::ServiceType;
31use std::collections::HashMap;
32use std::sync::Arc;
33use tracing::debug;
34
35pub(crate) struct AnalyticsComponent<C: Client> {
36 id: String,
37 http_component: HttpComponent<C>,
38
39 retry_manager: Arc<RetryManager>,
40}
41
42pub(crate) struct AnalyticsComponentConfig {
43 pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
44 pub authenticator: Authenticator,
45}
46
47pub(crate) struct AnalyticsComponentOptions {
48 pub id: String,
49 pub user_agent: String,
50}
51
52impl<C: Client + 'static> AnalyticsComponent<C> {
53 pub fn new(
54 retry_manager: Arc<RetryManager>,
55 http_client: Arc<C>,
56 config: AnalyticsComponentConfig,
57 opts: AnalyticsComponentOptions,
58 ) -> Self {
59 Self {
60 id: opts.id,
61 http_component: HttpComponent::new(
62 ServiceType::ANALYTICS,
63 opts.user_agent,
64 http_client,
65 HttpComponentState::new(config.endpoints, config.authenticator),
66 ),
67 retry_manager,
68 }
69 }
70
71 pub fn reconfigure(&self, config: AnalyticsComponentConfig) {
72 debug!(
73 "Analytics component {} updating endpoints to {:?}",
74 self.id,
75 &config.endpoints.keys().collect::<Vec<_>>()
76 );
77
78 self.http_component.reconfigure(HttpComponentState::new(
79 config.endpoints,
80 config.authenticator,
81 ))
82 }
83
84 pub async fn query(&self, opts: AnalyticsOptions) -> error::Result<AnalyticsResultStream> {
85 let retry_info = RetryRequest::new("analytics", opts.read_only.unwrap_or_default());
86
87 let retry = opts.retry_strategy.clone();
88 let endpoint = opts.endpoint.clone();
89 let copts = opts.into();
90
91 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
92 self.http_component
93 .orchestrate_endpoint(
94 endpoint.clone(),
95 async |client: Arc<C>,
96 endpoint_id: String,
97 endpoint: String,
98 canonical_endpoint: String,
99 auth: Auth| {
100 let res = match (Query::<C> {
101 http_client: client,
102 user_agent: self.http_component.user_agent().to_string(),
103 endpoint: endpoint.clone(),
104 canonical_endpoint,
105 auth,
106 }
107 .query(&copts)
108 .await)
109 {
110 Ok(r) => r,
111 Err(e) => return Err(ErrorKind::Analytics(e).into()),
112 };
113
114 Ok(AnalyticsResultStream {
115 inner: res,
116 endpoint,
117 })
118 },
119 )
120 .await
121 })
122 .await
123 }
124
125 pub async fn get_pending_mutations(
126 &self,
127 opts: &GetPendingMutationsOptions<'_>,
128 ) -> error::Result<HashMap<String, HashMap<String, i64>>> {
129 let retry_info = RetryRequest::new("get_pending_mutations", true);
130
131 let retry = opts.retry_strategy.clone();
132 let endpoint = opts.endpoint.clone();
133 let copts = opts.into();
134
135 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
136 self.http_component
137 .orchestrate_endpoint(
138 endpoint.clone(),
139 async |client: Arc<C>,
140 endpoint_id: String,
141 endpoint: String,
142 canonical_endpoint: String,
143 auth: Auth| {
144 let res = match (Query::<C> {
145 http_client: client,
146 user_agent: self.http_component.user_agent().to_string(),
147 endpoint,
148 canonical_endpoint,
149 auth,
150 }
151 .get_pending_mutations(&copts)
152 .await)
153 {
154 Ok(r) => r,
155 Err(e) => return Err(ErrorKind::Analytics(e).into()),
156 };
157
158 Ok(res)
159 },
160 )
161 .await
162 })
163 .await
164 }
165}