use std::collections::{HashMap, HashSet};
use ff_core::backend::ResumeSignal;
use ff_core::contracts::{CompositeBody, CountKind, ResumeCondition, SignalMatcher};
use ff_core::partition::{PartitionConfig, PartitionKey};
use ff_core::types::ExecutionId;
pub type SignalsByWaitpoint<'a> = HashMap<&'a str, &'a [ResumeSignal]>;
pub fn slot_of(exec_id: &ExecutionId, _cfg: &PartitionConfig) -> i16 {
exec_id.partition() as i16
}
pub fn slot_of_partition_key(pk: &PartitionKey) -> Option<i16> {
pk.as_partition().ok().map(|p| p.index as i16)
}
pub fn evaluate(condition: &ResumeCondition, by_wp: &SignalsByWaitpoint<'_>) -> bool {
match condition {
ResumeCondition::Single {
waitpoint_key,
matcher,
} => by_wp
.get(waitpoint_key.as_str())
.map(|sigs| sigs.iter().any(|s| matcher_matches(matcher, s)))
.unwrap_or(false),
ResumeCondition::OperatorOnly | ResumeCondition::TimeoutOnly => false,
ResumeCondition::Composite(body) => evaluate_composite(body, by_wp),
_ => false,
}
}
fn evaluate_composite(body: &CompositeBody, by_wp: &SignalsByWaitpoint<'_>) -> bool {
match body {
CompositeBody::AllOf { members } => {
!members.is_empty() && members.iter().all(|m| evaluate(m, by_wp))
}
CompositeBody::Count {
n,
count_kind,
matcher,
waitpoints,
} => {
let mut candidates: Vec<(&str, &ResumeSignal)> = Vec::new();
for wpk in waitpoints {
let Some(sigs) = by_wp.get(wpk.as_str()) else {
continue;
};
for s in sigs.iter() {
if matcher
.as_ref()
.map(|m| matcher_matches(m, s))
.unwrap_or(true)
{
candidates.push((wpk.as_str(), s));
}
}
}
let distinct_count = match count_kind {
CountKind::DistinctWaitpoints => {
let mut set: HashSet<&str> = HashSet::new();
for (wpk, _) in &candidates {
set.insert(wpk);
}
set.len() as u32
}
CountKind::DistinctSignals => {
let mut set: HashSet<String> = HashSet::new();
for (_, s) in &candidates {
set.insert(s.signal_id.0.to_string());
}
set.len() as u32
}
CountKind::DistinctSources => {
let mut set: HashSet<(&str, &str)> = HashSet::new();
for (_, s) in &candidates {
set.insert((s.source_type.as_str(), s.source_identity.as_str()));
}
set.len() as u32
}
_ => 0,
};
distinct_count >= *n
}
_ => false,
}
}
fn matcher_matches(matcher: &SignalMatcher, signal: &ResumeSignal) -> bool {
match matcher {
SignalMatcher::ByName(name) => signal.signal_name.as_str() == name.as_str(),
SignalMatcher::Wildcard => true,
_ => false,
}
}
#[cfg(test)]
mod evaluator_tests {
use super::*;
use ff_core::contracts::{CompositeBody, CountKind, ResumeCondition, SignalMatcher};
use ff_core::types::{SignalId, TimestampMs};
fn sig(wp_key: &str, name: &str, source_type: &str, source_identity: &str) -> ResumeSignal {
let _ = wp_key;
ResumeSignal {
signal_id: SignalId::new(),
signal_name: name.to_owned(),
signal_category: "external".to_owned(),
source_type: source_type.to_owned(),
source_identity: source_identity.to_owned(),
correlation_id: String::new(),
accepted_at: TimestampMs(0),
payload: None,
}
}
#[test]
fn single_byname_matches() {
let s = sig("wpk:a", "ready", "worker", "w1");
let v = vec![s];
let by_wp: SignalsByWaitpoint<'_> = [("wpk:a", v.as_slice())].into_iter().collect();
let cond = ResumeCondition::Single {
waitpoint_key: "wpk:a".into(),
matcher: SignalMatcher::ByName("ready".into()),
};
assert!(evaluate(&cond, &by_wp));
}
#[test]
fn single_byname_rejects_wrong_name() {
let s = sig("wpk:a", "other", "worker", "w1");
let v = vec![s];
let by_wp: SignalsByWaitpoint<'_> = [("wpk:a", v.as_slice())].into_iter().collect();
let cond = ResumeCondition::Single {
waitpoint_key: "wpk:a".into(),
matcher: SignalMatcher::ByName("ready".into()),
};
assert!(!evaluate(&cond, &by_wp));
}
#[test]
fn count_distinct_sources_requires_two_distinct() {
let dup_source = vec![
sig("wpk:a", "x", "worker", "w1"),
sig("wpk:a", "x", "worker", "w1"),
];
let by_wp: SignalsByWaitpoint<'_> =
[("wpk:a", dup_source.as_slice())].into_iter().collect();
let cond = ResumeCondition::Composite(CompositeBody::Count {
n: 2,
count_kind: CountKind::DistinctSources,
matcher: None,
waitpoints: vec!["wpk:a".into()],
});
assert!(!evaluate(&cond, &by_wp));
let distinct_sources = vec![
sig("wpk:a", "x", "worker", "w1"),
sig("wpk:a", "x", "worker", "w2"),
];
let by_wp_ok: SignalsByWaitpoint<'_> =
[("wpk:a", distinct_sources.as_slice())].into_iter().collect();
assert!(evaluate(&cond, &by_wp_ok));
}
#[test]
fn allof_two_waitpoints() {
let a = sig("wpk:a", "x", "src", "id");
let b = sig("wpk:b", "x", "src", "id");
let va = vec![a];
let vb = vec![b];
let by_wp: SignalsByWaitpoint<'_> =
[("wpk:a", va.as_slice()), ("wpk:b", vb.as_slice())]
.into_iter()
.collect();
let cond = ResumeCondition::all_of_waitpoints(["wpk:a", "wpk:b"]);
assert!(evaluate(&cond, &by_wp));
let only_a: SignalsByWaitpoint<'_> =
[("wpk:a", va.as_slice())].into_iter().collect();
assert!(!evaluate(&cond, &only_a));
}
}