couchbase_core/analyticsx/
analytics.rs1use crate::analyticsx::error;
19use crate::analyticsx::error::Error;
20use crate::analyticsx::query_options::{GetPendingMutationsOptions, PingOptions, QueryOptions};
21use crate::analyticsx::query_respreader::QueryRespReader;
22use crate::httpx::client::Client;
23use crate::httpx::request::{Auth, OnBehalfOfInfo, Request};
24use crate::httpx::response::Response;
25use bytes::Bytes;
26use http::Method;
27use serde_json::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use uuid::Uuid;
31
32#[derive(Debug)]
33pub struct Query<C: Client> {
34 pub http_client: Arc<C>,
35 pub user_agent: String,
36 pub endpoint: String,
37 pub canonical_endpoint: String,
38 pub auth: Auth,
39}
40
41impl<C: Client> Query<C> {
42 pub fn new_request(
43 &self,
44 method: Method,
45 path: impl Into<String>,
46 content_type: impl Into<String>,
47 on_behalf_of: Option<OnBehalfOfInfo>,
48 body: Option<Bytes>,
49 ) -> Request {
50 let auth = if let Some(obo) = on_behalf_of {
51 Auth::OnBehalfOf(OnBehalfOfInfo {
52 username: obo.username,
53 password_or_domain: obo.password_or_domain,
54 })
55 } else {
56 self.auth.clone()
57 };
58
59 Request::new(method, format!("{}/{}", self.endpoint, path.into()))
60 .auth(auth)
61 .user_agent(self.user_agent.clone())
62 .content_type(content_type.into())
63 .body(body)
64 }
65
66 pub async fn execute(
67 &self,
68 method: Method,
69 path: impl Into<String>,
70 content_type: impl Into<String>,
71 on_behalf_of: Option<OnBehalfOfInfo>,
72 body: Option<Bytes>,
73 ) -> crate::httpx::error::Result<Response> {
74 let req = self.new_request(method, path, content_type, on_behalf_of, body);
75
76 self.http_client.execute(req).await
77 }
78
79 pub async fn query(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
80 let statement = if let Some(statement) = &opts.statement {
81 statement.clone()
82 } else {
83 String::new()
84 };
85
86 let client_context_id = if let Some(id) = &opts.client_context_id {
88 id.clone()
89 } else {
90 Uuid::new_v4().to_string()
91 };
92
93 let on_behalf_of = opts.on_behalf_of.clone();
94
95 let mut serialized = serde_json::to_value(opts)
96 .map_err(|e| Error::new_encoding_error(format!("failed to encode options: {e}")))?;
97
98 let mut obj = serialized.as_object_mut().unwrap();
99 let mut client_context_id_entry = obj.get("client_context_id");
100 if client_context_id_entry.is_none() {
101 obj.insert(
102 "client_context_id".to_string(),
103 Value::String(client_context_id.clone()),
104 );
105 }
106
107 if let Some(named_args) = &opts.named_args {
108 for (k, v) in named_args.iter() {
109 let key = if k.starts_with('$') {
110 k.clone()
111 } else {
112 format!("${k}")
113 };
114 obj.insert(key, v.clone());
115 }
116 }
117
118 if let Some(raw) = &opts.raw {
119 for (k, v) in raw.iter() {
120 obj.insert(k.to_string(), v.clone());
121 }
122 }
123
124 let body =
125 Bytes::from(serde_json::to_vec(&serialized).map_err(|e| {
126 Error::new_encoding_error(format!("failed to encode options: {e}"))
127 })?);
128
129 let res = match self
130 .execute(
131 Method::POST,
132 "analytics/service",
133 "application/json",
134 on_behalf_of,
135 Some(body),
136 )
137 .await
138 {
139 Ok(r) => r,
140 Err(e) => {
141 return Err(Error::new_http_error(
142 e,
143 self.endpoint.to_string(),
144 statement,
145 client_context_id,
146 ));
147 }
148 };
149
150 QueryRespReader::new(res, &self.endpoint, statement, client_context_id).await
151 }
152
153 pub async fn get_pending_mutations(
154 &self,
155 opts: &GetPendingMutationsOptions<'_>,
156 ) -> error::Result<HashMap<String, HashMap<String, i64>>> {
157 let res = match self
158 .execute(
159 Method::GET,
160 "analytics/node/agg/stats/remaining",
161 "application/json",
162 opts.on_behalf_of.cloned(),
163 None,
164 )
165 .await
166 {
167 Ok(r) => r,
168 Err(e) => {
169 return Err(Error::new_http_error(
170 e,
171 self.endpoint.to_string(),
172 None,
173 None,
174 ));
175 }
176 };
177
178 if !res.status().is_success() {
179 return Err(Error::new_message_error(
180 format!(
181 "get_pending_mutations failed with status code: {}",
182 res.status()
183 ),
184 Some(self.endpoint.clone()),
185 None,
186 None,
187 ));
188 }
189
190 let pending = serde_json::from_slice(
191 &res.bytes()
192 .await
193 .map_err(|e| Error::new_http_error(e, self.endpoint.clone(), None, None))?,
194 )
195 .map_err(|e| {
196 Error::new_message_error(
197 format!("failed to decode get_pending_mutations response: {}", e),
198 self.endpoint.clone(),
199 None,
200 None,
201 )
202 })?;
203
204 Ok(pending)
205 }
206
207 pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
208 let res = match self
209 .execute(
210 Method::GET,
211 "admin/ping",
212 "",
213 opts.on_behalf_of.cloned(),
214 None,
215 )
216 .await
217 {
218 Ok(r) => r,
219 Err(e) => {
220 return Err(Error::new_http_error(
221 e,
222 self.endpoint.to_string(),
223 None,
224 None,
225 ));
226 }
227 };
228
229 if res.status().is_success() {
230 return Ok(());
231 }
232
233 Err(Error::new_message_error(
234 format!("ping failed with status code: {}", res.status()),
235 Some(self.endpoint.clone()),
236 None,
237 None,
238 ))
239 }
240}