hakuban 0.8.5

Data-object sharing library
Documentation
use std::{
	collections::HashMap,
	error::Error,
	sync::{Arc, Mutex},
	time::Duration,
};

use futures::{SinkExt, StreamExt};
use hakuban::{tokio_runtime::WebsocketConnector, Exchange, HakubanStreamExt, JsonDeserializeState, JsonSerializeState, ObjectState};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sysinfo::CpuRefreshKind;
use tokio::time::sleep;

#[derive(Serialize, Deserialize)]
struct CpuUtilization {
	percentage: Vec<u8>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
	env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).init();

	let unique_id = json!({ "host": hostname::get().unwrap().into_string().unwrap(), "process": std::process::id()});

	let exchange = Exchange::new();
	let _upstream = WebsocketConnector::new(exchange.clone(), std::env::args().nth(1).unwrap_or_else(|| "ws://127.0.0.1:3001".to_string()).as_str())?;

	let stats = Arc::new(Mutex::new(HashMap::new()));

	let mut expose_my_utilization_contract = exchange.object_expose_contract((["utilizations"], unique_id)).build();
	let observe_all_utilizations_contract = exchange.tag_observe_contract("utilizations").build();

	//-------------------------------  expose our cpu stats
	tokio::spawn(async move {
		while let Some(sink_and_stream) = expose_my_utilization_contract.next().await {
			let system = sysinfo::System::default();
			let (state_sink, meta_stream) = sink_and_stream.split();
			#[allow(unused_must_use)]
			meta_stream
				.for_each_till_next((state_sink, system), |mut context, _params| async move {
					loop {
						context.1.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage());
						let my_current_utilization =
							CpuUtilization { percentage: context.1.cpus().iter().map(|processor| processor.cpu_usage() as u8).collect() };
						context.0.send(ObjectState::new(my_current_utilization).json_serialize()).await.unwrap();
						sleep(Duration::from_millis(1000)).await;
					}
				})
				.await;
		}
	});

	//-------------------------------  collect all cpu stats
	tokio::spawn({
		let stats = stats.clone();
		observe_all_utilizations_contract.for_each_concurrent(0, move |mut stream| {
			let stats = stats.clone();
			async move {
				let key = format!(
					"{}:{}",
					stream.descriptor().json.get("host").and_then(|value| value.as_str()).unwrap_or("unknown"),
					stream.descriptor().json.get("process").and_then(|value| value.as_i64()).unwrap_or(-1)
				);
				while let Some(state) = stream.next().await {
					stats.lock().unwrap().insert(key.clone(), state.json_deserialize::<CpuUtilization>());
				}
				stats.lock().unwrap().remove(&key);
			}
		})
	});

	//-------------------------------  periodically print out the stats
	loop {
		println!();
		for (key, stats) in stats.lock().unwrap().iter() {
			println!("{:20} -> {:?}", key, stats.data.percentage);
		}
		sleep(Duration::from_millis(1000)).await;
	}
}