Skip to main content

surrealdb_core/kvs/
export.rs

1use std::fmt;
2
3use anyhow::Result;
4use async_channel::Sender;
5use surrealdb_types::{SurrealValue, ToSql};
6
7use super::Transaction;
8use crate::catalog::providers::{
9	AuthorisationProvider, DatabaseProvider, TableProvider, UserProvider,
10};
11use crate::catalog::{DatabaseId, NamespaceId, Record, TableDefinition};
12use crate::cnf::EXPORT_BATCH_SIZE;
13use crate::err::Error;
14use crate::expr::paths::{IN, OUT};
15use crate::expr::statements::define::{DefineAccessStatement, DefineUserStatement};
16use crate::expr::{Base, DefineAnalyzerStatement};
17use crate::key::record;
18use crate::kvs::KVValue;
19use crate::sql::statements::OptionStatement;
20
21#[derive(Clone, Debug, SurrealValue)]
22#[surreal(crate = "surrealdb_types")]
23#[surreal(default)]
24pub struct Config {
25	pub users: bool,
26	pub accesses: bool,
27	pub params: bool,
28	pub functions: bool,
29	pub analyzers: bool,
30	pub tables: TableConfig,
31	pub versions: bool,
32	pub records: bool,
33	pub sequences: bool,
34}
35
36impl Default for Config {
37	fn default() -> Config {
38		Config {
39			users: true,
40			accesses: true,
41			params: true,
42			functions: true,
43			analyzers: true,
44			tables: TableConfig::default(),
45			versions: false,
46			records: true,
47			sequences: true,
48		}
49	}
50}
51
52/// Named-field wrapper so that the untagged `SurrealValue` serialization
53/// can differentiate `Exclude` from `Some` (include).
54#[derive(Clone, Debug, SurrealValue)]
55#[surreal(crate = "surrealdb_types")]
56pub struct ExcludedTables {
57	pub exclude: Vec<String>,
58}
59
60#[derive(Clone, Debug, Default, SurrealValue)]
61#[surreal(crate = "surrealdb_types")]
62#[surreal(untagged)]
63pub enum TableConfig {
64	#[default]
65	#[surreal(value = true)]
66	All,
67	#[surreal(value = false)]
68	None,
69	Some(Vec<String>),
70	Exclude(ExcludedTables),
71}
72
73// TODO: This should probably be removed
74// This is not a good from implementation,
75// It is not direct: What true and false mean when converted to a table config?
76impl From<bool> for TableConfig {
77	fn from(value: bool) -> Self {
78		match value {
79			true => TableConfig::All,
80			false => TableConfig::None,
81		}
82	}
83}
84
85impl From<Vec<String>> for TableConfig {
86	fn from(value: Vec<String>) -> Self {
87		TableConfig::Some(value)
88	}
89}
90
91impl From<Vec<&str>> for TableConfig {
92	fn from(value: Vec<&str>) -> Self {
93		TableConfig::Some(value.into_iter().map(ToOwned::to_owned).collect())
94	}
95}
96
97impl TableConfig {
98	/// Check if we should export tables
99	pub(crate) fn is_any(&self) -> bool {
100		matches!(self, Self::All | Self::Some(_) | Self::Exclude(_))
101	}
102	// Check if we should export a specific table
103	pub(crate) fn includes(&self, table: &str) -> bool {
104		match self {
105			Self::All => true,
106			Self::None => false,
107			Self::Some(v) => v.iter().any(|v| v.eq(table)),
108			Self::Exclude(v) => !v.exclude.iter().any(|v| v.eq(table)),
109		}
110	}
111	/// Returns the explicitly listed table names, if any.
112	pub(crate) fn names(&self) -> Option<&[String]> {
113		match self {
114			Self::Some(v) => Some(v.as_slice()),
115			Self::Exclude(v) => Some(v.exclude.as_slice()),
116			_ => None,
117		}
118	}
119}
120
121struct InlineCommentWriter<'a, F>(&'a mut F);
122impl<F: fmt::Write> fmt::Write for InlineCommentWriter<'_, F> {
123	fn write_str(&mut self, s: &str) -> fmt::Result {
124		for c in s.chars() {
125			self.write_char(c)?
126		}
127		Ok(())
128	}
129
130	fn write_char(&mut self, c: char) -> fmt::Result {
131		match c {
132			'\n' => self.0.write_str("\\n"),
133			'\r' => self.0.write_str("\\r"),
134			// NEL/Next Line
135			'\u{0085}' => self.0.write_str("\\u{0085}"),
136			// line seperator
137			'\u{2028}' => self.0.write_str("\\u{2028}"),
138			// Paragraph seperator
139			'\u{2029}' => self.0.write_str("\\u{2029}"),
140			_ => self.0.write_char(c),
141		}
142	}
143}
144
145struct InlineCommentDisplay<F>(F);
146impl<F: fmt::Display> fmt::Display for InlineCommentDisplay<F> {
147	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148		fmt::Write::write_fmt(&mut InlineCommentWriter(f), format_args!("{}", self.0))
149	}
150}
151
152impl Transaction {
153	/// Writes the full database contents as binary SQL.
154	pub async fn export(
155		&self,
156		ns: &str,
157		db: &str,
158		cfg: Config,
159		chn: Sender<Vec<u8>>,
160	) -> Result<()> {
161		let db = self.get_db_by_name(ns, db).await?.ok_or_else(|| {
162			anyhow::Error::new(Error::DbNotFound {
163				name: db.to_owned(),
164			})
165		})?;
166
167		// Output USERS, ACCESSES, PARAMS, FUNCTIONS, ANALYZERS
168		self.export_metadata(&cfg, &chn, db.namespace_id, db.database_id).await?;
169		// Output TABLES
170		self.export_tables(&cfg, &chn, db.namespace_id, db.database_id).await?;
171		Ok(())
172	}
173
174	async fn export_metadata(
175		&self,
176		cfg: &Config,
177		chn: &Sender<Vec<u8>>,
178		ns: NamespaceId,
179		db: DatabaseId,
180	) -> Result<()> {
181		// Output OPTIONS
182		self.export_section("OPTION", [OptionStatement::import()].into_iter(), chn).await?;
183
184		// Output USERS
185		if cfg.users {
186			let users = self.all_db_users(ns, db).await?;
187			self.export_section(
188				"USERS",
189				users.iter().map(|x| DefineUserStatement::from_definition(Base::Db, x)),
190				chn,
191			)
192			.await?;
193		}
194
195		// Output ACCESSES
196		if cfg.accesses {
197			let accesses = self.all_db_accesses(ns, db).await?;
198			self.export_section(
199				"ACCESSES",
200				accesses
201					.iter()
202					.map(|x| DefineAccessStatement::from_definition(Base::Db, x).redact()),
203				chn,
204			)
205			.await?;
206		}
207
208		// Output PARAMS
209		if cfg.params {
210			let params = self.all_db_params(ns, db).await?;
211			self.export_section("PARAMS", params.iter(), chn).await?;
212		}
213
214		// Output FUNCTIONS
215		if cfg.functions {
216			let functions = self.all_db_functions(ns, db).await?;
217			self.export_section("FUNCTIONS", functions.iter(), chn).await?;
218		}
219
220		// Output ANALYZERS
221		if cfg.analyzers {
222			let analyzers = self.all_db_analyzers(ns, db).await?;
223			self.export_section(
224				"ANALYZERS",
225				analyzers.iter().map(DefineAnalyzerStatement::from_definition),
226				chn,
227			)
228			.await?;
229		}
230
231		// Output SEQUENCES
232		if cfg.sequences {
233			let sequences = self.all_db_sequences(ns, db).await?;
234			self.export_section("SEQUENCES", sequences.iter(), chn).await?;
235		}
236
237		Ok(())
238	}
239
240	async fn export_section<T>(
241		&self,
242		title: &str,
243		items: impl ExactSizeIterator<Item = T>,
244		chn: &Sender<Vec<u8>>,
245	) -> Result<()>
246	where
247		T: ToSql,
248	{
249		if items.len() == 0 {
250			return Ok(());
251		}
252
253		chn.send(bytes!("-- ------------------------------")).await?;
254		chn.send(bytes!(format!("-- {}", InlineCommentDisplay(title)))).await?;
255		chn.send(bytes!("-- ------------------------------")).await?;
256		chn.send(bytes!("")).await?;
257
258		for item in items {
259			chn.send(bytes!(format!("{};", item.to_sql()))).await?;
260		}
261
262		chn.send(bytes!("")).await?;
263		Ok(())
264	}
265
266	async fn export_tables(
267		&self,
268		cfg: &Config,
269		chn: &Sender<Vec<u8>>,
270		ns: NamespaceId,
271		db: DatabaseId,
272	) -> Result<()> {
273		// Check if tables are included in the export config
274		if !cfg.tables.is_any() {
275			return Ok(());
276		}
277		// Fetch all of the tables for this NS / DB
278		let tables = self.all_tb(ns, db, None).await?;
279		// Warn if any specified table names don't match existing tables
280		if let Some(names) = cfg.tables.names() {
281			let existing: Vec<&str> = tables.iter().map(|t| t.name.as_str()).collect();
282			for name in names {
283				if !existing.contains(&name.as_str()) {
284					warn!("Table '{name}' does not exist in the database");
285				}
286			}
287		}
288		// Loop over all of the tables in order
289		for table in tables.iter() {
290			// Check if this table is included in the export config
291			if !cfg.tables.includes(&table.name) {
292				continue;
293			}
294			// Export the table definition structure first
295			self.export_table_structure(ns, db, table, chn).await?;
296			// Then export the table data if its desired
297			if cfg.records {
298				self.export_table_data(ns, db, table, chn).await?;
299			}
300		}
301
302		Ok(())
303	}
304
305	async fn export_table_structure(
306		&self,
307		ns: NamespaceId,
308		db: DatabaseId,
309		table: &TableDefinition,
310		chn: &Sender<Vec<u8>>,
311	) -> Result<()> {
312		chn.send(bytes!("-- ------------------------------")).await?;
313		chn.send(bytes!(format!("-- TABLE: {}", InlineCommentDisplay(&table.name)))).await?;
314		chn.send(bytes!("-- ------------------------------")).await?;
315		chn.send(bytes!("")).await?;
316		chn.send(bytes!(format!("{};", table.to_sql()))).await?;
317		chn.send(bytes!("")).await?;
318		// Export all table field definitions for this table
319		let fields = self.all_tb_fields(ns, db, &table.name, None).await?;
320		for field in fields.iter() {
321			chn.send(bytes!(format!("{};", field.to_sql()))).await?;
322		}
323		chn.send(bytes!("")).await?;
324		// Export all table index definitions for this table
325		let indexes = self.all_tb_indexes(ns, db, &table.name).await?;
326		for index in indexes.iter() {
327			chn.send(bytes!(format!("{};", index.to_sql()))).await?;
328		}
329		chn.send(bytes!("")).await?;
330		// Export all table event definitions for this table
331		let events = self.all_tb_events(ns, db, &table.name).await?;
332		for event in events.iter() {
333			chn.send(bytes!(format!("{};", event.to_sql()))).await?;
334		}
335		chn.send(bytes!("")).await?;
336		// Everything ok
337		Ok(())
338	}
339
340	async fn export_table_data(
341		&self,
342		ns: NamespaceId,
343		db: DatabaseId,
344		table: &TableDefinition,
345		chn: &Sender<Vec<u8>>,
346	) -> Result<()> {
347		chn.send(bytes!("-- ------------------------------")).await?;
348		chn.send(bytes!(format!("-- TABLE DATA: {}", InlineCommentDisplay(&table.name)))).await?;
349		chn.send(bytes!("-- ------------------------------")).await?;
350		chn.send(bytes!("")).await?;
351
352		let beg = crate::key::record::prefix(ns, db, &table.name)?;
353		let end = crate::key::record::suffix(ns, db, &table.name)?;
354		let mut next = Some(beg..end);
355
356		while let Some(rng) = next {
357			let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
358			next = batch.next;
359			// If there are no values, return early.
360			if batch.result.is_empty() {
361				break;
362			}
363			self.export_regular_data(batch.result, chn).await?;
364		}
365
366		chn.send(bytes!("")).await?;
367		Ok(())
368	}
369
370	/// Processes a record and categorizes it for SQL export.
371	///
372	/// This function processes a record, categorizing it into either normal
373	/// records or graph edge records, and writes it to the appropriate string
374	/// buffer for later SQL generation.
375	///
376	/// Note: Only the latest version of each record is exported. Historical
377	/// versions must be exported at the KV level.
378	///
379	/// # Arguments
380	///
381	/// * `k` - The record key.
382	/// * `record` - The record to be processed.
383	/// * `records_relate` - A mutable reference to a string buffer for graph edge records.
384	/// * `records_normal` - A mutable reference to a string buffer for normal records.
385	fn process_record(
386		k: record::RecordKey,
387		mut record: Record,
388		records_relate: &mut String,
389		records_normal: &mut String,
390	) {
391		// Inject the id field into the document before processing.
392		let rid = crate::val::RecordId {
393			table: k.tb.into_owned(),
394			key: k.id,
395		};
396		record.data.def(rid);
397		// Match on the value to determine if it is a graph edge record or a normal record.
398		if record.is_edge()
399			&& let crate::val::Value::RecordId(_) = record.data.pick(&*IN)
400			&& let crate::val::Value::RecordId(_) = record.data.pick(&*OUT)
401		{
402			// If the value is a graph edge record (indicated by EDGE, IN, and OUT fields):
403			// Write the value to the records_relate string.
404			if !records_relate.is_empty() {
405				records_relate.push_str(", ");
406			}
407			records_relate.push_str(&record.data.to_sql());
408		} else {
409			// If the value is a normal record, write it to the records_normal string.
410			if !records_normal.is_empty() {
411				records_normal.push_str(", ");
412			}
413			records_normal.push_str(&record.data.to_sql());
414		}
415	}
416
417	/// Exports regular data to the provided channel.
418	///
419	/// This function processes a list of regular values, converting them into
420	/// SQL commands and sending them to the provided channel. It handles both
421	/// normal records and graph edge records, and ensures that the appropriate
422	/// SQL commands are generated for each type of record.
423	///
424	/// # Arguments
425	///
426	/// * `regular_values` - A vector of tuples containing the regular values to be exported. Each
427	///   tuple consists of a key and a value.
428	/// * `chn` - A reference to the channel to which the SQL commands will be sent.
429	///
430	/// # Returns
431	///
432	/// * `Result<()>` - Returns `Ok(())` if the operation is successful, or an `Error` if an error
433	///   occurs.
434	async fn export_regular_data(
435		&self,
436		regular_values: Vec<(Vec<u8>, Vec<u8>)>,
437		chn: &Sender<Vec<u8>>,
438	) -> Result<()> {
439		// Initialize strings to hold normal records and graph edge records.
440		// Write directly to strings to avoid unnecessary allocations.
441		let mut records_normal = String::new();
442		let mut records_relate = String::new();
443
444		// Process each regular value.
445		for (k, v) in regular_values {
446			let k = record::RecordKey::decode_key(&k)?;
447			let v = Record::kv_decode_value(v)?;
448			// Process the value and categorize it into records_relate or records_normal.
449			Self::process_record(k, v, &mut records_relate, &mut records_normal);
450		}
451
452		// If there are normal records, generate and send the INSERT SQL command.
453		if !records_normal.is_empty() {
454			let sql = format!("INSERT [ {} ];", records_normal);
455			chn.send(bytes!(sql)).await?;
456		}
457
458		// If there are graph edge records, generate and send the INSERT RELATION SQL
459		// command.
460		if !records_relate.is_empty() {
461			let sql = format!("INSERT RELATION [ {} ];", records_relate);
462			chn.send(bytes!(sql)).await?;
463		}
464
465		Ok(())
466	}
467}