1use std::cell::RefCell;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::ops::{AddAssign, Deref, DerefMut};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex, RwLock};
8use std::thread::LocalKey;
9
10use anyhow::{anyhow, Error};
11use command_executor::command::Command;
12use command_executor::shutdown_mode::ShutdownMode;
13use command_executor::thread_pool::ThreadPool;
14use command_executor::thread_pool_builder::ThreadPoolBuilder;
15
16use crate::osm::model::element::Element;
17use crate::osm::pbf::compression_type::CompressionType;
18use crate::osm::pbf::file_block::FileBlock;
19use crate::osm::pbf::file_info::FileInfo;
20use crate::osm::pbf::writer::Writer;
21
22thread_local! {
23 static ELEMENT_ORDERING_BUFFER: RefCell<VecDeque<Element>> = const { RefCell::new(VecDeque::new()) };
24 static ELEMENT_ORDERING_BUFFER_SIZE: RefCell<usize> = const { RefCell::new(0) };
25 static FILE_BLOCK_SIZE: RefCell<usize> = const { RefCell::new(0) };
26 static FILE_BLOCK_INDEX: RefCell<usize> = const { RefCell::new(1) };
27 static NEXT_THREAD_POOL: RefCell<Option<Arc<RwLock<ThreadPool>>>> = const { RefCell::new(None) };
28 static COMPRESSION_TYPE: RefCell<Option<CompressionType>> = const { RefCell::new(None) };
29 static CURRENT_MIN_ELEMENT: RefCell<Option<Element>> = const { RefCell::new(None) };
30
31 #[allow(clippy::type_complexity)]
32 pub static BLOB_ORDERING_BUFFER: RefCell<HashMap<usize, (Vec<u8>, Vec<u8>)>> = RefCell::new(HashMap::new());
33 pub static NEXT_TO_WRITE: RefCell<usize> = const { RefCell::new(1) };
35 pub static PBF_WRITER: RefCell<Option<Writer>> = const { RefCell::new(None) };
36}
37
38fn flush_sorted_top() {
39 ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
40 element_ordering_buffer.borrow_mut().make_contiguous().sort();
41 let elements = split_file_block(element_ordering_buffer);
42 set_current_min_element(elements.first());
43 NEXT_THREAD_POOL.with(|thread_pool| {
44 let thread_pool = thread_pool.borrow();
45 let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
46 thread_pool_guard.submit(Box::new(EncodeFileBlockCommand::new(file_block_index(), Mutex::new(elements))));
47 inc_file_block_index();
48 })
49 });
50}
51
52fn flush_all_sorted() {
53 ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
54 element_ordering_buffer.borrow_mut().make_contiguous().sort();
55 while !element_ordering_buffer.borrow().is_empty() {
56 let elements = split_file_block(element_ordering_buffer);
57 set_current_min_element(elements.first());
58 NEXT_THREAD_POOL.with(|thread_pool| {
59 let thread_pool = thread_pool.borrow();
60 let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
61 thread_pool_guard.submit(Box::new(EncodeFileBlockCommand::new(file_block_index(), Mutex::new(elements))));
62 inc_file_block_index();
63 })
64 }
65 });
66}
67
68fn split_file_block(element_ordering_buffer: &RefCell<VecDeque<Element>>) -> Vec<Element> {
69 let mut elements = Vec::with_capacity(file_block_size());
70 for _i in 0..file_block_size() {
71 let element = element_ordering_buffer.borrow_mut().pop_front();
72 match element {
73 None => {
74 break;
75 }
76 Some(e) => {
77 if elements.is_empty() || Element::same_type(&e, &elements[0]) {
78 elements.push(e);
79 } else {
80 element_ordering_buffer.borrow_mut().push_front(e);
81 break;
82 }
83 }
84 }
85 }
86 elements
87}
88
89fn element_ordering_buffer_size() -> usize {
90 ELEMENT_ORDERING_BUFFER_SIZE.with(|s| *s.borrow().deref())
91}
92
93fn file_block_size() -> usize {
94 FILE_BLOCK_SIZE.with(|s| *s.borrow().deref())
95}
96
97fn file_block_index() -> usize {
98 FILE_BLOCK_INDEX.with(|i| *i.borrow().deref())
99}
100
101fn inc_file_block_index() {
102 FILE_BLOCK_INDEX.with(|i| i.borrow_mut().deref_mut().add_assign(1))
103}
104
105fn compression_type() -> CompressionType {
106 COMPRESSION_TYPE.with(|compression_type| compression_type.borrow().as_ref().unwrap().clone())
107}
108
109fn assert_order(element: &Element) {
110 if !element.is_sentinel() {
111 assert!(
112 compare_to_current_min_element(element).is_ge(),
113 "Element order, required by OSM PBF definition is lost. \
114 Possible cause is that the length of the ordering buffer ({}) is too short \
115 to for compensate for the loss of order caused by concurrent processing. \
116 Recommended: reader_tasks * 8000 * n",
117 element_ordering_buffer_size()
118 );
119 }
120}
121
122fn compare_to_current_min_element(element: &Element) -> Ordering {
123 CURRENT_MIN_ELEMENT.with(|current_min_element|
124 match current_min_element.borrow().deref() {
125 None => {
126 Ordering::Greater
127 }
128 Some(e) => {
129 element.cmp(e)
130 }
131 }
132 )
133}
134
135fn set_current_min_element(element: Option<&Element>) {
136 CURRENT_MIN_ELEMENT.with(|current_min_element| {
137 match element {
138 None => {}
139 Some(e) => {
140 current_min_element.borrow_mut().replace(e.clone());
141 }
142 }
143 });
144}
145
146struct AddElementCommand {
147 element: Mutex<Option<Element>>,
148}
149
150impl AddElementCommand {
151 fn new(element: Element) -> AddElementCommand {
152 AddElementCommand {
153 element: Mutex::new(Some(element)),
154 }
155 }
156}
157
158impl Command for AddElementCommand {
159 fn execute(&self) -> Result<(), Error> {
160 ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
161 let mut element_guard = self.element.lock().unwrap();
162 assert_order(element_guard.as_ref().unwrap());
163 element_ordering_buffer.borrow_mut().push_back(element_guard.take().unwrap());
164 if element_ordering_buffer.borrow().len() > element_ordering_buffer_size() {
165 flush_sorted_top()
166 }
167 });
168 Ok(())
169 }
170}
171
172struct AddElementsCommand {
173 elements: Mutex<Option<Vec<Element>>>,
174}
175
176impl AddElementsCommand {
177 fn new(elements: Vec<Element>) -> AddElementsCommand {
178 AddElementsCommand {
179 elements: Mutex::new(Some(elements)),
180 }
181 }
182}
183
184impl Command for AddElementsCommand {
185 fn execute(&self) -> Result<(), Error> {
186 ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
187 let mut elements_guard = self.elements.lock().unwrap();
188 for element in elements_guard.take().unwrap() {
189 assert_order(&element);
190 element_ordering_buffer.borrow_mut().push_back(element);
191 }
192 if element_ordering_buffer.borrow().len() > element_ordering_buffer_size() {
193 flush_sorted_top();
194 }
195 });
196 Ok(())
197 }
198}
199
200struct EncodeFileBlockCommand {
201 index: usize,
202 elements: Mutex<Vec<Element>>,
203}
204
205impl EncodeFileBlockCommand {
206 fn new(index: usize, elements: Mutex<Vec<Element>>) -> EncodeFileBlockCommand {
207 EncodeFileBlockCommand {
208 index,
209 elements,
210 }
211 }
212}
213
214impl Command for EncodeFileBlockCommand {
215 fn execute(&self) -> Result<(), Error> {
216 let mut elements_guard = self.elements.lock().unwrap();
217 let file_block = FileBlock::from_elements(self.index, std::mem::take(&mut elements_guard));
218 let (blob_header, blob_body) = FileBlock::serialize(&file_block, compression_type())?;
219 NEXT_THREAD_POOL.with(|thread_pool| {
220 let thread_pool = thread_pool.borrow();
221 let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
222 thread_pool_guard.submit(
223 Box::new(
224 WriteBlobCommand::new(
225 self.index, Mutex::new(blob_header), Mutex::new(blob_body),
226 )
227 )
228 );
229 });
230 Ok(())
231 }
232}
233
234struct WriteBlobCommand {
235 index: usize,
236 blob_header: Mutex<Vec<u8>>,
237 blob_body: Mutex<Vec<u8>>,
238}
239
240impl WriteBlobCommand {
241 fn new(index: usize, blob_header: Mutex<Vec<u8>>, blob_body: Mutex<Vec<u8>>) -> WriteBlobCommand {
242 WriteBlobCommand {
243 index,
244 blob_header,
245 blob_body,
246 }
247 }
248}
249
250impl Command for WriteBlobCommand {
251 fn execute(&self) -> Result<(), Error> {
252 BLOB_ORDERING_BUFFER.with(
253 |buffer| {
254 let mut blob_header_guard = self.blob_header.lock().unwrap();
255 let blob_header = std::mem::take(blob_header_guard.deref_mut());
256 let mut blob_body_guard = self.blob_body.lock().unwrap();
257 let blob_body = std::mem::take(blob_body_guard.deref_mut());
258 buffer
259 .borrow_mut()
260 .insert(self.index, (blob_header, blob_body));
261 }
262 );
263
264 BLOB_ORDERING_BUFFER.with(
265 |buffer| {
266 NEXT_TO_WRITE.with(|next| {
267 let next_to_write = *next.borrow();
268 for i in next_to_write..usize::MAX {
269 match buffer.borrow_mut().remove(&i) {
270 None => {
271 *next.borrow_mut() = i;
272 break;
273 }
274 Some((header, body)) => {
275 PBF_WRITER.with(
276 |writer| {
277 writer.borrow_mut().as_mut().unwrap().write_blob(header, body).expect("Failed to write a blob");
278 }
279 );
280 }
281 }
282 }
283 });
284 }
285 );
286
287 Ok(())
288 }
289}
290
291pub struct ParallelWriter {
304 path: PathBuf,
305 file_info: FileInfo,
306 compression_type: CompressionType,
307 element_ordering_pool: Arc<RwLock<ThreadPool>>,
308 encoding_pool: Arc<RwLock<ThreadPool>>,
309 writing_pool: Arc<RwLock<ThreadPool>>,
310}
311
312impl ParallelWriter {
313 pub fn from_file_info(
315 element_ordering_buffer_size: usize,
316 file_block_size: usize,
317 path: PathBuf,
318 file_info: FileInfo,
319 compression_type: CompressionType,
320 ) -> Result<ParallelWriter, Error> {
321 let element_ordering_pool = Self::create_thread_pool("element-ordering", 1, 256)?;
322 let encoding_pool = Self::create_thread_pool("encoding", 4, 256)?;
323 let writing_pool = Self::create_thread_pool("writing", 1, 256)?;
324
325 Self::set_thread_local(element_ordering_pool.clone(), &ELEMENT_ORDERING_BUFFER_SIZE, element_ordering_buffer_size);
326 Self::set_thread_local(element_ordering_pool.clone(), &FILE_BLOCK_SIZE, file_block_size);
327 Self::set_thread_local(encoding_pool.clone(), &COMPRESSION_TYPE, Some(compression_type.clone()));
328 Self::set_thread_local(element_ordering_pool.clone(), &NEXT_THREAD_POOL, Some(encoding_pool.clone()));
329 Self::set_thread_local(encoding_pool.clone(), &NEXT_THREAD_POOL, Some(writing_pool.clone()));
330
331 Ok(
332 ParallelWriter {
333 path,
334 file_info,
335 compression_type,
336 element_ordering_pool,
337 encoding_pool,
338 writing_pool,
339 }
340 )
341 }
342
343 pub fn write_header(&mut self) -> Result<(), Error> {
347 let writing_pool_guard = self.writing_pool.read()
348 .map_err(|e| anyhow!("{}", e))?;
349 let path = self.path.clone();
350 let file_info = self.file_info.clone();
351 let compression_type = self.compression_type.clone();
352 writing_pool_guard.in_all_threads(
353 Arc::new(move || {
354 PBF_WRITER.with(|writer| {
355 if writer.borrow().is_none() {
356 let mut w = Writer::from_file_info(
357 path.clone(),
358 file_info.clone(),
359 compression_type.clone(),
360 ).unwrap();
361 w.write_header().unwrap();
362 writer.replace(Some(w));
363 }
364 })
365 })
366 );
367 Ok(())
368 }
369
370 pub fn write_element(&mut self, element: Element) -> Result<(), Error> {
372 self.element_ordering_pool
373 .read()
374 .unwrap()
375 .submit(Box::new(AddElementCommand::new(element)));
376 Ok(())
377 }
378
379 pub fn write_elements(&mut self, elements: Vec<Element>) -> Result<(), Error> {
381 self.element_ordering_pool
382 .read()
383 .unwrap()
384 .submit(Box::new(AddElementsCommand::new(elements)));
385 Ok(())
386 }
387
388 pub fn close(&mut self) -> Result<(), Error> {
390 self.flush_element_ordering();
391 Self::shutdown(self.element_ordering_pool.clone())?;
392 Self::shutdown(self.encoding_pool.clone())?;
393 self.flush_writing();
394 Self::shutdown(self.writing_pool.clone())?;
395 Ok(())
396 }
397
398 fn flush_element_ordering(&self) {
399 let element_ordering_pool_guard = self.element_ordering_pool.read().unwrap();
400 element_ordering_pool_guard.in_all_threads(Arc::new(flush_all_sorted))
401 }
402
403 fn flush_writing(&self) {}
404
405 fn create_thread_pool(name: &str, tasks: usize, queue_size: usize) -> Result<Arc<RwLock<ThreadPool>>, Error> {
406 Ok(
407 Arc::new(
408 RwLock::new(
409 ThreadPoolBuilder::new()
410 .with_name_str(name)
411 .with_tasks(tasks)
412 .with_queue_size(queue_size)
413 .with_shutdown_mode(ShutdownMode::CompletePending)
414 .build()?
415 )
416 )
417 )
418 }
419
420 fn set_thread_local<T>(thread_pool: Arc<RwLock<ThreadPool>>, local_key: &'static LocalKey<RefCell<T>>, val: T)
421 where T: Sync + Send + Clone {
422 thread_pool
423 .read()
424 .unwrap()
425 .set_thread_local(local_key, val);
426 }
427
428 fn shutdown(thread_pool: Arc<RwLock<ThreadPool>>) -> Result<(), Error> {
429 let mut thread_pool = thread_pool
430 .write()
431 .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?;
432 thread_pool.shutdown();
433 thread_pool.join()
434 }
435}