Skip to main content

couchbase_core/queryx/
query.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::httpx::client::Client;
20use crate::httpx::request::{Auth, BasicAuth, OnBehalfOfInfo, Request};
21use crate::httpx::response::Response;
22use crate::queryx::error;
23use crate::queryx::error::{Error, ErrorKind, ServerError, ServerErrorKind};
24use crate::queryx::index::Index;
25use crate::queryx::query_options::{
26    BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
27    DropPrimaryIndexOptions, GetAllIndexesOptions, PingOptions, QueryOptions, WatchIndexesOptions,
28};
29use crate::queryx::query_respreader::QueryRespReader;
30use crate::retry::RetryStrategy;
31use crate::tracingcomponent::{
32    BeginDispatchFields, EndDispatchFields, OperationId, TracingComponent,
33};
34use crate::tracingcomponent::{
35    SERVICE_VALUE_QUERY, SPAN_ATTRIB_DB_SYSTEM_VALUE, SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
36};
37use crate::util::get_host_port_tuple_from_uri;
38use bytes::Bytes;
39use futures::StreamExt;
40use http::{Method, StatusCode};
41use serde::Deserialize;
42use serde_json::Value;
43use std::collections::HashMap;
44use std::fmt::format;
45use std::sync::Arc;
46use std::time::{Duration, Instant};
47use tokio::time::sleep;
48use tracing::debug;
49use tracing::{instrument, Instrument, Level, Span};
50use uuid::Uuid;
51
52#[derive(Debug)]
53pub struct Query<C: Client> {
54    pub http_client: Arc<C>,
55    pub user_agent: String,
56    pub endpoint: String,
57    pub canonical_endpoint: String,
58    pub auth: Auth,
59    pub(crate) tracing: Arc<TracingComponent>,
60}
61
62impl<C: Client> Query<C> {
63    pub fn new_request(
64        &self,
65        method: Method,
66        path: impl Into<String>,
67        content_type: impl Into<String>,
68        on_behalf_of: Option<OnBehalfOfInfo>,
69        body: Option<Bytes>,
70    ) -> Request {
71        let auth = if let Some(obo) = on_behalf_of {
72            Auth::OnBehalfOf(OnBehalfOfInfo {
73                username: obo.username,
74                password_or_domain: obo.password_or_domain,
75            })
76        } else {
77            self.auth.clone()
78        };
79
80        Request::new(method, format!("{}/{}", self.endpoint, path.into()))
81            .auth(auth)
82            .user_agent(self.user_agent.clone())
83            .content_type(content_type.into())
84            .body(body)
85    }
86
87    pub async fn execute(
88        &self,
89        method: Method,
90        path: impl Into<String>,
91        content_type: impl Into<String>,
92        on_behalf_of: Option<OnBehalfOfInfo>,
93        body: Option<Bytes>,
94    ) -> crate::httpx::error::Result<Response> {
95        let req = self.new_request(method, path, content_type, on_behalf_of, body);
96
97        self.http_client.execute(req).await
98    }
99
100    pub async fn query(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
101        let statement = if let Some(statement) = &opts.statement {
102            statement.clone()
103        } else {
104            String::new()
105        };
106
107        //TODO; this needs re-embedding into options
108        let client_context_id = if let Some(id) = &opts.client_context_id {
109            id.clone()
110        } else {
111            Uuid::new_v4().to_string()
112        };
113
114        let on_behalf_of = opts.on_behalf_of.clone();
115
116        let mut serialized = serde_json::to_value(opts)
117            .map_err(|e| Error::new_encoding_error(format!("failed to encode options: {e}")))?;
118
119        let mut obj = serialized.as_object_mut().unwrap();
120        let mut client_context_id_entry = obj.get("client_context_id");
121        if client_context_id_entry.is_none() {
122            obj.insert(
123                "client_context_id".to_string(),
124                Value::String(client_context_id.clone()),
125            );
126        }
127
128        if let Some(named_args) = &opts.named_args {
129            for (k, v) in named_args.iter() {
130                let key = if k.starts_with("$") {
131                    k.clone()
132                } else {
133                    format!("${k}")
134                };
135                obj.insert(key, v.clone());
136            }
137        }
138
139        if let Some(raw) = &opts.raw {
140            for (k, v) in raw.iter() {
141                obj.insert(k.to_string(), v.clone());
142            }
143        }
144
145        let body =
146            Bytes::from(serde_json::to_vec(&serialized).map_err(|e| {
147                Error::new_encoding_error(format!("failed to encode options: {e}"))
148            })?);
149
150        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
151        let canonical_addr =
152            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
153        let res = self
154            .tracing
155            .orchestrate_dispatch_span(
156                BeginDispatchFields::new(
157                    (&peer_addr.0, &peer_addr.1),
158                    (&canonical_addr.0, &canonical_addr.1),
159                    None,
160                ),
161                self.execute(
162                    Method::POST,
163                    "query/service",
164                    "application/json",
165                    on_behalf_of,
166                    Some(body),
167                ),
168                |_| {
169                    EndDispatchFields::new(
170                        None,
171                        Some(OperationId::String(client_context_id.clone())),
172                    )
173                },
174            )
175            .await;
176
177        let res = match res {
178            Ok(r) => r,
179            Err(e) => {
180                return Err(Error::new_http_error(
181                    e,
182                    self.endpoint.to_string(),
183                    statement,
184                    client_context_id,
185                ));
186            }
187        };
188
189        QueryRespReader::new(res, &self.endpoint, statement, client_context_id).await
190    }
191
192    #[instrument(
193    skip_all,
194    level = Level::TRACE,
195    name = "query",
196    fields(
197    otel.kind = SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
198    db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
199    couchbase.service = SERVICE_VALUE_QUERY,
200    db.query.text = opts.statement.as_deref().unwrap_or(""),
201    couchbase.cluster.name,
202    couchbase.cluster.uuid,
203    ))]
204    async fn query_with_span(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
205        self.tracing.record_cluster_labels(&Span::current());
206        self.query(opts).await
207    }
208
209    pub async fn get_all_indexes(
210        &self,
211        opts: &GetAllIndexesOptions<'_>,
212    ) -> error::Result<Vec<Index>> {
213        let mut where_clause = match (&opts.collection_name, &opts.scope_name) {
214            (None, None) => {
215                if !opts.bucket_name.is_empty() {
216                    let encoded_bucket = encode_value(&opts.bucket_name)?;
217                    format!(
218                        "(keyspace_id={encoded_bucket} AND bucket_id IS MISSING) OR bucket_id={encoded_bucket}"
219                    )
220                } else {
221                    "1=1".to_string()
222                }
223            }
224            (Some(collection_name), Some(scope_name)) => {
225                let scope_name = normalise_default_name(scope_name);
226                let collection_name = normalise_default_name(collection_name);
227
228                let encoded_bucket = encode_value(&opts.bucket_name)?;
229                let encoded_scope = encode_value(&scope_name)?;
230                let encoded_collection = encode_value(&collection_name)?;
231
232                let temp_where = format!(
233                    "bucket_id={encoded_bucket} AND scope_id={encoded_scope} AND keyspace_id={encoded_collection}"
234                );
235
236                if scope_name == "_default" && collection_name == "_default" {
237                    format!(
238                        "({temp_where}) OR (keyspace_id={encoded_bucket} AND bucket_id IS MISSING)"
239                    )
240                } else {
241                    temp_where
242                }
243            }
244            (None, Some(scope_name)) => {
245                let scope_name = normalise_default_name(scope_name);
246
247                let encoded_bucket = encode_value(&opts.bucket_name)?;
248                let encoded_scope = encode_value(&scope_name)?;
249
250                format!("bucket_id={encoded_bucket} AND scope_id={encoded_scope}")
251            }
252            _ => {
253                return Err(Error::new_invalid_argument_error(
254                    "invalid combination of bucket, scope and collection".to_string(),
255                    None,
256                ));
257            }
258        };
259
260        where_clause = format!("({where_clause}) AND `using`=\"gsi\"");
261        let qs = format!(
262            "SELECT `idx`.* FROM system:indexes AS idx WHERE {where_clause} ORDER BY is_primary DESC, name ASC"
263        );
264
265        let opts = QueryOptions::new()
266            .statement(qs)
267            .on_behalf_of(opts.on_behalf_of.cloned());
268        let mut res = self.query_with_span(&opts).await?;
269
270        let mut indexes = vec![];
271
272        while let Some(row) = res.next().await {
273            let bytes = row?;
274            let index: Index = serde_json::from_slice(&bytes).map_err(|e| {
275                Error::new_message_error(
276                    format!("failed to parse index from response: {e}"),
277                    None,
278                    None,
279                    None,
280                )
281            })?;
282
283            indexes.push(index);
284        }
285
286        Ok(indexes)
287    }
288
289    pub async fn create_primary_index(
290        &self,
291        opts: &CreatePrimaryIndexOptions<'_>,
292    ) -> error::Result<()> {
293        // TODO (MW) - Maybe add IF NOT EXISTS & amend error handling if we don't need backwards compat with <=7.0
294        let mut qs = String::from("CREATE PRIMARY INDEX");
295        if let Some(index_name) = &opts.index_name {
296            qs.push_str(&format!(" {}", encode_identifier(index_name)));
297        }
298
299        qs.push_str(&format!(
300            " ON {}",
301            build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
302        ));
303
304        let mut with: HashMap<&str, Value> = HashMap::new();
305
306        if let Some(deferred) = opts.deferred {
307            with.insert("defer_build", Value::Bool(deferred));
308        }
309
310        if let Some(num_replicas) = opts.num_replicas {
311            with.insert("num_replica", Value::Number(num_replicas.into()));
312        }
313
314        if !with.is_empty() {
315            let with = encode_value(&with)?;
316            qs.push_str(&format!(" WITH {with}"));
317        }
318
319        let query_opts = QueryOptions::new()
320            .statement(qs)
321            .on_behalf_of(opts.on_behalf_of.cloned());
322
323        let mut res = self.query_with_span(&query_opts).await;
324
325        match res {
326            Err(e) => {
327                if e.is_index_exists() {
328                    if opts.ignore_if_exists.unwrap_or(false) {
329                        Ok(())
330                    } else {
331                        Err(e)
332                    }
333                } else if e.is_build_already_in_progress() {
334                    Ok(())
335                } else {
336                    Err(e)
337                }
338            }
339            Ok(_) => Ok(()),
340        }
341    }
342
343    pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
344        let mut qs = String::from("CREATE INDEX");
345        qs.push_str(&format!(" {}", encode_identifier(opts.index_name)));
346        qs.push_str(&format!(
347            " ON {}",
348            build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
349        ));
350
351        let mut encoded_fields: Vec<String> = Vec::with_capacity(opts.fields.len());
352        for field in opts.fields {
353            encoded_fields.push(encode_identifier(field));
354        }
355        qs.push_str(&format!(" ( {})", encoded_fields.join(",")));
356
357        let mut with: HashMap<&str, Value> = HashMap::new();
358
359        if let Some(deferred) = opts.deferred {
360            with.insert("defer_build", Value::Bool(deferred));
361        }
362
363        if let Some(num_replicas) = opts.num_replicas {
364            with.insert("num_replica", Value::Number(num_replicas.into()));
365        }
366
367        if !with.is_empty() {
368            let with = encode_value(&with)?;
369            qs.push_str(&format!(" WITH {with}"));
370        }
371
372        let query_opts = QueryOptions::new()
373            .statement(qs)
374            .on_behalf_of(opts.on_behalf_of.cloned());
375
376        let mut res = self.query_with_span(&query_opts).await;
377
378        match res {
379            Err(e) => {
380                if e.is_index_exists() {
381                    if opts.ignore_if_exists.unwrap_or(false) {
382                        Ok(())
383                    } else {
384                        Err(e)
385                    }
386                } else if e.is_build_already_in_progress() {
387                    Ok(())
388                } else {
389                    Err(e)
390                }
391            }
392            Ok(_) => Ok(()),
393        }
394    }
395
396    pub async fn drop_primary_index(
397        &self,
398        opts: &DropPrimaryIndexOptions<'_>,
399    ) -> error::Result<()> {
400        // TODO (MW) - Maybe add IF EXISTS & amend error handling if we don't need backwards compat with <=7.0
401        let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
402
403        let mut qs = String::new();
404        if let Some(index_name) = &opts.index_name {
405            let encoded_name = encode_identifier(index_name);
406
407            if opts.scope_name.is_some() || opts.collection_name.is_some() {
408                qs.push_str(&format!("DROP INDEX {encoded_name}"));
409                qs.push_str(&format!(" ON {keyspace}"));
410            } else {
411                qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
412            }
413        } else {
414            qs.push_str(&format!("DROP PRIMARY INDEX ON {keyspace}"));
415        }
416
417        let query_opts = QueryOptions::new()
418            .statement(qs)
419            .on_behalf_of(opts.on_behalf_of.cloned());
420
421        let mut res = self.query_with_span(&query_opts).await;
422
423        match res {
424            Err(e) => {
425                if e.is_index_not_found() {
426                    if opts.ignore_if_not_exists.unwrap_or(false) {
427                        Ok(())
428                    } else {
429                        Err(e)
430                    }
431                } else {
432                    Err(e)
433                }
434            }
435            Ok(_) => Ok(()),
436        }
437    }
438
439    pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
440        let encoded_name = encode_identifier(opts.index_name);
441        let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
442
443        let mut qs = String::new();
444        if opts.scope_name.is_some() || opts.collection_name.is_some() {
445            qs.push_str(&format!("DROP INDEX {encoded_name}"));
446            qs.push_str(&format!(" ON {keyspace}"));
447        } else {
448            qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
449        }
450
451        let query_opts = QueryOptions::new()
452            .statement(qs)
453            .on_behalf_of(opts.on_behalf_of.cloned());
454
455        let res = self.query_with_span(&query_opts).await;
456
457        match res {
458            Err(e) => {
459                if e.is_index_not_found() {
460                    if opts.ignore_if_not_exists.unwrap_or(false) {
461                        Ok(())
462                    } else {
463                        Err(e)
464                    }
465                } else {
466                    Err(e)
467                }
468            }
469            Ok(_) => Ok(()),
470        }
471    }
472
473    pub async fn build_deferred_indexes(
474        &self,
475        opts: &BuildDeferredIndexesOptions<'_>,
476    ) -> error::Result<()> {
477        let opts = GetAllIndexesOptions {
478            bucket_name: opts.bucket_name,
479            scope_name: opts.scope_name,
480            collection_name: opts.collection_name,
481            on_behalf_of: opts.on_behalf_of,
482        };
483
484        let indexes = self.get_all_indexes(&opts).await?;
485
486        let deferred_items: Vec<_> = indexes
487            .iter()
488            .filter(|index| index.state == "deferred")
489            .map(|index| {
490                let (bucket, scope, collection) = index_to_namespace_parts(index);
491                let deferred_index = DeferredIndexName {
492                    bucket_name: bucket,
493                    scope_name: scope,
494                    collection_name: collection,
495                    index_name: &index.name,
496                };
497                let keyspace = build_keyspace(bucket, &Some(scope), &Some(collection));
498                (keyspace, deferred_index)
499            })
500            .collect();
501
502        let mut deferred_indexes: HashMap<String, Vec<DeferredIndexName>> =
503            HashMap::with_capacity(deferred_items.len());
504
505        for (keyspace, deferred_index) in deferred_items {
506            deferred_indexes
507                .entry(keyspace)
508                .or_default()
509                .push(deferred_index);
510        }
511
512        if deferred_indexes.is_empty() {
513            return Ok(());
514        }
515
516        for (keyspace, indexes) in deferred_indexes {
517            let mut escaped_index_names: Vec<String> = Vec::with_capacity(indexes.len());
518            for index in indexes {
519                escaped_index_names.push(encode_identifier(index.index_name));
520            }
521
522            let qs = format!(
523                "BUILD INDEX ON {}({})",
524                keyspace,
525                escaped_index_names.join(",")
526            );
527            let query_opts = QueryOptions::new()
528                .statement(qs)
529                .on_behalf_of(opts.on_behalf_of.cloned());
530
531            let res = self.query_with_span(&query_opts).await;
532
533            match res {
534                Err(e) => {
535                    if e.is_build_already_in_progress() {
536                        continue;
537                    }
538
539                    return Err(e);
540                }
541                Ok(_) => continue,
542            }
543        }
544
545        Ok(())
546    }
547
548    pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
549        let mut cur_interval = Duration::from_millis(50);
550        let mut watch_list = opts.indexes.to_vec();
551
552        if opts.watch_primary.unwrap_or(false) {
553            watch_list.push("#primary");
554        }
555
556        loop {
557            let indexes = self
558                .get_all_indexes(&GetAllIndexesOptions {
559                    bucket_name: opts.bucket_name,
560                    scope_name: opts.scope_name,
561                    collection_name: opts.collection_name,
562                    on_behalf_of: opts.on_behalf_of,
563                })
564                .await?;
565
566            let all_online = check_indexes_active(&indexes, &watch_list)?;
567
568            if all_online {
569                debug!("All watched indexes are ready");
570                return Ok(());
571            }
572
573            cur_interval = std::cmp::min(
574                cur_interval + Duration::from_millis(500),
575                Duration::from_secs(1),
576            );
577
578            sleep(cur_interval).await;
579        }
580    }
581
582    pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
583        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
584        let canonical_addr =
585            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
586        let res = match self
587            .tracing
588            .orchestrate_dispatch_span(
589                BeginDispatchFields::new(
590                    (&peer_addr.0, &peer_addr.1),
591                    (&canonical_addr.0, &canonical_addr.1),
592                    None,
593                ),
594                self.execute(
595                    Method::GET,
596                    "admin/ping",
597                    "",
598                    opts.on_behalf_of.cloned(),
599                    None,
600                ),
601                |_| EndDispatchFields::new(None, None),
602            )
603            .await
604        {
605            Ok(r) => r,
606            Err(e) => {
607                return Err(Error::new_http_error(
608                    e,
609                    self.endpoint.to_string(),
610                    None,
611                    None,
612                ));
613            }
614        };
615
616        if res.status().is_success() {
617            return Ok(());
618        }
619
620        Err(Error::new_message_error(
621            format!("ping failed with status code: {}", res.status()),
622            Some(self.endpoint.clone()),
623            None,
624            None,
625        ))
626    }
627}
628
629struct DeferredIndexName<'a> {
630    bucket_name: &'a str,
631    scope_name: &'a str,
632    collection_name: &'a str,
633    index_name: &'a str,
634}
635
636pub fn normalise_default_name(name: &str) -> String {
637    if name.is_empty() {
638        "_default".to_string()
639    } else {
640        name.to_string()
641    }
642}
643
644pub fn build_keyspace(
645    bucket_name: &str,
646    scope_name: &Option<&str>,
647    collection_name: &Option<&str>,
648) -> String {
649    match (scope_name, collection_name) {
650        (Some(scope), Some(collection)) => format!(
651            "{}.{}.{}",
652            encode_identifier(bucket_name),
653            encode_identifier(scope),
654            encode_identifier(collection)
655        ),
656        (Some(scope), None) => format!(
657            "{}.{}._default",
658            encode_identifier(bucket_name),
659            encode_identifier(scope)
660        ),
661        (None, Some(collection)) => format!(
662            "{}._default.{}",
663            encode_identifier(bucket_name),
664            encode_identifier(collection)
665        ),
666        (None, None) => encode_identifier(bucket_name),
667    }
668}
669
670fn index_to_namespace_parts(index: &Index) -> (&str, &str, &str) {
671    if index.bucket_id.is_none() {
672        let default_scope_coll = "_default";
673        (
674            index.keyspace_id.as_deref().unwrap_or(""),
675            default_scope_coll,
676            default_scope_coll,
677        )
678    } else {
679        (
680            index.bucket_id.as_deref().unwrap_or(""),
681            index.scope_id.as_deref().unwrap_or(""),
682            index.keyspace_id.as_deref().unwrap_or(""),
683        )
684    }
685}
686
687fn check_indexes_active(indexes: &[Index], check_list: &Vec<&str>) -> error::Result<bool> {
688    let mut check_indexes = Vec::new();
689
690    for index_name in check_list {
691        if let Some(index) = indexes.iter().find(|idx| idx.name == *index_name) {
692            check_indexes.push(index);
693        } else {
694            return Ok(false);
695        }
696    }
697
698    for index in check_indexes {
699        if index.state != "online" {
700            debug!(
701                "Index {} is not ready yet, current state is {}",
702                index.name, index.state
703            );
704            return Ok(false);
705        }
706    }
707
708    Ok(true)
709}
710
711fn encode_identifier(identifier: &str) -> String {
712    let mut out = identifier.replace("\\", "\\\\");
713    out = out.replace("`", "\\`");
714    format!("`{out}`")
715}
716
717fn encode_value<T: serde::Serialize>(value: &T) -> error::Result<String> {
718    let bytes = serde_json::to_string(value).map_err(|e| {
719        Error::new_message_error(format!("failed to encode value: {e}"), None, None, None)
720    })?;
721    Ok(bytes)
722}