Struct munyo::Concurrent

source ·
pub struct Concurrent { /* private fields */ }
Expand description

Read files in a thread and parse them in a thread pool concurrently. The thread and the threads of the pool are automatically destroyed when no tasks are assigned for them. When a task is assigned after that, a new thread is spawned. It costs some, so you may want to assign as much tasks as possible at once to avoid the respawn cost.

Example

#[derive(serde::Deserialize, Debug, PartialEq)]
enum E1{
    Foo(usize)
}
#[derive(serde::Deserialize, Debug, PartialEq)]
enum E2{
    Bar(usize)
}
fn main() -> munyo::Result<()>{
    let con = munyo::Concurrent::new();
    let f1 = "Foo 1";
    let f2 = "Foo 2";
    let b1 = "Bar 1";
    let b2 = "Bar 2";
    // Write these into files and get the paths
    // Deserialize files in the background.
    let f_receiver = con.deserialize_files([f1_path, f2_path]);
    let b_receiver = con.deserialize_files([b1_path, b2_path]);
    // Prepare Future(an async block creates a Future)
    let fs = async{
        let mut fs : Vec<munyo::file_io::Data<E1>> = vec![];
        while let Some(data) = f_receiver.recv_async().await{
			let data = data.unwrap();
            fs.push(data);
        }
        fs
    };
    // Prepare another Future(Futures do nothing until .await or block_on/executor::execute/etc...)
    let bs = async{
        use futures::{Stream, StreamExt};
        // receiver.receiver is an async_channel and implements futures::Stream trait,
        // so you can use StreamExt utility methods.
        let bs : Vec<munyo::file_io::Data<E2>> = b_receiver.receiver
            .map(|r| r.unwrap()).collect().await;
        bs
    };
    // Some async executor is needed to .await/block_on/etc...
    // futures::executor is used here.
    // I believe you can use any async executor for this library.
    let fs = futures::executor::block_on(fs);

    // Tasks are executed in the order given in Concurrent, so you should await/block_on/etc.. in the same order,
    let bs = futures::executor::block_on(bs);
     
    // or maybe you should just futures::join!() all the futures.
    // let (fs, bs) = futures::executor::block_on(async{ futures::join!(fs, bs) });

	//This library shouldn't lose values. This check should be unnecessary.
	assert_eq!(fs.len(), 2);
	assert_eq!(bs.len(), 2);

	// Which one comes first is not known. You must check the path
	for data in &fs{
		if &data.path == f1_path{
			assert_eq!(&data.items[0], &E1::Foo(1));
		} else if &data.path == f2_path{
			assert_eq!(&data.items[0], &E1::Foo(2));
		} else{
			unreachable!()
		}
	}
     
    for data in &bs{
		if &data.path == b1_path{
			assert_eq!(&data.items[0], &E2::Bar(1));
		} else if &data.path == b2_path{
			assert_eq!(&data.items[0], &E2::Bar(2));
		} else{
			unreachable!()
		}
	}
    Ok(())
}

Implementations§

source§

impl Concurrent

source

pub fn new() -> Self

Create Concurrent with the thread-pool’s size = num_cpus::get().

This creates a thread pool. If multiple Concurrents have sufficient tasks, the sum of threads will surpass num_cpus.

If you want to share the pool, use clone().

IO thread is always shared.

source

pub fn with_pool_size(pool_size: usize) -> Self

Create Concurrent with the thread-pool’s size.

If you want to share the pool, use clone().

source

pub fn read_files_and_create_munyo_items<I, P>( &self, paths: I ) -> Receiver<Result<Data<MunyoItem>, Error>>
where I: IntoIterator<Item = P>, P: AsRef<Path>,

Read files and create MunyoItems from them. See MunyoItem

source

pub fn read_files_with_builder<I, P, T, B, MB>( &self, paths: I, meta_builder: MB ) -> Receiver<Result<Data<T>, Error>>
where I: IntoIterator<Item = P>, P: AsRef<Path>, MB: MetaBuilder<Item = B> + Send + Sync + 'static, B: Builder<Item = T>, T: Send + 'static,

Read files and build items with the meta_builder. This is not meant for general usage.

source

pub fn deserialize_files<I, P, T>( &self, paths: I ) -> Receiver<Result<Data<T>, Error>>
where I: IntoIterator<Item = P>, P: AsRef<Path>, T: Send + 'static + DeserializeOwned,

Read files and deserialize items from them.

Reading starts in the order given, and parsing and deserializing will follow. But it’s a concurrent process and the items will be sent as soon as they are ready, so the order of finished items is unknown.

See Concurrent to know how to use this.

source

pub fn do_something_with_pool<F>(&self, f: F)
where F: FnOnce() + Send + 'static,

Do something in this thread pool. Maybe useful, maybe not.

source

pub fn do_something_with_io_thread<F>(&self, f: F)
where F: FnOnce() + Send + 'static,

Do something in the io thread. Maybe useful, maybe not.

Trait Implementations§

source§

impl Clone for Concurrent

source§

fn clone(&self) -> Self

clone Concurrent. The cloned Concurrent shares the thread pool with the original.

1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.