pub mod policy;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use anyhow::{Context, Result};
use async_nats::jetstream::kv::Operation;
use chrono::{DateTime, Duration as ChronoDuration, Local, Utc};
use futures::{StreamExt, TryStreamExt};
use kanade_shared::kv::{
BUCKET_AGENT_GROUPS, BUCKET_FLEET_CONFIG, BUCKET_SCHEDULER_DISPATCH, BUCKET_SCHEDULES,
KEY_FREEZE, dispatch_mark_pc_key, dispatch_mark_target_key,
};
use kanade_shared::manifest::{
ExecMode, FanoutPlan, Freeze, Manifest, RunsOn, Schedule, ScheduleTz, Target, When,
};
use sqlx::Row;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::exec::exec_manifest;
use policy::{Completion, FireAction, decide_fire, suppress_dispatched};
const ALIVE_THRESHOLD: ChronoDuration = ChronoDuration::minutes(2);
const DISPATCH_DRAIN_MARGIN: ChronoDuration = ChronoDuration::seconds(90);
const DISPATCH_WINDOW_MIN: ChronoDuration = ChronoDuration::seconds(90);
const DISPATCH_WINDOW_MAX: ChronoDuration = ChronoDuration::minutes(30);
const DISPATCH_MARK_TTL: StdDuration = StdDuration::from_secs(60 * 60);
const DISPATCH_KV_CONCURRENCY: usize = 16;
type Registered = Arc<Mutex<HashMap<String, Uuid>>>;
type FreezeMirror = Arc<tokio::sync::RwLock<Option<Freeze>>>;
pub async fn run(state: AppState) -> Result<()> {
let kv = state
.jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.context("ensure schedules KV")?;
if let Err(e) = state
.jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_SCHEDULER_DISPATCH.into(),
history: 1,
max_age: DISPATCH_MARK_TTL,
..Default::default()
})
.await
{
warn!(error = %e, "ensure scheduler_dispatch KV failed (benign if the bucket already exists with a prior config; a genuine failure falls back to completion-only dedup)");
}
let sched = JobScheduler::new().await.context("init JobScheduler")?;
sched.start().await.context("start JobScheduler")?;
let registered: Registered = Arc::new(Mutex::new(HashMap::new()));
let initial_freeze = match load_freeze(&state.jetstream).await {
Ok(v) => v,
Err(e) => {
warn!(error = %e, "freeze boot-seed read failed; mirror starts empty, watch will seed on connect");
None
}
};
let freeze: FreezeMirror = Arc::new(tokio::sync::RwLock::new(initial_freeze));
spawn_freeze_watcher(state.jetstream.clone(), freeze.clone());
let keys: Vec<String> = match kv.keys().await {
Ok(stream) => stream.try_collect().await.unwrap_or_else(|e| {
warn!(error = %e, "collect schedules KV keys (initial load best-effort)");
Vec::new()
}),
Err(e) => {
warn!(error = %e, "list schedules KV keys (likely empty bucket; watch loop still arms)");
Vec::new()
}
};
for k in keys {
let entry = match kv.get(&k).await {
Ok(Some(b)) => b,
Ok(None) => continue,
Err(e) => {
warn!(error = %e, key = %k, "kv get");
continue;
}
};
match serde_json::from_slice::<Schedule>(&entry) {
Ok(s) if s.enabled => {
if let Err(e) = register(
&sched,
state.clone(),
®istered,
s.clone(),
freeze.clone(),
)
.await
{
warn!(error = %e, schedule_id = %s.id, "initial register failed");
}
}
Ok(s) => info!(schedule_id = %s.id, "skipped (disabled)"),
Err(e) => warn!(error = %e, key = %k, "deserialize Schedule"),
}
}
let initial_count = registered.lock().await.len();
info!(
count = initial_count,
"scheduler registered initial schedules"
);
let mut watcher = kv.watch_all().await.context("kv watch_all")?;
while let Some(entry) = watcher.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "watch entry error");
continue;
}
};
match entry.operation {
Operation::Put => {
let sched_data: Schedule = match serde_json::from_slice(&entry.value) {
Ok(s) => s,
Err(e) => {
warn!(error = %e, key = %entry.key, "deserialize Schedule on watch");
continue;
}
};
unregister(&sched, ®istered, &sched_data.id).await;
if sched_data.enabled
&& let Err(e) = register(
&sched,
state.clone(),
®istered,
sched_data.clone(),
freeze.clone(),
)
.await
{
warn!(error = %e, schedule_id = %sched_data.id, "watch register failed");
}
}
Operation::Delete | Operation::Purge => {
unregister(&sched, ®istered, &entry.key).await;
}
}
}
std::future::pending::<Result<()>>().await
}
async fn register(
sched: &JobScheduler,
state: AppState,
registered: &Registered,
schedule: Schedule,
freeze: FreezeMirror,
) -> Result<()> {
if matches!(schedule.runs_on, RunsOn::Agent) {
info!(
schedule_id = %schedule.id,
"skipped (runs_on: agent — agents tick this schedule themselves)",
);
return Ok(());
}
let lowered = schedule.lowered();
let cron = lowered.cron;
let schedule_snapshot = schedule.clone();
let cb = move |_uuid, _l| {
let state = state.clone();
let schedule = schedule_snapshot.clone();
let freeze = freeze.clone();
Box::pin(async move {
tick(&state, schedule, &freeze).await;
}) as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
};
let job = match lowered.tz {
ScheduleTz::Utc => Job::new_async_tz(cron.as_str(), Utc, cb),
ScheduleTz::Local => Job::new_async_tz(cron.as_str(), Local, cb),
}
.with_context(|| format!("Job::new_async_tz (cron={cron}, tz={:?})", lowered.tz))?;
let uuid = sched.add(job).await.context("scheduler.add")?;
registered.lock().await.insert(schedule.id.clone(), uuid);
info!(
schedule_id = %schedule.id,
when = %schedule.when,
poll_cron = %cron,
tz = ?lowered.tz,
"scheduled",
);
if let When::Calendar(c) = &schedule.when {
if let Some(fires_at) = c.oneshot_instant(schedule.tz) {
if fires_at < Utc::now() {
warn!(
schedule_id = %schedule.id,
%fires_at,
"calendar one-shot date is in the past — it will never fire",
);
}
}
}
if let Some(err) = schedule.bad_window() {
warn!(
schedule_id = %schedule.id,
%err,
"constraints.window is unparseable — schedule blocked (fail-closed) until fixed",
);
}
if schedule.calendar_outside_window() {
warn!(
schedule_id = %schedule.id,
when = %schedule.when,
"calendar fire time is outside constraints.window — it will never fire",
);
}
Ok(())
}
async fn tick(state: &AppState, schedule: Schedule, freeze: &FreezeMirror) {
let schedule_id = schedule.id.clone();
let job_id = schedule.job_id.clone();
let lowered = schedule.lowered();
let frozen_reason = {
let guard = freeze.read().await;
guard
.as_ref()
.filter(|f| f.is_active(Utc::now()))
.map(|f| f.reason.clone())
};
if let Some(reason) = frozen_reason {
tracing::info!(
%schedule_id,
reason = reason.as_deref().unwrap_or(""),
"scheduler tick: fleet change-freeze active — skip",
);
return;
}
if !schedule.active.contains(Utc::now(), schedule.tz) {
tracing::debug!(%schedule_id, "scheduler tick: outside active window (dormant)");
return;
}
if !schedule.constraints.allows(Utc::now(), schedule.tz) {
tracing::debug!(%schedule_id, "scheduler tick: outside maintenance window — skip");
return;
}
let manifest = match crate::api::jobs::fetch(&state.jetstream, &job_id).await {
Ok(Some(m)) => m,
Ok(None) => {
warn!(
%schedule_id, %job_id,
"scheduler fire skipped: job not registered in catalog",
);
return;
}
Err(e) => {
warn!(%schedule_id, %job_id, error = %e, "scheduler fire failed: catalog lookup error");
return;
}
};
let now = Utc::now();
let deadline_at = match parse_starting_deadline(schedule.starting_deadline.as_deref(), now) {
Ok(v) => v,
Err(e) => {
warn!(
%schedule_id, error = %e,
"scheduler fire failed: invalid starting_deadline",
);
return;
}
};
let plan_for_dispatch = || {
let mut p = schedule.plan.clone();
p.deadline_at = deadline_at;
p
};
let retry = schedule.on_failure.lowered_retry();
if matches!(lowered.mode, ExecMode::EveryTick) {
dispatch(
state,
&schedule_id,
manifest,
plan_for_dispatch(),
retry,
"EveryTick",
)
.await;
return;
}
let expected = match resolve_expected_pcs(state, &schedule.plan.target).await {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = ?e, "scheduler fire failed: target resolve");
return;
}
};
let completions = match recent_completions(state, &job_id).await {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = ?e, "scheduler fire failed: completion lookup");
return;
}
};
let cooldown = match parse_cooldown(lowered.cooldown.as_deref()) {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = %e, "scheduler fire failed: invalid when.every");
return;
}
};
let action = decide_fire(lowered.mode, cooldown, &expected, &completions, now);
let window = suppress_window(&schedule, &manifest);
let action = match action {
FireAction::Skip => FireAction::Skip,
FireAction::FireWholeTarget => {
let target_mark = read_target_dispatch_mark(state, &schedule_id).await;
suppress_dispatched(
FireAction::FireWholeTarget,
&HashMap::new(),
target_mark,
window,
now,
)
}
FireAction::FirePcs(pcs) => {
let marks = read_pc_dispatch_marks(state, &schedule_id, &pcs).await;
suppress_dispatched(FireAction::FirePcs(pcs), &marks, None, window, now)
}
};
match action {
FireAction::Skip => {
tracing::debug!(
%schedule_id, when = %schedule.when,
expected = expected.len(),
completions = completions.len(),
"scheduler tick: dedup/in-flight says skip",
);
}
FireAction::FireWholeTarget => {
if dispatch(
state,
&schedule_id,
manifest,
plan_for_dispatch(),
retry,
"OncePerTarget armed",
)
.await
{
record_target_dispatch_mark(state, &schedule_id, now).await;
}
}
FireAction::FirePcs(pc_ids) => {
let pc_ids = if let Some(max) = schedule.constraints.max_concurrent {
let in_flight = count_in_flight(state, &job_id).await;
let capped = cap_pcs_to_slots(pc_ids, in_flight, max);
if capped.is_empty() {
tracing::debug!(
%schedule_id, %max, in_flight,
"max_concurrent: all slots busy — deferring this tick",
);
return;
}
capped
} else {
pc_ids
};
let mut plan = plan_for_dispatch();
plan.target = Target {
pcs: pc_ids.clone(),
..Target::default()
};
plan.rollout = None;
info!(
%schedule_id, pcs = pc_ids.len(),
"OncePerPc: firing at remaining pcs",
);
if dispatch(
state,
&schedule_id,
manifest,
plan,
retry,
"OncePerPc subset",
)
.await
{
record_pc_dispatch_marks(state, &schedule_id, &pc_ids, now).await;
}
}
}
}
async fn dispatch(
state: &AppState,
schedule_id: &str,
manifest: Manifest,
plan: FanoutPlan,
retry: Option<kanade_shared::wire::RetrySpec>,
why: &str,
) -> bool {
match exec_manifest(state, manifest, plan, "scheduler", None, retry).await {
Ok(resp) => {
info!(
%schedule_id, exec_id = %resp.exec_id, why,
"scheduler exec ok",
);
true
}
Err((status, msg)) => {
warn!(
%schedule_id, status = %status, error = %msg, why,
"scheduler exec failed",
);
false
}
}
}
fn parse_freeze_or_safe(bytes: &[u8]) -> Freeze {
serde_json::from_slice::<Freeze>(bytes).unwrap_or_else(|e| {
warn!(error = %e, "fleet freeze blob is corrupt — failing safe (treating fleet as frozen)");
Freeze::default()
})
}
async fn load_freeze(js: &async_nats::jetstream::Context) -> Result<Option<Freeze>> {
let kv = js
.get_key_value(BUCKET_FLEET_CONFIG)
.await
.context("open fleet_config KV")?;
match kv.get(KEY_FREEZE).await.context("get freeze key")? {
Some(bytes) => Ok(Some(parse_freeze_or_safe(&bytes))),
None => Ok(None),
}
}
fn spawn_freeze_watcher(js: async_nats::jetstream::Context, freeze: FreezeMirror) {
tokio::spawn(async move {
loop {
let kv = match js.get_key_value(BUCKET_FLEET_CONFIG).await {
Ok(kv) => kv,
Err(e) => {
warn!(error = %e, "freeze watcher: fleet_config KV unavailable; retrying");
tokio::time::sleep(StdDuration::from_secs(5)).await;
continue;
}
};
let mut watch = match kv.watch_all().await {
Ok(w) => w,
Err(e) => {
warn!(error = %e, "freeze watcher: watch_all failed; retrying");
tokio::time::sleep(StdDuration::from_secs(5)).await;
continue;
}
};
while let Some(entry) = watch.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "freeze watcher: watch entry error; reopening");
break;
}
};
if entry.key != KEY_FREEZE {
continue;
}
let next = match entry.operation {
Operation::Put => Some(parse_freeze_or_safe(&entry.value)),
Operation::Delete | Operation::Purge => None,
};
let frozen = next.is_some();
*freeze.write().await = next;
info!(frozen, "fleet change-freeze mirror updated");
}
}
});
}
async fn count_in_flight(state: &AppState, job_id: &str) -> u32 {
match sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM execution_results WHERE job_id = ? AND finished_at IS NULL",
)
.bind(job_id)
.fetch_one(&state.pool)
.await
{
Ok(n) => u32::try_from(n).unwrap_or(u32::MAX),
Err(e) => {
warn!(error = %e, %job_id, "max_concurrent: in-flight count query failed; treating as 0");
0
}
}
}
fn cap_pcs_to_slots(mut pcs: Vec<String>, in_flight: u32, max_concurrent: u32) -> Vec<String> {
let slots = max_concurrent.saturating_sub(in_flight) as usize;
pcs.truncate(slots);
pcs
}
fn suppress_window(schedule: &Schedule, manifest: &Manifest) -> ChronoDuration {
let parse = |s: &str| {
humantime::parse_duration(s)
.ok()
.and_then(|d| ChronoDuration::from_std(d).ok())
};
let jitter = schedule
.plan
.jitter
.as_deref()
.and_then(parse)
.unwrap_or_else(ChronoDuration::zero);
let timeout = parse(&manifest.execute.timeout).unwrap_or_else(|| {
warn!(
job_id = %manifest.id,
raw = %manifest.execute.timeout,
"suppress_window: unparseable timeout; treating as zero",
);
ChronoDuration::zero()
});
let retry_budget = schedule
.on_failure
.lowered_retry()
.and_then(|r| {
let per_attempt =
timeout.checked_add(&ChronoDuration::seconds(r.backoff_secs as i64))?;
per_attempt.checked_mul(r.max as i32)
})
.unwrap_or_else(ChronoDuration::zero);
let plain = jitter
.checked_add(&timeout)
.and_then(|d| d.checked_add(&DISPATCH_DRAIN_MARGIN))
.map(|d| d.clamp(DISPATCH_WINDOW_MIN, DISPATCH_WINDOW_MAX))
.unwrap_or(DISPATCH_WINDOW_MAX);
plain.checked_add(&retry_budget).unwrap_or(plain)
}
fn parse_dispatch_mark(bytes: &[u8]) -> Option<DateTime<Utc>> {
let s = std::str::from_utf8(bytes).ok()?;
DateTime::parse_from_rfc3339(s.trim())
.ok()
.map(|dt| dt.with_timezone(&Utc))
}
async fn read_pc_dispatch_marks(
state: &AppState,
schedule_id: &str,
pcs: &[String],
) -> HashMap<String, DateTime<Utc>> {
let Ok(kv) = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
else {
return HashMap::new();
};
futures::stream::iter(pcs.iter().cloned())
.map(|pc| {
let kv = kv.clone();
let key = dispatch_mark_pc_key(schedule_id, &pc);
async move {
let ts = match kv.get(&key).await {
Ok(Some(bytes)) => parse_dispatch_mark(&bytes),
_ => None,
};
(pc, ts)
}
})
.buffer_unordered(DISPATCH_KV_CONCURRENCY)
.filter_map(|(pc, ts)| async move { ts.map(|t| (pc, t)) })
.collect()
.await
}
async fn read_target_dispatch_mark(state: &AppState, schedule_id: &str) -> Option<DateTime<Utc>> {
let kv = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
.ok()?;
let bytes = kv
.get(&dispatch_mark_target_key(schedule_id))
.await
.ok()??;
parse_dispatch_mark(&bytes)
}
async fn record_pc_dispatch_marks(
state: &AppState,
schedule_id: &str,
pcs: &[String],
at: DateTime<Utc>,
) {
let Ok(kv) = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
else {
warn!(%schedule_id, "record dispatch marks: scheduler_dispatch KV unavailable");
return;
};
let val = at.to_rfc3339();
futures::stream::iter(pcs.iter().cloned())
.for_each_concurrent(DISPATCH_KV_CONCURRENCY, |pc| {
let kv = kv.clone();
let key = dispatch_mark_pc_key(schedule_id, &pc);
let val = val.clone();
async move {
if let Err(e) = kv.put(&key, val.into_bytes().into()).await {
warn!(%schedule_id, pc, error = %e, "record dispatch mark failed");
}
}
})
.await;
}
async fn record_target_dispatch_mark(state: &AppState, schedule_id: &str, at: DateTime<Utc>) {
let Ok(kv) = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
else {
warn!(%schedule_id, "record target dispatch mark: scheduler_dispatch KV unavailable");
return;
};
let key = dispatch_mark_target_key(schedule_id);
if let Err(e) = kv.put(&key, at.to_rfc3339().into_bytes().into()).await {
warn!(%schedule_id, error = %e, "record target dispatch mark failed");
}
}
fn parse_cooldown(s: Option<&str>) -> Result<Option<ChronoDuration>> {
match s {
None => Ok(None),
Some(raw) => {
let std: StdDuration = humantime::parse_duration(raw)
.with_context(|| format!("parse cooldown '{raw}'"))?;
Ok(Some(
ChronoDuration::from_std(std).context("cooldown overflow")?,
))
}
}
}
fn parse_starting_deadline(
s: Option<&str>,
now: chrono::DateTime<Utc>,
) -> Result<Option<chrono::DateTime<Utc>>> {
match s {
None => Ok(None),
Some(raw) => {
let std: StdDuration = humantime::parse_duration(raw)
.with_context(|| format!("parse starting_deadline '{raw}'"))?;
let d = ChronoDuration::from_std(std).context("starting_deadline overflow")?;
Ok(Some(now + d))
}
}
}
async fn recent_completions(state: &AppState, job_id: &str) -> Result<Vec<Completion>> {
let rows = sqlx::query(
"SELECT pc_id, MAX(finished_at) AS finished_at
FROM execution_results
WHERE job_id = ? AND exit_code = 0
GROUP BY pc_id",
)
.bind(job_id)
.fetch_all(&state.pool)
.await
.context("execution_results dedup query")?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let pc_id: String = r.try_get("pc_id").unwrap_or_default();
let finished_at: chrono::DateTime<Utc> = match r.try_get("finished_at") {
Ok(t) => t,
Err(_) => continue,
};
if !pc_id.is_empty() {
out.push(Completion { pc_id, finished_at });
}
}
Ok(out)
}
async fn resolve_expected_pcs(state: &AppState, target: &Target) -> Result<Vec<String>> {
let mut out: HashSet<String> = HashSet::new();
if target.all {
let cutoff = Utc::now() - ALIVE_THRESHOLD;
let rows = sqlx::query("SELECT pc_id FROM agents WHERE last_heartbeat >= ? ORDER BY pc_id")
.bind(cutoff)
.fetch_all(&state.pool)
.await
.context("agents alive query")?;
for r in rows {
if let Ok(pc) = r.try_get::<String, _>("pc_id") {
out.insert(pc);
}
}
}
if !target.groups.is_empty() {
let want: HashSet<&str> = target.groups.iter().map(String::as_str).collect();
let cutoff = Utc::now() - ALIVE_THRESHOLD;
let alive: HashSet<String> =
sqlx::query("SELECT pc_id FROM agents WHERE last_heartbeat >= ?")
.bind(cutoff)
.fetch_all(&state.pool)
.await
.context("alive list for group resolve")?
.into_iter()
.filter_map(|r| r.try_get::<String, _>("pc_id").ok())
.collect();
if let Ok(kv) = state.jetstream.get_key_value(BUCKET_AGENT_GROUPS).await {
if let Ok(keys) = kv.keys().await {
let keys: Vec<String> = keys.try_collect().await.unwrap_or_default();
for k in keys {
if !alive.contains(&k) {
continue;
}
let Ok(Some(bytes)) = kv.get(&k).await else {
continue;
};
let Ok(groups) = serde_json::from_slice::<Vec<String>>(&bytes) else {
continue;
};
if groups.iter().any(|g| want.contains(g.as_str())) {
out.insert(k);
}
}
}
}
}
for pc in &target.pcs {
out.insert(pc.clone());
}
let mut v: Vec<String> = out.into_iter().collect();
v.sort();
Ok(v)
}
async fn unregister(sched: &JobScheduler, registered: &Registered, schedule_id: &str) {
let removed = registered.lock().await.remove(schedule_id);
if let Some(uuid) = removed {
if let Err(e) = sched.remove(&uuid).await {
warn!(error = %e, schedule_id, "scheduler.remove failed");
} else {
info!(schedule_id, "scheduler unregistered");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn manifest_with_timeout(timeout: &str) -> Manifest {
serde_yaml::from_str(&format!(
"id: j\nversion: 1.0.0\nexecute:\n shell: powershell\n script: \"echo hi\"\n timeout: {timeout}\n"
))
.expect("manifest parse")
}
fn schedule_with(extra: &str) -> Schedule {
serde_yaml::from_str(&format!(
"id: s\nwhen:\n per_pc: {{ every: 6h }}\njob_id: j\ntarget: {{ all: true }}\n{extra}"
))
.expect("schedule parse")
}
#[test]
fn suppress_window_includes_retry_budget() {
let manifest = manifest_with_timeout("30s");
let plain = suppress_window(&schedule_with(""), &manifest);
let with_retry = suppress_window(
&schedule_with("on_failure:\n retry: { max: 3, backoff: 1m }\n"),
&manifest,
);
assert!(
with_retry >= plain + ChronoDuration::seconds(270),
"retry window {with_retry:?} must exceed plain {plain:?} by the budget",
);
}
#[test]
fn suppress_window_lifts_ceiling_for_long_retry() {
let manifest = manifest_with_timeout("5m");
let w = suppress_window(
&schedule_with("on_failure:\n retry: { max: 10, backoff: 10m }\n"),
&manifest,
);
assert!(
w > DISPATCH_WINDOW_MAX,
"window {w:?} must exceed the default cap"
);
}
#[test]
fn suppress_window_without_retry_keeps_default_clamp() {
let manifest = manifest_with_timeout("1h");
let w = suppress_window(&schedule_with(""), &manifest);
assert_eq!(w, DISPATCH_WINDOW_MAX);
}
fn pcs(n: usize) -> Vec<String> {
(0..n).map(|i| format!("pc-{i}")).collect()
}
#[test]
fn cap_truncates_to_free_slots() {
let out = cap_pcs_to_slots(pcs(10), 2, 5);
assert_eq!(out.len(), 3);
assert_eq!(out, vec!["pc-0", "pc-1", "pc-2"]);
}
#[test]
fn cap_at_capacity_defers_all() {
assert!(cap_pcs_to_slots(pcs(4), 5, 5).is_empty());
assert!(cap_pcs_to_slots(pcs(4), 9, 5).is_empty());
}
#[test]
fn cap_under_slots_keeps_all() {
let out = cap_pcs_to_slots(pcs(2), 0, 5);
assert_eq!(out.len(), 2);
}
}