amadeus 0.4.3

Harmonious distributed data processing & analysis in Rust. parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common crawl
//! # Distributed parsing and analysis of 3.25 billion webpages
//!
//! This example finds the most prevalent 100 IP addresses in the 3.25 billion
//! page, 255 TiB Common Crawl dataset.
//!
//! The download, parsing and analysis is farmed out to a process pool
//! leveraging Amadeus, the distributed data processing library for Rust.
//!
//! ## Usage
//!
//! ```bash
//! cargo run --example common_crawl --release -- 16
//! ```
//!
//! where `16` is the number of processes with which to initialize the pool.
//! Defaults to the maximum available if omitted.
//!
//! It can also be run distributed on a [`constellation`](https://github.com/constellation-rs/constellation)
//! cluster like so:
//!
//! ```bash
//! cargo deploy 10.0.0.1 --example common_crawl --release -- 1000
//! ```
//!
//! where `10.0.0.1` is the address of the master. See [here](https://github.com/constellation-rs/constellation)
//! for instructions on setting up the cluster.

use amadeus::{data::Webpage, dist::prelude::*};
use constellation::{init, Resources};
use std::env;

#[allow(unreachable_code)]
fn main() {
	init(Resources::default());

	tokio::runtime::Builder::new()
		.threaded_scheduler()
		.enable_all()
		.build()
		.unwrap()
		.block_on(async {
			return; // TODO: runs for a long time

			// Accept the number of processes at the command line, defaulting to the maximum available
			let processes = env::args().nth(1).and_then(|arg| arg.parse::<usize>().ok());

			let pool = ProcessPool::new(processes, None, None, Resources::default()).unwrap();

			let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();

			let (count, (most_frequent_ips, most_diverse_ips)) = webpages
				.dist_stream()
				.map(FnMut!(|webpage: Result<_, _>| webpage.unwrap()))
				.fork(
					&pool,
					Identity
						.map(FnMut!(|webpage: Webpage<'static>| webpage))
						.count(),
					(
						Identity
							.map(FnMut!(|webpage: &Webpage<'static>| webpage.ip))
							.most_frequent(100, 0.99, 2.0 / 1000.0),
						Identity
							.map(FnMut!(|webpage: &Webpage<'static>| {
								(webpage.ip, webpage.url.host_str().unwrap().to_owned())
							}))
							.most_distinct(100, 0.99, 2.0 / 1000.0, 0.0808),
					),
				)
				.await;

			println!("Of the {} webpages processed, these are the most prevalent host IP addresses: {:?}", count, most_frequent_ips);
			println!("and these are the IP addresses hosting the most distinct domains: {:?}", most_diverse_ips);
		})
}