use std::sync::Arc;
use crossbeam_utils::atomic::AtomicCell;
use futures_util::future::{select, select_all, Either, FutureExt};
use log::debug;
use rand::Rng;
use serde::Deserialize;
use crate::{metrics, payload};
use crate::metrics::{Metric, MetricType, MetricUnit};
use crate::comms::{
Gate, GateMetrics, Link, Terminated, UnitHealth, UnitUpdate
};
use crate::manager::Component;
#[derive(Debug, Deserialize)]
pub struct Any {
sources: Vec<Link>,
random: bool,
}
impl Any {
pub async fn run(
mut self, mut component: Component, mut gate: Gate
) -> Result<(), Terminated> {
if self.sources.is_empty() {
gate.update(UnitUpdate::Gone).await;
return Err(Terminated)
}
let metrics = Arc::new(AnyMetrics::new(&gate));
component.register_metrics(metrics.clone());
let mut curr_idx: Option<usize> = None;
loop {
curr_idx = match self.pick(curr_idx) {
Ok(curr_idx) => curr_idx,
Err(_) => {
gate.update(UnitUpdate::Gone).await;
return Err(Terminated)
}
};
debug!(
"Unit {}: current index is now {:?}",
component.name(), curr_idx
);
metrics.current_index.store(curr_idx);
match curr_idx {
Some(idx) => {
if let Some(update) = self.sources[idx].payload() {
gate.update(
UnitUpdate::Payload(update.clone())
).await;
}
else {
gate.update(UnitUpdate::Stalled).await;
}
}
None => {
gate.update(UnitUpdate::Stalled).await;
}
}
loop {
let (res, idx, _) = {
let res = select(
select_all(
self.sources.iter_mut().map(|link|
link.query().boxed()
)
),
gate.process().boxed()
).await;
match res {
Either::Left((res, _)) => { res }
Either::Right(_) => continue,
}
};
match res {
UnitUpdate::Payload(payload) => {
if Some(idx) == curr_idx {
gate.update(UnitUpdate::Payload(payload)).await;
}
else if curr_idx.is_none() {
break
}
}
UnitUpdate::Stalled | UnitUpdate::Gone => {
if Some(idx) == curr_idx {
break
}
}
}
}
}
}
fn pick(&self, curr: Option<usize>) -> Result<Option<usize>, Terminated> {
let mut next = if self.random {
rand::rng().random_range(0..self.sources.len())
}
else if let Some(curr) = curr {
(curr + 1) % self.sources.len()
}
else {
0
};
let mut only_gone = true;
for _ in 0..self.sources.len() {
match self.sources[next].health() {
UnitHealth::Healthy => {
if self.sources[next].payload().is_some() {
return Ok(Some(next))
}
only_gone = false;
}
UnitHealth::Stalled => {
only_gone = false;
}
UnitHealth::Gone => { }
}
next = (next + 1) % self.sources.len()
}
if only_gone {
Err(Terminated)
}
else {
Ok(None)
}
}
}
#[derive(Debug, Default)]
struct AnyMetrics {
current_index: AtomicCell<Option<usize>>,
gate: Arc<GateMetrics>,
}
impl AnyMetrics {
const CURRENT_INDEX_METRIC: Metric = Metric::new(
"current_index", "the index of the currenly selected source",
MetricType::Gauge, MetricUnit::Info
);
}
impl AnyMetrics {
fn new(gate: &Gate) -> Self {
AnyMetrics {
current_index: Default::default(),
gate: gate.metrics(),
}
}
}
impl metrics::Source for AnyMetrics {
fn append(&self, unit_name: &str, target: &mut metrics::Target) {
target.append_simple(
&Self::CURRENT_INDEX_METRIC, Some(unit_name),
self.current_index.load().map(|v| v as isize).unwrap_or(-1)
);
self.gate.append(unit_name, target);
}
}
#[derive(Debug, Deserialize)]
pub struct Merge {
sources: Vec<Link>,
}
impl Merge {
pub async fn run(
mut self, mut component: Component, mut gate: Gate
) -> Result<(), Terminated> {
if self.sources.is_empty() {
gate.update(UnitUpdate::Gone).await;
return Err(Terminated)
}
let metrics = gate.metrics();
component.register_metrics(metrics.clone());
loop {
{
let res = select(
select_all(
self.sources.iter_mut().map(|link|
link.query().boxed()
)
),
gate.process().boxed()
).await;
if let Either::Right(_) = res {
continue
}
}
let mut output = payload::Set::default();
for source in self.sources.iter() {
if matches!(source.health(), UnitHealth::Healthy) {
if let Some(update) = source.payload() {
output = output.merge(update.set())
}
}
}
gate.update(
UnitUpdate::Payload(payload::Update::new(output))
).await;
}
}
}
#[cfg(test)]
mod test {
use super::*;
use tokio::runtime;
use crate::{test, units};
use crate::manager::Manager;
use crate::payload::testrig;
#[tokio::test]
async fn wake_up_again() {
test::init_log();
let mut manager = Manager::default();
let (u1, u2, u3, mut t) = manager.add_components(
&runtime::Handle::current(),
|units, targets| {
let (u, u1c) = test::Unit::new();
units.insert("u1", u);
let (u, u2c) = test::Unit::new();
units.insert("u2", u);
let (u, u3c) = test::Unit::new();
units.insert("u3", u);
units.insert("any", units::Unit::Any(Any {
sources: vec!["u1".into(), "u2".into(), "u3".into()],
random: false
}));
let (t, tc) = test::Target::new("any");
targets.insert("t", t);
(u1c, u2c, u3c, tc)
}
).unwrap();
u1.send_stalled().await;
t.recv_stalled().await.unwrap();
u2.send_stalled().await;
t.recv_nothing().unwrap();
u3.send_stalled().await;
t.recv_nothing().unwrap();
u1.send_payload(testrig::update([1])).await;
assert_eq!(t.recv_payload().await.unwrap(), testrig::update([1]));
u2.send_payload(testrig::update([2])).await;
t.recv_nothing().unwrap();
u1.send_stalled().await;
assert_eq!(t.recv_payload().await.unwrap(), testrig::update([2]));
u2.send_stalled().await;
t.recv_stalled().await.unwrap();
u3.send_payload(testrig::update([3])).await;
assert_eq!(t.recv_payload().await.unwrap(), testrig::update([3]));
}
}