1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
use std::{
path::{Path, PathBuf},
sync::{Arc, OnceLock},
};
use crate::{
builder::default_builder::DefaultMetaBuilder, error::munyo_error::PathItem,
from_str_with_metabuilder, Error, MunyoItem,
};
use serde::de::DeserializeOwned;
use shrink_pool::ShrinkPool;
use crate::builder::builder::{Builder, MetaBuilder};
use super::{Data, Receiver};
static IO_THREAD: OnceLock<ShrinkPool> = OnceLock::new();
fn io_thread() -> &'static ShrinkPool {
IO_THREAD.get_or_init(|| ShrinkPool::new(1))
}
/// Read files in a thread and parse them in a thread pool concurrently.
/// The thread and the threads of the pool are automatically destroyed when no tasks are assigned for them.
/// When a task is assigned after that, a new thread is spawned. It costs some, so you may want to
/// assign as much tasks as possible at once to avoid the respawn cost.
///
/// # Example
/// ```
/// #[derive(serde::Deserialize, Debug, PartialEq)]
/// enum E1{
/// Foo(usize)
/// }
/// #[derive(serde::Deserialize, Debug, PartialEq)]
/// enum E2{
/// Bar(usize)
/// }
/// fn main() -> munyo::Result<()>{
/// let con = munyo::Concurrent::new();
/// let f1 = "Foo 1";
/// let f2 = "Foo 2";
/// let b1 = "Bar 1";
/// let b2 = "Bar 2";
/// // Write these into files and get the paths
/// # let f1f = munyo::temp(f1)?;
/// # let f2f = munyo::temp(f2)?;
/// # let b1f = munyo::temp(b1)?;
/// # let b2f = munyo::temp(b2)?;
/// # let f1_path = f1f.path();
/// # let f2_path = f2f.path();
/// # let b1_path = b1f.path();
/// # let b2_path = b2f.path();
/// // Deserialize files in the background.
/// let f_receiver = con.deserialize_files([f1_path, f2_path]);
/// let b_receiver = con.deserialize_files([b1_path, b2_path]);
/// // Prepare Future(an async block creates a Future)
/// let fs = async{
/// let mut fs : Vec<munyo::file_io::Data<E1>> = vec![];
/// while let Some(data) = f_receiver.recv_async().await{
/// let data = data.unwrap();
/// fs.push(data);
/// }
/// fs
/// };
/// // Prepare another Future(Futures do nothing until .await or block_on/executor::execute/etc...)
/// let bs = async{
/// use futures::{Stream, StreamExt};
/// // receiver.receiver is an async_channel and implements futures::Stream trait,
/// // so you can use StreamExt utility methods.
/// let bs : Vec<munyo::file_io::Data<E2>> = b_receiver.receiver
/// .map(|r| r.unwrap()).collect().await;
/// bs
/// };
/// // Some async executor is needed to .await/block_on/etc...
/// // futures::executor is used here.
/// // I believe you can use any async executor for this library.
/// let fs = futures::executor::block_on(fs);
///
/// // Tasks are executed in the order given in Concurrent, so you should await/block_on/etc.. in the same order,
/// let bs = futures::executor::block_on(bs);
///
/// // or maybe you should just futures::join!() all the futures.
/// // let (fs, bs) = futures::executor::block_on(async{ futures::join!(fs, bs) });
///
/// //This library shouldn't lose values. This check should be unnecessary.
/// assert_eq!(fs.len(), 2);
/// assert_eq!(bs.len(), 2);
///
/// // Which one comes first is not known. You must check the path
/// for data in &fs{
/// if &data.path == f1_path{
/// assert_eq!(&data.items[0], &E1::Foo(1));
/// } else if &data.path == f2_path{
/// assert_eq!(&data.items[0], &E1::Foo(2));
/// } else{
/// unreachable!()
/// }
/// }
///
/// for data in &bs{
/// if &data.path == b1_path{
/// assert_eq!(&data.items[0], &E2::Bar(1));
/// } else if &data.path == b2_path{
/// assert_eq!(&data.items[0], &E2::Bar(2));
/// } else{
/// unreachable!()
/// }
/// }
/// Ok(())
/// }
/// ```
pub struct Concurrent {
pool: Arc<ShrinkPool>,
}
impl Clone for Concurrent {
/// clone Concurrent. The cloned Concurrent shares the thread pool with the original.
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
}
}
}
impl Concurrent {
/// Create Concurrent with the thread-pool's size = num_cpus::get().
///
/// This creates a thread pool. If multiple Concurrents have sufficient tasks,
/// the sum of threads will surpass num_cpus.
///
/// If you want to share the pool, use clone().
///
/// IO thread is always shared.
pub fn new() -> Self {
Self::with_pool_size(num_cpus::get())
}
/// Create Concurrent with the thread-pool's size.
///
/// If you want to share the pool, use clone().
pub fn with_pool_size(pool_size: usize) -> Self {
Self {
pool: Arc::new(ShrinkPool::new(pool_size)),
}
}
/// Read files and create MunyoItems from them. See [MunyoItem]
pub fn read_files_and_create_munyo_items<I, P>(
&self,
paths: I,
) -> Receiver<Result<Data<MunyoItem>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
{
self.read_files_with_builder(paths, DefaultMetaBuilder)
}
/// Read files and build items with the meta_builder. This is not meant for general usage.
pub fn read_files_with_builder<I, P, T, B, MB>(
&self,
paths: I,
meta_builder: MB,
) -> Receiver<Result<Data<T>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
MB: MetaBuilder<Item = B> + Send + Sync + 'static,
B: Builder<Item = T>,
T: Send + 'static,
{
self.inner(paths, move |(path, s)| {
match from_str_with_metabuilder(s.as_str(), &meta_builder) {
Ok(v) => Ok(Data::new(path, v.result)),
Err(e) => Err(Error::Parse(PathItem::new(Some(path)), e)),
}
})
}
/// Read files and deserialize items from them.
///
/// Reading starts in the order given, and parsing and deserializing will follow.
/// But it's a concurrent process and the items will be sent as soon as they are ready, so the order of finished items is unknown.
///
/// See [Concurrent] to know how to use this.
pub fn deserialize_files<I, P, T>(&self, paths: I) -> Receiver<Result<Data<T>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
T: Send + 'static + DeserializeOwned,
{
self.inner(paths, move |(path, s)| match crate::from_str(s.as_str()) {
Ok(v) => Ok(Data::new(path, v)),
Err(e) => Err(e),
})
}
fn inner<I, P, F, T>(&self, paths: I, f: F) -> Receiver<Result<Data<T>, Error>>
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
F: Fn((PathBuf, String)) -> Result<Data<T>, Error> + Send + Sync + 'static,
T: Send + 'static,
{
let paths: Vec<PathBuf> = paths
.into_iter()
.map(|p| p.as_ref().to_path_buf())
.collect();
let (sender, receiver) = async_channel::bounded(paths.len());
let f = Arc::new(f);
let pool = self.pool.clone();
io_thread().execute(move || {
for path in paths.into_iter() {
let sender = sender.clone();
let f = f.clone();
let pool = pool.clone();
match crate::read_file(&path) {
Ok(s) => {
pool.execute(move || {
//the channel has sufficient size, so no blocking occurs.
sender.send_blocking(f((path, s))).ok();
//when receiver is dropped, sending fails. It's OK.
});
}
Err(e) => {
sender.send_blocking(Err(e)).ok();
return;
}
}
}
});
Receiver::new(receiver)
}
/// Do something in this thread pool. Maybe useful, maybe not.
pub fn do_something_with_pool<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.pool.execute(f)
}
/// Do something in the io thread. Maybe useful, maybe not.
pub fn do_something_with_io_thread<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
io_thread().execute(f);
}
}