couchbase_core/queryx/
query.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::httpx::client::Client;
20use crate::httpx::request::{Auth, BasicAuth, OnBehalfOfInfo, Request};
21use crate::httpx::response::Response;
22use crate::queryx::error;
23use crate::queryx::error::{Error, ErrorKind, ServerError, ServerErrorKind};
24use crate::queryx::index::Index;
25use crate::queryx::query_options::{
26    BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
27    DropPrimaryIndexOptions, GetAllIndexesOptions, PingOptions, QueryOptions, WatchIndexesOptions,
28};
29use crate::queryx::query_respreader::QueryRespReader;
30use crate::retry::RetryStrategy;
31use bytes::Bytes;
32use futures::StreamExt;
33use http::{Method, StatusCode};
34use log::debug;
35use serde::Deserialize;
36use serde_json::Value;
37use std::collections::HashMap;
38use std::fmt::format;
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::time::sleep;
42use uuid::Uuid;
43
44#[derive(Debug)]
45pub struct Query<C: Client> {
46    pub http_client: Arc<C>,
47    pub user_agent: String,
48    pub endpoint: String,
49    pub username: String,
50    pub password: String,
51}
52
53impl<C: Client> Query<C> {
54    pub fn new_request(
55        &self,
56        method: Method,
57        path: impl Into<String>,
58        content_type: impl Into<String>,
59        on_behalf_of: Option<OnBehalfOfInfo>,
60        body: Option<Bytes>,
61    ) -> Request {
62        let auth = if let Some(obo) = on_behalf_of {
63            Auth::OnBehalfOf(OnBehalfOfInfo {
64                username: obo.username,
65                password_or_domain: obo.password_or_domain,
66            })
67        } else {
68            Auth::BasicAuth(BasicAuth {
69                username: self.username.clone(),
70                password: self.password.clone(),
71            })
72        };
73
74        Request::new(method, format!("{}/{}", self.endpoint, path.into()))
75            .auth(auth)
76            .user_agent(self.user_agent.clone())
77            .content_type(content_type.into())
78            .body(body)
79    }
80
81    pub async fn execute(
82        &self,
83        method: Method,
84        path: impl Into<String>,
85        content_type: impl Into<String>,
86        on_behalf_of: Option<OnBehalfOfInfo>,
87        body: Option<Bytes>,
88    ) -> crate::httpx::error::Result<Response> {
89        let req = self.new_request(method, path, content_type, on_behalf_of, body);
90
91        self.http_client.execute(req).await
92    }
93
94    pub async fn query(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
95        let statement = if let Some(statement) = &opts.statement {
96            statement.clone()
97        } else {
98            String::new()
99        };
100
101        //TODO; this needs re-embedding into options
102        let client_context_id = if let Some(id) = &opts.client_context_id {
103            id.clone()
104        } else {
105            Uuid::new_v4().to_string()
106        };
107
108        let on_behalf_of = opts.on_behalf_of.clone();
109
110        let mut serialized = serde_json::to_value(opts)
111            .map_err(|e| Error::new_encoding_error(format!("failed to encode options: {e}")))?;
112
113        let mut obj = serialized.as_object_mut().unwrap();
114        let mut client_context_id_entry = obj.get("client_context_id");
115        if client_context_id_entry.is_none() {
116            obj.insert(
117                "client_context_id".to_string(),
118                Value::String(client_context_id.clone()),
119            );
120        }
121
122        if let Some(named_args) = &opts.named_args {
123            for (k, v) in named_args.iter() {
124                let key = if k.starts_with("$") {
125                    k.clone()
126                } else {
127                    format!("${k}")
128                };
129                obj.insert(key, v.clone());
130            }
131        }
132
133        if let Some(raw) = &opts.raw {
134            for (k, v) in raw.iter() {
135                obj.insert(k.to_string(), v.clone());
136            }
137        }
138
139        let body =
140            Bytes::from(serde_json::to_vec(&serialized).map_err(|e| {
141                Error::new_encoding_error(format!("failed to encode options: {e}"))
142            })?);
143
144        let res = match self
145            .execute(
146                Method::POST,
147                "query/service",
148                "application/json",
149                on_behalf_of,
150                Some(body),
151            )
152            .await
153        {
154            Ok(r) => r,
155            Err(e) => {
156                return Err(Error::new_http_error(
157                    format!("{}: {}", &self.endpoint, e),
158                    statement,
159                    client_context_id,
160                ));
161            }
162        };
163
164        QueryRespReader::new(res, &self.endpoint, statement, client_context_id).await
165    }
166
167    pub async fn get_all_indexes(
168        &self,
169        opts: &GetAllIndexesOptions<'_>,
170    ) -> error::Result<Vec<Index>> {
171        let mut where_clause = match (&opts.collection_name, &opts.scope_name) {
172            (None, None) => {
173                if !opts.bucket_name.is_empty() {
174                    let encoded_bucket = encode_value(&opts.bucket_name)?;
175                    format!(
176                        "(keyspace_id={encoded_bucket} AND bucket_id IS MISSING) OR bucket_id={encoded_bucket}"
177                    )
178                } else {
179                    "1=1".to_string()
180                }
181            }
182            (Some(collection_name), Some(scope_name)) => {
183                let scope_name = normalise_default_name(scope_name);
184                let collection_name = normalise_default_name(collection_name);
185
186                let encoded_bucket = encode_value(&opts.bucket_name)?;
187                let encoded_scope = encode_value(&scope_name)?;
188                let encoded_collection = encode_value(&collection_name)?;
189
190                let temp_where = format!(
191                    "bucket_id={encoded_bucket} AND scope_id={encoded_scope} AND keyspace_id={encoded_collection}"
192                );
193
194                if scope_name == "_default" && collection_name == "_default" {
195                    format!(
196                        "({temp_where}) OR (keyspace_id={encoded_bucket} AND bucket_id IS MISSING)"
197                    )
198                } else {
199                    temp_where
200                }
201            }
202            (None, Some(scope_name)) => {
203                let scope_name = normalise_default_name(scope_name);
204
205                let encoded_bucket = encode_value(&opts.bucket_name)?;
206                let encoded_scope = encode_value(&scope_name)?;
207
208                format!("bucket_id={encoded_bucket} AND scope_id={encoded_scope}")
209            }
210            _ => {
211                return Err(Error::new_invalid_argument_error(
212                    "invalid combination of bucket, scope and collection".to_string(),
213                    None,
214                ));
215            }
216        };
217
218        where_clause = format!("({where_clause}) AND `using`=\"gsi\"");
219        let qs = format!(
220            "SELECT `idx`.* FROM system:indexes AS idx WHERE {where_clause} ORDER BY is_primary DESC, name ASC"
221        );
222
223        let opts = QueryOptions::new()
224            .statement(qs)
225            .on_behalf_of(opts.on_behalf_of.cloned());
226        let mut res = self.query(&opts).await?;
227
228        let mut indexes = vec![];
229
230        while let Some(row) = res.next().await {
231            let bytes = row?;
232            let index: Index = serde_json::from_slice(&bytes).map_err(|e| {
233                Error::new_message_error(
234                    format!("failed to parse index from response: {e}"),
235                    None,
236                    None,
237                    None,
238                )
239            })?;
240
241            indexes.push(index);
242        }
243
244        Ok(indexes)
245    }
246
247    pub async fn create_primary_index(
248        &self,
249        opts: &CreatePrimaryIndexOptions<'_>,
250    ) -> error::Result<()> {
251        // TODO (MW) - Maybe add IF NOT EXISTS & amend error handling if we don't need backwards compat with <=7.0
252        let mut qs = String::from("CREATE PRIMARY INDEX");
253        if let Some(index_name) = &opts.index_name {
254            qs.push_str(&format!(" {}", encode_identifier(index_name)));
255        }
256
257        qs.push_str(&format!(
258            " ON {}",
259            build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
260        ));
261
262        let mut with: HashMap<&str, Value> = HashMap::new();
263
264        if let Some(deferred) = opts.deferred {
265            with.insert("defer_build", Value::Bool(deferred));
266        }
267
268        if let Some(num_replicas) = opts.num_replicas {
269            with.insert("num_replica", Value::Number(num_replicas.into()));
270        }
271
272        if !with.is_empty() {
273            let with = encode_value(&with)?;
274            qs.push_str(&format!(" WITH {with}"));
275        }
276
277        let query_opts = QueryOptions::new()
278            .statement(qs)
279            .on_behalf_of(opts.on_behalf_of.cloned());
280
281        let mut res = self.query(&query_opts).await;
282
283        match res {
284            Err(e) => {
285                if e.is_index_exists() {
286                    if opts.ignore_if_exists.unwrap_or(false) {
287                        Ok(())
288                    } else {
289                        Err(e)
290                    }
291                } else if e.is_build_already_in_progress() {
292                    Ok(())
293                } else {
294                    Err(e)
295                }
296            }
297            Ok(_) => Ok(()),
298        }
299    }
300
301    pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
302        let mut qs = String::from("CREATE INDEX");
303        qs.push_str(&format!(" {}", encode_identifier(opts.index_name)));
304        qs.push_str(&format!(
305            " ON {}",
306            build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
307        ));
308
309        let mut encoded_fields: Vec<String> = Vec::with_capacity(opts.fields.len());
310        for field in opts.fields {
311            encoded_fields.push(encode_identifier(field));
312        }
313        qs.push_str(&format!(" ( {})", encoded_fields.join(",")));
314
315        let mut with: HashMap<&str, Value> = HashMap::new();
316
317        if let Some(deferred) = opts.deferred {
318            with.insert("defer_build", Value::Bool(deferred));
319        }
320
321        if let Some(num_replicas) = opts.num_replicas {
322            with.insert("num_replica", Value::Number(num_replicas.into()));
323        }
324
325        if !with.is_empty() {
326            let with = encode_value(&with)?;
327            qs.push_str(&format!(" WITH {with}"));
328        }
329
330        let query_opts = QueryOptions::new()
331            .statement(qs)
332            .on_behalf_of(opts.on_behalf_of.cloned());
333
334        let mut res = self.query(&query_opts).await;
335
336        match res {
337            Err(e) => {
338                if e.is_index_exists() {
339                    if opts.ignore_if_exists.unwrap_or(false) {
340                        Ok(())
341                    } else {
342                        Err(e)
343                    }
344                } else if e.is_build_already_in_progress() {
345                    Ok(())
346                } else {
347                    Err(e)
348                }
349            }
350            Ok(_) => Ok(()),
351        }
352    }
353
354    pub async fn drop_primary_index(
355        &self,
356        opts: &DropPrimaryIndexOptions<'_>,
357    ) -> error::Result<()> {
358        // TODO (MW) - Maybe add IF EXISTS & amend error handling if we don't need backwards compat with <=7.0
359        let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
360
361        let mut qs = String::new();
362        if let Some(index_name) = &opts.index_name {
363            let encoded_name = encode_identifier(index_name);
364
365            if opts.scope_name.is_some() || opts.collection_name.is_some() {
366                qs.push_str(&format!("DROP INDEX {encoded_name}"));
367                qs.push_str(&format!(" ON {keyspace}"));
368            } else {
369                qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
370            }
371        } else {
372            qs.push_str(&format!("DROP PRIMARY INDEX ON {keyspace}"));
373        }
374
375        let query_opts = QueryOptions::new()
376            .statement(qs)
377            .on_behalf_of(opts.on_behalf_of.cloned());
378
379        let mut res = self.query(&query_opts).await;
380
381        match res {
382            Err(e) => {
383                if e.is_index_not_found() {
384                    if opts.ignore_if_not_exists.unwrap_or(false) {
385                        Ok(())
386                    } else {
387                        Err(e)
388                    }
389                } else {
390                    Err(e)
391                }
392            }
393            Ok(_) => Ok(()),
394        }
395    }
396
397    pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
398        let encoded_name = encode_identifier(opts.index_name);
399        let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
400
401        let mut qs = String::new();
402        if opts.scope_name.is_some() || opts.collection_name.is_some() {
403            qs.push_str(&format!("DROP INDEX {encoded_name}"));
404            qs.push_str(&format!(" ON {keyspace}"));
405        } else {
406            qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
407        }
408
409        let query_opts = QueryOptions::new()
410            .statement(qs)
411            .on_behalf_of(opts.on_behalf_of.cloned());
412
413        let res = self.query(&query_opts).await;
414
415        match res {
416            Err(e) => {
417                if e.is_index_not_found() {
418                    if opts.ignore_if_not_exists.unwrap_or(false) {
419                        Ok(())
420                    } else {
421                        Err(e)
422                    }
423                } else {
424                    Err(e)
425                }
426            }
427            Ok(_) => Ok(()),
428        }
429    }
430
431    pub async fn build_deferred_indexes(
432        &self,
433        opts: &BuildDeferredIndexesOptions<'_>,
434    ) -> error::Result<()> {
435        let opts = GetAllIndexesOptions {
436            bucket_name: opts.bucket_name,
437            scope_name: opts.scope_name,
438            collection_name: opts.collection_name,
439            on_behalf_of: opts.on_behalf_of,
440        };
441
442        let indexes = self.get_all_indexes(&opts).await?;
443
444        let deferred_items: Vec<_> = indexes
445            .iter()
446            .filter(|index| index.state == "deferred")
447            .map(|index| {
448                let (bucket, scope, collection) = index_to_namespace_parts(index);
449                let deferred_index = DeferredIndexName {
450                    bucket_name: bucket,
451                    scope_name: scope,
452                    collection_name: collection,
453                    index_name: &index.name,
454                };
455                let keyspace = build_keyspace(bucket, &Some(scope), &Some(collection));
456                (keyspace, deferred_index)
457            })
458            .collect();
459
460        let mut deferred_indexes: HashMap<String, Vec<DeferredIndexName>> =
461            HashMap::with_capacity(deferred_items.len());
462
463        for (keyspace, deferred_index) in deferred_items {
464            deferred_indexes
465                .entry(keyspace)
466                .or_default()
467                .push(deferred_index);
468        }
469
470        if deferred_indexes.is_empty() {
471            return Ok(());
472        }
473
474        for (keyspace, indexes) in deferred_indexes {
475            let mut escaped_index_names: Vec<String> = Vec::with_capacity(indexes.len());
476            for index in indexes {
477                escaped_index_names.push(encode_identifier(index.index_name));
478            }
479
480            let qs = format!(
481                "BUILD INDEX ON {}({})",
482                keyspace,
483                escaped_index_names.join(",")
484            );
485            let query_opts = QueryOptions::new()
486                .statement(qs)
487                .on_behalf_of(opts.on_behalf_of.cloned());
488
489            let res = self.query(&query_opts).await;
490
491            match res {
492                Err(e) => {
493                    if e.is_build_already_in_progress() {
494                        continue;
495                    }
496
497                    return Err(e);
498                }
499                Ok(_) => continue,
500            }
501        }
502
503        Ok(())
504    }
505
506    pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
507        let mut cur_interval = Duration::from_millis(50);
508        let mut watch_list = opts.indexes.to_vec();
509
510        if opts.watch_primary.unwrap_or(false) {
511            watch_list.push("#primary");
512        }
513
514        loop {
515            let indexes = self
516                .get_all_indexes(&GetAllIndexesOptions {
517                    bucket_name: opts.bucket_name,
518                    scope_name: opts.scope_name,
519                    collection_name: opts.collection_name,
520                    on_behalf_of: opts.on_behalf_of,
521                })
522                .await?;
523
524            let all_online = check_indexes_active(&indexes, &watch_list)?;
525
526            if all_online {
527                debug!("All watched indexes are ready");
528                return Ok(());
529            }
530
531            cur_interval = std::cmp::min(
532                cur_interval + Duration::from_millis(500),
533                Duration::from_secs(1),
534            );
535
536            sleep(cur_interval).await;
537        }
538    }
539
540    pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
541        let res = match self
542            .execute(
543                Method::GET,
544                "admin/ping",
545                "",
546                opts.on_behalf_of.cloned(),
547                None,
548            )
549            .await
550        {
551            Ok(r) => r,
552            Err(e) => {
553                return Err(Error::new_http_error(
554                    format!("{}: {}", &self.endpoint, e),
555                    None,
556                    None,
557                ));
558            }
559        };
560
561        if res.status().is_success() {
562            return Ok(());
563        }
564
565        Err(Error::new_message_error(
566            format!("ping failed with status code: {}", res.status()),
567            Some(self.endpoint.clone()),
568            None,
569            None,
570        ))
571    }
572}
573
574struct DeferredIndexName<'a> {
575    bucket_name: &'a str,
576    scope_name: &'a str,
577    collection_name: &'a str,
578    index_name: &'a str,
579}
580
581pub fn normalise_default_name(name: &str) -> String {
582    if name.is_empty() {
583        "_default".to_string()
584    } else {
585        name.to_string()
586    }
587}
588
589pub fn build_keyspace(
590    bucket_name: &str,
591    scope_name: &Option<&str>,
592    collection_name: &Option<&str>,
593) -> String {
594    match (scope_name, collection_name) {
595        (Some(scope), Some(collection)) => format!(
596            "{}.{}.{}",
597            encode_identifier(bucket_name),
598            encode_identifier(scope),
599            encode_identifier(collection)
600        ),
601        (Some(scope), None) => format!(
602            "{}.{}._default",
603            encode_identifier(bucket_name),
604            encode_identifier(scope)
605        ),
606        (None, Some(collection)) => format!(
607            "{}._default.{}",
608            encode_identifier(bucket_name),
609            encode_identifier(collection)
610        ),
611        (None, None) => encode_identifier(bucket_name),
612    }
613}
614
615fn index_to_namespace_parts(index: &Index) -> (&str, &str, &str) {
616    if index.bucket_id.is_none() {
617        let default_scope_coll = "_default";
618        (
619            index.keyspace_id.as_deref().unwrap_or(""),
620            default_scope_coll,
621            default_scope_coll,
622        )
623    } else {
624        (
625            index.bucket_id.as_deref().unwrap_or(""),
626            index.scope_id.as_deref().unwrap_or(""),
627            index.keyspace_id.as_deref().unwrap_or(""),
628        )
629    }
630}
631
632fn check_indexes_active(indexes: &[Index], check_list: &Vec<&str>) -> error::Result<bool> {
633    let mut check_indexes = Vec::new();
634
635    for index_name in check_list {
636        if let Some(index) = indexes.iter().find(|idx| idx.name == *index_name) {
637            check_indexes.push(index);
638        } else {
639            return Ok(false);
640        }
641    }
642
643    for index in check_indexes {
644        if index.state != "online" {
645            debug!(
646                "Index {} is not ready yet, current state is {}",
647                index.name, index.state
648            );
649            return Ok(false);
650        }
651    }
652
653    Ok(true)
654}
655
656fn encode_identifier(identifier: &str) -> String {
657    let mut out = identifier.replace("\\", "\\\\");
658    out = out.replace("`", "\\`");
659    format!("`{out}`")
660}
661
662fn encode_value<T: serde::Serialize>(value: &T) -> error::Result<String> {
663    let bytes = serde_json::to_string(value).map_err(|e| {
664        Error::new_message_error(format!("failed to encode value: {e}"), None, None, None)
665    })?;
666    Ok(bytes)
667}