osm_io/osm/pbf/
parallel_writer.rs

1use std::cell::RefCell;
2use std::cmp::Ordering;
3use std::collections::HashMap;
4use std::collections::VecDeque;
5use std::ops::{AddAssign, Deref, DerefMut};
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex, RwLock};
8use std::thread::LocalKey;
9
10use anyhow::{anyhow, Error};
11use command_executor::command::Command;
12use command_executor::shutdown_mode::ShutdownMode;
13use command_executor::thread_pool::ThreadPool;
14use command_executor::thread_pool_builder::ThreadPoolBuilder;
15
16use crate::osm::model::element::Element;
17use crate::osm::pbf::compression_type::CompressionType;
18use crate::osm::pbf::file_block::FileBlock;
19use crate::osm::pbf::file_info::FileInfo;
20use crate::osm::pbf::writer::Writer;
21
22thread_local! {
23    static ELEMENT_ORDERING_BUFFER: RefCell<VecDeque<Element>> = const { RefCell::new(VecDeque::new()) };
24    static ELEMENT_ORDERING_BUFFER_SIZE: RefCell<usize> = const { RefCell::new(0) };
25    static FILE_BLOCK_SIZE: RefCell<usize> = const { RefCell::new(0) };
26    static FILE_BLOCK_INDEX: RefCell<usize> = const { RefCell::new(1) };
27    static NEXT_THREAD_POOL: RefCell<Option<Arc<RwLock<ThreadPool>>>> = const { RefCell::new(None) };
28    static COMPRESSION_TYPE: RefCell<Option<CompressionType>> = const { RefCell::new(None) };
29    static CURRENT_MIN_ELEMENT: RefCell<Option<Element>> = const { RefCell::new(None) };
30
31    #[allow(clippy::type_complexity)]
32    pub static BLOB_ORDERING_BUFFER: RefCell<HashMap<usize, (Vec<u8>, Vec<u8>)>> = RefCell::new(HashMap::new());
33    // the first expected block is #1. #0 is the header
34    pub static NEXT_TO_WRITE: RefCell<usize> = const { RefCell::new(1) };
35    pub static PBF_WRITER: RefCell<Option<Writer>> = const { RefCell::new(None) };
36}
37
38fn flush_sorted_top() {
39    ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
40        element_ordering_buffer.borrow_mut().make_contiguous().sort();
41        let elements = split_file_block(element_ordering_buffer);
42        set_current_min_element(elements.first());
43        NEXT_THREAD_POOL.with(|thread_pool| {
44            let thread_pool = thread_pool.borrow();
45            let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
46            thread_pool_guard.submit(Box::new(EncodeFileBlockCommand::new(file_block_index(), Mutex::new(elements))));
47            inc_file_block_index();
48        })
49    });
50}
51
52fn flush_all_sorted() {
53    ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
54        element_ordering_buffer.borrow_mut().make_contiguous().sort();
55        while !element_ordering_buffer.borrow().is_empty() {
56            let elements = split_file_block(element_ordering_buffer);
57            set_current_min_element(elements.first());
58            NEXT_THREAD_POOL.with(|thread_pool| {
59                let thread_pool = thread_pool.borrow();
60                let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
61                thread_pool_guard.submit(Box::new(EncodeFileBlockCommand::new(file_block_index(), Mutex::new(elements))));
62                inc_file_block_index();
63            })
64        }
65    });
66}
67
68fn split_file_block(element_ordering_buffer: &RefCell<VecDeque<Element>>) -> Vec<Element> {
69    let mut elements = Vec::with_capacity(file_block_size());
70    for _i in 0..file_block_size() {
71        let element = element_ordering_buffer.borrow_mut().pop_front();
72        match element {
73            None => {
74                break;
75            }
76            Some(e) => {
77                if elements.is_empty() || Element::same_type(&e, &elements[0]) {
78                    elements.push(e);
79                } else {
80                    element_ordering_buffer.borrow_mut().push_front(e);
81                    break;
82                }
83            }
84        }
85    }
86    elements
87}
88
89fn element_ordering_buffer_size() -> usize {
90    ELEMENT_ORDERING_BUFFER_SIZE.with(|s| *s.borrow().deref())
91}
92
93fn file_block_size() -> usize {
94    FILE_BLOCK_SIZE.with(|s| *s.borrow().deref())
95}
96
97fn file_block_index() -> usize {
98    FILE_BLOCK_INDEX.with(|i| *i.borrow().deref())
99}
100
101fn inc_file_block_index() {
102    FILE_BLOCK_INDEX.with(|i| i.borrow_mut().deref_mut().add_assign(1))
103}
104
105fn compression_type() -> CompressionType {
106    COMPRESSION_TYPE.with(|compression_type| compression_type.borrow().as_ref().unwrap().clone())
107}
108
109fn assert_order(element: &Element) {
110    if !element.is_sentinel() {
111        assert!(
112            compare_to_current_min_element(element).is_ge(),
113            "Element order, required by OSM PBF definition is lost. \
114                    Possible cause is that the length of the ordering buffer ({}) is too short \
115                    to for compensate for the loss of order caused by concurrent processing. \
116                    Recommended: reader_tasks * 8000 * n",
117            element_ordering_buffer_size()
118        );
119    }
120}
121
122fn compare_to_current_min_element(element: &Element) -> Ordering {
123    CURRENT_MIN_ELEMENT.with(|current_min_element|
124        match current_min_element.borrow().deref() {
125            None => {
126                Ordering::Greater
127            }
128            Some(e) => {
129                element.cmp(e)
130            }
131        }
132    )
133}
134
135fn set_current_min_element(element: Option<&Element>) {
136    CURRENT_MIN_ELEMENT.with(|current_min_element| {
137        match element {
138            None => {}
139            Some(e) => {
140                current_min_element.borrow_mut().replace(e.clone());
141            }
142        }
143    });
144}
145
146struct AddElementCommand {
147    element: Mutex<Option<Element>>,
148}
149
150impl AddElementCommand {
151    fn new(element: Element) -> AddElementCommand {
152        AddElementCommand {
153            element: Mutex::new(Some(element)),
154        }
155    }
156}
157
158impl Command for AddElementCommand {
159    fn execute(&self) -> Result<(), Error> {
160        ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
161            let mut element_guard = self.element.lock().unwrap();
162            assert_order(element_guard.as_ref().unwrap());
163            element_ordering_buffer.borrow_mut().push_back(element_guard.take().unwrap());
164            if element_ordering_buffer.borrow().len() > element_ordering_buffer_size() {
165                flush_sorted_top()
166            }
167        });
168        Ok(())
169    }
170}
171
172struct AddElementsCommand {
173    elements: Mutex<Option<Vec<Element>>>,
174}
175
176impl AddElementsCommand {
177    fn new(elements: Vec<Element>) -> AddElementsCommand {
178        AddElementsCommand {
179            elements: Mutex::new(Some(elements)),
180        }
181    }
182}
183
184impl Command for AddElementsCommand {
185    fn execute(&self) -> Result<(), Error> {
186        ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
187            let mut elements_guard = self.elements.lock().unwrap();
188            for element in elements_guard.take().unwrap() {
189                assert_order(&element);
190                element_ordering_buffer.borrow_mut().push_back(element);
191            }
192            if element_ordering_buffer.borrow().len() > element_ordering_buffer_size() {
193                flush_sorted_top();
194            }
195        });
196        Ok(())
197    }
198}
199
200struct EncodeFileBlockCommand {
201    index: usize,
202    elements: Mutex<Vec<Element>>,
203}
204
205impl EncodeFileBlockCommand {
206    fn new(index: usize, elements: Mutex<Vec<Element>>) -> EncodeFileBlockCommand {
207        EncodeFileBlockCommand {
208            index,
209            elements,
210        }
211    }
212}
213
214impl Command for EncodeFileBlockCommand {
215    fn execute(&self) -> Result<(), Error> {
216        let mut elements_guard = self.elements.lock().unwrap();
217        let file_block = FileBlock::from_elements(self.index, std::mem::take(&mut elements_guard));
218        let (blob_header, blob_body) = FileBlock::serialize(&file_block, compression_type())?;
219        NEXT_THREAD_POOL.with(|thread_pool| {
220            let thread_pool = thread_pool.borrow();
221            let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
222            thread_pool_guard.submit(
223                Box::new(
224                    WriteBlobCommand::new(
225                        self.index, Mutex::new(blob_header), Mutex::new(blob_body),
226                    )
227                )
228            );
229        });
230        Ok(())
231    }
232}
233
234struct WriteBlobCommand {
235    index: usize,
236    blob_header: Mutex<Vec<u8>>,
237    blob_body: Mutex<Vec<u8>>,
238}
239
240impl WriteBlobCommand {
241    fn new(index: usize, blob_header: Mutex<Vec<u8>>, blob_body: Mutex<Vec<u8>>) -> WriteBlobCommand {
242        WriteBlobCommand {
243            index,
244            blob_header,
245            blob_body,
246        }
247    }
248}
249
250impl Command for WriteBlobCommand {
251    fn execute(&self) -> Result<(), Error> {
252        BLOB_ORDERING_BUFFER.with(
253            |buffer| {
254                let mut blob_header_guard = self.blob_header.lock().unwrap();
255                let blob_header = std::mem::take(blob_header_guard.deref_mut());
256                let mut blob_body_guard = self.blob_body.lock().unwrap();
257                let blob_body = std::mem::take(blob_body_guard.deref_mut());
258                buffer
259                    .borrow_mut()
260                    .insert(self.index, (blob_header, blob_body));
261            }
262        );
263
264        BLOB_ORDERING_BUFFER.with(
265            |buffer| {
266                NEXT_TO_WRITE.with(|next| {
267                    let next_to_write = *next.borrow();
268                    for i in next_to_write..usize::MAX {
269                        match buffer.borrow_mut().remove(&i) {
270                            None => {
271                                *next.borrow_mut() = i;
272                                break;
273                            }
274                            Some((header, body)) => {
275                                PBF_WRITER.with(
276                                    |writer| {
277                                        writer.borrow_mut().as_mut().unwrap().write_blob(header, body).expect("Failed to write a blob");
278                                    }
279                                );
280                            }
281                        }
282                    }
283                });
284            }
285        );
286
287        Ok(())
288    }
289}
290
291/// Write *.osm.pbf file while performing concurrently significant parts of work.
292///
293/// The parallel writer accepts somewhat unordered stream of elements, orders these elements,
294/// splits them into FileBlocks and writes them to the target file. The writer is composed from an
295/// ordering thread that maintains a large enough buffer to provide high probability of restoring
296/// the order, multiple encoding threads that do the heavy lifting of encoding the PBF and
297/// compressing, and finally the writing thread tht writes encoded blobs to file.
298/// The [ParallelWriter] uses more memory because of the internal ordering buffers controlled by the
299/// `element_ordering_buffer_size` parameter to constructor. It is limited to use cases where the
300/// processing of each element takes roughly the same time, as in simple filtering tasks or that
301/// elements were ordered before calling the writer.
302/// For example please see ./examples/parallel-bf-io.rs
303pub struct ParallelWriter {
304    path: PathBuf,
305    file_info: FileInfo,
306    compression_type: CompressionType,
307    element_ordering_pool: Arc<RwLock<ThreadPool>>,
308    encoding_pool: Arc<RwLock<ThreadPool>>,
309    writing_pool: Arc<RwLock<ThreadPool>>,
310}
311
312impl ParallelWriter {
313    /// Create [ParallelWriter] from [FileInfo]
314    pub fn from_file_info(
315        element_ordering_buffer_size: usize,
316        file_block_size: usize,
317        path: PathBuf,
318        file_info: FileInfo,
319        compression_type: CompressionType,
320    ) -> Result<ParallelWriter, Error> {
321        let element_ordering_pool = Self::create_thread_pool("element-ordering", 1, 256)?;
322        let encoding_pool = Self::create_thread_pool("encoding", 4, 256)?;
323        let writing_pool = Self::create_thread_pool("writing", 1, 256)?;
324
325        Self::set_thread_local(element_ordering_pool.clone(), &ELEMENT_ORDERING_BUFFER_SIZE, element_ordering_buffer_size);
326        Self::set_thread_local(element_ordering_pool.clone(), &FILE_BLOCK_SIZE, file_block_size);
327        Self::set_thread_local(encoding_pool.clone(), &COMPRESSION_TYPE, Some(compression_type.clone()));
328        Self::set_thread_local(element_ordering_pool.clone(), &NEXT_THREAD_POOL, Some(encoding_pool.clone()));
329        Self::set_thread_local(encoding_pool.clone(), &NEXT_THREAD_POOL, Some(writing_pool.clone()));
330
331        Ok(
332            ParallelWriter {
333                path,
334                file_info,
335                compression_type,
336                element_ordering_pool,
337                encoding_pool,
338                writing_pool,
339            }
340        )
341    }
342
343    /// Write the *.osm.pbf header.
344    ///
345    /// Must be called before writing the first element.
346    pub fn write_header(&mut self) -> Result<(), Error> {
347        let writing_pool_guard = self.writing_pool.read()
348            .map_err(|e| anyhow!("{}", e))?;
349        let path = self.path.clone();
350        let file_info = self.file_info.clone();
351        let compression_type = self.compression_type.clone();
352        writing_pool_guard.in_all_threads(
353            Arc::new(move || {
354                PBF_WRITER.with(|writer| {
355                    if writer.borrow().is_none() {
356                        let mut w = Writer::from_file_info(
357                            path.clone(),
358                            file_info.clone(),
359                            compression_type.clone(),
360                        ).unwrap();
361                        w.write_header().unwrap();
362                        writer.replace(Some(w));
363                    }
364                })
365            })
366        );
367        Ok(())
368    }
369
370    /// Write an [Element]
371    pub fn write_element(&mut self, element: Element) -> Result<(), Error> {
372        self.element_ordering_pool
373            .read()
374            .unwrap()
375            .submit(Box::new(AddElementCommand::new(element)));
376        Ok(())
377    }
378
379    /// Write list of [Element]s
380    pub fn write_elements(&mut self, elements: Vec<Element>) -> Result<(), Error> {
381        self.element_ordering_pool
382            .read()
383            .unwrap()
384            .submit(Box::new(AddElementsCommand::new(elements)));
385        Ok(())
386    }
387
388    /// Flush internal buffers.
389    pub fn close(&mut self) -> Result<(), Error> {
390        self.flush_element_ordering();
391        Self::shutdown(self.element_ordering_pool.clone())?;
392        Self::shutdown(self.encoding_pool.clone())?;
393        self.flush_writing();
394        Self::shutdown(self.writing_pool.clone())?;
395        Ok(())
396    }
397
398    fn flush_element_ordering(&self) {
399        let element_ordering_pool_guard = self.element_ordering_pool.read().unwrap();
400        element_ordering_pool_guard.in_all_threads(Arc::new(flush_all_sorted))
401    }
402
403    fn flush_writing(&self) {}
404
405    fn create_thread_pool(name: &str, tasks: usize, queue_size: usize) -> Result<Arc<RwLock<ThreadPool>>, Error> {
406        Ok(
407            Arc::new(
408                RwLock::new(
409                    ThreadPoolBuilder::new()
410                        .with_name_str(name)
411                        .with_tasks(tasks)
412                        .with_queue_size(queue_size)
413                        .with_shutdown_mode(ShutdownMode::CompletePending)
414                        .build()?
415                )
416            )
417        )
418    }
419
420    fn set_thread_local<T>(thread_pool: Arc<RwLock<ThreadPool>>, local_key: &'static LocalKey<RefCell<T>>, val: T)
421        where T: Sync + Send + Clone {
422        thread_pool
423            .read()
424            .unwrap()
425            .set_thread_local(local_key, val);
426    }
427
428    fn shutdown(thread_pool: Arc<RwLock<ThreadPool>>) -> Result<(), Error> {
429        let mut thread_pool = thread_pool
430            .write()
431            .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?;
432        thread_pool.shutdown();
433        thread_pool.join()
434    }
435}