1use crate::httpx::client::Client;
20use crate::httpx::request::{Auth, BasicAuth, OnBehalfOfInfo, Request};
21use crate::httpx::response::Response;
22use crate::queryx::error;
23use crate::queryx::error::{Error, ErrorKind, ServerError, ServerErrorKind};
24use crate::queryx::index::Index;
25use crate::queryx::query_options::{
26 BuildDeferredIndexesOptions, CreateIndexOptions, CreatePrimaryIndexOptions, DropIndexOptions,
27 DropPrimaryIndexOptions, GetAllIndexesOptions, PingOptions, QueryOptions, WatchIndexesOptions,
28};
29use crate::queryx::query_respreader::QueryRespReader;
30use crate::retry::RetryStrategy;
31use bytes::Bytes;
32use futures::StreamExt;
33use http::{Method, StatusCode};
34use log::debug;
35use serde::Deserialize;
36use serde_json::Value;
37use std::collections::HashMap;
38use std::fmt::format;
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::time::sleep;
42use uuid::Uuid;
43
44#[derive(Debug)]
45pub struct Query<C: Client> {
46 pub http_client: Arc<C>,
47 pub user_agent: String,
48 pub endpoint: String,
49 pub username: String,
50 pub password: String,
51}
52
53impl<C: Client> Query<C> {
54 pub fn new_request(
55 &self,
56 method: Method,
57 path: impl Into<String>,
58 content_type: impl Into<String>,
59 on_behalf_of: Option<OnBehalfOfInfo>,
60 body: Option<Bytes>,
61 ) -> Request {
62 let auth = if let Some(obo) = on_behalf_of {
63 Auth::OnBehalfOf(OnBehalfOfInfo {
64 username: obo.username,
65 password_or_domain: obo.password_or_domain,
66 })
67 } else {
68 Auth::BasicAuth(BasicAuth {
69 username: self.username.clone(),
70 password: self.password.clone(),
71 })
72 };
73
74 Request::new(method, format!("{}/{}", self.endpoint, path.into()))
75 .auth(auth)
76 .user_agent(self.user_agent.clone())
77 .content_type(content_type.into())
78 .body(body)
79 }
80
81 pub async fn execute(
82 &self,
83 method: Method,
84 path: impl Into<String>,
85 content_type: impl Into<String>,
86 on_behalf_of: Option<OnBehalfOfInfo>,
87 body: Option<Bytes>,
88 ) -> crate::httpx::error::Result<Response> {
89 let req = self.new_request(method, path, content_type, on_behalf_of, body);
90
91 self.http_client.execute(req).await
92 }
93
94 pub async fn query(&self, opts: &QueryOptions) -> error::Result<QueryRespReader> {
95 let statement = if let Some(statement) = &opts.statement {
96 statement.clone()
97 } else {
98 String::new()
99 };
100
101 let client_context_id = if let Some(id) = &opts.client_context_id {
103 id.clone()
104 } else {
105 Uuid::new_v4().to_string()
106 };
107
108 let on_behalf_of = opts.on_behalf_of.clone();
109
110 let mut serialized = serde_json::to_value(opts)
111 .map_err(|e| Error::new_encoding_error(format!("failed to encode options: {e}")))?;
112
113 let mut obj = serialized.as_object_mut().unwrap();
114 let mut client_context_id_entry = obj.get("client_context_id");
115 if client_context_id_entry.is_none() {
116 obj.insert(
117 "client_context_id".to_string(),
118 Value::String(client_context_id.clone()),
119 );
120 }
121
122 if let Some(named_args) = &opts.named_args {
123 for (k, v) in named_args.iter() {
124 let key = if k.starts_with("$") {
125 k.clone()
126 } else {
127 format!("${k}")
128 };
129 obj.insert(key, v.clone());
130 }
131 }
132
133 if let Some(raw) = &opts.raw {
134 for (k, v) in raw.iter() {
135 obj.insert(k.to_string(), v.clone());
136 }
137 }
138
139 let body =
140 Bytes::from(serde_json::to_vec(&serialized).map_err(|e| {
141 Error::new_encoding_error(format!("failed to encode options: {e}"))
142 })?);
143
144 let res = match self
145 .execute(
146 Method::POST,
147 "query/service",
148 "application/json",
149 on_behalf_of,
150 Some(body),
151 )
152 .await
153 {
154 Ok(r) => r,
155 Err(e) => {
156 return Err(Error::new_http_error(
157 format!("{}: {}", &self.endpoint, e),
158 statement,
159 client_context_id,
160 ));
161 }
162 };
163
164 QueryRespReader::new(res, &self.endpoint, statement, client_context_id).await
165 }
166
167 pub async fn get_all_indexes(
168 &self,
169 opts: &GetAllIndexesOptions<'_>,
170 ) -> error::Result<Vec<Index>> {
171 let mut where_clause = match (&opts.collection_name, &opts.scope_name) {
172 (None, None) => {
173 if !opts.bucket_name.is_empty() {
174 let encoded_bucket = encode_value(&opts.bucket_name)?;
175 format!(
176 "(keyspace_id={encoded_bucket} AND bucket_id IS MISSING) OR bucket_id={encoded_bucket}"
177 )
178 } else {
179 "1=1".to_string()
180 }
181 }
182 (Some(collection_name), Some(scope_name)) => {
183 let scope_name = normalise_default_name(scope_name);
184 let collection_name = normalise_default_name(collection_name);
185
186 let encoded_bucket = encode_value(&opts.bucket_name)?;
187 let encoded_scope = encode_value(&scope_name)?;
188 let encoded_collection = encode_value(&collection_name)?;
189
190 let temp_where = format!(
191 "bucket_id={encoded_bucket} AND scope_id={encoded_scope} AND keyspace_id={encoded_collection}"
192 );
193
194 if scope_name == "_default" && collection_name == "_default" {
195 format!(
196 "({temp_where}) OR (keyspace_id={encoded_bucket} AND bucket_id IS MISSING)"
197 )
198 } else {
199 temp_where
200 }
201 }
202 (None, Some(scope_name)) => {
203 let scope_name = normalise_default_name(scope_name);
204
205 let encoded_bucket = encode_value(&opts.bucket_name)?;
206 let encoded_scope = encode_value(&scope_name)?;
207
208 format!("bucket_id={encoded_bucket} AND scope_id={encoded_scope}")
209 }
210 _ => {
211 return Err(Error::new_invalid_argument_error(
212 "invalid combination of bucket, scope and collection".to_string(),
213 None,
214 ));
215 }
216 };
217
218 where_clause = format!("({where_clause}) AND `using`=\"gsi\"");
219 let qs = format!(
220 "SELECT `idx`.* FROM system:indexes AS idx WHERE {where_clause} ORDER BY is_primary DESC, name ASC"
221 );
222
223 let opts = QueryOptions::new()
224 .statement(qs)
225 .on_behalf_of(opts.on_behalf_of.cloned());
226 let mut res = self.query(&opts).await?;
227
228 let mut indexes = vec![];
229
230 while let Some(row) = res.next().await {
231 let bytes = row?;
232 let index: Index = serde_json::from_slice(&bytes).map_err(|e| {
233 Error::new_message_error(
234 format!("failed to parse index from response: {e}"),
235 None,
236 None,
237 None,
238 )
239 })?;
240
241 indexes.push(index);
242 }
243
244 Ok(indexes)
245 }
246
247 pub async fn create_primary_index(
248 &self,
249 opts: &CreatePrimaryIndexOptions<'_>,
250 ) -> error::Result<()> {
251 let mut qs = String::from("CREATE PRIMARY INDEX");
253 if let Some(index_name) = &opts.index_name {
254 qs.push_str(&format!(" {}", encode_identifier(index_name)));
255 }
256
257 qs.push_str(&format!(
258 " ON {}",
259 build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
260 ));
261
262 let mut with: HashMap<&str, Value> = HashMap::new();
263
264 if let Some(deferred) = opts.deferred {
265 with.insert("defer_build", Value::Bool(deferred));
266 }
267
268 if let Some(num_replicas) = opts.num_replicas {
269 with.insert("num_replica", Value::Number(num_replicas.into()));
270 }
271
272 if !with.is_empty() {
273 let with = encode_value(&with)?;
274 qs.push_str(&format!(" WITH {with}"));
275 }
276
277 let query_opts = QueryOptions::new()
278 .statement(qs)
279 .on_behalf_of(opts.on_behalf_of.cloned());
280
281 let mut res = self.query(&query_opts).await;
282
283 match res {
284 Err(e) => {
285 if e.is_index_exists() {
286 if opts.ignore_if_exists.unwrap_or(false) {
287 Ok(())
288 } else {
289 Err(e)
290 }
291 } else if e.is_build_already_in_progress() {
292 Ok(())
293 } else {
294 Err(e)
295 }
296 }
297 Ok(_) => Ok(()),
298 }
299 }
300
301 pub async fn create_index(&self, opts: &CreateIndexOptions<'_>) -> error::Result<()> {
302 let mut qs = String::from("CREATE INDEX");
303 qs.push_str(&format!(" {}", encode_identifier(opts.index_name)));
304 qs.push_str(&format!(
305 " ON {}",
306 build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name)
307 ));
308
309 let mut encoded_fields: Vec<String> = Vec::with_capacity(opts.fields.len());
310 for field in opts.fields {
311 encoded_fields.push(encode_identifier(field));
312 }
313 qs.push_str(&format!(" ( {})", encoded_fields.join(",")));
314
315 let mut with: HashMap<&str, Value> = HashMap::new();
316
317 if let Some(deferred) = opts.deferred {
318 with.insert("defer_build", Value::Bool(deferred));
319 }
320
321 if let Some(num_replicas) = opts.num_replicas {
322 with.insert("num_replica", Value::Number(num_replicas.into()));
323 }
324
325 if !with.is_empty() {
326 let with = encode_value(&with)?;
327 qs.push_str(&format!(" WITH {with}"));
328 }
329
330 let query_opts = QueryOptions::new()
331 .statement(qs)
332 .on_behalf_of(opts.on_behalf_of.cloned());
333
334 let mut res = self.query(&query_opts).await;
335
336 match res {
337 Err(e) => {
338 if e.is_index_exists() {
339 if opts.ignore_if_exists.unwrap_or(false) {
340 Ok(())
341 } else {
342 Err(e)
343 }
344 } else if e.is_build_already_in_progress() {
345 Ok(())
346 } else {
347 Err(e)
348 }
349 }
350 Ok(_) => Ok(()),
351 }
352 }
353
354 pub async fn drop_primary_index(
355 &self,
356 opts: &DropPrimaryIndexOptions<'_>,
357 ) -> error::Result<()> {
358 let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
360
361 let mut qs = String::new();
362 if let Some(index_name) = &opts.index_name {
363 let encoded_name = encode_identifier(index_name);
364
365 if opts.scope_name.is_some() || opts.collection_name.is_some() {
366 qs.push_str(&format!("DROP INDEX {encoded_name}"));
367 qs.push_str(&format!(" ON {keyspace}"));
368 } else {
369 qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
370 }
371 } else {
372 qs.push_str(&format!("DROP PRIMARY INDEX ON {keyspace}"));
373 }
374
375 let query_opts = QueryOptions::new()
376 .statement(qs)
377 .on_behalf_of(opts.on_behalf_of.cloned());
378
379 let mut res = self.query(&query_opts).await;
380
381 match res {
382 Err(e) => {
383 if e.is_index_not_found() {
384 if opts.ignore_if_not_exists.unwrap_or(false) {
385 Ok(())
386 } else {
387 Err(e)
388 }
389 } else {
390 Err(e)
391 }
392 }
393 Ok(_) => Ok(()),
394 }
395 }
396
397 pub async fn drop_index(&self, opts: &DropIndexOptions<'_>) -> error::Result<()> {
398 let encoded_name = encode_identifier(opts.index_name);
399 let keyspace = build_keyspace(opts.bucket_name, &opts.scope_name, &opts.collection_name);
400
401 let mut qs = String::new();
402 if opts.scope_name.is_some() || opts.collection_name.is_some() {
403 qs.push_str(&format!("DROP INDEX {encoded_name}"));
404 qs.push_str(&format!(" ON {keyspace}"));
405 } else {
406 qs.push_str(&format!("DROP INDEX {keyspace}.{encoded_name}"));
407 }
408
409 let query_opts = QueryOptions::new()
410 .statement(qs)
411 .on_behalf_of(opts.on_behalf_of.cloned());
412
413 let res = self.query(&query_opts).await;
414
415 match res {
416 Err(e) => {
417 if e.is_index_not_found() {
418 if opts.ignore_if_not_exists.unwrap_or(false) {
419 Ok(())
420 } else {
421 Err(e)
422 }
423 } else {
424 Err(e)
425 }
426 }
427 Ok(_) => Ok(()),
428 }
429 }
430
431 pub async fn build_deferred_indexes(
432 &self,
433 opts: &BuildDeferredIndexesOptions<'_>,
434 ) -> error::Result<()> {
435 let opts = GetAllIndexesOptions {
436 bucket_name: opts.bucket_name,
437 scope_name: opts.scope_name,
438 collection_name: opts.collection_name,
439 on_behalf_of: opts.on_behalf_of,
440 };
441
442 let indexes = self.get_all_indexes(&opts).await?;
443
444 let deferred_items: Vec<_> = indexes
445 .iter()
446 .filter(|index| index.state == "deferred")
447 .map(|index| {
448 let (bucket, scope, collection) = index_to_namespace_parts(index);
449 let deferred_index = DeferredIndexName {
450 bucket_name: bucket,
451 scope_name: scope,
452 collection_name: collection,
453 index_name: &index.name,
454 };
455 let keyspace = build_keyspace(bucket, &Some(scope), &Some(collection));
456 (keyspace, deferred_index)
457 })
458 .collect();
459
460 let mut deferred_indexes: HashMap<String, Vec<DeferredIndexName>> =
461 HashMap::with_capacity(deferred_items.len());
462
463 for (keyspace, deferred_index) in deferred_items {
464 deferred_indexes
465 .entry(keyspace)
466 .or_default()
467 .push(deferred_index);
468 }
469
470 if deferred_indexes.is_empty() {
471 return Ok(());
472 }
473
474 for (keyspace, indexes) in deferred_indexes {
475 let mut escaped_index_names: Vec<String> = Vec::with_capacity(indexes.len());
476 for index in indexes {
477 escaped_index_names.push(encode_identifier(index.index_name));
478 }
479
480 let qs = format!(
481 "BUILD INDEX ON {}({})",
482 keyspace,
483 escaped_index_names.join(",")
484 );
485 let query_opts = QueryOptions::new()
486 .statement(qs)
487 .on_behalf_of(opts.on_behalf_of.cloned());
488
489 let res = self.query(&query_opts).await;
490
491 match res {
492 Err(e) => {
493 if e.is_build_already_in_progress() {
494 continue;
495 }
496
497 return Err(e);
498 }
499 Ok(_) => continue,
500 }
501 }
502
503 Ok(())
504 }
505
506 pub async fn watch_indexes(&self, opts: &WatchIndexesOptions<'_>) -> error::Result<()> {
507 let mut cur_interval = Duration::from_millis(50);
508 let mut watch_list = opts.indexes.to_vec();
509
510 if opts.watch_primary.unwrap_or(false) {
511 watch_list.push("#primary");
512 }
513
514 loop {
515 let indexes = self
516 .get_all_indexes(&GetAllIndexesOptions {
517 bucket_name: opts.bucket_name,
518 scope_name: opts.scope_name,
519 collection_name: opts.collection_name,
520 on_behalf_of: opts.on_behalf_of,
521 })
522 .await?;
523
524 let all_online = check_indexes_active(&indexes, &watch_list)?;
525
526 if all_online {
527 debug!("All watched indexes are ready");
528 return Ok(());
529 }
530
531 cur_interval = std::cmp::min(
532 cur_interval + Duration::from_millis(500),
533 Duration::from_secs(1),
534 );
535
536 sleep(cur_interval).await;
537 }
538 }
539
540 pub async fn ping(&self, opts: &PingOptions<'_>) -> error::Result<()> {
541 let res = match self
542 .execute(
543 Method::GET,
544 "admin/ping",
545 "",
546 opts.on_behalf_of.cloned(),
547 None,
548 )
549 .await
550 {
551 Ok(r) => r,
552 Err(e) => {
553 return Err(Error::new_http_error(
554 format!("{}: {}", &self.endpoint, e),
555 None,
556 None,
557 ));
558 }
559 };
560
561 if res.status().is_success() {
562 return Ok(());
563 }
564
565 Err(Error::new_message_error(
566 format!("ping failed with status code: {}", res.status()),
567 Some(self.endpoint.clone()),
568 None,
569 None,
570 ))
571 }
572}
573
574struct DeferredIndexName<'a> {
575 bucket_name: &'a str,
576 scope_name: &'a str,
577 collection_name: &'a str,
578 index_name: &'a str,
579}
580
581pub fn normalise_default_name(name: &str) -> String {
582 if name.is_empty() {
583 "_default".to_string()
584 } else {
585 name.to_string()
586 }
587}
588
589pub fn build_keyspace(
590 bucket_name: &str,
591 scope_name: &Option<&str>,
592 collection_name: &Option<&str>,
593) -> String {
594 match (scope_name, collection_name) {
595 (Some(scope), Some(collection)) => format!(
596 "{}.{}.{}",
597 encode_identifier(bucket_name),
598 encode_identifier(scope),
599 encode_identifier(collection)
600 ),
601 (Some(scope), None) => format!(
602 "{}.{}._default",
603 encode_identifier(bucket_name),
604 encode_identifier(scope)
605 ),
606 (None, Some(collection)) => format!(
607 "{}._default.{}",
608 encode_identifier(bucket_name),
609 encode_identifier(collection)
610 ),
611 (None, None) => encode_identifier(bucket_name),
612 }
613}
614
615fn index_to_namespace_parts(index: &Index) -> (&str, &str, &str) {
616 if index.bucket_id.is_none() {
617 let default_scope_coll = "_default";
618 (
619 index.keyspace_id.as_deref().unwrap_or(""),
620 default_scope_coll,
621 default_scope_coll,
622 )
623 } else {
624 (
625 index.bucket_id.as_deref().unwrap_or(""),
626 index.scope_id.as_deref().unwrap_or(""),
627 index.keyspace_id.as_deref().unwrap_or(""),
628 )
629 }
630}
631
632fn check_indexes_active(indexes: &[Index], check_list: &Vec<&str>) -> error::Result<bool> {
633 let mut check_indexes = Vec::new();
634
635 for index_name in check_list {
636 if let Some(index) = indexes.iter().find(|idx| idx.name == *index_name) {
637 check_indexes.push(index);
638 } else {
639 return Ok(false);
640 }
641 }
642
643 for index in check_indexes {
644 if index.state != "online" {
645 debug!(
646 "Index {} is not ready yet, current state is {}",
647 index.name, index.state
648 );
649 return Ok(false);
650 }
651 }
652
653 Ok(true)
654}
655
656fn encode_identifier(identifier: &str) -> String {
657 let mut out = identifier.replace("\\", "\\\\");
658 out = out.replace("`", "\\`");
659 format!("`{out}`")
660}
661
662fn encode_value<T: serde::Serialize>(value: &T) -> error::Result<String> {
663 let bytes = serde_json::to_string(value).map_err(|e| {
664 Error::new_message_error(format!("failed to encode value: {e}"), None, None, None)
665 })?;
666 Ok(bytes)
667}