Skip to main content

couchbase_core/analyticsx/
analytics.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18use crate::analyticsx::error;
19use crate::analyticsx::error::Error;
20use crate::analyticsx::query_options::{GetPendingMutationsOptions, PingOptions, QueryOptions};
21use crate::analyticsx::query_respreader::QueryRespReader;
22use crate::httpx::client::Client;
23use crate::httpx::request::{Auth, OnBehalfOfInfo, Request};
24use crate::httpx::response::Response;
25use bytes::Bytes;
26use http::Method;
27use serde_json::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use uuid::Uuid;
31
32#[derive(Debug)]
33pub struct Query<C: Client> {
34    pub http_client: Arc<C>,
35    pub user_agent: String,
36    pub endpoint: String,
37    pub canonical_endpoint: String,
38    pub auth: Auth,
39}
40
41impl<C: Client> Query<C> {
42    pub fn new_request(
43        &self,
44        method: Method,
45        path: impl Into<String>,
46        content_type: impl Into<String>,
47        on_behalf_of: Option<OnBehalfOfInfo>,
48        body: Option<Bytes>,
49    ) -> Request {
50        let auth = if let Some(obo) = on_behalf_of {
51            Auth::OnBehalfOf(OnBehalfOfInfo {
52                username: obo.username,
53                password_or_domain: obo.password_or_domain,
54            })
55        } else {
56            self.auth.clone()
57        };
58
59        Request::new(method, format!("{}/{}", self.endpoint, path.into()))
60            .auth(auth)
61            .user_agent(self.user_agent.clone())
62            .content_type(content_type.into())
63            .body(body)
64    }
65
66    pub async fn execute(
67        &self,
68        method: Method,
69        path: impl Into<String>,
70        content_type: impl Into<String>,
71        on_behalf_of: Option<OnBehalfOfInfo>,
72        body: Option<Bytes>,
73    ) -> crate::httpx::error::Result<Response> {
74        let req = self.new_request(method, path, content_type, on_behalf_of, body);
75
76        self.http_client.execute(req).await
77    }
78
79    pub async fn query(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
80        let statement = if let Some(statement) = &opts.statement {
81            statement.clone()
82        } else {
83            String::new()
84        };
85
86        //TODO: this needs re-embedding into options
87        let client_context_id = if let Some(id) = &opts.client_context_id {
88            id.clone()
89        } else {
90            Uuid::new_v4().to_string()
91        };
92
93        let on_behalf_of = opts.on_behalf_of.clone();
94
95        let mut serialized = serde_json::to_value(opts)
96            .map_err(|e| Error::new_encoding_error(format!("failed to encode options: {e}")))?;
97
98        let mut obj = serialized.as_object_mut().unwrap();
99        let mut client_context_id_entry = obj.get("client_context_id");
100        if client_context_id_entry.is_none() {
101            obj.insert(
102                "client_context_id".to_string(),
103                Value::String(client_context_id.clone()),
104            );
105        }
106
107        if let Some(named_args) = &opts.named_args {
108            for (k, v) in named_args.iter() {
109                let key = if k.starts_with('$') {
110                    k.clone()
111                } else {
112                    format!("${k}")
113                };
114                obj.insert(key, v.clone());
115            }
116        }
117
118        if let Some(raw) = &opts.raw {
119            for (k, v) in raw.iter() {
120                obj.insert(k.to_string(), v.clone());
121            }
122        }
123
124        let body =
125            Bytes::from(serde_json::to_vec(&serialized).map_err(|e| {
126                Error::new_encoding_error(format!("failed to encode options: {e}"))
127            })?);
128
129        let res = match self
130            .execute(
131                Method::POST,
132                "analytics/service",
133                "application/json",
134                on_behalf_of,
135                Some(body),
136            )
137            .await
138        {
139            Ok(r) => r,
140            Err(e) => {
141                return Err(Error::new_http_error(
142                    e,
143                    self.endpoint.to_string(),
144                    statement,
145                    client_context_id,
146                ));
147            }
148        };
149
150        QueryRespReader::new(res, &self.endpoint, statement, client_context_id).await
151    }
152
153    pub async fn get_pending_mutations(
154        &self,
155        opts: &GetPendingMutationsOptions<'_>,
156    ) -> error::Result<HashMap<String, HashMap<String, i64>>> {
157        let res = match self
158            .execute(
159                Method::GET,
160                "analytics/node/agg/stats/remaining",
161                "application/json",
162                opts.on_behalf_of.cloned(),
163                None,
164            )
165            .await
166        {
167            Ok(r) => r,
168            Err(e) => {
169                return Err(Error::new_http_error(
170                    e,
171                    self.endpoint.to_string(),
172                    None,
173                    None,
174                ));
175            }
176        };
177
178        if !res.status().is_success() {
179            return Err(Error::new_message_error(
180                format!(
181                    "get_pending_mutations failed with status code: {}",
182                    res.status()
183                ),
184                Some(self.endpoint.clone()),
185                None,
186                None,
187            ));
188        }
189
190        let pending = serde_json::from_slice(
191            &res.bytes()
192                .await
193                .map_err(|e| Error::new_http_error(e, self.endpoint.clone(), None, None))?,
194        )
195        .map_err(|e| {
196            Error::new_message_error(
197                format!("failed to decode get_pending_mutations response: {}", e),
198                self.endpoint.clone(),
199                None,
200                None,
201            )
202        })?;
203
204        Ok(pending)
205    }
206
207    pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
208        let res = match self
209            .execute(
210                Method::GET,
211                "admin/ping",
212                "",
213                opts.on_behalf_of.cloned(),
214                None,
215            )
216            .await
217        {
218            Ok(r) => r,
219            Err(e) => {
220                return Err(Error::new_http_error(
221                    e,
222                    self.endpoint.to_string(),
223                    None,
224                    None,
225                ));
226            }
227        };
228
229        if res.status().is_success() {
230            return Ok(());
231        }
232
233        Err(Error::new_message_error(
234            format!("ping failed with status code: {}", res.status()),
235            Some(self.endpoint.clone()),
236            None,
237            None,
238        ))
239    }
240}