oxygengine-core 0.28.0

Core module for Oxygengine
Documentation
use std::iter::FromIterator;
#[cfg(feature = "parallel")]
use std::sync::mpsc::{channel, Receiver, TryRecvError};

pub enum JobResult<T> {
    Running(Job<T>),
    Complete(T),
    Dead,
}

impl<T> JobResult<T> {
    pub fn unwrap(self) -> T {
        match self {
            Self::Complete(data) => data,
            _ => panic!("Trying to unwrap incomplete job!"),
        }
    }

    pub fn expect(self, message: &str) -> T {
        match self {
            Self::Complete(data) => data,
            _ => panic!("{}", message),
        }
    }

    pub fn is_running(&self) -> bool {
        matches!(self, Self::Running(_))
    }

    pub fn is_complete(&self) -> bool {
        matches!(self, Self::Complete(_))
    }

    pub fn is_dead(&self) -> bool {
        matches!(self, Self::Dead)
    }
}

#[cfg(not(feature = "parallel"))]
pub struct Job<T>(T);

#[cfg(feature = "parallel")]
pub struct Job<T>(Receiver<T>);

impl<T> Job<T> {
    pub fn new<F>(f: F) -> Job<T>
    where
        T: Send + 'static,
        F: FnOnce() -> T + Send + 'static,
    {
        #[cfg(not(feature = "parallel"))]
        {
            Job(f())
        }
        #[cfg(feature = "parallel")]
        {
            let (sender, receiver) = channel();
            rayon::spawn_fifo(move || {
                let _ = sender.send(f());
            });
            Job(receiver)
        }
    }
}

impl<T> Job<T> {
    #[cfg(not(feature = "parallel"))]
    pub fn try_consume(self) -> JobResult<T> {
        JobResult::Complete(self.0)
    }

    #[cfg(feature = "parallel")]
    pub fn try_consume(self) -> JobResult<T> {
        let receiver = self.0;
        match receiver.try_recv() {
            Ok(data) => JobResult::Complete(data),
            Err(error) => match error {
                TryRecvError::Empty => JobResult::Running(Self(receiver)),
                TryRecvError::Disconnected => JobResult::Dead,
            },
        }
    }

    #[cfg(not(feature = "parallel"))]
    pub fn consume(self) -> JobResult<T> {
        self.try_consume()
    }

    #[cfg(feature = "parallel")]
    pub fn consume(self) -> JobResult<T> {
        match self.0.recv() {
            Ok(data) => JobResult::Complete(data),
            Err(_) => JobResult::Dead,
        }
    }
}

pub struct JobsGroup<T>(Vec<JobResult<T>>);

impl<T> JobsGroup<T> {
    pub fn new<I>(iter: I) -> Self
    where
        I: IntoIterator<Item = Job<T>>,
    {
        Self(iter.into_iter().map(JobResult::Running).collect())
    }

    pub fn try_consume(self) -> Result<Vec<T>, Self> {
        let result = self
            .0
            .into_iter()
            .map(|result| match result {
                JobResult::Running(job) => job.try_consume(),
                _ => result,
            })
            .collect::<Vec<_>>();
        if result.iter().all(|result| result.is_complete()) {
            Ok(result.into_iter().map(|result| result.unwrap()).collect())
        } else {
            Err(Self(result))
        }
    }

    pub fn consume(self) -> Vec<T> {
        self.0
            .into_iter()
            .filter_map(|result| match result {
                JobResult::Running(job) => match job.consume() {
                    JobResult::Complete(data) => Some(data),
                    _ => None,
                },
                JobResult::Complete(data) => Some(data),
                JobResult::Dead => None,
            })
            .collect()
    }

    pub fn is_all_complete(&self) -> bool {
        self.0.iter().all(|result| result.is_complete())
    }

    pub fn is_any_dead(&self) -> bool {
        self.0.iter().any(|result| result.is_dead())
    }

    pub fn is_any_running(&self) -> bool {
        self.0.iter().any(|result| result.is_running())
    }
}

impl<T> FromIterator<Job<T>> for JobsGroup<T> {
    fn from_iter<I>(iter: I) -> Self
    where
        I: IntoIterator<Item = Job<T>>,
    {
        Self::new(iter)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::{Duration, Instant};

    #[test]
    fn test_jobs() {
        fn fib(n: usize) -> (usize, Duration) {
            let timer = Instant::now();
            let mut x = (1, 1);
            for _ in 0..n {
                x = (x.1, x.0 + x.1)
            }
            (x.0, timer.elapsed())
        }

        let single = Job::new(|| fib(50));
        let group = (0..20)
            .into_iter()
            .map(|n| Job::new(move || fib(n)))
            .collect::<JobsGroup<_>>();
        println!("* Single result: {:?}", single.consume().unwrap());
        println!("* Group results: {:?}", group.consume());
    }
}