pgdump_toc_rewrite/
lib.rs

1/*
2 * Copyright 2023, WiltonDB Software
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17mod keywords;
18mod rewrite_catalog;
19mod rewrite_sql;
20mod toc_datetime;
21mod toc_entry;
22mod toc_error;
23mod toc_header;
24mod toc_string;
25mod toc_reader;
26mod toc_writer;
27mod utils;
28
29use std::collections::HashMap;
30use std::fs;
31use std::fs::File;
32use std::io::BufReader;
33use std::io::BufWriter;
34use std::io::Write;
35use std::path::Path;
36
37use serde::Deserialize;
38use serde::Serialize;
39use serde_json;
40
41use keywords::KEYWORDS;
42use rewrite_catalog::rewrite_catalog;
43use rewrite_catalog::rewrite_catalog_all_at_once;
44use toc_entry::TocEntry;
45use toc_entry::TocEntryJson;
46use toc_error::TocError;
47use toc_header::TocHeader;
48use toc_header::TocHeaderJson;
49use toc_reader::TocReader;
50use toc_string::TocString;
51use toc_writer::TocWriter;
52
53pub use rewrite_sql::rewrite_schema_in_sql;
54pub use rewrite_sql::rewrite_schema_in_sql_single_quoted;
55pub use rewrite_sql::rewrite_schema_in_sql_unqualified;
56pub use rewrite_sql::rewrite_schema_in_sql_qualified_single_quoted;
57
58
59#[derive(Default, Debug, Clone)]
60struct TocCtx {
61    header: TocHeader,
62    orig_dbname: String,
63    orig_dbname_with_underscore: String,
64    dest_dbname: String,
65    schemas: HashMap<String, String>,
66    owners: HashMap<String, String>,
67    catalog_files: HashMap<String, String>
68}
69
70impl TocCtx {
71    fn new(header: TocHeader, orig_dbname: &str, dest_dbname: &str) -> Self {
72        Self {
73            header,
74            orig_dbname: orig_dbname.to_string(),
75            orig_dbname_with_underscore: format!("{}_", orig_dbname),
76            dest_dbname: dest_dbname.to_string(),
77            ..Default::default()
78        }
79    }
80
81    fn catalog_filename(&self, bbf_catalog: &str) -> Result<String, TocError> {
82        match self.catalog_files.get(bbf_catalog) {
83            Some(fname) => Ok(fname.clone()),
84            None => return Err(TocError::new(&format!(
85                "Catalog table not found: {}", bbf_catalog)))
86        }
87    }
88}
89
90#[derive(Serialize, Deserialize, Debug)]
91pub(crate) struct TocJson {
92    pub(crate) header: TocHeaderJson,
93    pub(crate) entries: Vec<TocEntryJson>
94}
95
96fn replace_record_rolname(ctx: &TocCtx, rec: &mut Vec<String>, idx: usize) -> Result<(), TocError> {
97    let rolname = &rec[idx];
98    if let Some(replaced) = ctx.owners.get(rolname) {
99        rec[idx] = replaced.clone();
100    };
101    Ok(())
102}
103
104fn replace_record_schema(ctx: &TocCtx, rec: &mut Vec<String>, idx: usize) -> Result<(), TocError> {
105    let schema = &rec[idx];
106    if let Some(replaced) = ctx.schemas.get(schema) {
107        rec[idx] = replaced.clone();
108    };
109    Ok(())
110}
111
112fn replace_record_schema_in_signature(ctx: &TocCtx, rec: &mut Vec<String>, idx: usize) -> Result<(), TocError> {
113    let sig = &rec[idx];
114    let replaced = rewrite_schema_in_sql(&ctx.schemas, sig)?;
115    rec[idx] = replaced;
116    Ok(())
117}
118
119fn replace_record_dbname(ctx: &TocCtx, rec: &mut Vec<String>, idx: usize) -> Result<(), TocError> {
120    let dbname = &rec[idx];
121    if ctx.orig_dbname == *dbname {
122        rec[idx] = ctx.dest_dbname.clone()
123    }
124    Ok(())
125}
126
127fn rewrite_bbf_authid_user_ext(ctx: &TocCtx, dir_path: &Path) -> Result<(), TocError> {
128    let filename = ctx.catalog_filename("babelfish_authid_user_ext")?;
129    rewrite_catalog(dir_path, &filename, ctx.header.compression, |mut rec| {
130        replace_record_rolname(ctx, &mut rec, 0)?;
131        replace_record_dbname(ctx, &mut rec, 11)?;
132        Ok(rec)
133    })?;
134    Ok(())
135}
136
137fn rewrite_bbf_extended_properties(ctx: &TocCtx, dir_path: &Path) -> Result<(), TocError> {
138    let filename = ctx.catalog_filename("babelfish_extended_properties")?;
139    rewrite_catalog_all_at_once(dir_path, &filename, ctx.header.compression, |sql| {
140        let replaced = rewrite_schema_in_sql_single_quoted(&ctx.schemas, &sql)?;
141        Ok(replaced)
142    })?;
143    Ok(())
144}
145
146fn rewrite_bbf_function_ext(ctx: &TocCtx, dir_path: &Path) -> Result<(), TocError> {
147    let filename = ctx.catalog_filename("babelfish_function_ext")?;
148    rewrite_catalog(dir_path, &filename, ctx.header.compression, |mut rec| {
149        replace_record_schema(ctx, &mut rec, 0)?;
150        replace_record_schema_in_signature(ctx, &mut rec, 3)?;
151        Ok(rec)
152    })?;
153    Ok(())
154}
155
156fn rewrite_bbf_namespace_ext(ctx: &TocCtx, dir_path: &Path) -> Result<(), TocError> {
157    let filename = ctx.catalog_filename("babelfish_namespace_ext")?;
158    rewrite_catalog(dir_path, &filename, ctx.header.compression, |mut rec| {
159        replace_record_schema(ctx, &mut rec, 0)?;
160        Ok(rec)
161    })?;
162    Ok(())
163}
164
165fn rewrite_bbf_sysdatabases(ctx: &TocCtx, dir_path: &Path) -> Result<(), TocError> {
166    let filename = ctx.catalog_filename("babelfish_sysdatabases")?;
167    rewrite_catalog(dir_path, &filename, ctx.header.compression, |mut rec| {
168        replace_record_dbname(ctx, &mut rec, 4)?;
169        Ok(rec)
170    })?;
171    Ok(())
172}
173
174fn rewrite_babelfish_catalogs(ctx: &TocCtx, dir_path: &Path) -> Result<(), TocError> {
175    rewrite_bbf_authid_user_ext(ctx, dir_path)?;
176    rewrite_bbf_extended_properties(ctx, dir_path)?;
177    rewrite_bbf_function_ext(ctx, dir_path)?;
178    rewrite_bbf_namespace_ext(ctx, dir_path)?;
179    rewrite_bbf_sysdatabases(ctx, dir_path)?;
180    Ok(())
181}
182
183fn replace_schema_tstr(schemas: &HashMap<String, String>, sql: &TocString) -> Result<TocString, TocError> {
184    if sql.opt.is_none() {
185        return Ok(TocString::none())
186    };
187    let sql_st = sql.to_string()?;
188    let sql_rewritten = rewrite_schema_in_sql(schemas, &sql_st)?;
189    Ok(TocString::from_string(sql_rewritten))
190}
191
192fn replace_schema_tstr_unqualified(schemas: &HashMap<String, String>, sql: &TocString) -> Result<TocString, TocError> {
193    if sql.opt.is_none() {
194        return Ok(TocString::none())
195    };
196    let sql_st = sql.to_string()?;
197    let sql_rewritten = rewrite_schema_in_sql_unqualified(schemas, &sql_st)?;
198    Ok(TocString::from_string(sql_rewritten))
199}
200
201fn replace_schema_tstr_qualified_single_quoted(schemas: &HashMap<String, String>, sql: &TocString) -> Result<TocString, TocError> {
202    if sql.opt.is_none() {
203        return Ok(TocString::none())
204    };
205    let sql_st = sql.to_string()?;
206    let sql_rewritten = rewrite_schema_in_sql_qualified_single_quoted(schemas, &sql_st)?;
207    Ok(TocString::from_string(sql_rewritten))
208}
209
210fn replace_create_stmt(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
211    te.create_stmt = replace_schema_tstr(&ctx.schemas, &te.create_stmt)?;
212    Ok(())
213}
214
215fn replace_create_stmt_unqualified(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
216    te.create_stmt = replace_schema_tstr_unqualified(&ctx.schemas, &te.create_stmt)?;
217    Ok(())
218}
219
220fn replace_create_stmt_qualified_single_quoted(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
221    te.create_stmt = replace_schema_tstr_qualified_single_quoted(&ctx.schemas, &te.create_stmt)?;
222    Ok(())
223}
224
225fn replace_drop_stmt(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
226    te.drop_stmt = replace_schema_tstr(&ctx.schemas, &te.drop_stmt)?;
227    Ok(())
228}
229
230fn replace_drop_stmt_unqualified(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
231    te.drop_stmt = replace_schema_tstr_unqualified(&ctx.schemas, &te.drop_stmt)?;
232    Ok(())
233}
234
235fn replace_copy_stmt(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
236    te.copy_stmt = replace_schema_tstr(&ctx.schemas, &te.copy_stmt)?;
237    Ok(())
238}
239
240fn replace_tag(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
241    te.tag = replace_schema_tstr(&ctx.schemas, &te.tag)?;
242    Ok(())
243}
244
245fn replace_tag_unqualified(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
246    te.tag = replace_schema_tstr_unqualified(&ctx.schemas, &te.tag)?;
247    Ok(())
248}
249
250fn replace_owner(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
251    if let Some(replaced) = ctx.owners.get(&te.owner.to_string()?) {
252        te.owner = TocString::from_str(replaced);
253    };
254    Ok(())
255}
256
257fn replace_namespace(ctx: &TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
258    if let Some(replaced) = ctx.schemas.get(&te.namespace.to_string()?) {
259        te.namespace = TocString::from_str(replaced);
260    };
261    Ok(())
262}
263
264fn collect_schema_and_owner(ctx: &mut TocCtx, te: &TocEntry) -> Result<(), TocError> {
265    let schema_orig = te.tag.to_string()?;
266    if !schema_orig.starts_with(&ctx.orig_dbname_with_underscore) {
267        return Err(TocError::new(&format!("Unexpected schema name: {}", schema_orig)));
268    }
269    let schema_suffix = schema_orig.chars().skip(ctx.orig_dbname_with_underscore.len()).collect::<String>();
270    let schema_dest = format!("{}_{}", ctx.dest_dbname, schema_suffix);
271    ctx.schemas.insert(schema_orig.clone(), schema_dest.clone());
272
273    let owner_orig = te.owner.to_string()?;
274    if owner_orig.starts_with(&ctx.orig_dbname_with_underscore) {
275        let owner_suffix = owner_orig.chars().skip(ctx.orig_dbname_with_underscore.len()).collect::<String>();
276        let owner_dest = format!("{}_{}", ctx.dest_dbname, owner_suffix);
277        ctx.owners.insert(owner_orig.clone(), owner_dest.clone());
278    }
279    Ok(())
280}
281
282fn collect_babelfish_catalog_filename(ctx: &mut TocCtx, te: &TocEntry) -> Result<(), TocError> {
283    let catalogs = vec!(
284        "babelfish_authid_user_ext",
285        "babelfish_extended_properties",
286        "babelfish_function_ext",
287        "babelfish_namespace_ext",
288        "babelfish_sysdatabases",
289    );
290    let tag = te.tag.to_string()?;
291    if catalogs.contains(&tag.as_str()) {
292        ctx.catalog_files.insert(tag, te.filename.to_string()?);
293    }
294    Ok(())
295}
296
297fn modify_toc_entry(ctx: &mut TocCtx, te: &mut TocEntry) -> Result<(), TocError> {
298    let tag = te.tag.to_string()?;
299    let description = te.description.to_string()?;
300    if "SCHEMA" == description {
301        collect_schema_and_owner(ctx, te)?;
302        replace_tag_unqualified(ctx, te)?;
303        replace_create_stmt_unqualified(ctx, te)?;
304        replace_drop_stmt_unqualified(ctx, te)?;
305        replace_owner(ctx, te)?;
306    } else if "ACL" == description && tag.starts_with("SCHEMA ") {
307        replace_tag_unqualified(ctx, te)?;
308        replace_create_stmt_unqualified(ctx, te)?;
309        replace_owner(ctx, te)?;
310    } else if "SEQUENCE SET" == description {
311        replace_create_stmt_qualified_single_quoted(ctx, te)?;
312        replace_namespace(ctx, te)?;
313        replace_owner(ctx, te)?;
314    } else {
315        if "TABLE DATA" == description {
316            collect_babelfish_catalog_filename(ctx, te)?;
317        }
318        replace_tag(ctx, te)?;
319        replace_create_stmt(ctx, te)?;
320        replace_drop_stmt(ctx, te)?;
321        replace_copy_stmt(ctx, te)?;
322        replace_namespace(ctx, te)?;
323        replace_owner(ctx, te)?;
324    }
325
326    Ok(())
327}
328
329fn check_dbname(dbname: &str) -> Result<(), TocError> {
330    let error = Err(TocError::new(&format!("Invalid db name specified: [{}]", dbname)));
331    if dbname.is_empty() {
332        return error;
333    }
334    if dbname.trim() != dbname {
335        return error;
336    }
337    let first_char = dbname.chars().nth(0).ok_or(TocError::from_str("First char read error"))?;
338    if !((first_char >= 'a' && first_char <= 'z') || first_char == '_') {
339        return error;
340    }
341    for ch in dbname.chars() {
342        if !((ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || (ch == '_')) {
343            return error;
344        }
345    }
346    if KEYWORDS.contains(&dbname) {
347        return error;
348    }
349    Ok(())
350}
351
352fn reorder_babelfish_catalogs(entries: &mut Vec<TocEntry>) -> Result<(), TocError> {
353    let mut sysdatabases_idx = 0usize;
354    let mut extended_properties_idx = 0usize;
355    let mut function_ext_idx = 0usize;
356    let mut namespace_ext_idx = 0usize;
357    let mut view_def_idx = 0usize;
358    for idx in 0..entries.len() {
359        let te = &entries[idx];
360        if te.description.to_string()? == "TABLE DATA" {
361            let tag = te.tag.to_string()?;
362            if tag == "babelfish_sysdatabases" {
363                sysdatabases_idx = idx;
364            } else if tag == "babelfish_extended_properties" {
365                extended_properties_idx = idx;
366            } else if tag == "babelfish_function_ext" {
367                function_ext_idx = idx;
368            } else if tag == "babelfish_namespace_ext" {
369                namespace_ext_idx = idx;
370            } else if tag == "babelfish_view_def" {
371                view_def_idx = idx;
372            }
373        }
374    }
375
376    if 0 == sysdatabases_idx {
377        return Err(TocError::from_str("Invalid TOC, 'babelfish_sysdatabases' table data must be present"));
378    }
379
380    let mut indices = vec!(
381        &mut extended_properties_idx,
382        &mut function_ext_idx,
383        &mut namespace_ext_idx,
384        &mut view_def_idx
385    );
386
387    // bubble sort variation
388    loop {
389        let mut swapped = false;
390        for i in 0..indices.len()  {
391            let idx = &mut indices[i];
392            if **idx > 0 && **idx < sysdatabases_idx {
393                entries.swap(**idx, sysdatabases_idx);
394                let tmp = **idx;
395                **idx = sysdatabases_idx;
396                sysdatabases_idx = tmp;
397                swapped = true;
398            }
399        }
400        if !swapped {
401            break;
402        }
403    }
404
405    Ok(())
406}
407
408fn longest_common_prefix(strs: &Vec<String>) -> String {
409    if strs.is_empty() {
410        return String::new();
411    }
412
413    // Start with the first string as the initial prefix
414    let mut prefix = strs[0].to_string();
415
416    // Compare the prefix with each string in the list
417    for s in &strs[1..] {
418        while !s.starts_with(&prefix) {
419            // Shorten the prefix until it matches
420            prefix.pop();
421            if prefix.is_empty() {
422                return String::new();
423            }
424        }
425    }
426
427    prefix
428}
429
430fn find_out_orig_dbname(entries: &Vec<TocEntry>) -> Result<String, TocError> {
431    let mut schemas = Vec::new();
432    for te in entries {
433        let description = te.description.to_string()?;
434        if "SCHEMA" == description {
435            let tag = te.tag.to_string()?;
436            schemas.push(tag);
437        }
438    }
439
440    let dbname_with_underscore = longest_common_prefix(&schemas);
441    if dbname_with_underscore.len() < 2 || !dbname_with_underscore.ends_with("_") {
442        return Err(TocError::from_str(&format!("Cannot determine original DB name, TOC schemas: {}", schemas.join(", "))));
443    }
444
445    let dbname = dbname_with_underscore.chars().take(dbname_with_underscore.len() - 1).collect();
446    Ok(dbname)
447}
448
449/// Reads `pg_dump` TOC as a JSON string.
450///
451/// TOC file `toc.dat` is created by `pg_dump` when it is run with directory format (`-Z d` flag).
452///
453/// # Arguments
454///
455/// * `toc_path` - Path to `pg_dump` TOC file
456pub fn read_toc_to_json<P: AsRef<Path>>(toc_path: P) -> Result<String, TocError> {
457    let toc_file = File::open(toc_path)?;
458    let mut reader = TocReader::new(BufReader::new(toc_file));
459    let header = reader.read_header()?;
460    let mut entries = Vec::with_capacity(header.toc_count as usize);
461    for _ in 0..header.toc_count {
462        let te = reader.read_entry()?;
463        entries.push(te.to_json()?);
464    }
465    let tj = TocJson { header: header.to_json()?, entries };
466    let res = serde_json::to_string_pretty(&tj)?;
467    Ok(res)
468}
469
470/// Writes `pg_dump` TOC from a JSON string.
471///
472/// JSON string can be generated with `read_toc_json`.
473///
474/// # Arguments
475///
476/// * `toc_path` - Path to destination TOC file
477/// * `toc_json` - JSON string
478pub fn write_toc_from_json<P: AsRef<Path>>(toc_path: P, toc_json: &str) -> Result<(), TocError> {
479    if toc_path.as_ref().exists() {
480        return Err(TocError::new(&format!("TOC file already exists on path: {}", toc_path.as_ref().to_string_lossy())));
481    }
482    let tj: TocJson = serde_json::from_str(toc_json)?;
483    let toc_file = File::create(toc_path)?;
484    let mut writer = TocWriter::new(BufWriter::new(toc_file));
485    let header = TocHeader::from_json(&tj.header)?;
486    writer.write_header(&header)?;
487    for ej in tj.entries {
488        let te = TocEntry::from_json(&ej)?;
489        writer.write_toc_entry(&te)?;
490    }
491    Ok(())
492}
493
494/// Prints `pg_dump` TOC contents to the specified writer.
495///
496/// TOC file `toc.dat` is created by `pg_dump` when it is run with directory format (`-Z d` flag).
497///
498/// # Arguments
499///
500/// * `toc_path` - Path to `pg_dump` TOC file
501/// * `writer` - Destination writer.
502pub fn print_toc<P: AsRef<Path>, W: Write>(toc_path: P, writer: &mut W) -> Result<(), TocError> {
503    let toc_file = File::open(toc_path)?;
504    let mut reader = TocReader::new(BufReader::new(toc_file));
505    let header = reader.read_header()?;
506    write!(writer, "{}", header)?;
507    for i in 0..header.toc_count {
508        let te = reader.read_entry()?;
509        writeln!(writer, "Entry: {}", i + 1)?;
510        writeln!(writer, "{}", te)?;
511    }
512    Ok(())
513}
514
515/// Rewrites `pg_dump` TOC and catalogs contents with the specified DB name.
516///
517/// TOC file `toc.dat` is created by `pg_dump` when it is run with directory format (`-Z d` flag).
518///
519/// # Arguments
520///
521/// * `toc_path` - Path to `pg_dump` TOC file
522/// * `dbname` - New name for logical database.
523pub fn rewrite_toc<P: AsRef<Path>>(toc_path: P, dbname: &str) -> Result<(), TocError> {
524    check_dbname(dbname)?;
525    let toc_src_path = toc_path.as_ref();
526    let dir_path = match toc_src_path.canonicalize()?.parent() {
527        Some(parent) => parent.to_path_buf(),
528        None => return Err(TocError::from_str("Error accessing dump directory"))
529    };
530    let toc_dest_path = dir_path.join("toc_rewritten.dat");
531    let toc_src = File::open(&toc_src_path)?;
532    let mut reader = TocReader::new(BufReader::new(toc_src));
533    let dest_file = File::create(&toc_dest_path)?;
534    let mut writer = TocWriter::new(BufWriter::new(dest_file));
535
536    let header = reader.read_header()?;
537    let mut entries = Vec::with_capacity(header.toc_count as usize);
538    for _ in 0..header.toc_count {
539        let te  = reader.read_entry()?;
540        entries.push(te);
541    }
542
543    reorder_babelfish_catalogs(&mut entries)?;
544
545    writer.write_header(&header)?;
546    let orig_dbname = find_out_orig_dbname(&entries)?;
547    let mut ctx = TocCtx::new(header, &orig_dbname, &dbname);
548    // _dbo owner may not be present if custom schemas are not used
549    ctx.owners.insert(format!("{}_dbo", &orig_dbname), format!("{}_dbo", &dbname));
550    for mut te in entries {
551        modify_toc_entry(&mut ctx, &mut te)?;
552        writer.write_toc_entry(&te)?;
553    }
554
555    rewrite_babelfish_catalogs(&ctx, dir_path.as_path())?;
556
557    let toc_orig_path = dir_path.join("toc.dat.orig");
558    fs::rename(&toc_src_path, &toc_orig_path)?;
559    fs::rename(&toc_dest_path, &toc_src_path)?;
560
561    Ok(())
562}