amadeus 0.3.7

Harmonious distributed data processing & analysis in Rust. parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common crawl
Documentation
#![allow(clippy::suspicious_map)]

use futures::{future, future::BoxFuture, FutureExt};
use std::{error::Error, future::Future, time::SystemTime};

use amadeus::prelude::*;
use amadeus_core::pool::ThreadPool;

#[test]
fn single_threaded() {
	let start = SystemTime::now();

	let pool = &LocalPool;

	let _ = (0..100)
		.map(|i| format!("string {}", i))
		.par()
		.fork(
			pool,
			Identity.sample_unstable(10),
			(
				Identity
					.map(|row: &String| (row[..8].to_owned(), ()))
					.group_by(Identity.count()),
				Identity.count(),
				Identity.for_each(|_: &_| ()),
				Identity.map(|_: &_| ()).count(),
			),
		)
		.now_or_never()
		.unwrap();

	println!("in {:?}", start.elapsed().unwrap());
}

#[derive(Clone)]
struct LocalPool;

impl ThreadPool for LocalPool {
	fn threads(&self) -> usize {
		1
	}
	fn spawn<F, Fut, T>(&self, work: F) -> BoxFuture<'static, Result<T, Box<dyn Error + Send>>>
	where
		F: FnOnce() -> Fut + Send + 'static,
		Fut: Future<Output = T> + 'static,
		T: Send + 'static,
	{
		Box::pin(future::lazy(|_| work().now_or_never().unwrap()).map(Ok))
	}
	unsafe fn spawn_unchecked<'a, F, Fut, T>(
		&self, work: F,
	) -> BoxFuture<'a, Result<T, Box<dyn Error + Send>>>
	where
		F: FnOnce() -> Fut + Send + 'a,
		Fut: Future<Output = T> + 'a,
		T: Send + 'a,
	{
		Box::pin(future::lazy(|_| work().now_or_never().unwrap()).map(Ok))
	}
}