use std::{
collections::HashMap,
error::Error,
sync::{Arc, Mutex},
time::Duration,
};
use futures::{SinkExt, StreamExt};
use hakuban::{tokio_runtime::WebsocketConnector, Exchange, HakubanStreamExt, JsonDeserializeState, JsonSerializeState, ObjectState};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sysinfo::CpuRefreshKind;
use tokio::time::sleep;
#[derive(Serialize, Deserialize)]
struct CpuUtilization {
percentage: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("hakuban=info")).init();
let unique_id = json!({ "host": hostname::get().unwrap().into_string().unwrap(), "process": std::process::id()});
let exchange = Exchange::new();
let _upstream = WebsocketConnector::new(exchange.clone(), std::env::args().nth(1).unwrap_or_else(|| "ws://127.0.0.1:3001".to_string()).as_str())?;
let stats = Arc::new(Mutex::new(HashMap::new()));
let mut expose_my_utilization_contract = exchange.object_expose_contract((["utilizations"], unique_id)).build();
let observe_all_utilizations_contract = exchange.tag_observe_contract("utilizations").build();
tokio::spawn(async move {
while let Some(sink_and_stream) = expose_my_utilization_contract.next().await {
let system = sysinfo::System::default();
let (state_sink, meta_stream) = sink_and_stream.split();
#[allow(unused_must_use)]
meta_stream
.for_each_till_next((state_sink, system), |mut context, _params| async move {
loop {
context.1.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage());
let my_current_utilization =
CpuUtilization { percentage: context.1.cpus().iter().map(|processor| processor.cpu_usage() as u8).collect() };
context.0.send(ObjectState::new(my_current_utilization).json_serialize()).await.unwrap();
sleep(Duration::from_millis(1000)).await;
}
})
.await;
}
});
tokio::spawn({
let stats = stats.clone();
observe_all_utilizations_contract.for_each_concurrent(0, move |mut stream| {
let stats = stats.clone();
async move {
let key = format!(
"{}:{}",
stream.descriptor().json.get("host").and_then(|value| value.as_str()).unwrap_or("unknown"),
stream.descriptor().json.get("process").and_then(|value| value.as_i64()).unwrap_or(-1)
);
while let Some(state) = stream.next().await {
stats.lock().unwrap().insert(key.clone(), state.json_deserialize::<CpuUtilization>());
}
stats.lock().unwrap().remove(&key);
}
})
});
loop {
println!();
for (key, stats) in stats.lock().unwrap().iter() {
println!("{:20} -> {:?}", key, stats.data.percentage);
}
sleep(Duration::from_millis(1000)).await;
}
}