osm_io/osm/apidb_dump/read/
reader.rs

1use std::collections::HashMap;
2use std::fs;
3use std::ops::{AddAssign, SubAssign};
4use std::path::PathBuf;
5use anyhow::{anyhow, Context};
6
7use regex::Regex;
8use text_file_sort::sort::Sort;
9
10use crate::osm::apidb_dump::read::element_iterator::ElementIterator;
11use crate::osm::apidb_dump::read::table_def::TableDef;
12use crate::osm::apidb_dump::read::table_fields::TableFields;
13
14/// Reader of apidb schema dump produced by pg_dump
15pub struct Reader {
16    tables: HashMap<String, TableDef>,
17}
18
19impl Reader {
20    /// Create a new [Reader]
21    ///
22    /// * input_path - a path to directory that contains apidb schema dump produced by pg_dump with
23    ///   directory format. For example:
24    /// ```bash
25    ///  pg_dump --host localhost --port 5432 --username openstreetmap --no-password --file /result --format d -d openstreetmap --compress 0 --table public.nodes --table public.node_tags --table public.ways --table public.way_nodes --table public.way_tags --table public.relations --table public.relation_members --table public.relation_tags --table public.changesets --table public.users
26    /// ```
27    ///   The input is sorted using primary keys for each table found in input_path/toc.dat which may
28    ///   take significant time depending on the size of the input
29    /// * tmp_path - location used by the sorting algorithm for intermediate and final result. Should
30    ///   have space for at least 2.2 * input size
31    pub fn new(input_path: PathBuf, tmp_path: PathBuf) -> Result<Reader, anyhow::Error> {
32        let mut tables: HashMap<String, TableDef> = HashMap::new();
33
34        let toc_path = input_path.join("toc.dat");
35        let toc = fs::read(&toc_path)
36            .with_context(|| anyhow!("path: {}", toc_path.display()))?;
37        let raw_table_defs = Self::get_table_def_strings(&toc);
38        // COPY public.node_tags (node_id, version, k, v) FROM stdin
39        let re = Regex::new("^([^ ]+) \\((.+)\\)$").unwrap();
40        for raw_table_def in raw_table_defs {
41            let table_data_path = input_path.join(&raw_table_def.1);
42            let captures = re.captures(&raw_table_def.0).unwrap();
43            let name = captures.get(1).unwrap().as_str();
44            if !TableFields::is_of_interest(name) {
45                continue;
46            }
47            let fields: Vec<&str> = captures.get(2).unwrap().as_str().split(", ").collect();
48            tables.insert(
49                name.to_string(),
50                TableDef::new(
51                    name.to_string(),
52                    table_data_path,
53                    tmp_path.clone(),
54                    fields.iter().map(|e| {
55                        e.to_string()
56                    }
57                    ).collect(),
58                )?,
59            );
60        }
61
62        Self::sort_tables(&tables)?;
63
64        Ok(
65            Reader {
66                tables,
67            }
68        )
69    }
70
71    fn sort_tables(tables: &HashMap<String, TableDef>) -> Result<(), anyhow::Error> {
72        let ignore_regex = Regex::new("^\\\\\\.$")?;
73        for (table_name, table_def) in tables {
74            log::info!("Sort {} table data", table_name);
75            fs::create_dir_all(table_def.tmp_path())?;
76            let mut text_file = Sort::new(vec![table_def.path()], table_def.sorted_path());
77            text_file.with_tmp_dir(table_def.tmp_path());
78            text_file.with_intermediate_files(8192);
79            text_file.with_tasks(num_cpus::get());
80            text_file.with_fields(table_def.pkey().key());
81            text_file.with_ignore_empty();
82            text_file.with_ignore_lines(ignore_regex.clone());
83            text_file.sort()?;
84        }
85        Ok(())
86    }
87
88    fn get_table_def_strings(toc: &[u8]) -> Vec<(String, String)> {
89        // COPY public.node_tags (node_id, version, k, v) FROM stdin;......3838.dat
90        let mut result: Vec<(String, String)> = Vec::new();
91        let copy = "COPY ".as_bytes();
92        let from_stdin = " FROM stdin".as_bytes();
93        let dotdat = ".dat".as_bytes();
94        let mut i: usize = 0;
95        let mut start_table_def;
96        let mut end_table_def;
97        let mut start_file_name;
98        let mut end_file_name;
99        while i < toc.len() {
100            if toc[i..].starts_with(copy) {
101                i.add_assign(copy.len());
102                start_table_def = i;
103                while i < toc.len() {
104                    if toc[i..].starts_with(from_stdin) {
105                        end_table_def = i;
106                        i.add_assign(from_stdin.len());
107                        while i < toc.len() {
108                            if toc[i..].starts_with(dotdat) {
109                                start_file_name = i - 1;
110                                i.add_assign(dotdat.len());
111                                end_file_name = i;
112                                while start_file_name > 0 && toc[start_file_name].is_ascii_digit() {
113                                    start_file_name.sub_assign(1);
114                                }
115                                start_file_name.add_assign(1);
116                                result.push(
117                                    (
118                                        String::from_utf8(toc[start_table_def..end_table_def].to_vec()).unwrap(),
119                                        String::from_utf8(toc[start_file_name..end_file_name].to_vec()).unwrap(),
120                                    )
121                                );
122                                break;
123                            }
124                            i.add_assign(1);
125                        }
126                        break;
127                    }
128                    i.add_assign(1);
129                }
130            }
131            i.add_assign(1);
132        }
133        result
134    }
135
136    /// Create iterator over the elements.
137    ///
138    /// The behaviour is similar to that of [osm::pbf::element_iterator::ElementIterator] with the
139    /// distinction that [Element::Sentinel] is produced after completing each type, that is Node,
140    /// Way, Relation.
141    pub fn elements(&self) -> Result<ElementIterator, anyhow::Error> {
142        ElementIterator::new(self.tables.clone())
143    }
144}
145
146
147