1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use std::collections::HashMap;
use std::fs;
use std::ops::{AddAssign, SubAssign};
use std::path::PathBuf;
use anyhow::{anyhow, Context};
use regex::Regex;
use text_file_sort::sort::Sort;
use crate::osm::apidb_dump::read::element_iterator::ElementIterator;
use crate::osm::apidb_dump::read::table_def::TableDef;
use crate::osm::apidb_dump::read::table_fields::TableFields;
/// Reader of apidb schema dump produced by pg_dump
pub struct Reader {
tables: HashMap<String, TableDef>,
}
impl Reader {
/// Create a new [Reader]
///
/// * input_path - a path to directory that contains apidb schema dump produced by pg_dump with
/// directory format. For example:
/// ```bash
/// pg_dump --host localhost --port 5432 --username openstreetmap --no-password --file /result --format d -d openstreetmap --compress 0 --table public.nodes --table public.node_tags --table public.ways --table public.way_nodes --table public.way_tags --table public.relations --table public.relation_members --table public.relation_tags --table public.changesets --table public.users
/// ```
/// The input is sorted using primary keys for each table found in input_path/toc.dat which may
/// take significant time depending on the size of the input
/// * tmp_path - location used by the sorting algorithm for intermediate and final result. Should
/// have space for at least 2.2 * input size
pub fn new(input_path: PathBuf, tmp_path: PathBuf) -> Result<Reader, anyhow::Error> {
let mut tables: HashMap<String, TableDef> = HashMap::new();
let toc_path = input_path.join("toc.dat");
let toc = fs::read(&toc_path)
.with_context(|| anyhow!("path: {}", toc_path.display()))?;
let raw_table_defs = Self::get_table_def_strings(&toc);
// COPY public.node_tags (node_id, version, k, v) FROM stdin
let re = Regex::new("^([^ ]+) \\((.+)\\)$").unwrap();
for raw_table_def in raw_table_defs {
let table_data_path = input_path.join(&raw_table_def.1);
let captures = re.captures(&raw_table_def.0).unwrap();
let name = captures.get(1).unwrap().as_str();
if !TableFields::is_of_interest(name) {
continue;
}
let fields: Vec<&str> = captures.get(2).unwrap().as_str().split(", ").collect();
tables.insert(
name.to_string(),
TableDef::new(
name.to_string(),
table_data_path,
tmp_path.clone(),
fields.iter().map(|e| {
e.to_string()
}
).collect(),
)?,
);
}
Self::sort_tables(&tables)?;
Ok(
Reader {
tables,
}
)
}
fn sort_tables(tables: &HashMap<String, TableDef>) -> Result<(), anyhow::Error> {
for (table_name, table_def) in tables {
log::info!("Sort {} table data", table_name);
std::fs::create_dir_all(table_def.tmp_path())?;
let mut text_file = Sort::new(vec![table_def.path()], table_def.sorted_path());
text_file.with_tmp_dir(table_def.tmp_path());
text_file.with_intermediate_files(8192);
text_file.with_tasks(num_cpus::get());
text_file.with_fields(table_def.pkey().key());
text_file.with_ignore_empty();
text_file.with_ignore_lines(Regex::new("^\\\\\\.$")?);
text_file.sort()?;
}
Ok(())
}
fn get_table_def_strings(toc: &Vec<u8>) -> Vec<(String, String)> {
// COPY public.node_tags (node_id, version, k, v) FROM stdin;......3838.dat
let mut result: Vec<(String, String)> = Vec::new();
let copy = "COPY ".as_bytes();
let from_stdin = " FROM stdin".as_bytes();
let dotdat = ".dat".as_bytes();
let mut i: usize = 0;
let mut start_table_def;
let mut end_table_def;
let mut start_file_name;
let mut end_file_name;
while i < toc.len() {
if toc[i..].starts_with(copy) {
i.add_assign(copy.len());
start_table_def = i;
while i < toc.len() {
if toc[i..].starts_with(from_stdin) {
end_table_def = i;
i.add_assign(from_stdin.len());
while i < toc.len() {
if toc[i..].starts_with(dotdat) {
start_file_name = i - 1;
i.add_assign(dotdat.len());
end_file_name = i;
while start_file_name > 0 && toc[start_file_name].is_ascii_digit() {
start_file_name.sub_assign(1);
}
start_file_name.add_assign(1);
result.push(
(
String::from_utf8(toc[start_table_def..end_table_def].to_vec()).unwrap(),
String::from_utf8(toc[start_file_name..end_file_name].to_vec()).unwrap(),
)
);
break;
}
i.add_assign(1);
}
break;
}
i.add_assign(1);
}
}
i.add_assign(1);
}
result
}
/// Create iterator over the elements.
///
/// The behaviour is similar to that of [osm::pbf::element_iterator::ElementIterator] with the
/// distinction that [Element::Sentinel] is produced after completing each type, that is Node,
/// Way, Relation.
pub fn elements(&self) -> Result<ElementIterator, anyhow::Error> {
ElementIterator::new(self.tables.clone())
}
}