use std::{
path::{Path, PathBuf},
sync::{Arc, OnceLock},
};
use crate::{
builder::default_builder::DefaultMetaBuilder, error::munyo_error::PathItem,
from_str_with_metabuilder, Error, MunyoItem,
};
use serde::de::DeserializeOwned;
use shrink_pool::ShrinkPool;
use crate::builder::builder::{Builder, MetaBuilder};
use super::{Data, Receiver};
static IO_THREAD: OnceLock<ShrinkPool> = OnceLock::new();
fn io_thread() -> &'static ShrinkPool {
IO_THREAD.get_or_init(|| ShrinkPool::new(1))
}
pub struct Concurrent {
pool: Arc<ShrinkPool>,
}
impl Clone for Concurrent {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
}
}
}
impl Concurrent {
pub fn new() -> Self {
Self::with_pool_size(num_cpus::get())
}
pub fn with_pool_size(pool_size: usize) -> Self {
Self {
pool: Arc::new(ShrinkPool::new(pool_size)),
}
}
pub fn read_files_and_create_munyo_items<I, P>(
&self,
paths: I,
) -> Receiver<Result<Data<MunyoItem>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
{
self.read_files_with_builder(paths, DefaultMetaBuilder)
}
pub fn read_files_with_builder<I, P, T, B, MB>(
&self,
paths: I,
meta_builder: MB,
) -> Receiver<Result<Data<T>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
MB: MetaBuilder<Item = B> + Send + Sync + 'static,
B: Builder<Item = T>,
T: Send + 'static,
{
self.inner(paths, move |(path, s)| {
match from_str_with_metabuilder(s.as_str(), &meta_builder) {
Ok(v) => Ok(Data::new(path, v.result)),
Err(e) => Err(Error::Parse(PathItem::new(Some(path)), e)),
}
})
}
pub fn deserialize_files<I, P, T>(&self, paths: I) -> Receiver<Result<Data<T>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
T: Send + 'static + DeserializeOwned,
{
self.inner(paths, move |(path, s)| match crate::from_str(s.as_str()) {
Ok(v) => Ok(Data::new(path, v)),
Err(e) => Err(e),
})
}
fn inner<I, P, F, T>(&self, paths: I, f: F) -> Receiver<Result<Data<T>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
F: Fn((PathBuf, String)) -> Result<Data<T>, Error> + Send + Sync + 'static,
T: Send + 'static,
{
let paths: Vec<PathBuf> = paths
.into_iter()
.map(|p| p.as_ref().to_path_buf())
.collect();
let (sender, receiver) = async_channel::bounded(paths.len());
let f = Arc::new(f);
let pool = self.pool.clone();
io_thread().execute(move || {
for path in paths.into_iter() {
let sender = sender.clone();
let f = f.clone();
let pool = pool.clone();
match crate::read_file(&path) {
Ok(s) => {
pool.execute(move || {
sender.send_blocking(f((path, s))).ok();
});
}
Err(e) => {
sender.send_blocking(Err(e)).ok();
return;
}
}
}
});
Receiver::new(receiver)
}
pub fn do_something_with_pool<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.pool.execute(f)
}
pub fn do_something_with_io_thread<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
io_thread().execute(f);
}
}