surrealdb_core/sql/statements/define/
table.rs

1use super::DefineFieldStatement;
2use crate::ctx::Context;
3use crate::dbs::{Force, Options};
4use crate::doc::CursorDoc;
5use crate::err::Error;
6use crate::iam::{Action, ResourceKind};
7use crate::kvs::Transaction;
8use crate::sql::fmt::{is_pretty, pretty_indent};
9use crate::sql::paths::{IN, OUT};
10use crate::sql::statements::info::InfoStructure;
11use crate::sql::{
12	changefeed::ChangeFeed, statements::UpdateStatement, Base, Ident, Output, Permissions, Strand,
13	Value, Values, View,
14};
15use crate::sql::{Idiom, Kind, TableType};
16
17use reblessive::tree::Stk;
18use revision::revisioned;
19use revision::Error as RevisionError;
20use serde::{Deserialize, Serialize};
21use std::fmt::{self, Display, Write};
22use std::sync::Arc;
23use uuid::Uuid;
24
25#[revisioned(revision = 6)]
26#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)]
27#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
28#[non_exhaustive]
29pub struct DefineTableStatement {
30	pub id: Option<u32>,
31	pub name: Ident,
32	pub drop: bool,
33	pub full: bool,
34	pub view: Option<View>,
35	pub permissions: Permissions,
36	pub changefeed: Option<ChangeFeed>,
37	pub comment: Option<Strand>,
38	#[revision(start = 2)]
39	pub if_not_exists: bool,
40	#[revision(start = 3)]
41	pub kind: TableType,
42	/// Should we overwrite the field definition if it already exists
43	#[revision(start = 4)]
44	pub overwrite: bool,
45	/// The last time that a DEFINE FIELD was added to this table
46	#[revision(start = 5)]
47	pub cache_fields_ts: Uuid,
48	/// The last time that a DEFINE EVENT was added to this table
49	#[revision(start = 5)]
50	pub cache_events_ts: Uuid,
51	/// The last time that a DEFINE TABLE was added to this table
52	#[revision(start = 5)]
53	pub cache_tables_ts: Uuid,
54	/// The last time that a DEFINE INDEX was added to this table
55	#[revision(start = 5)]
56	pub cache_indexes_ts: Uuid,
57	/// The last time that a LIVE query was added to this table
58	#[revision(start = 5, end = 6, convert_fn = "convert_cache_ts")]
59	pub cache_lives_ts: Uuid,
60}
61
62impl DefineTableStatement {
63	pub(crate) async fn compute(
64		&self,
65		stk: &mut Stk,
66		ctx: &Context,
67		opt: &Options,
68		doc: Option<&CursorDoc>,
69	) -> Result<Value, Error> {
70		// Allowed to run?
71		opt.is_allowed(Action::Edit, ResourceKind::Table, &Base::Db)?;
72		// Get the NS and DB
73		let (ns, db) = opt.ns_db()?;
74		// Fetch the transaction
75		let txn = ctx.tx();
76		// Check if the definition exists
77		if txn.get_tb(ns, db, &self.name).await.is_ok() {
78			if self.if_not_exists {
79				return Ok(Value::None);
80			} else if !self.overwrite {
81				return Err(Error::TbAlreadyExists {
82					name: self.name.to_string(),
83				});
84			}
85		}
86		// Process the statement
87		let key = crate::key::database::tb::new(ns, db, &self.name);
88		let nsv = txn.get_or_add_ns(ns, opt.strict).await?;
89		let dbv = txn.get_or_add_db(ns, db, opt.strict).await?;
90		let mut dt = DefineTableStatement {
91			id: if self.id.is_none() && nsv.id.is_some() && dbv.id.is_some() {
92				Some(txn.lock().await.get_next_tb_id(nsv.id.unwrap(), dbv.id.unwrap()).await?)
93			} else {
94				None
95			},
96			// Don't persist the `IF NOT EXISTS` clause to the schema
97			if_not_exists: false,
98			overwrite: false,
99			..self.clone()
100		};
101		// Make sure we are refreshing the caches
102		dt.cache_fields_ts = Uuid::now_v7();
103		dt.cache_events_ts = Uuid::now_v7();
104		dt.cache_indexes_ts = Uuid::now_v7();
105		dt.cache_tables_ts = Uuid::now_v7();
106		// Add table relational fields
107		Self::add_in_out_fields(&txn, ns, db, &mut dt).await?;
108		// Set the table definition
109		txn.set(key, revision::to_vec(&dt)?, None).await?;
110		// Clear the cache
111		if let Some(cache) = ctx.get_cache() {
112			cache.clear_tb(ns, db, &self.name);
113		}
114		// Clear the cache
115		txn.clear();
116		// Record definition change
117		if dt.changefeed.is_some() {
118			txn.lock().await.record_table_change(ns, db, &self.name, &dt);
119		}
120		// Check if table is a view
121		if let Some(view) = &self.view {
122			// Force queries to run
123			let opt = &opt.new_with_force(Force::Table(Arc::new([dt])));
124			// Remove the table data
125			let key = crate::key::table::all::new(ns, db, &self.name);
126			txn.delp(key).await?;
127			// Process each foreign table
128			for ft in view.what.0.iter() {
129				// Save the view config
130				let key = crate::key::table::ft::new(ns, db, ft, &self.name);
131				txn.set(key, revision::to_vec(self)?, None).await?;
132				// Refresh the table cache
133				let key = crate::key::database::tb::new(ns, db, ft);
134				let tb = txn.get_tb(ns, db, ft).await?;
135				txn.set(
136					key,
137					revision::to_vec(&DefineTableStatement {
138						cache_tables_ts: Uuid::now_v7(),
139						..tb.as_ref().clone()
140					})?,
141					None,
142				)
143				.await?;
144				// Clear the cache
145				if let Some(cache) = ctx.get_cache() {
146					cache.clear_tb(ns, db, ft);
147				}
148				// Clear the cache
149				txn.clear();
150				// Process the view data
151				let stm = UpdateStatement {
152					what: Values(vec![Value::Table(ft.clone())]),
153					output: Some(Output::None),
154					..UpdateStatement::default()
155				};
156				stm.compute(stk, ctx, opt, doc).await?;
157			}
158		}
159		// Clear the cache
160		if let Some(cache) = ctx.get_cache() {
161			cache.clear_tb(ns, db, &self.name);
162		}
163		// Clear the cache
164		txn.clear();
165		// Ok all good
166		Ok(Value::None)
167	}
168
169	fn convert_cache_ts(&self, _revision: u16, _value: Uuid) -> Result<(), RevisionError> {
170		Ok(())
171	}
172}
173
174impl DefineTableStatement {
175	/// Checks if this is a TYPE RELATION table
176	pub fn is_relation(&self) -> bool {
177		matches!(self.kind, TableType::Relation(_))
178	}
179	/// Checks if this table allows graph edges / relations
180	pub fn allows_relation(&self) -> bool {
181		matches!(self.kind, TableType::Relation(_) | TableType::Any)
182	}
183	/// Checks if this table allows normal records / documents
184	pub fn allows_normal(&self) -> bool {
185		matches!(self.kind, TableType::Normal | TableType::Any)
186	}
187	/// Used to add relational fields to existing table records
188	pub async fn add_in_out_fields(
189		txn: &Transaction,
190		ns: &str,
191		db: &str,
192		tb: &mut DefineTableStatement,
193	) -> Result<(), Error> {
194		// Add table relational fields
195		if let TableType::Relation(rel) = &tb.kind {
196			// Set the `in` field as a DEFINE FIELD definition
197			{
198				let key = crate::key::table::fd::new(ns, db, &tb.name, "in");
199				let val = rel.from.clone().unwrap_or(Kind::Record(vec![]));
200				txn.set(
201					key,
202					revision::to_vec(&DefineFieldStatement {
203						name: Idiom::from(IN.to_vec()),
204						what: tb.name.to_owned(),
205						kind: Some(val),
206						..Default::default()
207					})?,
208					None,
209				)
210				.await?;
211			}
212			// Set the `out` field as a DEFINE FIELD definition
213			{
214				let key = crate::key::table::fd::new(ns, db, &tb.name, "out");
215				let val = rel.to.clone().unwrap_or(Kind::Record(vec![]));
216				txn.set(
217					key,
218					revision::to_vec(&DefineFieldStatement {
219						name: Idiom::from(OUT.to_vec()),
220						what: tb.name.to_owned(),
221						kind: Some(val),
222						..Default::default()
223					})?,
224					None,
225				)
226				.await?;
227			}
228			// Refresh the table cache for the fields
229			tb.cache_fields_ts = Uuid::now_v7();
230		}
231		Ok(())
232	}
233}
234
235impl Display for DefineTableStatement {
236	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
237		write!(f, "DEFINE TABLE")?;
238		if self.if_not_exists {
239			write!(f, " IF NOT EXISTS")?
240		}
241		if self.overwrite {
242			write!(f, " OVERWRITE")?
243		}
244		write!(f, " {}", self.name)?;
245		write!(f, " TYPE")?;
246		match &self.kind {
247			TableType::Normal => {
248				f.write_str(" NORMAL")?;
249			}
250			TableType::Relation(rel) => {
251				f.write_str(" RELATION")?;
252				if let Some(Kind::Record(kind)) = &rel.from {
253					write!(
254						f,
255						" IN {}",
256						kind.iter().map(|t| t.0.as_str()).collect::<Vec<_>>().join(" | ")
257					)?;
258				}
259				if let Some(Kind::Record(kind)) = &rel.to {
260					write!(
261						f,
262						" OUT {}",
263						kind.iter().map(|t| t.0.as_str()).collect::<Vec<_>>().join(" | ")
264					)?;
265				}
266				if rel.enforced {
267					write!(f, " ENFORCED")?;
268				}
269			}
270			TableType::Any => {
271				f.write_str(" ANY")?;
272			}
273		}
274		if self.drop {
275			f.write_str(" DROP")?;
276		}
277		f.write_str(if self.full {
278			" SCHEMAFULL"
279		} else {
280			" SCHEMALESS"
281		})?;
282		if let Some(ref v) = self.comment {
283			write!(f, " COMMENT {v}")?
284		}
285		if let Some(ref v) = self.view {
286			write!(f, " {v}")?
287		}
288		if let Some(ref v) = self.changefeed {
289			write!(f, " {v}")?;
290		}
291		let _indent = if is_pretty() {
292			Some(pretty_indent())
293		} else {
294			f.write_char(' ')?;
295			None
296		};
297		write!(f, "{}", self.permissions)?;
298		Ok(())
299	}
300}
301
302impl InfoStructure for DefineTableStatement {
303	fn structure(self) -> Value {
304		Value::from(map! {
305			"name".to_string() => self.name.structure(),
306			"drop".to_string() => self.drop.into(),
307			"full".to_string() => self.full.into(),
308			"kind".to_string() => self.kind.structure(),
309			"view".to_string(), if let Some(v) = self.view => v.structure(),
310			"changefeed".to_string(), if let Some(v) = self.changefeed => v.structure(),
311			"permissions".to_string() => self.permissions.structure(),
312			"comment".to_string(), if let Some(v) = self.comment => v.into(),
313		})
314	}
315}