1use crate::httpx::client::Client;
20use crate::httpx::request::{Auth, BasicAuth, OnBehalfOfInfo, Request};
21use crate::httpx::response::Response;
22use crate::queryx::error;
23use crate::queryx::error::{Error, ErrorKind, ServerError, ServerErrorKind};
24use crate::queryx::index::Index;
25use crate::queryx::query_options::{
26 BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
27 DropPrimaryIndexOptions, GetAllIndexesOptions, PingOptions, QueryOptions, WatchIndexesOptions,
28};
29use crate::queryx::query_respreader::QueryRespReader;
30use crate::retry::RetryStrategy;
31use crate::tracingcomponent::{
32 BeginDispatchFields, EndDispatchFields, OperationId, TracingComponent,
33};
34use crate::tracingcomponent::{
35 SERVICE_VALUE_QUERY, SPAN_ATTRIB_DB_SYSTEM_VALUE, SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
36};
37use crate::util::get_host_port_tuple_from_uri;
38use bytes::Bytes;
39use futures::StreamExt;
40use http::{Method, StatusCode};
41use serde::Deserialize;
42use serde_json::Value;
43use std::collections::HashMap;
44use std::fmt::format;
45use std::sync::Arc;
46use std::time::{Duration, Instant};
47use tokio::time::sleep;
48use tracing::debug;
49use tracing::{instrument, Instrument, Level, Span};
50use uuid::Uuid;
51
52#[derive(Debug)]
53pub struct Query<C: Client> {
54 pub http_client: Arc<C>,
55 pub user_agent: String,
56 pub endpoint: String,
57 pub canonical_endpoint: String,
58 pub auth: Auth,
59 pub(crate) tracing: Arc<TracingComponent>,
60}
61
62impl<C: Client> Query<C> {
63 pub fn new_request(
64 &self,
65 method: Method,
66 path: impl Into<String>,
67 content_type: impl Into<String>,
68 on_behalf_of: Option<OnBehalfOfInfo>,
69 body: Option<Bytes>,
70 ) -> Request {
71 let auth = if let Some(obo) = on_behalf_of {
72 Auth::OnBehalfOf(OnBehalfOfInfo {
73 username: obo.username,
74 password_or_domain: obo.password_or_domain,
75 })
76 } else {
77 self.auth.clone()
78 };
79
80 Request::new(method, format!("{}/{}", self.endpoint, path.into()))
81 .auth(auth)
82 .user_agent(self.user_agent.clone())
83 .content_type(content_type.into())
84 .body(body)
85 }
86
87 pub async fn execute(
88 &self,
89 method: Method,
90 path: impl Into<String>,
91 content_type: impl Into<String>,
92 on_behalf_of: Option<OnBehalfOfInfo>,
93 body: Option<Bytes>,
94 ) -> crate::httpx::error::Result<Response> {
95 let req = self.new_request(method, path, content_type, on_behalf_of, body);
96
97 self.http_client.execute(req).await
98 }
99
100 pub async fn query(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
101 let statement = if let Some(statement) = &opts.statement {
102 statement.clone()
103 } else {
104 String::new()
105 };
106
107 let client_context_id = if let Some(id) = &opts.client_context_id {
109 id.clone()
110 } else {
111 Uuid::new_v4().to_string()
112 };
113
114 let on_behalf_of = opts.on_behalf_of.clone();
115
116 let mut serialized = serde_json::to_value(opts)
117 .map_err(|e| Error::new_encoding_error(format!("failed to encode options: {e}")))?;
118
119 let mut obj = serialized.as_object_mut().unwrap();
120 let mut client_context_id_entry = obj.get("client_context_id");
121 if client_context_id_entry.is_none() {
122 obj.insert(
123 "client_context_id".to_string(),
124 Value::String(client_context_id.clone()),
125 );
126 }
127
128 if let Some(named_args) = &opts.named_args {
129 for (k, v) in named_args.iter() {
130 let key = if k.starts_with("$") {
131 k.clone()
132 } else {
133 format!("${k}")
134 };
135 obj.insert(key, v.clone());
136 }
137 }
138
139 if let Some(raw) = &opts.raw {
140 for (k, v) in raw.iter() {
141 obj.insert(k.to_string(), v.clone());
142 }
143 }
144
145 let body =
146 Bytes::from(serde_json::to_vec(&serialized).map_err(|e| {
147 Error::new_encoding_error(format!("failed to encode options: {e}"))
148 })?);
149
150 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
151 let canonical_addr =
152 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
153 let res = self
154 .tracing
155 .orchestrate_dispatch_span(
156 BeginDispatchFields::new(
157 (&peer_addr.0, &peer_addr.1),
158 (&canonical_addr.0, &canonical_addr.1),
159 None,
160 ),
161 self.execute(
162 Method::POST,
163 "query/service",
164 "application/json",
165 on_behalf_of,
166 Some(body),
167 ),
168 |_| {
169 EndDispatchFields::new(
170 None,
171 Some(OperationId::String(client_context_id.clone())),
172 )
173 },
174 )
175 .await;
176
177 let res = match res {
178 Ok(r) => r,
179 Err(e) => {
180 return Err(Error::new_http_error(
181 e,
182 self.endpoint.to_string(),
183 statement,
184 client_context_id,
185 ));
186 }
187 };
188
189 QueryRespReader::new(res, &self.endpoint, statement, client_context_id).await
190 }
191
192 #[instrument(
193 skip_all,
194 level = Level::TRACE,
195 name = "query",
196 fields(
197 otel.kind = SPAN_ATTRIB_OTEL_KIND_CLIENT_VALUE,
198 db.system.name = SPAN_ATTRIB_DB_SYSTEM_VALUE,
199 couchbase.service = SERVICE_VALUE_QUERY,
200 db.query.text = opts.statement.as_deref().unwrap_or(""),
201 couchbase.cluster.name,
202 couchbase.cluster.uuid,
203 ))]
204 async fn query_with_span(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
205 self.tracing.record_cluster_labels(&Span::current());
206 self.query(opts).await
207 }
208
209 pub async fn get_all_indexes(
210 &self,
211 opts: &GetAllIndexesOptions<'_>,
212 ) -> error::Result<Vec<Index>> {
213 let mut where_clause = match (&opts.collection_name, &opts.scope_name) {
214 (None, None) => {
215 if !opts.bucket_name.is_empty() {
216 let encoded_bucket = encode_value(&opts.bucket_name)?;
217 format!(
218 "(keyspace_id={encoded_bucket} AND bucket_id IS MISSING) OR bucket_id={encoded_bucket}"
219 )
220 } else {
221 "1=1".to_string()
222 }
223 }
224 (Some(collection_name), Some(scope_name)) => {
225 let scope_name = normalise_default_name(scope_name);
226 let collection_name = normalise_default_name(collection_name);
227
228 let encoded_bucket = encode_value(&opts.bucket_name)?;
229 let encoded_scope = encode_value(&scope_name)?;
230 let encoded_collection = encode_value(&collection_name)?;
231
232 let temp_where = format!(
233 "bucket_id={encoded_bucket} AND scope_id={encoded_scope} AND keyspace_id={encoded_collection}"
234 );
235
236 if scope_name == "_default" && collection_name == "_default" {
237 format!(
238 "({temp_where}) OR (keyspace_id={encoded_bucket} AND bucket_id IS MISSING)"
239 )
240 } else {
241 temp_where
242 }
243 }
244 (None, Some(scope_name)) => {
245 let scope_name = normalise_default_name(scope_name);
246
247 let encoded_bucket = encode_value(&opts.bucket_name)?;
248 let encoded_scope = encode_value(&scope_name)?;
249
250 format!("bucket_id={encoded_bucket} AND scope_id={encoded_scope}")
251 }
252 _ => {
253 return Err(Error::new_invalid_argument_error(
254 "invalid combination of bucket, scope and collection".to_string(),
255 None,
256 ));
257 }
258 };
259
260 where_clause = format!("({where_clause}) AND `using`=\"gsi\"");
261 let qs = format!(
262 "SELECT `idx`.* FROM system:indexes AS idx WHERE {where_clause} ORDER BY is_primary DESC, name ASC"
263 );
264
265 let opts = QueryOptions::new()
266 .statement(qs)
267 .on_behalf_of(opts.on_behalf_of.cloned());
268 let mut res = self.query_with_span(&opts).await?;
269
270 let mut indexes = vec![];
271
272 while let Some(row) = res.next().await {
273 let bytes = row?;
274 let index: Index = serde_json::from_slice(&bytes).map_err(|e| {
275 Error::new_message_error(
276 format!("failed to parse index from response: {e}"),
277 None,
278 None,
279 None,
280 )
281 })?;
282
283 indexes.push(index);
284 }
285
286 Ok(indexes)
287 }
288
289 pub async fn create_primary_index(
290 &self,
291 opts: &CreatePrimaryIndexOptions<'_>,
292 ) -> error::Result<()> {
293 let mut qs = String::from("CREATE PRIMARY INDEX");
295 if let Some(index_name) = &opts.index_name {
296 qs.push_str(&format!(" {}", encode_identifier(index_name)));
297 }
298
299 qs.push_str(&format!(
300 " ON {}",
301 build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
302 ));
303
304 let mut with: HashMap<&str, Value> = HashMap::new();
305
306 if let Some(deferred) = opts.deferred {
307 with.insert("defer_build", Value::Bool(deferred));
308 }
309
310 if let Some(num_replicas) = opts.num_replicas {
311 with.insert("num_replica", Value::Number(num_replicas.into()));
312 }
313
314 if !with.is_empty() {
315 let with = encode_value(&with)?;
316 qs.push_str(&format!(" WITH {with}"));
317 }
318
319 let query_opts = QueryOptions::new()
320 .statement(qs)
321 .on_behalf_of(opts.on_behalf_of.cloned());
322
323 let mut res = self.query_with_span(&query_opts).await;
324
325 match res {
326 Err(e) => {
327 if e.is_index_exists() {
328 if opts.ignore_if_exists.unwrap_or(false) {
329 Ok(())
330 } else {
331 Err(e)
332 }
333 } else if e.is_build_already_in_progress() {
334 Ok(())
335 } else {
336 Err(e)
337 }
338 }
339 Ok(_) => Ok(()),
340 }
341 }
342
343 pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
344 let mut qs = String::from("CREATE INDEX");
345 qs.push_str(&format!(" {}", encode_identifier(opts.index_name)));
346 qs.push_str(&format!(
347 " ON {}",
348 build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
349 ));
350
351 let mut encoded_fields: Vec<String> = Vec::with_capacity(opts.fields.len());
352 for field in opts.fields {
353 encoded_fields.push(encode_identifier(field));
354 }
355 qs.push_str(&format!(" ( {})", encoded_fields.join(",")));
356
357 let mut with: HashMap<&str, Value> = HashMap::new();
358
359 if let Some(deferred) = opts.deferred {
360 with.insert("defer_build", Value::Bool(deferred));
361 }
362
363 if let Some(num_replicas) = opts.num_replicas {
364 with.insert("num_replica", Value::Number(num_replicas.into()));
365 }
366
367 if !with.is_empty() {
368 let with = encode_value(&with)?;
369 qs.push_str(&format!(" WITH {with}"));
370 }
371
372 let query_opts = QueryOptions::new()
373 .statement(qs)
374 .on_behalf_of(opts.on_behalf_of.cloned());
375
376 let mut res = self.query_with_span(&query_opts).await;
377
378 match res {
379 Err(e) => {
380 if e.is_index_exists() {
381 if opts.ignore_if_exists.unwrap_or(false) {
382 Ok(())
383 } else {
384 Err(e)
385 }
386 } else if e.is_build_already_in_progress() {
387 Ok(())
388 } else {
389 Err(e)
390 }
391 }
392 Ok(_) => Ok(()),
393 }
394 }
395
396 pub async fn drop_primary_index(
397 &self,
398 opts: &DropPrimaryIndexOptions<'_>,
399 ) -> error::Result<()> {
400 let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
402
403 let mut qs = String::new();
404 if let Some(index_name) = &opts.index_name {
405 let encoded_name = encode_identifier(index_name);
406
407 if opts.scope_name.is_some() || opts.collection_name.is_some() {
408 qs.push_str(&format!("DROP INDEX {encoded_name}"));
409 qs.push_str(&format!(" ON {keyspace}"));
410 } else {
411 qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
412 }
413 } else {
414 qs.push_str(&format!("DROP PRIMARY INDEX ON {keyspace}"));
415 }
416
417 let query_opts = QueryOptions::new()
418 .statement(qs)
419 .on_behalf_of(opts.on_behalf_of.cloned());
420
421 let mut res = self.query_with_span(&query_opts).await;
422
423 match res {
424 Err(e) => {
425 if e.is_index_not_found() {
426 if opts.ignore_if_not_exists.unwrap_or(false) {
427 Ok(())
428 } else {
429 Err(e)
430 }
431 } else {
432 Err(e)
433 }
434 }
435 Ok(_) => Ok(()),
436 }
437 }
438
439 pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
440 let encoded_name = encode_identifier(opts.index_name);
441 let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
442
443 let mut qs = String::new();
444 if opts.scope_name.is_some() || opts.collection_name.is_some() {
445 qs.push_str(&format!("DROP INDEX {encoded_name}"));
446 qs.push_str(&format!(" ON {keyspace}"));
447 } else {
448 qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
449 }
450
451 let query_opts = QueryOptions::new()
452 .statement(qs)
453 .on_behalf_of(opts.on_behalf_of.cloned());
454
455 let res = self.query_with_span(&query_opts).await;
456
457 match res {
458 Err(e) => {
459 if e.is_index_not_found() {
460 if opts.ignore_if_not_exists.unwrap_or(false) {
461 Ok(())
462 } else {
463 Err(e)
464 }
465 } else {
466 Err(e)
467 }
468 }
469 Ok(_) => Ok(()),
470 }
471 }
472
473 pub async fn build_deferred_indexes(
474 &self,
475 opts: &BuildDeferredIndexesOptions<'_>,
476 ) -> error::Result<()> {
477 let opts = GetAllIndexesOptions {
478 bucket_name: opts.bucket_name,
479 scope_name: opts.scope_name,
480 collection_name: opts.collection_name,
481 on_behalf_of: opts.on_behalf_of,
482 };
483
484 let indexes = self.get_all_indexes(&opts).await?;
485
486 let deferred_items: Vec<_> = indexes
487 .iter()
488 .filter(|index| index.state == "deferred")
489 .map(|index| {
490 let (bucket, scope, collection) = index_to_namespace_parts(index);
491 let deferred_index = DeferredIndexName {
492 bucket_name: bucket,
493 scope_name: scope,
494 collection_name: collection,
495 index_name: &index.name,
496 };
497 let keyspace = build_keyspace(bucket, &Some(scope), &Some(collection));
498 (keyspace, deferred_index)
499 })
500 .collect();
501
502 let mut deferred_indexes: HashMap<String, Vec<DeferredIndexName>> =
503 HashMap::with_capacity(deferred_items.len());
504
505 for (keyspace, deferred_index) in deferred_items {
506 deferred_indexes
507 .entry(keyspace)
508 .or_default()
509 .push(deferred_index);
510 }
511
512 if deferred_indexes.is_empty() {
513 return Ok(());
514 }
515
516 for (keyspace, indexes) in deferred_indexes {
517 let mut escaped_index_names: Vec<String> = Vec::with_capacity(indexes.len());
518 for index in indexes {
519 escaped_index_names.push(encode_identifier(index.index_name));
520 }
521
522 let qs = format!(
523 "BUILD INDEX ON {}({})",
524 keyspace,
525 escaped_index_names.join(",")
526 );
527 let query_opts = QueryOptions::new()
528 .statement(qs)
529 .on_behalf_of(opts.on_behalf_of.cloned());
530
531 let res = self.query_with_span(&query_opts).await;
532
533 match res {
534 Err(e) => {
535 if e.is_build_already_in_progress() {
536 continue;
537 }
538
539 return Err(e);
540 }
541 Ok(_) => continue,
542 }
543 }
544
545 Ok(())
546 }
547
548 pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
549 let mut cur_interval = Duration::from_millis(50);
550 let mut watch_list = opts.indexes.to_vec();
551
552 if opts.watch_primary.unwrap_or(false) {
553 watch_list.push("#primary");
554 }
555
556 loop {
557 let indexes = self
558 .get_all_indexes(&GetAllIndexesOptions {
559 bucket_name: opts.bucket_name,
560 scope_name: opts.scope_name,
561 collection_name: opts.collection_name,
562 on_behalf_of: opts.on_behalf_of,
563 })
564 .await?;
565
566 let all_online = check_indexes_active(&indexes, &watch_list)?;
567
568 if all_online {
569 debug!("All watched indexes are ready");
570 return Ok(());
571 }
572
573 cur_interval = std::cmp::min(
574 cur_interval + Duration::from_millis(500),
575 Duration::from_secs(1),
576 );
577
578 sleep(cur_interval).await;
579 }
580 }
581
582 pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
583 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
584 let canonical_addr =
585 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
586 let res = match self
587 .tracing
588 .orchestrate_dispatch_span(
589 BeginDispatchFields::new(
590 (&peer_addr.0, &peer_addr.1),
591 (&canonical_addr.0, &canonical_addr.1),
592 None,
593 ),
594 self.execute(
595 Method::GET,
596 "admin/ping",
597 "",
598 opts.on_behalf_of.cloned(),
599 None,
600 ),
601 |_| EndDispatchFields::new(None, None),
602 )
603 .await
604 {
605 Ok(r) => r,
606 Err(e) => {
607 return Err(Error::new_http_error(
608 e,
609 self.endpoint.to_string(),
610 None,
611 None,
612 ));
613 }
614 };
615
616 if res.status().is_success() {
617 return Ok(());
618 }
619
620 Err(Error::new_message_error(
621 format!("ping failed with status code: {}", res.status()),
622 Some(self.endpoint.clone()),
623 None,
624 None,
625 ))
626 }
627}
628
629struct DeferredIndexName<'a> {
630 bucket_name: &'a str,
631 scope_name: &'a str,
632 collection_name: &'a str,
633 index_name: &'a str,
634}
635
636pub fn normalise_default_name(name: &str) -> String {
637 if name.is_empty() {
638 "_default".to_string()
639 } else {
640 name.to_string()
641 }
642}
643
644pub fn build_keyspace(
645 bucket_name: &str,
646 scope_name: &Option<&str>,
647 collection_name: &Option<&str>,
648) -> String {
649 match (scope_name, collection_name) {
650 (Some(scope), Some(collection)) => format!(
651 "{}.{}.{}",
652 encode_identifier(bucket_name),
653 encode_identifier(scope),
654 encode_identifier(collection)
655 ),
656 (Some(scope), None) => format!(
657 "{}.{}._default",
658 encode_identifier(bucket_name),
659 encode_identifier(scope)
660 ),
661 (None, Some(collection)) => format!(
662 "{}._default.{}",
663 encode_identifier(bucket_name),
664 encode_identifier(collection)
665 ),
666 (None, None) => encode_identifier(bucket_name),
667 }
668}
669
670fn index_to_namespace_parts(index: &Index) -> (&str, &str, &str) {
671 if index.bucket_id.is_none() {
672 let default_scope_coll = "_default";
673 (
674 index.keyspace_id.as_deref().unwrap_or(""),
675 default_scope_coll,
676 default_scope_coll,
677 )
678 } else {
679 (
680 index.bucket_id.as_deref().unwrap_or(""),
681 index.scope_id.as_deref().unwrap_or(""),
682 index.keyspace_id.as_deref().unwrap_or(""),
683 )
684 }
685}
686
687fn check_indexes_active(indexes: &[Index], check_list: &Vec<&str>) -> error::Result<bool> {
688 let mut check_indexes = Vec::new();
689
690 for index_name in check_list {
691 if let Some(index) = indexes.iter().find(|idx| idx.name == *index_name) {
692 check_indexes.push(index);
693 } else {
694 return Ok(false);
695 }
696 }
697
698 for index in check_indexes {
699 if index.state != "online" {
700 debug!(
701 "Index {} is not ready yet, current state is {}",
702 index.name, index.state
703 );
704 return Ok(false);
705 }
706 }
707
708 Ok(true)
709}
710
711fn encode_identifier(identifier: &str) -> String {
712 let mut out = identifier.replace("\\", "\\\\");
713 out = out.replace("`", "\\`");
714 format!("`{out}`")
715}
716
717fn encode_value<T: serde::Serialize>(value: &T) -> error::Result<String> {
718 let bytes = serde_json::to_string(value).map_err(|e| {
719 Error::new_message_error(format!("failed to encode value: {e}"), None, None, None)
720 })?;
721 Ok(bytes)
722}