surrealdb_core/sql/statements/define/
index.rs

1use crate::ctx::Context;
2use crate::dbs::Options;
3use crate::doc::CursorDoc;
4use crate::err::Error;
5use crate::iam::{Action, ResourceKind};
6use crate::sql::statements::info::InfoStructure;
7use crate::sql::statements::DefineTableStatement;
8use crate::sql::{Base, Ident, Idioms, Index, Part, Strand, Value};
9
10use reblessive::tree::Stk;
11use revision::revisioned;
12use serde::{Deserialize, Serialize};
13use std::fmt::{self, Display};
14use uuid::Uuid;
15
16#[revisioned(revision = 4)]
17#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
18#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
19#[non_exhaustive]
20pub struct DefineIndexStatement {
21	pub name: Ident,
22	pub what: Ident,
23	pub cols: Idioms,
24	pub index: Index,
25	pub comment: Option<Strand>,
26	#[revision(start = 2)]
27	pub if_not_exists: bool,
28	#[revision(start = 3)]
29	pub overwrite: bool,
30	#[revision(start = 4)]
31	pub concurrently: bool,
32}
33
34impl DefineIndexStatement {
35	/// Process this type returning a computed simple Value
36	pub(crate) async fn compute(
37		&self,
38		_stk: &mut Stk,
39		ctx: &Context,
40		opt: &Options,
41		_doc: Option<&CursorDoc>,
42	) -> Result<Value, Error> {
43		// Allowed to run?
44		opt.is_allowed(Action::Edit, ResourceKind::Index, &Base::Db)?;
45		// Get the NS and DB
46		let (ns, db) = opt.ns_db()?;
47		// Fetch the transaction
48		let txn = ctx.tx();
49		// Check if the definition exists
50		if txn.get_tb_index(ns, db, &self.what, &self.name).await.is_ok() {
51			if self.if_not_exists {
52				return Ok(Value::None);
53			} else if !self.overwrite {
54				return Err(Error::IxAlreadyExists {
55					name: self.name.to_string(),
56				});
57			}
58			// Clear the index store cache
59			ctx.get_index_stores()
60				.index_removed(ctx.get_index_builder(), &txn, ns, db, &self.what, &self.name)
61				.await?;
62		}
63		// Does the table exists?
64		match txn.get_tb(ns, db, &self.what).await {
65			Ok(tb) => {
66				// Are we SchemaFull?
67				if tb.full {
68					// Check that the fields exists
69					for idiom in self.cols.iter() {
70						let Some(Part::Field(first)) = idiom.0.first() else {
71							continue;
72						};
73						txn.get_tb_field(ns, db, &self.what, &first.to_string()).await?;
74					}
75				}
76			}
77			// If the TB was not found, we're fine
78			Err(Error::TbNotFound {
79				..
80			}) => {}
81			// Any other error should be returned
82			Err(e) => return Err(e),
83		}
84		// Process the statement
85		let key = crate::key::table::ix::new(ns, db, &self.what, &self.name);
86		txn.get_or_add_ns(ns, opt.strict).await?;
87		txn.get_or_add_db(ns, db, opt.strict).await?;
88		txn.get_or_add_tb(ns, db, &self.what, opt.strict).await?;
89		txn.set(
90			key,
91			revision::to_vec(&DefineIndexStatement {
92				// Don't persist the `IF NOT EXISTS`, `OVERWRITE` and `CONCURRENTLY` clause to schema
93				if_not_exists: false,
94				overwrite: false,
95				concurrently: false,
96				..self.clone()
97			})?,
98			None,
99		)
100		.await?;
101		// Refresh the table cache
102		let key = crate::key::database::tb::new(ns, db, &self.what);
103		let tb = txn.get_tb(ns, db, &self.what).await?;
104		txn.set(
105			key,
106			revision::to_vec(&DefineTableStatement {
107				cache_indexes_ts: Uuid::now_v7(),
108				..tb.as_ref().clone()
109			})?,
110			None,
111		)
112		.await?;
113		// Clear the cache
114		if let Some(cache) = ctx.get_cache() {
115			cache.clear_tb(ns, db, &self.what);
116		}
117		// Clear the cache
118		txn.clear();
119		// Process the index
120		self.run_indexing(ctx, opt, !self.concurrently).await?;
121		// Ok all good
122		Ok(Value::None)
123	}
124
125	pub(crate) async fn run_indexing(
126		&self,
127		ctx: &Context,
128		opt: &Options,
129		blocking: bool,
130	) -> Result<(), Error> {
131		let rcv = ctx
132			.get_index_builder()
133			.ok_or_else(|| Error::unreachable("No Index Builder"))?
134			.build(ctx, opt.clone(), self.clone().into(), blocking)
135			.await?;
136		if let Some(rcv) = rcv {
137			rcv.await.map_err(|_| Error::IndexingBuildingCancelled)?
138		} else {
139			Ok(())
140		}
141	}
142}
143
144impl Display for DefineIndexStatement {
145	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
146		write!(f, "DEFINE INDEX")?;
147		if self.if_not_exists {
148			write!(f, " IF NOT EXISTS")?
149		}
150		if self.overwrite {
151			write!(f, " OVERWRITE")?
152		}
153		write!(f, " {} ON {}", self.name, self.what)?;
154		if !self.cols.is_empty() {
155			write!(f, " FIELDS {}", self.cols)?;
156		}
157		if Index::Idx != self.index {
158			write!(f, " {}", self.index)?;
159		}
160		if let Some(ref v) = self.comment {
161			write!(f, " COMMENT {v}")?
162		}
163		if self.concurrently {
164			write!(f, " CONCURRENTLY")?
165		}
166		Ok(())
167	}
168}
169
170impl InfoStructure for DefineIndexStatement {
171	fn structure(self) -> Value {
172		Value::from(map! {
173			"name".to_string() => self.name.structure(),
174			"what".to_string() => self.what.structure(),
175			"cols".to_string() => self.cols.structure(),
176			"index".to_string() => self.index.structure(),
177			"comment".to_string(), if let Some(v) = self.comment => v.into(),
178		})
179	}
180}