use std::cell::RefCell;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ops::{AddAssign, Deref, DerefMut};
use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::LocalKey;
use anyhow::{anyhow, Error};
use command_executor::command::Command;
use command_executor::shutdown_mode::ShutdownMode;
use command_executor::thread_pool::ThreadPool;
use command_executor::thread_pool_builder::ThreadPoolBuilder;
use crate::osm::model::element::Element;
use crate::osm::pbf::compression_type::CompressionType;
use crate::osm::pbf::file_block::FileBlock;
use crate::osm::pbf::file_info::FileInfo;
use crate::osm::pbf::writer::Writer;
thread_local! {
static ELEMENT_ORDERING_BUFFER: RefCell<VecDeque<Element>> = RefCell::new(VecDeque::new());
static ELEMENT_ORDERING_BUFFER_SIZE: RefCell<usize> = RefCell::new(0);
static FILE_BLOCK_SIZE: RefCell<usize> = RefCell::new(0);
static FILE_BLOCK_INDEX: RefCell<usize> = RefCell::new(1);
static NEXT_THREAD_POOL: RefCell<Option<Arc<RwLock<ThreadPool>>>> = RefCell::new(None);
static COMPRESSION_TYPE: RefCell<Option<CompressionType>> = RefCell::new(None);
static CURRENT_MIN_ELEMENT: RefCell<Option<Element>> = RefCell::new(None);
pub static BLOB_ORDERING_BUFFER: RefCell<HashMap<usize, (Vec<u8>, Vec<u8>)>> = RefCell::new(HashMap::new());
pub static NEXT_TO_WRITE: RefCell<usize> = RefCell::new(1);
pub static PBF_WRITER: RefCell<Option<Writer>> = RefCell::new(None);
}
fn flush_sorted_top() {
ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
element_ordering_buffer.borrow_mut().make_contiguous().sort();
let elements = split_file_block(element_ordering_buffer);
set_current_min_element(elements.get(0));
NEXT_THREAD_POOL.with(|thread_pool| {
let thread_pool = thread_pool.borrow();
let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
thread_pool_guard.submit(Box::new(EncodeFileBlockCommand::new(file_block_index(), Mutex::new(elements))));
inc_file_block_index();
})
});
}
fn flush_all_sorted() {
ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
element_ordering_buffer.borrow_mut().make_contiguous().sort();
while element_ordering_buffer.borrow().len() > 0 {
let elements = split_file_block(element_ordering_buffer);
set_current_min_element(elements.get(0));
NEXT_THREAD_POOL.with(|thread_pool| {
let thread_pool = thread_pool.borrow();
let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
thread_pool_guard.submit(Box::new(EncodeFileBlockCommand::new(file_block_index(), Mutex::new(elements))));
inc_file_block_index();
})
}
});
}
fn split_file_block(element_ordering_buffer: &RefCell<VecDeque<Element>>) -> Vec<Element> {
let mut elements = Vec::with_capacity(file_block_size());
for _i in 0..file_block_size() {
let element = element_ordering_buffer.borrow_mut().pop_front();
match element {
None => {
break;
}
Some(e) => {
if elements.is_empty() {
elements.push(e);
} else if Element::same_type(&e, &elements[0]) {
elements.push(e);
} else {
element_ordering_buffer.borrow_mut().push_front(e);
break;
}
}
}
}
elements
}
fn element_ordering_buffer_size() -> usize {
ELEMENT_ORDERING_BUFFER_SIZE.with(|s| *s.borrow().deref())
}
fn file_block_size() -> usize {
FILE_BLOCK_SIZE.with(|s| *s.borrow().deref())
}
fn file_block_index() -> usize {
FILE_BLOCK_INDEX.with(|i| *i.borrow().deref())
}
fn inc_file_block_index() {
FILE_BLOCK_INDEX.with(|i| i.borrow_mut().deref_mut().add_assign(1))
}
fn compression_type() -> CompressionType {
COMPRESSION_TYPE.with(|compression_type| compression_type.borrow().as_ref().unwrap().clone())
}
fn assert_order(element: &Element) {
if !element.is_sentinel() {
assert!(
compare_to_current_min_element(&element).is_ge(),
"Element order, required by OSM PBF definition is lost. \
Possible cause is that the length of the ordering buffer ({}) is too short \
to for compensate for the loss of order caused by concurrent processing. \
Recommended: reader_tasks * 8000 * n",
element_ordering_buffer_size()
);
}
}
fn compare_to_current_min_element(element: &Element) -> Ordering {
CURRENT_MIN_ELEMENT.with(|current_min_element|
match current_min_element.borrow().deref() {
None => {
Ordering::Greater
}
Some(e) => {
element.cmp(e)
}
}
)
}
fn set_current_min_element(element: Option<&Element>) {
CURRENT_MIN_ELEMENT.with(|current_min_element| {
match element {
None => {}
Some(e) => {
current_min_element.borrow_mut().replace(e.clone());
}
}
});
}
struct AddElementCommand {
element: Mutex<Option<Element>>,
}
impl AddElementCommand {
fn new(element: Element) -> AddElementCommand {
AddElementCommand {
element: Mutex::new(Some(element)),
}
}
}
impl Command for AddElementCommand {
fn execute(&self) -> Result<(), Error> {
ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
let mut element_guard = self.element.lock().unwrap();
assert_order(element_guard.as_ref().unwrap());
element_ordering_buffer.borrow_mut().push_back(element_guard.take().unwrap());
if element_ordering_buffer.borrow().len() > element_ordering_buffer_size() {
flush_sorted_top()
}
});
Ok(())
}
}
struct AddElementsCommand {
elements: Mutex<Option<Vec<Element>>>,
}
impl AddElementsCommand {
fn new(elements: Vec<Element>) -> AddElementsCommand {
AddElementsCommand {
elements: Mutex::new(Some(elements)),
}
}
}
impl Command for AddElementsCommand {
fn execute(&self) -> Result<(), Error> {
ELEMENT_ORDERING_BUFFER.with(|element_ordering_buffer| {
let mut elements_guard = self.elements.lock().unwrap();
for element in elements_guard.take().unwrap() {
assert_order(&element);
element_ordering_buffer.borrow_mut().push_back(element);
}
if element_ordering_buffer.borrow().len() > element_ordering_buffer_size() {
flush_sorted_top();
}
});
Ok(())
}
}
struct EncodeFileBlockCommand {
index: usize,
elements: Mutex<Vec<Element>>,
}
impl EncodeFileBlockCommand {
fn new(index: usize, elements: Mutex<Vec<Element>>) -> EncodeFileBlockCommand {
EncodeFileBlockCommand {
index,
elements,
}
}
}
impl Command for EncodeFileBlockCommand {
fn execute(&self) -> Result<(), Error> {
let mut elements_guard = self.elements.lock().unwrap();
let file_block = FileBlock::from_elements(self.index, std::mem::take(&mut elements_guard));
let (blob_header, blob_body) = FileBlock::serialize(&file_block, compression_type())?;
NEXT_THREAD_POOL.with(|thread_pool| {
let thread_pool = thread_pool.borrow();
let thread_pool_guard = thread_pool.as_ref().unwrap().read().unwrap();
thread_pool_guard.submit(
Box::new(
WriteBlobCommand::new(
self.index, Mutex::new(blob_header), Mutex::new(blob_body),
)
)
);
});
Ok(())
}
}
struct WriteBlobCommand {
index: usize,
blob_header: Mutex<Vec<u8>>,
blob_body: Mutex<Vec<u8>>,
}
impl WriteBlobCommand {
fn new(index: usize, blob_header: Mutex<Vec<u8>>, blob_body: Mutex<Vec<u8>>) -> WriteBlobCommand {
WriteBlobCommand {
index,
blob_header,
blob_body,
}
}
}
impl Command for WriteBlobCommand {
fn execute(&self) -> Result<(), Error> {
BLOB_ORDERING_BUFFER.with(
|buffer| {
let mut blob_header_guard = self.blob_header.lock().unwrap();
let blob_header = std::mem::take(blob_header_guard.deref_mut());
let mut blob_body_guard = self.blob_body.lock().unwrap();
let blob_body = std::mem::take(blob_body_guard.deref_mut());
buffer
.borrow_mut()
.insert(self.index, (blob_header, blob_body));
}
);
BLOB_ORDERING_BUFFER.with(
|buffer| {
NEXT_TO_WRITE.with(|next| {
let next_to_write = *next.borrow();
for i in next_to_write..usize::MAX {
match buffer.borrow_mut().remove(&i) {
None => {
*next.borrow_mut() = i;
break;
}
Some((header, body)) => {
PBF_WRITER.with(
|writer| {
writer.borrow_mut().as_mut().unwrap().write_blob(header, body).expect("Failed to write a blob");
}
);
}
}
}
});
}
);
Ok(())
}
}
pub struct ParallelWriter {
path: PathBuf,
file_info: FileInfo,
compression_type: CompressionType,
element_ordering_pool: Arc<RwLock<ThreadPool>>,
encoding_pool: Arc<RwLock<ThreadPool>>,
writing_pool: Arc<RwLock<ThreadPool>>,
}
impl ParallelWriter {
pub fn from_file_info(
element_ordering_buffer_size: usize,
file_block_size: usize,
path: PathBuf,
file_info: FileInfo,
compression_type: CompressionType,
) -> Result<ParallelWriter, anyhow::Error> {
let element_ordering_pool = Self::create_thread_pool("element-ordering", 1, 256)?;
let encoding_pool = Self::create_thread_pool("encoding", 4, 256)?;
let writing_pool = Self::create_thread_pool("writing", 1, 256)?;
Self::set_thread_local(element_ordering_pool.clone(), &ELEMENT_ORDERING_BUFFER_SIZE, element_ordering_buffer_size);
Self::set_thread_local(element_ordering_pool.clone(), &FILE_BLOCK_SIZE, file_block_size);
Self::set_thread_local(encoding_pool.clone(), &COMPRESSION_TYPE, Some(compression_type.clone()));
Self::set_thread_local(element_ordering_pool.clone(), &NEXT_THREAD_POOL, Some(encoding_pool.clone()));
Self::set_thread_local(encoding_pool.clone(), &NEXT_THREAD_POOL, Some(writing_pool.clone()));
Ok(
ParallelWriter {
path,
file_info,
compression_type,
element_ordering_pool,
encoding_pool,
writing_pool,
}
)
}
pub fn write_header(&mut self) -> Result<(), anyhow::Error> {
let writing_pool_guard = self.writing_pool.read()
.map_err(|e| anyhow!("{}", e))?;
let path = self.path.clone();
let file_info = self.file_info.clone();
let compression_type = self.compression_type.clone();
writing_pool_guard.in_all_threads(
Arc::new(move || {
PBF_WRITER.with(|writer| {
if writer.borrow().is_none() {
let mut w = Writer::from_file_info(
path.clone(),
file_info.clone(),
compression_type.clone(),
).unwrap();
w.write_header().unwrap();
writer.replace(Some(w));
}
})
})
);
Ok(())
}
pub fn write_element(&mut self, element: Element) -> Result<(), anyhow::Error> {
self.element_ordering_pool
.read()
.unwrap()
.submit(Box::new(AddElementCommand::new(element)));
Ok(())
}
pub fn write_elements(&mut self, elements: Vec<Element>) -> Result<(), anyhow::Error> {
self.element_ordering_pool
.read()
.unwrap()
.submit(Box::new(AddElementsCommand::new(elements)));
Ok(())
}
pub fn close(&mut self) -> Result<(), anyhow::Error> {
self.flush_element_ordering();
Self::shutdown(self.element_ordering_pool.clone())?;
Self::shutdown(self.encoding_pool.clone())?;
self.flush_writing();
Self::shutdown(self.writing_pool.clone())?;
Ok(())
}
fn flush_element_ordering(&self) {
let element_ordering_pool_guard = self.element_ordering_pool.read().unwrap();
element_ordering_pool_guard.in_all_threads(Arc::new(|| flush_all_sorted()))
}
fn flush_writing(&self) {}
fn create_thread_pool(name: &str, tasks: usize, queue_size: usize) -> Result<Arc<RwLock<ThreadPool>>, anyhow::Error> {
Ok(
Arc::new(
RwLock::new(
ThreadPoolBuilder::new()
.with_name_str(name)
.with_tasks(tasks)
.with_queue_size(queue_size)
.with_shutdown_mode(ShutdownMode::CompletePending)
.build()?
)
)
)
}
fn set_thread_local<T>(thread_pool: Arc<RwLock<ThreadPool>>, local_key: &'static LocalKey<RefCell<T>>, val: T)
where T: Sync + Send + Clone {
thread_pool
.read()
.unwrap()
.set_thread_local(local_key, val);
}
fn shutdown(thread_pool: Arc<RwLock<ThreadPool>>) -> Result<(), anyhow::Error> {
let mut thread_pool = thread_pool
.write()
.map_err(|e| anyhow!("failed to lock tread pool: {e}"))?;
thread_pool.shutdown();
thread_pool.join()
}
}