1use std::fs;
2use std::io::Write;
3use std::path::PathBuf;
4
5use anyhow::{Context, Error};
6use escape_string::escape;
7
8use crate::osm::apidb_dump::sql::{calculate_tile, to_sql_bool, to_sql_time_millis, to_sql_time_micros};
9use crate::osm::apidb_dump::write::current_object::{CurrentObjectLine, CurrentObjectLines};
10use crate::osm::apidb_dump::write::table_data_writers::TableDataWriters;
11use crate::osm::apidb_dump::write::toc::{load_template_mapping, write_toc};
12use crate::osm::model::element::Element;
13use crate::osm::model::node::Node;
14use crate::osm::model::relation::{Member, Relation};
15use crate::osm::model::way::Way;
16
17pub struct Writer {
21 #[allow(dead_code)]
22 output_path: PathBuf,
23 #[allow(dead_code)]
24 compression_level: i8,
25 writers: TableDataWriters,
26 current_node_line: CurrentObjectLine,
27 current_node_tag_lines: CurrentObjectLines,
28 current_way_line: CurrentObjectLine,
29 current_way_node_lines: CurrentObjectLines,
30 current_way_tag_lines: CurrentObjectLines,
31 current_relation_line: CurrentObjectLine,
32 current_relation_member_lines: CurrentObjectLines,
33 current_relation_tag_lines: CurrentObjectLines,
34}
35
36impl Writer {
37 pub fn new(output_path: PathBuf, compression_level: i8) -> Result<Writer, Error> {
43 Self::create_result_dir(&output_path)?;
44 let writers = TableDataWriters::new(load_template_mapping()?, &output_path)?;
45 Ok(
46 Writer {
47 output_path,
48 compression_level,
49 writers,
50 current_node_line: CurrentObjectLine::new(),
51 current_node_tag_lines: CurrentObjectLines::new(),
52 current_way_line: CurrentObjectLine::new(),
53 current_way_node_lines: CurrentObjectLines::new(),
54 current_way_tag_lines: CurrentObjectLines::new(),
55 current_relation_line: CurrentObjectLine::new(),
56 current_relation_member_lines: CurrentObjectLines::new(),
57 current_relation_tag_lines: CurrentObjectLines::new(),
58 }
59 )
60 }
61
62 pub fn write_element(&mut self, element: Element) -> Result<(), Error> {
64 match element {
65 Element::Node { node } => {
66 self.write_node(node)?;
67 }
68 Element::Way { way } => {
69 self.write_way(way)?;
70 }
71 Element::Relation { relation } => {
72 self.write_relation(relation)?;
73 }
74 Element::Sentinel => {}
75 }
76 Ok(())
77 }
78
79 fn write_node(&mut self, mut node: Node) -> Result<(), Error> {
80 self.writers.user_index_buffer.insert(node.uid() as i64, node.take_user());
81 self.writers.changeset_user_index_buffer.insert(node.changeset(), node.uid() as i64);
82
83 let current_node_line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n",
86 node.id(),
87 node.coordinate().lat7(),
88 node.coordinate().lon7(),
89 node.changeset(),
90 to_sql_bool(node.visible()),
91 to_sql_time_millis(node.timestamp()),
92 calculate_tile(node.coordinate().lat(), node.coordinate().lon()),
93 node.version()
94 );
95
96 match self.current_node_line.set_last_line(current_node_line, node.id(), node.visible()) {
97 None => {}
98 Some(current_node_line) => {
99 self.writers.current_nodes.writer().write_all(current_node_line.as_bytes())?;
100 }
101 }
102 self.current_node_line.set_last_id(node.id());
103
104 let node_line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t\\N\n",
107 node.id(),
108 node.coordinate().lat7(),
109 node.coordinate().lon7(),
110 node.changeset(),
111 to_sql_bool(node.visible()),
112 to_sql_time_millis(node.timestamp()),
113 calculate_tile(node.coordinate().lat(), node.coordinate().lon()),
114 node.version()
115 );
116
117 self.writers.nodes.writer().write_all(node_line.as_bytes())?;
118
119 let mut current_node_tag_lines = Vec::new();
120 let tags = node.take_tags();
121 for tag in tags {
122 let escaped_key = escape(tag.k());
125 let escaped_tag = escape(tag.v());
126 let node_tag_line = format!("{}\t{}\t{}\t{}\n",
127 node.id(),
128 node.version(),
129 escaped_key,
130 escaped_tag,
131 );
132 self.writers.node_tags.writer().write_all(node_tag_line.as_bytes())?;
133
134 let current_node_tag_line = format!("{}\t{}\t{}\n",
137 node.id(),
138 escaped_key,
139 escaped_tag,
140 );
141 current_node_tag_lines.push(current_node_tag_line);
142 }
143
144 match self.current_node_tag_lines.set_last_lines(current_node_tag_lines, node.id(), node.visible()) {
145 None => {}
146 Some(current_node_tag_lines) => {
147 for current_node_tag_line in current_node_tag_lines {
148 self.writers.current_node_tags.writer().write_all(current_node_tag_line.as_bytes())?;
149 }
150 }
151 }
152 self.current_node_tag_lines.set_last_id(node.id());
153
154 Ok(())
155 }
156
157 fn write_way(&mut self, mut way: Way) -> Result<(), Error> {
158 self.writers.user_index_buffer.insert(way.uid() as i64, way.take_user());
159 self.writers.changeset_user_index_buffer.insert(way.changeset(), way.uid() as i64);
160
161
162 let mut current_way_node_lines = Vec::new();
163 for (sequence_id, node_id) in way.refs().iter().enumerate() {
164 let current_way_node_line = format!("{}\t{}\t{}\n",
167 way.id(),
168 node_id,
169 sequence_id + 1
170 );
171 current_way_node_lines.push(current_way_node_line);
172
173 let way_node_line = format!("{}\t{}\t{}\t{}\n",
176 way.id(),
177 node_id,
178 way.version(),
179 sequence_id + 1
180 );
181 self.writers.way_nodes.writer().write_all(way_node_line.as_bytes())?;
182 }
183
184 match self.current_way_node_lines.set_last_lines(current_way_node_lines, way.id(), way.visible()) {
185 None => {}
186 Some(current_way_node_lines) => {
187 for current_way_node_line in current_way_node_lines {
188 self.writers.current_way_nodes.writer().write_all(current_way_node_line.as_bytes())?;
189 }
190 }
191 }
192 self.current_way_node_lines.set_last_id(way.id());
193
194
195 let mut current_way_tag_lines = Vec::new();
196 for tag in way.take_tags() {
197 let escaped_key = escape(tag.k());
200 let escaped_tag = escape(tag.v());
201 let current_way_tag_line = format!("{}\t{}\t{}\n",
202 way.id(),
203 escaped_key,
204 escaped_tag,
205 );
206 current_way_tag_lines.push(current_way_tag_line);
207
208 let way_tag_line_line = format!("{}\t{}\t{}\t{}\n",
211 way.id(),
212 escaped_key,
213 escaped_tag,
214 way.version()
215 );
216 self.writers.way_tags.writer().write_all(way_tag_line_line.as_bytes())?;
217 }
218
219 match self.current_way_tag_lines.set_last_lines(current_way_tag_lines, way.id(), way.visible()) {
220 None => {}
221 Some(current_way_tag_lines) => {
222 for current_way_tag_line in current_way_tag_lines {
223 self.writers.current_way_tags.writer().write_all(current_way_tag_line.as_bytes())?;
224 }
225 }
226 }
227 self.current_way_tag_lines.set_last_id(way.id());
228
229 let current_way_line = format!("{}\t{}\t{}\t{}\t{}\n",
232 way.id(),
233 way.changeset(),
234 to_sql_time_millis(way.timestamp()),
235 to_sql_bool(way.visible()),
236 way.version(),
237 );
238
239 match self.current_way_line.set_last_line(current_way_line, way.id(), way.visible()) {
240 None => {}
241 Some(current_way_line) => {
242 self.writers.current_ways.writer().write_all(current_way_line.as_bytes())?;
243 }
244 }
245 self.current_way_line.set_last_id(way.id());
246
247 let way_line = format!("{}\t{}\t{}\t{}\t{}\t\\N\n",
250 way.id(),
251 way.changeset(),
252 to_sql_time_millis(way.timestamp()),
253 way.version(),
254 to_sql_bool(way.visible()),
255 );
256 self.writers.ways.writer().write_all(way_line.as_bytes())?;
257
258 Ok(())
259 }
260
261 fn write_relation(&mut self, mut relation: Relation) -> Result<(), Error> {
262 self.writers.user_index_buffer.insert(relation.uid() as i64, relation.take_user());
263 self.writers.changeset_user_index_buffer.insert(relation.changeset(), relation.uid() as i64);
264 let mut current_relation_member_lines = Vec::new();
265 for (sequence_id, member) in relation.members().iter().enumerate() {
266 let (member_id, member_role, member_type) = match member {
267 Member::Node { member } => {
268 (member.id(), member.role(), "Node")
269 }
270 Member::Way { member } => {
271 (member.id(), member.role(), "Way")
272 }
273 Member::Relation { member } => {
274 (member.id(), member.role(), "Relation")
275 }
276 };
277
278 let escaped_role = escape(member_role);
281 let current_relation_member_line = format!("{}\t{}\t{}\t{}\t{}\n",
282 relation.id(),
283 member_type,
284 member_id,
285 escaped_role,
286 sequence_id + 1,
287 );
288 current_relation_member_lines.push(current_relation_member_line);
289
290 let relation_member_line = format!("{}\t{}\t{}\t{}\t{}\t{}\n",
293 relation.id(),
294 member_type,
295 member_id,
296 escaped_role,
297 relation.version(),
298 sequence_id + 1,
299 );
300 self.writers.relation_members.writer().write_all(relation_member_line.as_bytes())?;
301 }
302
303 match self.current_relation_member_lines.set_last_lines(current_relation_member_lines, relation.id(), relation.visible()) {
304 None => {}
305 Some(current_relation_member_lines) => {
306 for current_relation_member_line in current_relation_member_lines {
307 self.writers.current_relation_members.writer().write_all(current_relation_member_line.as_bytes())?;
308 }
309 }
310 }
311 self.current_relation_member_lines.set_last_id(relation.id());
312
313 let mut current_relation_tag_lines = Vec::new();
314 for tag in relation.take_tags() {
315 let escaped_key = escape(tag.k());
318 let escaped_tag = escape(tag.v());
319 let current_relation_tag_line = format!("{}\t{}\t{}\n",
320 relation.id(),
321 escaped_key,
322 escaped_tag,
323 );
324 current_relation_tag_lines.push(current_relation_tag_line);
325
326 let relation_tag_line = format!("{}\t{}\t{}\t{}\n",
329 relation.id(),
330 escaped_key,
331 escaped_tag,
332 relation.version(),
333 );
334 self.writers.relation_tags.writer().write_all(relation_tag_line.as_bytes())?;
335 }
336
337 match self.current_relation_tag_lines.set_last_lines(current_relation_tag_lines, relation.id(), relation.visible()) {
338 None => {}
339 Some(current_relation_tag_lines) => {
340 for current_relation_tag_line in current_relation_tag_lines {
341 self.writers.current_relation_tags.writer().write_all(current_relation_tag_line.as_bytes())?;
342 }
343 }
344 }
345 self.current_relation_tag_lines.set_last_id(relation.id());
346
347
348 let current_relation_line = format!("{}\t{}\t{}\t{}\t{}\n",
351 relation.id(),
352 relation.changeset(),
353 to_sql_time_millis(relation.timestamp()),
354 to_sql_bool(relation.visible()),
355 relation.version(),
356 );
357
358 match self.current_relation_line.set_last_line(current_relation_line, relation.id(), relation.visible()) {
359 None => {}
360 Some(current_relation_line) => {
361 self.writers.current_relations.writer().write_all(current_relation_line.as_bytes())?;
362 }
363 }
364 self.current_relation_line.set_last_id(relation.id());
365
366 let relation_line = format!("{}\t{}\t{}\t{}\t{}\t\\N\n",
369 relation.id(),
370 relation.changeset(),
371 to_sql_time_millis(relation.timestamp()),
372 relation.version(),
373 to_sql_bool(relation.visible()),
374 );
375 self.writers.relations.writer().write_all(relation_line.as_bytes())?;
376
377 Ok(())
378 }
379
380 fn write_changesets(&mut self) -> Result<(), Error> {
381 for element in self.writers.changeset_user_index.range(..)? {
382 let (changeset_id, user_id) = element?;
383 let lib_name = format!("osm-io {}", env!("CARGO_PKG_VERSION"));
386 let line = format!("{}\t{}\t{}\n",
387 changeset_id,
388 "created_by",
389 lib_name,
390 );
391 self.writers.changeset_tags.writer().write_all(line.as_bytes())?;
392
393 let line = format!("{}\t{}\t{}\n",
394 changeset_id,
395 "replication",
396 "true"
397 );
398 self.writers.changeset_tags.writer().write_all(line.as_bytes())?;
399
400 let t = chrono::offset::Utc::now();
403 let line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n",
404 changeset_id,
405 user_id,
406 to_sql_time_micros(t.timestamp_micros()),
407 -900000000,
408 900000000,
409 -1800000000,
410 1800000000,
411 to_sql_time_micros(t.timestamp_micros()),
412 0
413 );
414 self.writers.changesets.writer().write_all(line.as_bytes())?;
415 }
416
417 Ok(())
418 }
419
420 fn write_users(&mut self) -> Result<(), Error> {
421 for element in self.writers.user_index.range(..)? {
424 let (user_id, user_name) = element?;
425
426 let t = chrono::offset::Utc::now();
427 let osm_admin_user = format!("osm-admin-user-{}@example.com", user_id);
428 let line = format!("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\n",
429 osm_admin_user,
430 user_id,
431 "00000000000000000000000000000000",
432 to_sql_time_micros(t.timestamp_micros()),
433 user_name,
434 to_sql_bool(true),
435 user_name,
436 0,
437 0,
438 3,
439 "00000000",
440 to_sql_bool(false),
441 "\\N",
442 "\\N",
443 "\\N",
444 "pending",
445 "\\N",
446 to_sql_bool(false),
447 "\\N",
448 "\\N",
449 to_sql_bool(false),
450 "markdown",
451 0,
452 0,
453 0,
454 to_sql_bool(false),
455 "\\N",
456 "\\N",
457 "\\N",
458 );
459 self.writers.users.writer().write_all(line.as_bytes())?;
460 }
461
462 Ok(())
463 }
464
465 fn flush_current_object_lines(&mut self) -> Result<(), Error> {
466 match self.current_node_line.take() {
467 None => {}
468 Some(current_node_line) => {
469 self.writers.current_nodes.writer().write_all(current_node_line.as_bytes())?;
470 }
471 }
472
473 match self.current_node_tag_lines.take() {
474 None => {}
475 Some(current_node_tag_lines) => {
476 for current_node_tag_line in current_node_tag_lines {
477 self.writers.current_node_tags.writer().write_all(current_node_tag_line.as_bytes())?;
478 }
479 }
480 }
481
482 match self.current_way_line.take() {
483 None => {}
484 Some(current_way_line) => {
485 self.writers.current_ways.writer().write_all(current_way_line.as_bytes())?;
486 }
487 }
488
489 match self.current_way_tag_lines.take() {
490 None => {}
491 Some(current_way_tag_lines) => {
492 for current_way_tag_line in current_way_tag_lines {
493 self.writers.current_way_tags.writer().write_all(current_way_tag_line.as_bytes())?;
494 }
495 }
496 }
497
498 match self.current_way_node_lines.take() {
499 None => {}
500 Some(current_way_node_lines) => {
501 for current_way_node_line in current_way_node_lines {
502 self.writers.current_way_nodes.writer().write_all(current_way_node_line.as_bytes())?;
503 }
504 }
505 }
506
507 match self.current_relation_line.take() {
508 None => {}
509 Some(current_relation_line) => {
510 self.writers.current_relations.writer().write_all(current_relation_line.as_bytes())?;
511 }
512 }
513
514 match self.current_relation_tag_lines.take() {
515 None => {}
516 Some(current_relation_tag_lines) => {
517 for current_relation_tag_line in current_relation_tag_lines {
518 self.writers.current_relation_tags.writer().write_all(current_relation_tag_line.as_bytes())?;
519 }
520 }
521 }
522
523 match self.current_relation_member_lines.take() {
524 None => {}
525 Some(current_relation_member_lines) => {
526 for current_relation_member_line in current_relation_member_lines {
527 self.writers.current_relation_members.writer().write_all(current_relation_member_line.as_bytes())?;
528 }
529 }
530 }
531
532 Ok(())
533 }
534
535 pub fn close(&mut self) -> Result<(), Error> {
537 self.flush_current_object_lines()?;
538 self.writers.flush_buffers()?;
539 self.write_users()?;
540 self.write_changesets()?;
541 self.writers.close()?;
542 Ok(())
543 }
544
545 pub fn table_mapping(&self) -> Vec<String> {
547 Vec::new()
548 }
549
550 fn create_result_dir(output_path: &PathBuf) -> Result<(), Error> {
551 fs::create_dir_all(output_path).with_context(|| format!("Failed to create dir: {:?}", output_path))?;
552 write_toc(output_path)?;
553
554 Ok(())
555 }
556}