couchbase_core/
analyticscomponent.rs1use crate::analyticsx::analytics::Query;
20use crate::authenticator::Authenticator;
21use crate::componentconfigs::NetworkAndCanonicalEndpoint;
22use crate::error;
23use crate::error::ErrorKind;
24use crate::httpcomponent::{HttpComponent, HttpComponentState};
25use crate::httpx::client::Client;
26use crate::httpx::request::Auth;
27use crate::options::analytics::{AnalyticsOptions, GetPendingMutationsOptions};
28use crate::results::analytics::AnalyticsResultStream;
29use crate::retry::{orchestrate_retries, RetryManager, RetryRequest};
30use crate::service_type::ServiceType;
31use std::collections::HashMap;
32use std::sync::Arc;
33
34pub(crate) struct AnalyticsComponent<C: Client> {
35 http_component: HttpComponent<C>,
36
37 retry_manager: Arc<RetryManager>,
38}
39
40pub(crate) struct AnalyticsComponentConfig {
41 pub endpoints: HashMap<String, NetworkAndCanonicalEndpoint>,
42 pub authenticator: Authenticator,
43}
44
45pub(crate) struct AnalyticsComponentOptions {
46 pub user_agent: String,
47}
48
49impl<C: Client + 'static> AnalyticsComponent<C> {
50 pub fn new(
51 retry_manager: Arc<RetryManager>,
52 http_client: Arc<C>,
53 config: AnalyticsComponentConfig,
54 opts: AnalyticsComponentOptions,
55 ) -> Self {
56 Self {
57 http_component: HttpComponent::new(
58 ServiceType::ANALYTICS,
59 opts.user_agent,
60 http_client,
61 HttpComponentState::new(config.endpoints, config.authenticator),
62 ),
63 retry_manager,
64 }
65 }
66
67 pub fn reconfigure(&self, config: AnalyticsComponentConfig) {
68 self.http_component.reconfigure(HttpComponentState::new(
69 config.endpoints,
70 config.authenticator,
71 ))
72 }
73
74 pub async fn query(&self, opts: AnalyticsOptions) -> error::Result<AnalyticsResultStream> {
75 let retry_info = RetryRequest::new("analytics", opts.read_only.unwrap_or_default());
76
77 let retry = opts.retry_strategy.clone();
78 let endpoint = opts.endpoint.clone();
79 let copts = opts.into();
80
81 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
82 self.http_component
83 .orchestrate_endpoint(
84 endpoint.clone(),
85 async |client: Arc<C>,
86 endpoint_id: String,
87 endpoint: String,
88 canonical_endpoint: String,
89 auth: Auth| {
90 let res = match (Query::<C> {
91 http_client: client,
92 user_agent: self.http_component.user_agent().to_string(),
93 endpoint: endpoint.clone(),
94 canonical_endpoint,
95 auth,
96 }
97 .query(&copts)
98 .await)
99 {
100 Ok(r) => r,
101 Err(e) => return Err(ErrorKind::Analytics(e).into()),
102 };
103
104 Ok(AnalyticsResultStream {
105 inner: res,
106 endpoint,
107 })
108 },
109 )
110 .await
111 })
112 .await
113 }
114
115 pub async fn get_pending_mutations(
116 &self,
117 opts: &GetPendingMutationsOptions<'_>,
118 ) -> error::Result<HashMap<String, HashMap<String, i64>>> {
119 let retry_info = RetryRequest::new("get_pending_mutations", true);
120
121 let retry = opts.retry_strategy.clone();
122 let endpoint = opts.endpoint.clone();
123 let copts = opts.into();
124
125 orchestrate_retries(self.retry_manager.clone(), retry, retry_info, async || {
126 self.http_component
127 .orchestrate_endpoint(
128 endpoint.clone(),
129 async |client: Arc<C>,
130 endpoint_id: String,
131 endpoint: String,
132 canonical_endpoint: String,
133 auth: Auth| {
134 let res = match (Query::<C> {
135 http_client: client,
136 user_agent: self.http_component.user_agent().to_string(),
137 endpoint,
138 canonical_endpoint,
139 auth,
140 }
141 .get_pending_mutations(&copts)
142 .await)
143 {
144 Ok(r) => r,
145 Err(e) => return Err(ErrorKind::Analytics(e).into()),
146 };
147
148 Ok(res)
149 },
150 )
151 .await
152 })
153 .await
154 }
155}