osm_io/osm/apidb_dump/write/
writer.rs

1use std::fs;
2use std::io::Write;
3use std::path::PathBuf;
4
5use anyhow::{Context, Error};
6use escape_string::escape;
7
8use crate::osm::apidb_dump::sql::{calculate_tile, to_sql_bool, to_sql_time_millis, to_sql_time_micros};
9use crate::osm::apidb_dump::write::current_object::{CurrentObjectLine, CurrentObjectLines};
10use crate::osm::apidb_dump::write::table_data_writers::TableDataWriters;
11use crate::osm::apidb_dump::write::toc::{load_template_mapping, write_toc};
12use crate::osm::model::element::Element;
13use crate::osm::model::node::Node;
14use crate::osm::model::relation::{Member, Relation};
15use crate::osm::model::way::Way;
16
17/// Writer of apidb schema dump
18///
19/// Writer of apidb schema dump that can be loaded using pg_restore into a Postgresql database
20pub struct Writer {
21    #[allow(dead_code)]
22    output_path: PathBuf,
23    #[allow(dead_code)]
24    compression_level: i8,
25    writers: TableDataWriters,
26    current_node_line: CurrentObjectLine,
27    current_node_tag_lines: CurrentObjectLines,
28    current_way_line: CurrentObjectLine,
29    current_way_node_lines: CurrentObjectLines,
30    current_way_tag_lines: CurrentObjectLines,
31    current_relation_line: CurrentObjectLine,
32    current_relation_member_lines: CurrentObjectLines,
33    current_relation_tag_lines: CurrentObjectLines,
34}
35
36impl Writer {
37    /// Create a new [Writer]
38    ///
39    /// * output_path - directory to write the output to. Must contain enough space which is very
40    /// difficult to calculate because the *.osm.pbf input is so condensed that 1GB of input can
41    /// easily transform into 100GB of output.
42    pub fn new(output_path: PathBuf, compression_level: i8) -> Result<Writer, Error> {
43        Self::create_result_dir(&output_path)?;
44        let writers = TableDataWriters::new(load_template_mapping()?, &output_path)?;
45        Ok(
46            Writer {
47                output_path,
48                compression_level,
49                writers,
50                current_node_line: CurrentObjectLine::new(),
51                current_node_tag_lines: CurrentObjectLines::new(),
52                current_way_line: CurrentObjectLine::new(),
53                current_way_node_lines: CurrentObjectLines::new(),
54                current_way_tag_lines: CurrentObjectLines::new(),
55                current_relation_line: CurrentObjectLine::new(),
56                current_relation_member_lines: CurrentObjectLines::new(),
57                current_relation_tag_lines: CurrentObjectLines::new(),
58            }
59        )
60    }
61
62    /// Write an element
63    pub fn write_element(&mut self, element: Element) -> Result<(), Error> {
64        match element {
65            Element::Node { node } => {
66                self.write_node(node)?;
67            }
68            Element::Way { way } => {
69                self.write_way(way)?;
70            }
71            Element::Relation { relation } => {
72                self.write_relation(relation)?;
73            }
74            Element::Sentinel => {}
75        }
76        Ok(())
77    }
78
79    fn write_node(&mut self, mut node: Node) -> Result<(), Error> {
80        self.writers.user_index_buffer.insert(node.uid() as i64, node.take_user());
81        self.writers.changeset_user_index_buffer.insert(node.changeset(), node.uid() as i64);
82
83        // public.current_nodes (id, latitude, longitude, changeset_id, visible, "timestamp", tile, version)
84        // template context: 4228.dat
85        let current_node_line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n",
86                                        node.id(),
87                                        node.coordinate().lat7(),
88                                        node.coordinate().lon7(),
89                                        node.changeset(),
90                                        to_sql_bool(node.visible()),
91                                        to_sql_time_millis(node.timestamp()),
92                                        calculate_tile(node.coordinate().lat(), node.coordinate().lon()),
93                                        node.version()
94        );
95
96        match self.current_node_line.set_last_line(current_node_line, node.id(), node.visible()) {
97            None => {}
98            Some(current_node_line) => {
99                self.writers.current_nodes.writer().write_all(current_node_line.as_bytes())?;
100            }
101        }
102        self.current_node_line.set_last_id(node.id());
103
104        // public.nodes (node_id, latitude, longitude, changeset_id, visible, "timestamp", tile, version, redaction_id)
105        // template context: 4260.dat
106        let node_line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t\\N\n",
107                                node.id(),
108                                node.coordinate().lat7(),
109                                node.coordinate().lon7(),
110                                node.changeset(),
111                                to_sql_bool(node.visible()),
112                                to_sql_time_millis(node.timestamp()),
113                                calculate_tile(node.coordinate().lat(), node.coordinate().lon()),
114                                node.version()
115        );
116
117        self.writers.nodes.writer().write_all(node_line.as_bytes())?;
118
119        let mut current_node_tag_lines = Vec::new();
120        let tags = node.take_tags();
121        for tag in tags {
122            // public.node_tags (node_id, version, k, v)
123            // template context: 4259.dat
124            let escaped_key = escape(tag.k());
125            let escaped_tag = escape(tag.v());
126            let node_tag_line = format!("{}\t{}\t{}\t{}\n",
127                                        node.id(),
128                                        node.version(),
129                                        escaped_key,
130                                        escaped_tag,
131            );
132            self.writers.node_tags.writer().write_all(node_tag_line.as_bytes())?;
133
134            // public.current_node_tags (node_id, k, v)
135            // template context: 4227.dat
136            let current_node_tag_line = format!("{}\t{}\t{}\n",
137                                                node.id(),
138                                                escaped_key,
139                                                escaped_tag,
140            );
141            current_node_tag_lines.push(current_node_tag_line);
142        }
143
144        match self.current_node_tag_lines.set_last_lines(current_node_tag_lines, node.id(), node.visible()) {
145            None => {}
146            Some(current_node_tag_lines) => {
147                for current_node_tag_line in current_node_tag_lines {
148                    self.writers.current_node_tags.writer().write_all(current_node_tag_line.as_bytes())?;
149                }
150            }
151        }
152        self.current_node_tag_lines.set_last_id(node.id());
153
154        Ok(())
155    }
156
157    fn write_way(&mut self, mut way: Way) -> Result<(), Error> {
158        self.writers.user_index_buffer.insert(way.uid() as i64, way.take_user());
159        self.writers.changeset_user_index_buffer.insert(way.changeset(), way.uid() as i64);
160
161
162        let mut current_way_node_lines = Vec::new();
163        for (sequence_id, node_id) in way.refs().iter().enumerate() {
164            // public.current_way_nodes (way_id, node_id, sequence_id)
165            // template context: 4234.dat
166            let current_way_node_line = format!("{}\t{}\t{}\n",
167                                                way.id(),
168                                                node_id,
169                                                sequence_id + 1
170            );
171            current_way_node_lines.push(current_way_node_line);
172
173            // public.way_nodes (way_id, node_id, version, sequence_id)
174            // template context: 4292.dat
175            let way_node_line = format!("{}\t{}\t{}\t{}\n",
176                                        way.id(),
177                                        node_id,
178                                        way.version(),
179                                        sequence_id + 1
180            );
181            self.writers.way_nodes.writer().write_all(way_node_line.as_bytes())?;
182        }
183
184        match self.current_way_node_lines.set_last_lines(current_way_node_lines, way.id(), way.visible()) {
185            None => {}
186            Some(current_way_node_lines) => {
187                for current_way_node_line in current_way_node_lines {
188                    self.writers.current_way_nodes.writer().write_all(current_way_node_line.as_bytes())?;
189                }
190            }
191        }
192        self.current_way_node_lines.set_last_id(way.id());
193
194
195        let mut current_way_tag_lines = Vec::new();
196        for tag in way.take_tags() {
197            // public.current_way_tags (way_id, k, v)
198            // template context: 4235.dat
199            let escaped_key = escape(tag.k());
200            let escaped_tag = escape(tag.v());
201            let current_way_tag_line = format!("{}\t{}\t{}\n",
202                                               way.id(),
203                                               escaped_key,
204                                               escaped_tag,
205            );
206            current_way_tag_lines.push(current_way_tag_line);
207
208            // public.way_tags (way_id, k, v, version)
209            // template context: 4293.dat
210            let way_tag_line_line = format!("{}\t{}\t{}\t{}\n",
211                                            way.id(),
212                                            escaped_key,
213                                            escaped_tag,
214                                            way.version()
215            );
216            self.writers.way_tags.writer().write_all(way_tag_line_line.as_bytes())?;
217        }
218
219        match self.current_way_tag_lines.set_last_lines(current_way_tag_lines, way.id(), way.visible()) {
220            None => {}
221            Some(current_way_tag_lines) => {
222                for current_way_tag_line in current_way_tag_lines {
223                    self.writers.current_way_tags.writer().write_all(current_way_tag_line.as_bytes())?;
224                }
225            }
226        }
227        self.current_way_tag_lines.set_last_id(way.id());
228
229        // public.current_ways (id, changeset_id, "timestamp", visible, version)
230        // template context: 4236.dat
231        let current_way_line = format!("{}\t{}\t{}\t{}\t{}\n",
232                                       way.id(),
233                                       way.changeset(),
234                                       to_sql_time_millis(way.timestamp()),
235                                       to_sql_bool(way.visible()),
236                                       way.version(),
237        );
238
239        match self.current_way_line.set_last_line(current_way_line, way.id(), way.visible()) {
240            None => {}
241            Some(current_way_line) => {
242                self.writers.current_ways.writer().write_all(current_way_line.as_bytes())?;
243            }
244        }
245        self.current_way_line.set_last_id(way.id());
246
247        // public.ways (way_id, changeset_id, "timestamp", version, visible, redaction_id)
248        // template context: 4294.dat"
249        let way_line = format!("{}\t{}\t{}\t{}\t{}\t\\N\n",
250                               way.id(),
251                               way.changeset(),
252                               to_sql_time_millis(way.timestamp()),
253                               way.version(),
254                               to_sql_bool(way.visible()),
255        );
256        self.writers.ways.writer().write_all(way_line.as_bytes())?;
257
258        Ok(())
259    }
260
261    fn write_relation(&mut self, mut relation: Relation) -> Result<(), Error> {
262        self.writers.user_index_buffer.insert(relation.uid() as i64, relation.take_user());
263        self.writers.changeset_user_index_buffer.insert(relation.changeset(), relation.uid() as i64);
264        let mut current_relation_member_lines = Vec::new();
265        for (sequence_id, member) in relation.members().iter().enumerate() {
266            let (member_id, member_role, member_type) = match member {
267                Member::Node { member } => {
268                    (member.id(), member.role(), "Node")
269                }
270                Member::Way { member } => {
271                    (member.id(), member.role(), "Way")
272                }
273                Member::Relation { member } => {
274                    (member.id(), member.role(), "Relation")
275                }
276            };
277
278            // public.current_relation_members (relation_id, member_type, member_id, member_role, sequence_id)
279            // template context: 4230.dat
280            let escaped_role = escape(member_role);
281            let current_relation_member_line = format!("{}\t{}\t{}\t{}\t{}\n",
282                                                       relation.id(),
283                                                       member_type,
284                                                       member_id,
285                                                       escaped_role,
286                                                       sequence_id + 1,
287            );
288            current_relation_member_lines.push(current_relation_member_line);
289
290            // public.relation_members (relation_id, member_type, member_id, member_role, version, sequence_id)
291            // template context: 4277.dat
292            let relation_member_line = format!("{}\t{}\t{}\t{}\t{}\t{}\n",
293                                               relation.id(),
294                                               member_type,
295                                               member_id,
296                                               escaped_role,
297                                               relation.version(),
298                                               sequence_id + 1,
299            );
300            self.writers.relation_members.writer().write_all(relation_member_line.as_bytes())?;
301        }
302
303        match self.current_relation_member_lines.set_last_lines(current_relation_member_lines, relation.id(), relation.visible()) {
304            None => {}
305            Some(current_relation_member_lines) => {
306                for current_relation_member_line in current_relation_member_lines {
307                    self.writers.current_relation_members.writer().write_all(current_relation_member_line.as_bytes())?;
308                }
309            }
310        }
311        self.current_relation_member_lines.set_last_id(relation.id());
312
313        let mut current_relation_tag_lines = Vec::new();
314        for tag in relation.take_tags() {
315            // public.current_relation_tags (relation_id, k, v)
316            // template context: 4231.dat
317            let escaped_key = escape(tag.k());
318            let escaped_tag = escape(tag.v());
319            let current_relation_tag_line = format!("{}\t{}\t{}\n",
320                                                    relation.id(),
321                                                    escaped_key,
322                                                    escaped_tag,
323            );
324            current_relation_tag_lines.push(current_relation_tag_line);
325
326            // public.relation_tags (relation_id, k, v, version)
327            // template context: 4278.dat
328            let relation_tag_line = format!("{}\t{}\t{}\t{}\n",
329                                            relation.id(),
330                                            escaped_key,
331                                            escaped_tag,
332                                            relation.version(),
333            );
334            self.writers.relation_tags.writer().write_all(relation_tag_line.as_bytes())?;
335        }
336
337        match self.current_relation_tag_lines.set_last_lines(current_relation_tag_lines, relation.id(), relation.visible()) {
338            None => {}
339            Some(current_relation_tag_lines) => {
340                for current_relation_tag_line in current_relation_tag_lines {
341                    self.writers.current_relation_tags.writer().write_all(current_relation_tag_line.as_bytes())?;
342                }
343            }
344        }
345        self.current_relation_tag_lines.set_last_id(relation.id());
346
347
348        // public.current_relations (id, changeset_id, "timestamp", visible, version)
349        // template context: 4232.dat
350        let current_relation_line = format!("{}\t{}\t{}\t{}\t{}\n",
351                                            relation.id(),
352                                            relation.changeset(),
353                                            to_sql_time_millis(relation.timestamp()),
354                                            to_sql_bool(relation.visible()),
355                                            relation.version(),
356        );
357
358        match self.current_relation_line.set_last_line(current_relation_line, relation.id(), relation.visible()) {
359            None => {}
360            Some(current_relation_line) => {
361                self.writers.current_relations.writer().write_all(current_relation_line.as_bytes())?;
362            }
363        }
364        self.current_relation_line.set_last_id(relation.id());
365
366        // public.relations (relation_id, changeset_id, "timestamp", version, visible, redaction_id)
367        // template context: 4279.dat
368        let relation_line = format!("{}\t{}\t{}\t{}\t{}\t\\N\n",
369                                    relation.id(),
370                                    relation.changeset(),
371                                    to_sql_time_millis(relation.timestamp()),
372                                    relation.version(),
373                                    to_sql_bool(relation.visible()),
374        );
375        self.writers.relations.writer().write_all(relation_line.as_bytes())?;
376
377        Ok(())
378    }
379
380    fn write_changesets(&mut self) -> Result<(), Error> {
381        for element in self.writers.changeset_user_index.range(..)? {
382            let (changeset_id, user_id) = element?;
383            // public.changeset_tags (changeset_id, k, v)
384            // template context: 4221.dat
385            let lib_name = format!("osm-io {}", env!("CARGO_PKG_VERSION"));
386            let line = format!("{}\t{}\t{}\n",
387                               changeset_id,
388                               "created_by",
389                               lib_name,
390            );
391            self.writers.changeset_tags.writer().write_all(line.as_bytes())?;
392
393            let line = format!("{}\t{}\t{}\n",
394                               changeset_id,
395                               "replication",
396                               "true"
397            );
398            self.writers.changeset_tags.writer().write_all(line.as_bytes())?;
399
400            // public.changesets (id, user_id, created_at, min_lat, max_lat, min_lon, max_lon, closed_at, num_changes)
401            // template context: 4222.dat
402            let t = chrono::offset::Utc::now();
403            let line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n",
404                               changeset_id,
405                               user_id,
406                               to_sql_time_micros(t.timestamp_micros()),
407                               -900000000,
408                               900000000,
409                               -1800000000,
410                               1800000000,
411                               to_sql_time_micros(t.timestamp_micros()),
412                               0
413            );
414            self.writers.changesets.writer().write_all(line.as_bytes())?;
415        }
416
417        Ok(())
418    }
419
420    fn write_users(&mut self) -> Result<(), Error> {
421        // public.users (email, id, pass_crypt, creation_time, display_name, data_public, description, home_lat, home_lon, home_zoom, pass_salt, email_valid, new_email, creation_ip, languages, status, terms_agreed, consider_pd, auth_uid, preferred_editor, terms_seen, description_format, changesets_count, traces_count, diary_entries_count, image_use_gravatar, auth_provider, home_tile, tou_agreed)
422        // template context: 4290.dat
423        for element in self.writers.user_index.range(..)? {
424            let (user_id, user_name) = element?;
425
426            let t = chrono::offset::Utc::now();
427            let osm_admin_user = format!("osm-admin-user-{}@example.com", user_id);
428            let line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n",
429                               osm_admin_user,
430                               user_id,
431                               "00000000000000000000000000000000",
432                               to_sql_time_micros(t.timestamp_micros()),
433                               user_name,
434                               to_sql_bool(true),
435                               user_name,
436                               0,
437                               0,
438                               3,
439                               "00000000",
440                               to_sql_bool(false),
441                               "\\N",
442                               "\\N",
443                               "\\N",
444                               "pending",
445                               "\\N",
446                               to_sql_bool(false),
447                               "\\N",
448                               "\\N",
449                               to_sql_bool(false),
450                               "markdown",
451                               0,
452                               0,
453                               0,
454                               to_sql_bool(false),
455                               "\\N",
456                               "\\N",
457                               "\\N",
458            );
459            self.writers.users.writer().write_all(line.as_bytes())?;
460        }
461
462        Ok(())
463    }
464
465    fn flush_current_object_lines(&mut self) -> Result<(), Error> {
466        match self.current_node_line.take() {
467            None => {}
468            Some(current_node_line) => {
469                self.writers.current_nodes.writer().write_all(current_node_line.as_bytes())?;
470            }
471        }
472
473        match self.current_node_tag_lines.take() {
474            None => {}
475            Some(current_node_tag_lines) => {
476                for current_node_tag_line in current_node_tag_lines {
477                    self.writers.current_node_tags.writer().write_all(current_node_tag_line.as_bytes())?;
478                }
479            }
480        }
481
482        match self.current_way_line.take() {
483            None => {}
484            Some(current_way_line) => {
485                self.writers.current_ways.writer().write_all(current_way_line.as_bytes())?;
486            }
487        }
488
489        match self.current_way_tag_lines.take() {
490            None => {}
491            Some(current_way_tag_lines) => {
492                for current_way_tag_line in current_way_tag_lines {
493                    self.writers.current_way_tags.writer().write_all(current_way_tag_line.as_bytes())?;
494                }
495            }
496        }
497
498        match self.current_way_node_lines.take() {
499            None => {}
500            Some(current_way_node_lines) => {
501                for current_way_node_line in current_way_node_lines {
502                    self.writers.current_way_nodes.writer().write_all(current_way_node_line.as_bytes())?;
503                }
504            }
505        }
506
507        match self.current_relation_line.take() {
508            None => {}
509            Some(current_relation_line) => {
510                self.writers.current_relations.writer().write_all(current_relation_line.as_bytes())?;
511            }
512        }
513
514        match self.current_relation_tag_lines.take() {
515            None => {}
516            Some(current_relation_tag_lines) => {
517                for current_relation_tag_line in current_relation_tag_lines {
518                    self.writers.current_relation_tags.writer().write_all(current_relation_tag_line.as_bytes())?;
519                }
520            }
521        }
522
523        match self.current_relation_member_lines.take() {
524            None => {}
525            Some(current_relation_member_lines) => {
526                for current_relation_member_line in current_relation_member_lines {
527                    self.writers.current_relation_members.writer().write_all(current_relation_member_line.as_bytes())?;
528                }
529            }
530        }
531
532        Ok(())
533    }
534
535    /// Flush internal buffers and add file terminators
536    pub fn close(&mut self) -> Result<(), Error> {
537        self.flush_current_object_lines()?;
538        self.writers.flush_buffers()?;
539        self.write_users()?;
540        self.write_changesets()?;
541        self.writers.close()?;
542        Ok(())
543    }
544
545    /// Return table to file mapping for diagnostics
546    pub fn table_mapping(&self) -> Vec<String> {
547        Vec::new()
548    }
549
550    fn create_result_dir(output_path: &PathBuf) -> Result<(), Error> {
551        fs::create_dir_all(output_path).with_context(|| format!("Failed to create dir: {:?}", output_path))?;
552        write_toc(output_path)?;
553
554        Ok(())
555    }
556}