Struct munyo::Concurrent
source · pub struct Concurrent { /* private fields */ }
Expand description
Read files in a thread and parse them in a thread pool concurrently. The thread and the threads of the pool are automatically destroyed when no tasks are assigned for them. When a task is assigned after that, a new thread is spawned. It costs some, so you may want to assign as much tasks as possible at once to avoid the respawn cost.
Example
#[derive(serde::Deserialize, Debug, PartialEq)]
enum E1{
Foo(usize)
}
#[derive(serde::Deserialize, Debug, PartialEq)]
enum E2{
Bar(usize)
}
fn main() -> munyo::Result<()>{
let con = munyo::Concurrent::new();
let f1 = "Foo 1";
let f2 = "Foo 2";
let b1 = "Bar 1";
let b2 = "Bar 2";
// Write these into files and get the paths
// Deserialize files in the background.
let f_receiver = con.deserialize_files([f1_path, f2_path]);
let b_receiver = con.deserialize_files([b1_path, b2_path]);
// Prepare Future(an async block creates a Future)
let fs = async{
let mut fs : Vec<munyo::file_io::Data<E1>> = vec![];
while let Some(data) = f_receiver.recv_async().await{
let data = data.unwrap();
fs.push(data);
}
fs
};
// Prepare another Future(Futures do nothing until .await or block_on/executor::execute/etc...)
let bs = async{
use futures::{Stream, StreamExt};
// receiver.receiver is an async_channel and implements futures::Stream trait,
// so you can use StreamExt utility methods.
let bs : Vec<munyo::file_io::Data<E2>> = b_receiver.receiver
.map(|r| r.unwrap()).collect().await;
bs
};
// Some async executor is needed to .await/block_on/etc...
// futures::executor is used here.
// I believe you can use any async executor for this library.
let fs = futures::executor::block_on(fs);
// Tasks are executed in the order given in Concurrent, so you should await/block_on/etc.. in the same order,
let bs = futures::executor::block_on(bs);
// or maybe you should just futures::join!() all the futures.
// let (fs, bs) = futures::executor::block_on(async{ futures::join!(fs, bs) });
//This library shouldn't lose values. This check should be unnecessary.
assert_eq!(fs.len(), 2);
assert_eq!(bs.len(), 2);
// Which one comes first is not known. You must check the path
for data in &fs{
if &data.path == f1_path{
assert_eq!(&data.items[0], &E1::Foo(1));
} else if &data.path == f2_path{
assert_eq!(&data.items[0], &E1::Foo(2));
} else{
unreachable!()
}
}
for data in &bs{
if &data.path == b1_path{
assert_eq!(&data.items[0], &E2::Bar(1));
} else if &data.path == b2_path{
assert_eq!(&data.items[0], &E2::Bar(2));
} else{
unreachable!()
}
}
Ok(())
}
Implementations§
source§impl Concurrent
impl Concurrent
sourcepub fn new() -> Self
pub fn new() -> Self
Create Concurrent with the thread-pool’s size = num_cpus::get().
This creates a thread pool. If multiple Concurrents have sufficient tasks, the sum of threads will surpass num_cpus.
If you want to share the pool, use clone().
IO thread is always shared.
sourcepub fn with_pool_size(pool_size: usize) -> Self
pub fn with_pool_size(pool_size: usize) -> Self
Create Concurrent with the thread-pool’s size.
If you want to share the pool, use clone().
sourcepub fn read_files_and_create_munyo_items<I, P>(
&self,
paths: I
) -> Receiver<Result<Data<MunyoItem>, Error>>
pub fn read_files_and_create_munyo_items<I, P>( &self, paths: I ) -> Receiver<Result<Data<MunyoItem>, Error>>
Read files and create MunyoItems from them. See MunyoItem
sourcepub fn read_files_with_builder<I, P, T, B, MB>(
&self,
paths: I,
meta_builder: MB
) -> Receiver<Result<Data<T>, Error>>where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
MB: MetaBuilder<Item = B> + Send + Sync + 'static,
B: Builder<Item = T>,
T: Send + 'static,
pub fn read_files_with_builder<I, P, T, B, MB>(
&self,
paths: I,
meta_builder: MB
) -> Receiver<Result<Data<T>, Error>>where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
MB: MetaBuilder<Item = B> + Send + Sync + 'static,
B: Builder<Item = T>,
T: Send + 'static,
Read files and build items with the meta_builder. This is not meant for general usage.
sourcepub fn deserialize_files<I, P, T>(
&self,
paths: I
) -> Receiver<Result<Data<T>, Error>>
pub fn deserialize_files<I, P, T>( &self, paths: I ) -> Receiver<Result<Data<T>, Error>>
Read files and deserialize items from them.
Reading starts in the order given, and parsing and deserializing will follow. But it’s a concurrent process and the items will be sent as soon as they are ready, so the order of finished items is unknown.
See Concurrent to know how to use this.
sourcepub fn do_something_with_pool<F>(&self, f: F)
pub fn do_something_with_pool<F>(&self, f: F)
Do something in this thread pool. Maybe useful, maybe not.
sourcepub fn do_something_with_io_thread<F>(&self, f: F)
pub fn do_something_with_io_thread<F>(&self, f: F)
Do something in the io thread. Maybe useful, maybe not.