use std::marker::PhantomData;
use std::thread;
use crate::entities::{CtxWrapper, MapReduceError, ResWrapper, ThreadError, Worker};
use crate::traits::{SCFunMF, SCFunRF, Sss};
const DEFAULT_CONTEXT: () = ();
pub struct Manager<'a, Ctx> {
context: &'a Ctx,
}
pub struct MapManager<'a, Ctx, Req, Resp: Sss, MF: Fn(&Ctx, usize, &Req) -> Resp> {
phantom_req: PhantomData<Req>,
manager: Manager<'a, Ctx>,
fun: MF,
}
pub struct ReduceManager<
'a,
Ctx,
Req,
Resp: Sss,
MF: Fn(&Ctx, usize, &Req) -> Resp,
RF: Fn(&Ctx, Resp, Resp) -> Resp,
> {
map_manager: MapManager<'a, Ctx, Req, Resp, MF>,
fun: RF,
}
pub struct ReduceRManager<
'a,
Ctx,
Req,
ResOk: Sss,
ResErr: Sss,
MF: Fn(&Ctx, usize, &Req) -> Result<ResOk, ResErr>,
RF: Fn(&Ctx, ResOk, ResOk) -> Result<ResOk, ResErr>,
> {
map_manager: MapManager<'a, Ctx, Req, Result<ResOk, ResErr>, MF>,
fun: RF,
}
pub struct FullManager<
'a,
Ctx,
Req,
Resp: Sss,
MF: Fn(&Ctx, usize, &Req) -> Resp,
RF: Fn(&Ctx, Resp, Resp) -> Resp,
> {
reduce_manager: ReduceManager<'a, Ctx, Req, Resp, MF, RF>,
default_value: Resp,
}
pub struct FullRManager<
'a,
Ctx,
Req,
ROk: Sss,
RErr: Sss,
MF: Fn(&Ctx, usize, &Req) -> Result<ROk, RErr>,
RF: Fn(&Ctx, ROk, ROk) -> Result<ROk, RErr>,
> {
reduce_manager: ReduceRManager<'a, Ctx, Req, ROk, RErr, MF, RF>,
default_value: ROk,
}
pub fn manager() -> Manager<'static, ()> {
Manager {
context: &DEFAULT_CONTEXT,
}
}
impl<'a, Ctx> Manager<'a, Ctx> {
pub fn context<NewCtx>(self, context: &NewCtx) -> Manager<NewCtx> {
Manager { context }
}
pub fn map<Req, Resp: Sss, MF: Fn(&Ctx, usize, &Req) -> Resp>(
self,
fun: MF,
) -> MapManager<'a, Ctx, Req, Resp, MF> {
MapManager {
manager: self,
fun,
phantom_req: PhantomData::default(),
}
}
}
impl<'a, Ctx, Req, Resp: Sss, MF: SCFunMF<Ctx, Req, Resp>> MapManager<'a, Ctx, Req, Resp, MF> {
pub fn reduce<RF: SCFunRF<Ctx, Resp>>(
self,
fun: RF,
) -> ReduceManager<'a, Ctx, Req, Resp, MF, RF> {
ReduceManager {
map_manager: self,
fun,
}
}
}
impl<'a, Ctx, Req, ROk: Sss, RErr: Sss, MF: SCFunMF<Ctx, Req, Result<ROk, RErr>>>
MapManager<'a, Ctx, Req, Result<ROk, RErr>, MF>
{
pub fn reduce_result<RF: 'static + Fn(&Ctx, ROk, ROk) -> Result<ROk, RErr>>(
self,
fun: RF,
) -> ReduceRManager<'a, Ctx, Req, ROk, RErr, MF, RF> {
ReduceRManager {
map_manager: self,
fun,
}
}
}
impl<
'a,
Ctx,
Req,
Resp: Sss,
MF: SCFunMF<Ctx, Req, Resp>,
RF: 'static + Fn(&Ctx, Resp, Resp) -> Resp,
> ReduceManager<'a, Ctx, Req, Resp, MF, RF>
{
pub fn default(self, default_value: Resp) -> FullManager<'a, Ctx, Req, Resp, MF, RF> {
FullManager {
reduce_manager: self,
default_value,
}
}
}
impl<
'a,
Ctx,
Req,
Resp: Sss + Default,
MF: SCFunMF<Ctx, Req, Resp>,
RF: 'static + Fn(&Ctx, Resp, Resp) -> Resp,
> ReduceManager<'a, Ctx, Req, Resp, MF, RF>
{
pub fn run(self, chunks: &[Req]) -> Result<Resp, ThreadError> {
self.default(Resp::default()).run(chunks)
}
}
impl<
'a,
Ctx,
Req,
ROk: Sss,
RErr: Sss,
MF: SCFunMF<Ctx, Req, Result<ROk, RErr>>,
RF: 'static + Fn(&Ctx, ROk, ROk) -> Result<ROk, RErr>,
> ReduceRManager<'a, Ctx, Req, ROk, RErr, MF, RF>
{
fn default(self, default_value: ROk) -> FullRManager<'a, Ctx, Req, ROk, RErr, MF, RF> {
FullRManager {
reduce_manager: self,
default_value,
}
}
}
impl<
'a,
Ctx,
Req,
ROk: Sss + Default,
RErr: Sss,
MF: SCFunMF<Ctx, Req, Result<ROk, RErr>>,
RF: 'static + Fn(&Ctx, ROk, ROk) -> Result<ROk, RErr>,
> ReduceRManager<'a, Ctx, Req, ROk, RErr, MF, RF>
{
pub fn run(self, chunks: &[Req]) -> Result<ROk, MapReduceError<RErr>> {
self.default(ROk::default()).run(chunks)
}
}
fn make_workers<Ctx, Req, Resp: Sss, MF: Fn(&Ctx, usize, &Req) -> Resp>(
map_manager: &MapManager<Ctx, Req, Resp, MF>,
chunks: &[Req],
) -> Vec<Worker<ResWrapper<Resp>>> {
let fun = CtxWrapper::new(&map_manager.fun);
let ctx = CtxWrapper::new(map_manager.manager.context);
let mut workers = Vec::new();
for (id, chunk) in chunks.iter().enumerate() {
let request = CtxWrapper::new(chunk);
let handler: thread::JoinHandle<ResWrapper<Resp>> = thread::spawn(move || {
let res = fun.get::<MF>()(ctx.get::<Ctx>(), id, request.get::<Req>());
ResWrapper::new(res)
});
let worker = Worker {
thread: Box::new(handler),
};
workers.push(worker);
}
workers
}
impl<
'a,
Ctx,
Req,
Resp: Sss,
MF: SCFunMF<Ctx, Req, Resp>,
RF: 'static + Fn(&Ctx, Resp, Resp) -> Resp,
> FullManager<'a, Ctx, Req, Resp, MF, RF>
{
pub fn run(self, chunks: &[Req]) -> Result<Resp, ThreadError> {
let mut workers = make_workers(&self.reduce_manager.map_manager, chunks);
let mut result = self.default_value;
let ctx = self.reduce_manager.map_manager.manager.context;
let mut failed = None;
workers.reverse();
for _ in 0..workers.len() {
let worker = workers.pop().unwrap();
let data = match worker.thread.join() {
Ok(val) => val.get(),
Err(err) => {
failed = Some(err);
break;
}
};
result = (self.reduce_manager.fun)(ctx, result, data)
}
if let Some(err) = failed {
for worker in workers {
let _ = worker.thread.join();
}
Err(err)
} else {
Ok(result)
}
}
}
impl<
'a,
Ctx,
Req,
ROk: Sss,
RErr: Sss,
MF: 'static + Fn(&Ctx, usize, &Req) -> Result<ROk, RErr>,
RF: 'static + Fn(&Ctx, ROk, ROk) -> Result<ROk, RErr>,
> FullRManager<'a, Ctx, Req, ROk, RErr, MF, RF>
{
fn do_map_reduce(
self,
workers: &mut Vec<Worker<ResWrapper<Result<ROk, RErr>>>>,
) -> Result<ROk, MapReduceError<RErr>> {
let ctx = self.reduce_manager.map_manager.manager.context;
let mut result = self.default_value;
for _ in 0..workers.len() {
let worker = workers.pop().unwrap();
let worker_res_wrapper = match worker.thread.join() {
Ok(wrapper) => wrapper,
Err(err) => return Err(MapReduceError::ThreadFailed(err)),
};
let worker_res = match worker_res_wrapper.get() {
Ok(val) => val,
Err(err) => return Err(MapReduceError::Custom(err)),
};
match (self.reduce_manager.fun)(ctx, result, worker_res) {
Ok(val) => result = val,
Err(err) => return Err(MapReduceError::Custom(err)),
}
}
Ok(result)
}
pub fn run(self, chunks: &[Req]) -> Result<ROk, MapReduceError<RErr>> {
let mut workers = make_workers(&self.reduce_manager.map_manager, chunks);
workers.reverse();
let result = self.do_map_reduce(&mut workers);
for rest_worker in workers {
let _ = rest_worker.thread.join();
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use std::time::Duration;
#[test]
fn test_manager() {
let res = manager()
.map(|_, _, n: &usize| n.to_string())
.reduce(|_, a: String, b: String| format!("{}{}", a, b))
.run(&[1, 2, 3])
.unwrap();
assert_eq!(res, "123");
}
#[test]
fn test_with_context() {
let var: usize = 5;
let res = manager()
.context(&var)
.map(|ctx: &usize, _, n: &usize| (n + ctx).to_string())
.reduce(|_, a: String, b: String| format!("{}{}", a, b))
.run(&[1, 2, 3])
.unwrap();
assert_eq!(res, "678");
}
#[test]
fn test_complex_types() {
let arg = Box::new("abc");
let args: Vec<Box<dyn Fn(&str) -> String>> = vec![
Box::new(|s: &str| format!("{}a", s)),
Box::new(|s: &str| format!("{}b", s)),
Box::new(|s: &str| format!("{}c", s)),
];
let res = manager()
.context(&*arg)
.map(|ctx, _, f: &Box<dyn Fn(&str) -> String>| f(ctx))
.reduce(|_, a: String, b: String| format!("({}{})", a, b))
.run(&args)
.unwrap();
assert_eq!(res, "(((abca)abcb)abcc)")
}
#[test]
fn test_slices() {
let params: &[&[usize]] = &[&[1, 2, 3], &[4, 5]];
let res = manager()
.map(|_, _, s: &&[usize]| s.len())
.reduce(|_, a, b| a + b)
.run(params)
.unwrap();
assert_eq!(res, 5);
}
#[test]
fn test_result_ok() {
let res = manager()
.map(|_, _, s: &&str| u8::from_str(s))
.reduce_result(|_, a: u8, b: u8| Ok(a + b))
.run(&["1", "2", "3"]);
match res {
Ok(6) => (),
_ => panic!(),
}
}
#[test]
fn test_result_err_map() {
let res = manager()
.map(|_, _, s: &&str| u8::from_str(s))
.reduce_result(|_, a: u8, b: u8| Ok(a + b))
.run(&["1", "arr", "3"]);
let err = match res {
Err(MapReduceError::Custom(err)) => err.to_string(),
_ => panic!(),
};
assert_eq!(err, "invalid digit found in string");
}
#[test]
fn test_result_err_reduce() {
let res = manager()
.map(|_, _, s: &u8| Ok(*s))
.reduce_result(|_, _: u8, _: u8| Err(()))
.run(&[1, 2, 3]);
match res {
Err(MapReduceError::Custom(())) => (),
_ => panic!(),
}
}
#[test]
fn test_sleeps_and_errors() {
let res = manager()
.map(|_, _, s: &u8| {
thread::sleep(Duration::from_secs(*s as u64));
if *s <= 1 {
Ok(*s)
} else {
Err(())
}
})
.reduce_result(|_, a, b| Ok(a + b))
.run(&[1, 1, 2, 3, 3]);
match res {
Err(MapReduceError::Custom(())) => (),
_ => panic!(),
}
}
#[test]
fn test_thread_id() {
let res: String = manager()
.map(|_, thread, val: &char| format!("{}:{}", thread, val))
.reduce(|_, a, b| format!("[{}{}]", a, b))
.run(&['a', 'b', 'c'])
.unwrap();
assert_eq!(res, "[[[0:a]1:b]2:c]")
}
struct StructWrapper {
data: usize,
}
fn func(context: &str, data: &[StructWrapper]) -> usize {
let res = manager()
.context(&context)
.map(|_, _, val: &&[StructWrapper]| {
let mut sum = 0;
for s in *val {
sum += s.data
}
sum
})
.reduce(|_, a, b| a + b)
.run(&[&data[0..2], &data[2..4]]);
match res {
Ok(val) => val,
_ => panic!(),
}
}
fn func_res(context: &str, data: &[StructWrapper]) -> Result<Vec<String>, String> {
manager()
.context(&context)
.map(|ctx, _, val: &&[StructWrapper]| {
let mut sum = 0;
for s in *val {
sum += s.data;
}
Ok(vec![format!("{}: {}", ctx, sum)])
})
.reduce_result(|_, mut a, mut b| {
a.append(&mut b);
let res: Result<Vec<String>, String> = Ok(a);
res
})
.run(&[&data[0..2], &data[2..4]])
.map_err(|_| "err".to_string())
}
#[test]
fn test_map_per_chunks() {
let wrapped = vec![
StructWrapper { data: 1 },
StructWrapper { data: 2 },
StructWrapper { data: 3 },
StructWrapper { data: 4 },
];
assert_eq!(func("hello", &wrapped), 10);
assert_eq!(wrapped[0].data, 1);
}
#[test]
fn test_map_result_per_chunks() {
let ctx = "hi";
let wrapped = vec![
StructWrapper { data: 1 },
StructWrapper { data: 2 },
StructWrapper { data: 3 },
StructWrapper { data: 4 },
];
assert_eq!(
func_res(ctx, &wrapped),
Ok(vec!["hi: 3".to_string(), "hi: 7".to_string()])
);
assert_eq!(wrapped[0].data, 1);
assert_eq!(ctx, "hi");
}
}