1use std::fmt;
2
3use anyhow::Result;
4use async_channel::Sender;
5use surrealdb_types::{SurrealValue, ToSql};
6
7use super::Transaction;
8use crate::catalog::providers::{
9 AuthorisationProvider, DatabaseProvider, TableProvider, UserProvider,
10};
11use crate::catalog::{DatabaseId, NamespaceId, Record, TableDefinition};
12use crate::cnf::EXPORT_BATCH_SIZE;
13use crate::err::Error;
14use crate::expr::paths::{IN, OUT};
15use crate::expr::statements::define::{DefineAccessStatement, DefineUserStatement};
16use crate::expr::{Base, DefineAnalyzerStatement};
17use crate::key::record;
18use crate::kvs::KVValue;
19use crate::sql::statements::OptionStatement;
20
21#[derive(Clone, Debug, SurrealValue)]
22#[surreal(crate = "surrealdb_types")]
23#[surreal(default)]
24pub struct Config {
25 pub users: bool,
26 pub accesses: bool,
27 pub params: bool,
28 pub functions: bool,
29 pub analyzers: bool,
30 pub tables: TableConfig,
31 pub versions: bool,
32 pub records: bool,
33 pub sequences: bool,
34}
35
36impl Default for Config {
37 fn default() -> Config {
38 Config {
39 users: true,
40 accesses: true,
41 params: true,
42 functions: true,
43 analyzers: true,
44 tables: TableConfig::default(),
45 versions: false,
46 records: true,
47 sequences: true,
48 }
49 }
50}
51
52#[derive(Clone, Debug, SurrealValue)]
55#[surreal(crate = "surrealdb_types")]
56pub struct ExcludedTables {
57 pub exclude: Vec<String>,
58}
59
60#[derive(Clone, Debug, Default, SurrealValue)]
61#[surreal(crate = "surrealdb_types")]
62#[surreal(untagged)]
63pub enum TableConfig {
64 #[default]
65 #[surreal(value = true)]
66 All,
67 #[surreal(value = false)]
68 None,
69 Some(Vec<String>),
70 Exclude(ExcludedTables),
71}
72
73impl From<bool> for TableConfig {
77 fn from(value: bool) -> Self {
78 match value {
79 true => TableConfig::All,
80 false => TableConfig::None,
81 }
82 }
83}
84
85impl From<Vec<String>> for TableConfig {
86 fn from(value: Vec<String>) -> Self {
87 TableConfig::Some(value)
88 }
89}
90
91impl From<Vec<&str>> for TableConfig {
92 fn from(value: Vec<&str>) -> Self {
93 TableConfig::Some(value.into_iter().map(ToOwned::to_owned).collect())
94 }
95}
96
97impl TableConfig {
98 pub(crate) fn is_any(&self) -> bool {
100 matches!(self, Self::All | Self::Some(_) | Self::Exclude(_))
101 }
102 pub(crate) fn includes(&self, table: &str) -> bool {
104 match self {
105 Self::All => true,
106 Self::None => false,
107 Self::Some(v) => v.iter().any(|v| v.eq(table)),
108 Self::Exclude(v) => !v.exclude.iter().any(|v| v.eq(table)),
109 }
110 }
111 pub(crate) fn names(&self) -> Option<&[String]> {
113 match self {
114 Self::Some(v) => Some(v.as_slice()),
115 Self::Exclude(v) => Some(v.exclude.as_slice()),
116 _ => None,
117 }
118 }
119}
120
121struct InlineCommentWriter<'a, F>(&'a mut F);
122impl<F: fmt::Write> fmt::Write for InlineCommentWriter<'_, F> {
123 fn write_str(&mut self, s: &str) -> fmt::Result {
124 for c in s.chars() {
125 self.write_char(c)?
126 }
127 Ok(())
128 }
129
130 fn write_char(&mut self, c: char) -> fmt::Result {
131 match c {
132 '\n' => self.0.write_str("\\n"),
133 '\r' => self.0.write_str("\\r"),
134 '\u{0085}' => self.0.write_str("\\u{0085}"),
136 '\u{2028}' => self.0.write_str("\\u{2028}"),
138 '\u{2029}' => self.0.write_str("\\u{2029}"),
140 _ => self.0.write_char(c),
141 }
142 }
143}
144
145struct InlineCommentDisplay<F>(F);
146impl<F: fmt::Display> fmt::Display for InlineCommentDisplay<F> {
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 fmt::Write::write_fmt(&mut InlineCommentWriter(f), format_args!("{}", self.0))
149 }
150}
151
152impl Transaction {
153 pub async fn export(
155 &self,
156 ns: &str,
157 db: &str,
158 cfg: Config,
159 chn: Sender<Vec<u8>>,
160 ) -> Result<()> {
161 let db = self.get_db_by_name(ns, db).await?.ok_or_else(|| {
162 anyhow::Error::new(Error::DbNotFound {
163 name: db.to_owned(),
164 })
165 })?;
166
167 self.export_metadata(&cfg, &chn, db.namespace_id, db.database_id).await?;
169 self.export_tables(&cfg, &chn, db.namespace_id, db.database_id).await?;
171 Ok(())
172 }
173
174 async fn export_metadata(
175 &self,
176 cfg: &Config,
177 chn: &Sender<Vec<u8>>,
178 ns: NamespaceId,
179 db: DatabaseId,
180 ) -> Result<()> {
181 self.export_section("OPTION", [OptionStatement::import()].into_iter(), chn).await?;
183
184 if cfg.users {
186 let users = self.all_db_users(ns, db).await?;
187 self.export_section(
188 "USERS",
189 users.iter().map(|x| DefineUserStatement::from_definition(Base::Db, x)),
190 chn,
191 )
192 .await?;
193 }
194
195 if cfg.accesses {
197 let accesses = self.all_db_accesses(ns, db).await?;
198 self.export_section(
199 "ACCESSES",
200 accesses
201 .iter()
202 .map(|x| DefineAccessStatement::from_definition(Base::Db, x).redact()),
203 chn,
204 )
205 .await?;
206 }
207
208 if cfg.params {
210 let params = self.all_db_params(ns, db).await?;
211 self.export_section("PARAMS", params.iter(), chn).await?;
212 }
213
214 if cfg.functions {
216 let functions = self.all_db_functions(ns, db).await?;
217 self.export_section("FUNCTIONS", functions.iter(), chn).await?;
218 }
219
220 if cfg.analyzers {
222 let analyzers = self.all_db_analyzers(ns, db).await?;
223 self.export_section(
224 "ANALYZERS",
225 analyzers.iter().map(DefineAnalyzerStatement::from_definition),
226 chn,
227 )
228 .await?;
229 }
230
231 if cfg.sequences {
233 let sequences = self.all_db_sequences(ns, db).await?;
234 self.export_section("SEQUENCES", sequences.iter(), chn).await?;
235 }
236
237 Ok(())
238 }
239
240 async fn export_section<T>(
241 &self,
242 title: &str,
243 items: impl ExactSizeIterator<Item = T>,
244 chn: &Sender<Vec<u8>>,
245 ) -> Result<()>
246 where
247 T: ToSql,
248 {
249 if items.len() == 0 {
250 return Ok(());
251 }
252
253 chn.send(bytes!("-- ------------------------------")).await?;
254 chn.send(bytes!(format!("-- {}", InlineCommentDisplay(title)))).await?;
255 chn.send(bytes!("-- ------------------------------")).await?;
256 chn.send(bytes!("")).await?;
257
258 for item in items {
259 chn.send(bytes!(format!("{};", item.to_sql()))).await?;
260 }
261
262 chn.send(bytes!("")).await?;
263 Ok(())
264 }
265
266 async fn export_tables(
267 &self,
268 cfg: &Config,
269 chn: &Sender<Vec<u8>>,
270 ns: NamespaceId,
271 db: DatabaseId,
272 ) -> Result<()> {
273 if !cfg.tables.is_any() {
275 return Ok(());
276 }
277 let tables = self.all_tb(ns, db, None).await?;
279 if let Some(names) = cfg.tables.names() {
281 let existing: Vec<&str> = tables.iter().map(|t| t.name.as_str()).collect();
282 for name in names {
283 if !existing.contains(&name.as_str()) {
284 warn!("Table '{name}' does not exist in the database");
285 }
286 }
287 }
288 for table in tables.iter() {
290 if !cfg.tables.includes(&table.name) {
292 continue;
293 }
294 self.export_table_structure(ns, db, table, chn).await?;
296 if cfg.records {
298 self.export_table_data(ns, db, table, chn).await?;
299 }
300 }
301
302 Ok(())
303 }
304
305 async fn export_table_structure(
306 &self,
307 ns: NamespaceId,
308 db: DatabaseId,
309 table: &TableDefinition,
310 chn: &Sender<Vec<u8>>,
311 ) -> Result<()> {
312 chn.send(bytes!("-- ------------------------------")).await?;
313 chn.send(bytes!(format!("-- TABLE: {}", InlineCommentDisplay(&table.name)))).await?;
314 chn.send(bytes!("-- ------------------------------")).await?;
315 chn.send(bytes!("")).await?;
316 chn.send(bytes!(format!("{};", table.to_sql()))).await?;
317 chn.send(bytes!("")).await?;
318 let fields = self.all_tb_fields(ns, db, &table.name, None).await?;
320 for field in fields.iter() {
321 chn.send(bytes!(format!("{};", field.to_sql()))).await?;
322 }
323 chn.send(bytes!("")).await?;
324 let indexes = self.all_tb_indexes(ns, db, &table.name).await?;
326 for index in indexes.iter() {
327 chn.send(bytes!(format!("{};", index.to_sql()))).await?;
328 }
329 chn.send(bytes!("")).await?;
330 let events = self.all_tb_events(ns, db, &table.name).await?;
332 for event in events.iter() {
333 chn.send(bytes!(format!("{};", event.to_sql()))).await?;
334 }
335 chn.send(bytes!("")).await?;
336 Ok(())
338 }
339
340 async fn export_table_data(
341 &self,
342 ns: NamespaceId,
343 db: DatabaseId,
344 table: &TableDefinition,
345 chn: &Sender<Vec<u8>>,
346 ) -> Result<()> {
347 chn.send(bytes!("-- ------------------------------")).await?;
348 chn.send(bytes!(format!("-- TABLE DATA: {}", InlineCommentDisplay(&table.name)))).await?;
349 chn.send(bytes!("-- ------------------------------")).await?;
350 chn.send(bytes!("")).await?;
351
352 let beg = crate::key::record::prefix(ns, db, &table.name)?;
353 let end = crate::key::record::suffix(ns, db, &table.name)?;
354 let mut next = Some(beg..end);
355
356 while let Some(rng) = next {
357 let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
358 next = batch.next;
359 if batch.result.is_empty() {
361 break;
362 }
363 self.export_regular_data(batch.result, chn).await?;
364 }
365
366 chn.send(bytes!("")).await?;
367 Ok(())
368 }
369
370 fn process_record(
386 k: record::RecordKey,
387 mut record: Record,
388 records_relate: &mut String,
389 records_normal: &mut String,
390 ) {
391 let rid = crate::val::RecordId {
393 table: k.tb.into_owned(),
394 key: k.id,
395 };
396 record.data.def(rid);
397 if record.is_edge()
399 && let crate::val::Value::RecordId(_) = record.data.pick(&*IN)
400 && let crate::val::Value::RecordId(_) = record.data.pick(&*OUT)
401 {
402 if !records_relate.is_empty() {
405 records_relate.push_str(", ");
406 }
407 records_relate.push_str(&record.data.to_sql());
408 } else {
409 if !records_normal.is_empty() {
411 records_normal.push_str(", ");
412 }
413 records_normal.push_str(&record.data.to_sql());
414 }
415 }
416
417 async fn export_regular_data(
435 &self,
436 regular_values: Vec<(Vec<u8>, Vec<u8>)>,
437 chn: &Sender<Vec<u8>>,
438 ) -> Result<()> {
439 let mut records_normal = String::new();
442 let mut records_relate = String::new();
443
444 for (k, v) in regular_values {
446 let k = record::RecordKey::decode_key(&k)?;
447 let v = Record::kv_decode_value(v)?;
448 Self::process_record(k, v, &mut records_relate, &mut records_normal);
450 }
451
452 if !records_normal.is_empty() {
454 let sql = format!("INSERT [ {} ];", records_normal);
455 chn.send(bytes!(sql)).await?;
456 }
457
458 if !records_relate.is_empty() {
461 let sql = format!("INSERT RELATION [ {} ];", records_relate);
462 chn.send(bytes!(sql)).await?;
463 }
464
465 Ok(())
466 }
467}