use serde::{Deserialize, Serialize};
use thiserror::Error;
pub const SCHEDULER_EVIDENCE_SCHEMA_VERSION: &str = "asupersync.scheduler-evidence.v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchedulerEvidenceArtifact {
pub schema_version: String,
pub run_label: String,
pub workload_class: SchedulerWorkloadClass,
pub topology: SchedulerTopologyDescriptor,
pub current_knobs: SchedulerKnobProfile,
pub metrics: SchedulerEvidenceMetrics,
pub notes: Vec<String>,
}
impl SchedulerEvidenceArtifact {
pub fn validate(&self) -> Result<(), SchedulerEvidenceError> {
if self.schema_version != SCHEDULER_EVIDENCE_SCHEMA_VERSION {
return Err(SchedulerEvidenceError::UnsupportedSchemaVersion {
expected: SCHEDULER_EVIDENCE_SCHEMA_VERSION.to_string(),
found: self.schema_version.clone(),
});
}
if self.run_label.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptyRunLabel);
}
if self.topology.worker_threads == 0 {
return Err(SchedulerEvidenceError::ZeroWorkerThreads);
}
if self.topology.cohort_count == 0 {
return Err(SchedulerEvidenceError::ZeroCohortCount);
}
if self.topology.memory_budget_gib == 0 {
return Err(SchedulerEvidenceError::ZeroMemoryBudget);
}
if self.current_knobs.worker_threads == 0 {
return Err(SchedulerEvidenceError::ZeroCurrentWorkers);
}
if self.current_knobs.steal_batch_size == 0 {
return Err(SchedulerEvidenceError::ZeroStealBatchSize);
}
if self.current_knobs.cancel_streak_limit == 0 {
return Err(SchedulerEvidenceError::ZeroCancelStreakLimit);
}
self.metrics.validate()?;
Ok(())
}
pub fn tune_report(&self) -> Result<SchedulerTuneReport, SchedulerEvidenceError> {
self.validate()?;
let mut recommended_knobs = self.current_knobs.clone();
let mut reason_codes = Vec::new();
let mut explanation = Vec::new();
let mut global_queue_limit_hint = None;
let backlog_scale_threshold = self.topology.worker_threads.saturating_mul(4);
if self.metrics.wake_to_run_p99_ns >= 150_000
&& self.metrics.ready_backlog_p99 >= backlog_scale_threshold
{
reason_codes.push(SchedulerRecommendationReason::WorkersSaturated);
recommended_knobs.worker_threads = recommended_knobs
.worker_threads
.saturating_add(self.topology.cohort_count.max(1));
explanation.push(format!(
"wake_to_run p99={}ns with ready_backlog_p99={} exceeded the worker saturation envelope",
self.metrics.wake_to_run_p99_ns, self.metrics.ready_backlog_p99
));
}
if self.metrics.queue_residency_p99_ns >= self.metrics.wake_to_run_p99_ns.saturating_mul(2)
{
reason_codes.push(SchedulerRecommendationReason::QueueResidencyDominant);
recommended_knobs.steal_batch_size =
recommended_knobs.steal_batch_size.saturating_mul(2).min(64);
global_queue_limit_hint = Some(
self.metrics
.ready_backlog_p99
.saturating_mul(2)
.max(backlog_scale_threshold),
);
explanation.push(format!(
"queue_residency p99={}ns dominated wake_to_run p99={}ns, suggesting deeper burst draining",
self.metrics.queue_residency_p99_ns, self.metrics.wake_to_run_p99_ns
));
}
if self.metrics.cancel_debt_p99
>= self
.metrics
.cancel_debt_p95
.max(self.current_knobs.cancel_streak_limit)
{
reason_codes.push(SchedulerRecommendationReason::CancelDebtDominant);
recommended_knobs.cancel_streak_limit = recommended_knobs
.cancel_streak_limit
.saturating_mul(2)
.min(128);
explanation.push(format!(
"cancel_debt p99={} remained above the current drain envelope",
self.metrics.cancel_debt_p99
));
}
if let Some(remote_steal_ratio_pct) = self.metrics.remote_steal_ratio_pct
&& self.topology.cohort_count > 1
&& remote_steal_ratio_pct >= 35
{
reason_codes.push(SchedulerRecommendationReason::RemoteStealPressure);
explanation.push(format!(
"remote steal ratio {}% indicates locality-aware follow-up work should stay enabled",
remote_steal_ratio_pct
));
}
if reason_codes.is_empty() {
reason_codes.push(SchedulerRecommendationReason::BalancedBaseline);
explanation.push(
"tail and backlog metrics stayed inside the conservative baseline envelope"
.to_string(),
);
}
let profile_name = if reason_codes
.contains(&SchedulerRecommendationReason::WorkersSaturated)
{
"scale_workers"
} else if reason_codes.contains(&SchedulerRecommendationReason::QueueResidencyDominant) {
"drain_ready_bursts"
} else if reason_codes.contains(&SchedulerRecommendationReason::CancelDebtDominant) {
"drain_cancel_pressure"
} else {
"conservative_baseline"
};
let confidence_percent = 55u8
.saturating_add(
(u8::try_from(reason_codes.len()).unwrap_or(u8::MAX)).saturating_mul(10),
)
.min(90);
Ok(SchedulerTuneReport {
schema_version: SCHEDULER_EVIDENCE_SCHEMA_VERSION.to_string(),
source_run_label: self.run_label.clone(),
workload_class: self.workload_class,
profile_name: profile_name.to_string(),
recommended_knobs,
global_queue_limit_hint,
fallback_profile: self.current_knobs.clone(),
confidence_percent,
reason_codes,
explanation,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SchedulerWorkloadClass {
InteractiveSwarm,
MixedBurst,
CancellationStorm,
ThroughputDrain,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchedulerTopologyDescriptor {
pub worker_threads: usize,
pub cohort_count: usize,
pub memory_budget_gib: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchedulerKnobProfile {
pub worker_threads: usize,
pub steal_batch_size: usize,
pub cancel_streak_limit: usize,
pub global_queue_limit: usize,
pub parking_enabled: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchedulerEvidenceMetrics {
pub wake_to_run_p50_ns: u64,
pub wake_to_run_p95_ns: u64,
pub wake_to_run_p99_ns: u64,
pub queue_residency_p50_ns: u64,
pub queue_residency_p95_ns: u64,
pub queue_residency_p99_ns: u64,
pub ready_backlog_p95: usize,
pub ready_backlog_p99: usize,
pub cancel_debt_p95: usize,
pub cancel_debt_p99: usize,
pub remote_steal_ratio_pct: Option<u8>,
pub cross_cohort_wake_p99_ns: Option<u64>,
}
impl SchedulerEvidenceMetrics {
fn validate(&self) -> Result<(), SchedulerEvidenceError> {
validate_percentiles(
self.wake_to_run_p50_ns,
self.wake_to_run_p95_ns,
self.wake_to_run_p99_ns,
"wake_to_run",
)?;
validate_percentiles(
self.queue_residency_p50_ns,
self.queue_residency_p95_ns,
self.queue_residency_p99_ns,
"queue_residency",
)?;
validate_percentiles(
self.ready_backlog_p95,
self.ready_backlog_p99,
self.ready_backlog_p99,
"ready_backlog",
)?;
validate_percentiles(
self.cancel_debt_p95,
self.cancel_debt_p99,
self.cancel_debt_p99,
"cancel_debt",
)?;
if let Some(remote_steal_ratio_pct) = self.remote_steal_ratio_pct
&& remote_steal_ratio_pct > 100
{
return Err(SchedulerEvidenceError::RemoteStealRatioOutOfRange(
remote_steal_ratio_pct,
));
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchedulerTuneReport {
pub schema_version: String,
pub source_run_label: String,
pub workload_class: SchedulerWorkloadClass,
pub profile_name: String,
pub recommended_knobs: SchedulerKnobProfile,
pub global_queue_limit_hint: Option<usize>,
pub fallback_profile: SchedulerKnobProfile,
pub confidence_percent: u8,
pub reason_codes: Vec<SchedulerRecommendationReason>,
pub explanation: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SchedulerRecommendationReason {
WorkersSaturated,
QueueResidencyDominant,
CancelDebtDominant,
RemoteStealPressure,
BalancedBaseline,
}
pub const SCHEDULER_COORDINATION_EVIDENCE_SCHEMA_VERSION: &str =
"asupersync.scheduler-coordination-evidence-inputs.v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchedulerCoordinationEvidenceInputs {
pub schema_version: String,
pub source_pack_id: String,
pub source_bundle_hash: String,
pub source_run_id: String,
pub evidence_inputs: Vec<SchedulerCoordinationEvidenceInput>,
}
impl SchedulerCoordinationEvidenceInputs {
pub fn validate(&self) -> Result<(), SchedulerEvidenceError> {
if self.schema_version != SCHEDULER_COORDINATION_EVIDENCE_SCHEMA_VERSION {
return Err(SchedulerEvidenceError::UnsupportedSchemaVersion {
expected: SCHEDULER_COORDINATION_EVIDENCE_SCHEMA_VERSION.to_string(),
found: self.schema_version.clone(),
});
}
validate_hash(&self.source_bundle_hash)?;
if self.source_pack_id.trim().is_empty() || self.source_run_id.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptyEvidenceInputId);
}
if self.evidence_inputs.is_empty() {
return Err(SchedulerEvidenceError::EmptyEvidenceInputSet);
}
for input in &self.evidence_inputs {
input.validate()?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchedulerCoordinationEvidenceInput {
pub evidence_input_id: String,
pub workload_id: String,
pub workload_class: SchedulerWorkloadClass,
pub scenario_family: CoordinationPressureFamily,
pub semantic_pressure: Vec<String>,
pub provenance_only_context: Vec<String>,
pub source_event_count: usize,
pub source_hashes: Vec<String>,
pub source_bundle_hash: String,
}
impl SchedulerCoordinationEvidenceInput {
pub fn validate(&self) -> Result<(), SchedulerEvidenceError> {
if self.evidence_input_id.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptyEvidenceInputId);
}
if self.workload_id.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptyCoordinationWorkloadId);
}
if self.semantic_pressure.is_empty()
|| self
.semantic_pressure
.iter()
.any(|item| item.trim().is_empty())
{
return Err(SchedulerEvidenceError::EmptySemanticPressure);
}
if self.provenance_only_context.is_empty()
|| self
.provenance_only_context
.iter()
.any(|item| item.trim().is_empty())
{
return Err(SchedulerEvidenceError::EmptyProvenanceContext);
}
if self.source_event_count == 0 {
return Err(SchedulerEvidenceError::ZeroSourceEventCount);
}
validate_hash(&self.source_bundle_hash)?;
if self.source_hashes.is_empty() {
return Err(SchedulerEvidenceError::EmptySourceHash);
}
for hash in &self.source_hashes {
validate_hash(hash)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CoordinationPressureFamily {
TrackerLockContention,
ConcurrentRchProofs,
FailClosedDirtyFrontier,
ArtifactRetrievalTail,
ProofRunnerFanout,
StaleInProgressReclaim,
CoordinationLatencyBurst,
}
pub const SWARM_CAPACITY_SNAPSHOT_SCHEMA_VERSION: &str = "asupersync.swarm-capacity-snapshot.v1";
pub const SWARM_ADMISSION_POLICY_REPORT_SCHEMA_VERSION: &str =
"asupersync.swarm-admission-policy-report.v1";
pub const SWARM_MEMORY_BUDGET_PLAN_SCHEMA_VERSION: &str = "asupersync.swarm-memory-budget-plan.v1";
pub const SWARM_MEMORY_RESIDENCY_POLICY_SCHEMA_VERSION: &str =
"asupersync.swarm-memory-residency-policy.v1";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmCapacitySnapshot {
pub schema_version: String,
pub snapshot_id: String,
pub cpu: SwarmCpuTopologyHints,
pub memory: SwarmMemoryCapacity,
pub disk: SwarmDiskCapacity,
pub rch: SwarmRchCapacity,
pub coordination: SwarmCoordinationBacklogSignals,
}
impl SwarmCapacitySnapshot {
pub fn validate(&self) -> Result<(), SchedulerEvidenceError> {
if self.schema_version != SWARM_CAPACITY_SNAPSHOT_SCHEMA_VERSION {
return Err(SchedulerEvidenceError::UnsupportedSchemaVersion {
expected: SWARM_CAPACITY_SNAPSHOT_SCHEMA_VERSION.to_string(),
found: self.schema_version.clone(),
});
}
if self.snapshot_id.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptyCapacitySnapshotId);
}
self.cpu.validate()?;
self.memory.validate()?;
self.disk.validate()?;
Ok(())
}
pub fn admission_report(&self) -> Result<SwarmAdmissionReport, SchedulerEvidenceError> {
self.validate()?;
let lanes = vec![
self.classify_source_only_lane(),
self.classify_tracker_planning_lane(),
self.classify_remote_proof_lane(),
self.classify_local_artifact_lane(),
self.classify_cleanup_authorization_lane(),
];
let recommended_lane = self.recommend_admission_lane(&lanes);
Ok(SwarmAdmissionReport {
schema_version: SWARM_ADMISSION_POLICY_REPORT_SCHEMA_VERSION.to_string(),
source_snapshot_id: self.snapshot_id.clone(),
recommended_lane,
lanes,
})
}
pub fn memory_budget_plan(&self) -> Result<SwarmMemoryBudgetPlan, SchedulerEvidenceError> {
self.validate()?;
let host_tier = SwarmMemoryHostTier::classify(self.memory.total_bytes);
let available_bytes = self.memory.available_bytes.unwrap_or(0);
let total_bytes = self.memory.total_bytes.unwrap_or(available_bytes);
let emergency_reserve_bytes = host_tier.emergency_reserve_bytes(
total_bytes,
available_bytes,
self.memory.pressure_tier,
);
let allocatable_bytes = available_bytes.saturating_sub(emergency_reserve_bytes);
let (interactive_weight, trace_weight, proof_weight, cache_weight) =
host_tier.budget_weights(self.memory.pressure_tier);
let total_weight = interactive_weight + trace_weight + proof_weight + cache_weight;
let lane_budget = |weight| {
if total_weight == 0 {
0
} else {
allocatable_bytes.saturating_mul(weight) / total_weight
}
};
let interactive_runtime_bytes = lane_budget(interactive_weight);
let trace_replay_bytes = lane_budget(trace_weight);
let proof_artifact_staging_bytes = lane_budget(proof_weight);
let compiler_cache_bytes = lane_budget(cache_weight);
let total_planned_bytes = interactive_runtime_bytes
.saturating_add(trace_replay_bytes)
.saturating_add(proof_artifact_staging_bytes)
.saturating_add(compiler_cache_bytes);
Ok(SwarmMemoryBudgetPlan {
schema_version: SWARM_MEMORY_BUDGET_PLAN_SCHEMA_VERSION.to_string(),
source_snapshot_id: self.snapshot_id.clone(),
host_tier,
pressure_tier: self.memory.pressure_tier,
available_bytes,
total_bytes,
emergency_reserve_bytes,
interactive_runtime_bytes,
trace_replay_bytes,
proof_artifact_staging_bytes,
compiler_cache_bytes,
total_planned_bytes,
})
}
pub fn memory_residency_policy(
&self,
request: &SwarmMemoryResidencyRequest,
) -> Result<SwarmMemoryResidencyPlan, SchedulerEvidenceError> {
self.validate()?;
request.validate()?;
let budget = self.memory_budget_plan()?;
let lane_budget_bytes = request.workload_class.lane_budget_bytes(&budget);
let before = SwarmMemoryResidencyEnvelope {
available_bytes: budget.available_bytes,
emergency_reserve_bytes: budget.emergency_reserve_bytes,
lane_budget_bytes,
pressure_tier: budget.pressure_tier,
host_tier: budget.host_tier,
};
let metrics_stale = request.metrics_age_secs > request.max_metrics_age_secs;
let contradictory_policy = request.minimum_hot_bytes > request.requested_bytes;
let (decision, fallback_reason) = if contradictory_policy {
(
SwarmMemoryResidencyDecision::RefuseNoWin,
Some(SwarmMemoryResidencyFallbackReason::ContradictoryPolicy),
)
} else if metrics_stale {
(
SwarmMemoryResidencyDecision::RefuseNoWin,
Some(SwarmMemoryResidencyFallbackReason::StaleMetrics),
)
} else if lane_budget_bytes == 0 && request.requested_bytes > 0 {
(
SwarmMemoryResidencyDecision::RefuseNoWin,
Some(SwarmMemoryResidencyFallbackReason::EmptyBudget),
)
} else if request.requested_bytes <= lane_budget_bytes {
(SwarmMemoryResidencyDecision::AdmitHot, None)
} else if request.spill_allowed && request.minimum_hot_bytes <= lane_budget_bytes {
(SwarmMemoryResidencyDecision::SpillCold, None)
} else if request.brownout_allowed && request.workload_class.brownout_eligible() {
(SwarmMemoryResidencyDecision::BrownoutOptional, None)
} else {
(
SwarmMemoryResidencyDecision::RefuseNoWin,
Some(SwarmMemoryResidencyFallbackReason::NoSafeResidencyTier),
)
};
let minimum_hot_bytes = request.minimum_hot_bytes.min(request.requested_bytes);
let (
hot_resident_bytes,
warm_resident_bytes,
spilled_bytes,
browned_out_bytes,
refused_bytes,
) = memory_residency_counts(
request.requested_bytes,
minimum_hot_bytes,
lane_budget_bytes,
decision,
);
let after = SwarmMemoryResidencyEnvelope {
available_bytes: before
.available_bytes
.saturating_sub(hot_resident_bytes.saturating_add(warm_resident_bytes)),
emergency_reserve_bytes: before.emergency_reserve_bytes,
lane_budget_bytes: before.lane_budget_bytes,
pressure_tier: before.pressure_tier,
host_tier: before.host_tier,
};
let residency_tier = residency_tier_for_decision(decision);
let brownout_class = brownout_class_for_decision(decision, budget.pressure_tier);
let no_win_decision = decision == SwarmMemoryResidencyDecision::RefuseNoWin;
let explanation = memory_residency_explanation(
request,
decision,
fallback_reason,
lane_budget_bytes,
hot_resident_bytes,
warm_resident_bytes,
spilled_bytes,
browned_out_bytes,
refused_bytes,
);
Ok(SwarmMemoryResidencyPlan {
schema_version: SWARM_MEMORY_RESIDENCY_POLICY_SCHEMA_VERSION.to_string(),
source_snapshot_id: self.snapshot_id.clone(),
policy_id: request.policy_id.clone(),
workload_id: request.workload_id.clone(),
workload_class: request.workload_class,
affected_region_id: request.affected_region_id.clone(),
affected_task_ids: sorted_unique_strings(&request.affected_task_ids),
proof_lane_id: request.proof_lane_id.clone(),
proof_command: request.proof_command.clone(),
decision,
residency_tier,
brownout_class,
fallback_reason,
before,
after,
requested_bytes: request.requested_bytes,
minimum_hot_bytes,
hot_resident_bytes,
warm_resident_bytes,
spilled_bytes,
browned_out_bytes,
refused_bytes,
metrics_age_secs: request.metrics_age_secs,
max_metrics_age_secs: request.max_metrics_age_secs,
metrics_stale,
no_win_decision,
preserved_invariants: SwarmMemoryProtectedInvariant::all(),
explanation,
})
}
fn classify_source_only_lane(&self) -> SwarmLaneAdmission {
let mut reason_codes = vec![SwarmAdmissionReasonCode::SourceOnlyAlwaysAvailable];
if self.disk.pressure_level == SwarmDiskPressureLevel::Critical {
reason_codes.push(SwarmAdmissionReasonCode::DiskCriticalPreferSourceOnly);
}
if self.coordination.active_dirty_paths > 0 {
reason_codes.push(SwarmAdmissionReasonCode::PeerDirtyPathsRequireNarrowReservations);
}
if self.coordination.ready_beads == 0 {
reason_codes.push(SwarmAdmissionReasonCode::SparseReadyQueueUseFallback);
}
SwarmLaneAdmission {
lane: SwarmAdmissionLane::InteractiveSourceOnly,
decision: SwarmAdmissionDecision::Admit,
validation_class: SwarmValidationClass::SourceOnly,
reason_codes,
}
}
fn recommend_admission_lane(&self, lanes: &[SwarmLaneAdmission]) -> SwarmAdmissionLane {
let is_admitted = |candidate| {
lanes.iter().any(|lane| {
lane.lane == candidate && lane.decision == SwarmAdmissionDecision::Admit
})
};
if self.coordination.ready_beads == 0
|| self.disk.pressure_level == SwarmDiskPressureLevel::Critical
|| matches!(
self.memory.pressure_tier,
SwarmMemoryPressureTier::Saturated | SwarmMemoryPressureTier::Critical
)
{
if is_admitted(SwarmAdmissionLane::InteractiveSourceOnly) {
return SwarmAdmissionLane::InteractiveSourceOnly;
}
}
if is_admitted(SwarmAdmissionLane::RemoteProof) {
return SwarmAdmissionLane::RemoteProof;
}
if is_admitted(SwarmAdmissionLane::InteractiveSourceOnly) {
return SwarmAdmissionLane::InteractiveSourceOnly;
}
if is_admitted(SwarmAdmissionLane::TrackerOnlyPlanning) {
return SwarmAdmissionLane::TrackerOnlyPlanning;
}
SwarmAdmissionLane::TrackerOnlyPlanning
}
fn classify_tracker_planning_lane(&self) -> SwarmLaneAdmission {
let mut reason_codes = vec![SwarmAdmissionReasonCode::TrackerPlanningAlwaysAvailable];
if self.coordination.ready_beads == 0 {
reason_codes.push(SwarmAdmissionReasonCode::SparseReadyQueueUseFallback);
}
SwarmLaneAdmission {
lane: SwarmAdmissionLane::TrackerOnlyPlanning,
decision: SwarmAdmissionDecision::Admit,
validation_class: SwarmValidationClass::SourceOnly,
reason_codes,
}
}
fn classify_remote_proof_lane(&self) -> SwarmLaneAdmission {
let mut reason_codes = Vec::new();
let decision = match self.rch.admissibility {
SwarmRchAdmissibility::Available => {
reason_codes.push(SwarmAdmissionReasonCode::RchAvailable);
SwarmAdmissionDecision::Admit
}
SwarmRchAdmissibility::Degraded => {
reason_codes.push(SwarmAdmissionReasonCode::RchDegraded);
SwarmAdmissionDecision::Admit
}
SwarmRchAdmissibility::Unavailable => {
reason_codes.push(SwarmAdmissionReasonCode::RchUnavailable);
SwarmAdmissionDecision::Defer
}
SwarmRchAdmissibility::DeferredByPolicy => {
reason_codes.push(SwarmAdmissionReasonCode::RchDeferredByPolicy);
SwarmAdmissionDecision::Defer
}
SwarmRchAdmissibility::Unknown => {
reason_codes.push(SwarmAdmissionReasonCode::RchUnknown);
SwarmAdmissionDecision::Defer
}
};
if self.disk.pressure_level == SwarmDiskPressureLevel::Critical {
reason_codes.push(SwarmAdmissionReasonCode::DiskCriticalRemoteOnly);
}
SwarmLaneAdmission {
lane: SwarmAdmissionLane::RemoteProof,
decision,
validation_class: SwarmValidationClass::RemoteRch,
reason_codes,
}
}
fn classify_local_artifact_lane(&self) -> SwarmLaneAdmission {
let mut reason_codes = Vec::new();
let mut decision = SwarmAdmissionDecision::Admit;
match self.disk.pressure_level {
SwarmDiskPressureLevel::Healthy => {
reason_codes.push(SwarmAdmissionReasonCode::DiskHealthy);
}
SwarmDiskPressureLevel::Low => {
decision = SwarmAdmissionDecision::Defer;
reason_codes.push(SwarmAdmissionReasonCode::DiskLowPreferRemoteOrSourceOnly);
}
SwarmDiskPressureLevel::Critical => {
decision = SwarmAdmissionDecision::Defer;
reason_codes.push(SwarmAdmissionReasonCode::DiskCriticalBlocksLocalArtifacts);
}
SwarmDiskPressureLevel::Unknown => {
decision = SwarmAdmissionDecision::Defer;
reason_codes.push(SwarmAdmissionReasonCode::DiskUnknownBlocksLocalArtifacts);
}
}
if matches!(
self.memory.pressure_tier,
SwarmMemoryPressureTier::Saturated | SwarmMemoryPressureTier::Critical
) {
decision = SwarmAdmissionDecision::Defer;
reason_codes.push(SwarmAdmissionReasonCode::MemoryPressureBlocksArtifactGrowth);
}
SwarmLaneAdmission {
lane: SwarmAdmissionLane::LocalArtifactRetrieval,
decision,
validation_class: SwarmValidationClass::LocalArtifact,
reason_codes,
}
}
fn classify_cleanup_authorization_lane(&self) -> SwarmLaneAdmission {
let mut reason_codes = vec![SwarmAdmissionReasonCode::CleanupRequiresAuthorization];
if self.disk.pressure_level == SwarmDiskPressureLevel::Critical {
reason_codes.push(SwarmAdmissionReasonCode::DiskCriticalNeedsCleanupReview);
}
SwarmLaneAdmission {
lane: SwarmAdmissionLane::CleanupAuthorization,
decision: SwarmAdmissionDecision::RequireAuthorization,
validation_class: SwarmValidationClass::HumanAuthorizedCleanup,
reason_codes,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryHostTier {
Unknown,
Small,
Standard,
HighMemory,
}
impl SwarmMemoryHostTier {
fn classify(total_bytes: Option<u64>) -> Self {
const GIB: u64 = 1_024 * 1_024 * 1_024;
match total_bytes {
None => Self::Unknown,
Some(bytes) if bytes >= 256 * GIB => Self::HighMemory,
Some(bytes) if bytes > 64 * GIB => Self::Standard,
Some(_) => Self::Small,
}
}
fn emergency_reserve_bytes(
self,
total_bytes: u64,
available_bytes: u64,
pressure_tier: SwarmMemoryPressureTier,
) -> u64 {
const GIB: u64 = 1_024 * 1_024 * 1_024;
let tier_floor = match self {
Self::Unknown => 0,
Self::Small => 4 * GIB,
Self::Standard => 12 * GIB,
Self::HighMemory => 32 * GIB,
};
let ratio_floor = total_bytes / 10;
let healthy_reserve = tier_floor.max(ratio_floor);
let pressure_reserve = match pressure_tier {
SwarmMemoryPressureTier::Unknown => healthy_reserve.max(available_bytes / 4),
SwarmMemoryPressureTier::Healthy => healthy_reserve,
SwarmMemoryPressureTier::Low => healthy_reserve.max(available_bytes / 5),
SwarmMemoryPressureTier::Saturated => healthy_reserve.max(available_bytes / 3),
SwarmMemoryPressureTier::Critical => healthy_reserve.max(available_bytes / 2),
};
if available_bytes == 0 {
0
} else {
pressure_reserve.min(available_bytes.saturating_sub(1))
}
}
fn budget_weights(self, pressure_tier: SwarmMemoryPressureTier) -> (u64, u64, u64, u64) {
match pressure_tier {
SwarmMemoryPressureTier::Critical => (90, 10, 0, 0),
SwarmMemoryPressureTier::Saturated => (70, 20, 5, 5),
SwarmMemoryPressureTier::Low => (45, 30, 15, 10),
SwarmMemoryPressureTier::Unknown => (80, 20, 0, 0),
SwarmMemoryPressureTier::Healthy => match self {
Self::Unknown => (80, 20, 0, 0),
Self::Small => (45, 25, 15, 15),
Self::Standard => (35, 30, 20, 15),
Self::HighMemory => (25, 35, 25, 15),
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmMemoryBudgetPlan {
pub schema_version: String,
pub source_snapshot_id: String,
pub host_tier: SwarmMemoryHostTier,
pub pressure_tier: SwarmMemoryPressureTier,
pub available_bytes: u64,
pub total_bytes: u64,
pub emergency_reserve_bytes: u64,
pub interactive_runtime_bytes: u64,
pub trace_replay_bytes: u64,
pub proof_artifact_staging_bytes: u64,
pub compiler_cache_bytes: u64,
pub total_planned_bytes: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryResidencyWorkloadClass {
InteractiveRuntime,
TraceReplay,
ProofArtifact,
CompilerCache,
OptionalDiagnostics,
}
impl SwarmMemoryResidencyWorkloadClass {
const fn lane_budget_bytes(self, budget: &SwarmMemoryBudgetPlan) -> u64 {
match self {
Self::InteractiveRuntime => budget.interactive_runtime_bytes,
Self::TraceReplay => budget.trace_replay_bytes,
Self::ProofArtifact => budget.proof_artifact_staging_bytes,
Self::CompilerCache | Self::OptionalDiagnostics => budget.compiler_cache_bytes,
}
}
const fn brownout_eligible(self) -> bool {
matches!(
self,
Self::ProofArtifact | Self::CompilerCache | Self::OptionalDiagnostics
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryResidencyTier {
Hot,
Warm,
SpillEligible,
BrownoutEligible,
Refused,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryBrownoutClass {
None,
Observe,
DegradeOptional,
ShedOptional,
NoWin,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryResidencyDecision {
AdmitHot,
SpillCold,
BrownoutOptional,
RefuseNoWin,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryResidencyFallbackReason {
EmptyBudget,
StaleMetrics,
ContradictoryPolicy,
NoSafeResidencyTier,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryProtectedInvariant {
CoreScheduling,
CancellationDrain,
LoserDrain,
RegionQuiescence,
ObligationCleanup,
}
impl SwarmMemoryProtectedInvariant {
fn all() -> Vec<Self> {
vec![
Self::CoreScheduling,
Self::CancellationDrain,
Self::LoserDrain,
Self::RegionQuiescence,
Self::ObligationCleanup,
]
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmMemoryResidencyEnvelope {
pub available_bytes: u64,
pub emergency_reserve_bytes: u64,
pub lane_budget_bytes: u64,
pub pressure_tier: SwarmMemoryPressureTier,
pub host_tier: SwarmMemoryHostTier,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmMemoryResidencyRequest {
pub policy_id: String,
pub workload_id: String,
pub workload_class: SwarmMemoryResidencyWorkloadClass,
pub affected_region_id: Option<String>,
pub affected_task_ids: Vec<String>,
pub requested_bytes: u64,
pub minimum_hot_bytes: u64,
pub spill_allowed: bool,
pub brownout_allowed: bool,
pub metrics_age_secs: u64,
pub max_metrics_age_secs: u64,
pub proof_lane_id: Option<String>,
pub proof_command: Option<String>,
}
impl SwarmMemoryResidencyRequest {
fn validate(&self) -> Result<(), SchedulerEvidenceError> {
if self.policy_id.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptyMemoryResidencyPolicyId);
}
if self.workload_id.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptyMemoryResidencyWorkloadId);
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmMemoryResidencyPlan {
pub schema_version: String,
pub source_snapshot_id: String,
pub policy_id: String,
pub workload_id: String,
pub workload_class: SwarmMemoryResidencyWorkloadClass,
pub affected_region_id: Option<String>,
pub affected_task_ids: Vec<String>,
pub proof_lane_id: Option<String>,
pub proof_command: Option<String>,
pub decision: SwarmMemoryResidencyDecision,
pub residency_tier: SwarmMemoryResidencyTier,
pub brownout_class: SwarmMemoryBrownoutClass,
pub fallback_reason: Option<SwarmMemoryResidencyFallbackReason>,
pub before: SwarmMemoryResidencyEnvelope,
pub after: SwarmMemoryResidencyEnvelope,
pub requested_bytes: u64,
pub minimum_hot_bytes: u64,
pub hot_resident_bytes: u64,
pub warm_resident_bytes: u64,
pub spilled_bytes: u64,
pub browned_out_bytes: u64,
pub refused_bytes: u64,
pub metrics_age_secs: u64,
pub max_metrics_age_secs: u64,
pub metrics_stale: bool,
pub no_win_decision: bool,
pub preserved_invariants: Vec<SwarmMemoryProtectedInvariant>,
pub explanation: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmAdmissionLane {
InteractiveSourceOnly,
TrackerOnlyPlanning,
RemoteProof,
LocalArtifactRetrieval,
CleanupAuthorization,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmAdmissionDecision {
Admit,
Defer,
RequireAuthorization,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmValidationClass {
SourceOnly,
RemoteRch,
LocalArtifact,
HumanAuthorizedCleanup,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmAdmissionReasonCode {
SourceOnlyAlwaysAvailable,
TrackerPlanningAlwaysAvailable,
DiskCriticalPreferSourceOnly,
DiskCriticalRemoteOnly,
DiskHealthy,
DiskLowPreferRemoteOrSourceOnly,
DiskCriticalBlocksLocalArtifacts,
DiskUnknownBlocksLocalArtifacts,
DiskCriticalNeedsCleanupReview,
RchAvailable,
RchDegraded,
RchUnavailable,
RchDeferredByPolicy,
RchUnknown,
MemoryPressureBlocksArtifactGrowth,
PeerDirtyPathsRequireNarrowReservations,
SparseReadyQueueUseFallback,
CleanupRequiresAuthorization,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmLaneAdmission {
pub lane: SwarmAdmissionLane,
pub decision: SwarmAdmissionDecision,
pub validation_class: SwarmValidationClass,
pub reason_codes: Vec<SwarmAdmissionReasonCode>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmAdmissionReport {
pub schema_version: String,
pub source_snapshot_id: String,
pub recommended_lane: SwarmAdmissionLane,
pub lanes: Vec<SwarmLaneAdmission>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwarmCpuTopologyHints {
pub logical_cpus: usize,
pub physical_cores: Option<usize>,
pub numa_nodes: Option<usize>,
pub scheduler_worker_target: Option<usize>,
}
impl SwarmCpuTopologyHints {
fn validate(&self) -> Result<(), SchedulerEvidenceError> {
if self.logical_cpus == 0 {
return Err(SchedulerEvidenceError::InvalidCapacityDimension {
field: "cpu.logical_cpus",
});
}
if self.physical_cores == Some(0) {
return Err(SchedulerEvidenceError::InvalidCapacityDimension {
field: "cpu.physical_cores",
});
}
if self.numa_nodes == Some(0) {
return Err(SchedulerEvidenceError::InvalidCapacityDimension {
field: "cpu.numa_nodes",
});
}
if self.scheduler_worker_target == Some(0) {
return Err(SchedulerEvidenceError::InvalidCapacityDimension {
field: "cpu.scheduler_worker_target",
});
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmMemoryPressureTier {
#[default]
Unknown,
Healthy,
Low,
Saturated,
Critical,
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct SwarmMemoryCapacity {
pub available_bytes: Option<u64>,
pub total_bytes: Option<u64>,
#[serde(default)]
pub pressure_tier: SwarmMemoryPressureTier,
}
impl SwarmMemoryCapacity {
fn validate(&self) -> Result<(), SchedulerEvidenceError> {
validate_optional_capacity_pair(
self.available_bytes,
self.total_bytes,
"memory.available_bytes",
"memory.total_bytes",
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmDiskPressureLevel {
#[default]
Unknown,
Healthy,
Low,
Critical,
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct SwarmDiskCapacity {
pub free_bytes: Option<u64>,
pub total_bytes: Option<u64>,
#[serde(default)]
pub pressure_level: SwarmDiskPressureLevel,
}
impl SwarmDiskCapacity {
fn validate(&self) -> Result<(), SchedulerEvidenceError> {
validate_optional_capacity_pair(
self.free_bytes,
self.total_bytes,
"disk.free_bytes",
"disk.total_bytes",
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwarmRchAdmissibility {
#[default]
Unknown,
Available,
Degraded,
Unavailable,
DeferredByPolicy,
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct SwarmRchCapacity {
#[serde(default)]
pub admissibility: SwarmRchAdmissibility,
pub healthy_worker_count: Option<usize>,
pub available_slots: Option<usize>,
#[serde(default)]
pub blocked_reason_codes: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct SwarmCoordinationBacklogSignals {
pub ready_beads: usize,
pub open_beads: usize,
pub in_progress_beads: usize,
pub active_reservations: usize,
pub active_dirty_paths: usize,
pub active_agents: usize,
pub stale_in_progress_beads: usize,
}
#[derive(Debug, Error, PartialEq, Eq)]
pub enum SchedulerEvidenceError {
#[error("unsupported schema version: expected {expected}, found {found}")]
UnsupportedSchemaVersion {
expected: String,
found: String,
},
#[error("run label must not be empty")]
EmptyRunLabel,
#[error("topology must declare at least one worker thread")]
ZeroWorkerThreads,
#[error("topology must declare at least one cohort")]
ZeroCohortCount,
#[error("topology must declare a non-zero memory budget")]
ZeroMemoryBudget,
#[error("current knob profile must declare at least one worker")]
ZeroCurrentWorkers,
#[error("current knob profile must declare a non-zero steal batch size")]
ZeroStealBatchSize,
#[error("current knob profile must declare a non-zero cancel streak limit")]
ZeroCancelStreakLimit,
#[error("{field} percentiles must be monotonic (p50 <= p95 <= p99)")]
NonMonotonicPercentiles {
field: &'static str,
},
#[error("remote steal ratio must be between 0 and 100 inclusive, found {0}")]
RemoteStealRatioOutOfRange(u8),
#[error("coordination evidence input set must not be empty")]
EmptyEvidenceInputSet,
#[error("coordination evidence input id must not be empty")]
EmptyEvidenceInputId,
#[error("coordination workload id must not be empty")]
EmptyCoordinationWorkloadId,
#[error("coordination evidence must declare semantic pressure dimensions")]
EmptySemanticPressure,
#[error("coordination evidence must declare provenance-only context")]
EmptyProvenanceContext,
#[error("coordination evidence must include at least one source event")]
ZeroSourceEventCount,
#[error("coordination evidence source hashes must not be empty")]
EmptySourceHash,
#[error("coordination evidence source hash must start with sha256:, found {found}")]
InvalidSourceHash {
found: String,
},
#[error("swarm capacity snapshot id must not be empty")]
EmptyCapacitySnapshotId,
#[error("swarm memory residency policy id must not be empty")]
EmptyMemoryResidencyPolicyId,
#[error("swarm memory residency workload id must not be empty")]
EmptyMemoryResidencyWorkloadId,
#[error("swarm capacity dimension is invalid: {field}")]
InvalidCapacityDimension {
field: &'static str,
},
#[error("swarm capacity available field {available_field} exceeds total field {total_field}")]
CapacityAvailableExceedsTotal {
available_field: &'static str,
total_field: &'static str,
},
}
fn memory_residency_counts(
requested_bytes: u64,
minimum_hot_bytes: u64,
lane_budget_bytes: u64,
decision: SwarmMemoryResidencyDecision,
) -> (u64, u64, u64, u64, u64) {
match decision {
SwarmMemoryResidencyDecision::AdmitHot => (requested_bytes, 0, 0, 0, 0),
SwarmMemoryResidencyDecision::SpillCold => {
let hot = minimum_hot_bytes.min(lane_budget_bytes);
let warm = requested_bytes
.saturating_sub(hot)
.min(lane_budget_bytes.saturating_sub(hot));
let spilled = requested_bytes.saturating_sub(hot).saturating_sub(warm);
(hot, warm, spilled, 0, 0)
}
SwarmMemoryResidencyDecision::BrownoutOptional => {
let hot = minimum_hot_bytes.min(lane_budget_bytes);
let warm = requested_bytes
.saturating_sub(hot)
.min(lane_budget_bytes.saturating_sub(hot));
let browned_out = requested_bytes.saturating_sub(hot).saturating_sub(warm);
(hot, warm, 0, browned_out, 0)
}
SwarmMemoryResidencyDecision::RefuseNoWin => (0, 0, 0, 0, requested_bytes),
}
}
const fn residency_tier_for_decision(
decision: SwarmMemoryResidencyDecision,
) -> SwarmMemoryResidencyTier {
match decision {
SwarmMemoryResidencyDecision::AdmitHot => SwarmMemoryResidencyTier::Hot,
SwarmMemoryResidencyDecision::SpillCold => SwarmMemoryResidencyTier::SpillEligible,
SwarmMemoryResidencyDecision::BrownoutOptional => {
SwarmMemoryResidencyTier::BrownoutEligible
}
SwarmMemoryResidencyDecision::RefuseNoWin => SwarmMemoryResidencyTier::Refused,
}
}
const fn brownout_class_for_decision(
decision: SwarmMemoryResidencyDecision,
pressure_tier: SwarmMemoryPressureTier,
) -> SwarmMemoryBrownoutClass {
match decision {
SwarmMemoryResidencyDecision::BrownoutOptional => SwarmMemoryBrownoutClass::ShedOptional,
SwarmMemoryResidencyDecision::RefuseNoWin => SwarmMemoryBrownoutClass::NoWin,
SwarmMemoryResidencyDecision::AdmitHot | SwarmMemoryResidencyDecision::SpillCold => {
match pressure_tier {
SwarmMemoryPressureTier::Critical | SwarmMemoryPressureTier::Saturated => {
SwarmMemoryBrownoutClass::DegradeOptional
}
SwarmMemoryPressureTier::Low | SwarmMemoryPressureTier::Unknown => {
SwarmMemoryBrownoutClass::Observe
}
SwarmMemoryPressureTier::Healthy => SwarmMemoryBrownoutClass::None,
}
}
}
}
fn memory_residency_explanation(
request: &SwarmMemoryResidencyRequest,
decision: SwarmMemoryResidencyDecision,
fallback_reason: Option<SwarmMemoryResidencyFallbackReason>,
lane_budget_bytes: u64,
hot_resident_bytes: u64,
warm_resident_bytes: u64,
spilled_bytes: u64,
browned_out_bytes: u64,
refused_bytes: u64,
) -> Vec<String> {
let mut explanation = vec![format!(
"workload={} class={:?} requested={}B lane_budget={}B decision={:?}",
request.workload_id,
request.workload_class,
request.requested_bytes,
lane_budget_bytes,
decision
)];
if let Some(reason) = fallback_reason {
explanation.push(format!("fallback_reason={reason:?}"));
}
explanation.push(format!(
"resident_hot={}B resident_warm={}B spilled={}B browned_out={}B refused={}B",
hot_resident_bytes, warm_resident_bytes, spilled_bytes, browned_out_bytes, refused_bytes
));
explanation.push(
"core scheduling, cancellation drain, loser drain, region quiescence, and obligation cleanup remain preserved"
.to_string(),
);
explanation
}
fn sorted_unique_strings(values: &[String]) -> Vec<String> {
let mut sorted = values
.iter()
.map(|value| value.trim())
.filter(|value| !value.is_empty())
.map(ToString::to_string)
.collect::<Vec<_>>();
sorted.sort();
sorted.dedup();
sorted
}
fn validate_percentiles<T: Ord>(
p50: T,
p95: T,
p99: T,
field: &'static str,
) -> Result<(), SchedulerEvidenceError> {
if p50 > p95 || p95 > p99 {
return Err(SchedulerEvidenceError::NonMonotonicPercentiles { field });
}
Ok(())
}
fn validate_hash(hash: &str) -> Result<(), SchedulerEvidenceError> {
if hash.trim().is_empty() {
return Err(SchedulerEvidenceError::EmptySourceHash);
}
if !hash.starts_with("sha256:") {
return Err(SchedulerEvidenceError::InvalidSourceHash {
found: hash.to_string(),
});
}
Ok(())
}
fn validate_optional_capacity_pair(
available: Option<u64>,
total: Option<u64>,
available_field: &'static str,
total_field: &'static str,
) -> Result<(), SchedulerEvidenceError> {
if let (Some(available), Some(total)) = (available, total)
&& available > total
{
return Err(SchedulerEvidenceError::CapacityAvailableExceedsTotal {
available_field,
total_field,
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn baseline_artifact() -> SchedulerEvidenceArtifact {
SchedulerEvidenceArtifact {
schema_version: SCHEDULER_EVIDENCE_SCHEMA_VERSION.to_string(),
run_label: "unit-baseline-64c".to_string(),
workload_class: SchedulerWorkloadClass::InteractiveSwarm,
topology: SchedulerTopologyDescriptor {
worker_threads: 64,
cohort_count: 2,
memory_budget_gib: 256,
},
current_knobs: SchedulerKnobProfile {
worker_threads: 64,
steal_batch_size: 8,
cancel_streak_limit: 16,
global_queue_limit: 0,
parking_enabled: true,
},
metrics: SchedulerEvidenceMetrics {
wake_to_run_p50_ns: 5_000,
wake_to_run_p95_ns: 20_000,
wake_to_run_p99_ns: 60_000,
queue_residency_p50_ns: 8_000,
queue_residency_p95_ns: 30_000,
queue_residency_p99_ns: 90_000,
ready_backlog_p95: 32,
ready_backlog_p99: 96,
cancel_debt_p95: 4,
cancel_debt_p99: 8,
remote_steal_ratio_pct: Some(12),
cross_cohort_wake_p99_ns: Some(70_000),
},
notes: vec!["unit".to_string()],
}
}
fn coordination_input(
family: CoordinationPressureFamily,
workload_id: &str,
) -> SchedulerCoordinationEvidenceInput {
SchedulerCoordinationEvidenceInput {
evidence_input_id: format!("coordination-evidence-{workload_id}"),
workload_id: workload_id.to_string(),
workload_class: SchedulerWorkloadClass::InteractiveSwarm,
scenario_family: family,
semantic_pressure: vec![
"ready-backlog".to_string(),
"queue-residency-tail".to_string(),
],
provenance_only_context: vec![
"pseudonymized-agent".to_string(),
"hashed-path".to_string(),
],
source_event_count: 2,
source_hashes: vec!["sha256:event-a".to_string(), "sha256:event-b".to_string()],
source_bundle_hash: "sha256:coordination-bundle".to_string(),
}
}
#[test]
fn validate_rejects_schema_and_required_zero_fields() {
let mut artifact = baseline_artifact();
artifact.schema_version = "asupersync.scheduler-evidence.v0".to_string();
assert_eq!(
artifact.validate(),
Err(SchedulerEvidenceError::UnsupportedSchemaVersion {
expected: SCHEDULER_EVIDENCE_SCHEMA_VERSION.to_string(),
found: "asupersync.scheduler-evidence.v0".to_string(),
})
);
let mut artifact = baseline_artifact();
artifact.run_label = " ".to_string();
assert_eq!(
artifact.validate(),
Err(SchedulerEvidenceError::EmptyRunLabel)
);
let mut artifact = baseline_artifact();
artifact.topology.worker_threads = 0;
assert_eq!(
artifact.validate(),
Err(SchedulerEvidenceError::ZeroWorkerThreads)
);
let mut artifact = baseline_artifact();
artifact.current_knobs.steal_batch_size = 0;
assert_eq!(
artifact.validate(),
Err(SchedulerEvidenceError::ZeroStealBatchSize)
);
}
#[test]
fn validate_rejects_metric_boundary_violations() {
let mut artifact = baseline_artifact();
artifact.metrics.wake_to_run_p95_ns = artifact.metrics.wake_to_run_p50_ns - 1;
assert_eq!(
artifact.validate(),
Err(SchedulerEvidenceError::NonMonotonicPercentiles {
field: "wake_to_run",
})
);
let mut artifact = baseline_artifact();
artifact.metrics.remote_steal_ratio_pct = Some(101);
assert_eq!(
artifact.validate(),
Err(SchedulerEvidenceError::RemoteStealRatioOutOfRange(101))
);
}
#[test]
fn tune_report_keeps_conservative_fallback_for_balanced_baseline() {
let artifact = baseline_artifact();
let report = artifact
.tune_report()
.expect("balanced artifact should tune");
assert_eq!(report.profile_name, "conservative_baseline");
assert_eq!(report.recommended_knobs, artifact.current_knobs);
assert_eq!(report.fallback_profile, artifact.current_knobs);
assert_eq!(report.global_queue_limit_hint, None);
assert_eq!(
report.reason_codes,
vec![SchedulerRecommendationReason::BalancedBaseline]
);
assert_eq!(report.confidence_percent, 65);
assert!(
report
.explanation
.iter()
.any(|line| line.contains("conservative baseline envelope"))
);
}
#[test]
fn coordination_evidence_inputs_validate_all_pressure_families() {
let evidence = SchedulerCoordinationEvidenceInputs {
schema_version: SCHEDULER_COORDINATION_EVIDENCE_SCHEMA_VERSION.to_string(),
source_pack_id: "agent-swarm-coordination-pressure".to_string(),
source_bundle_hash: "sha256:coordination-runtime-fixture".to_string(),
source_run_id: "coordination-runtime-fixture-accepted-all-families".to_string(),
evidence_inputs: vec![
coordination_input(
CoordinationPressureFamily::TrackerLockContention,
"ASWARM-WL-LOCK-001",
),
coordination_input(
CoordinationPressureFamily::ConcurrentRchProofs,
"ASWARM-WL-RCH-001",
),
coordination_input(
CoordinationPressureFamily::FailClosedDirtyFrontier,
"ASWARM-WL-DIRTY-001",
),
coordination_input(
CoordinationPressureFamily::ArtifactRetrievalTail,
"ASWARM-WL-ARTIFACT-001",
),
coordination_input(
CoordinationPressureFamily::ProofRunnerFanout,
"ASWARM-WL-FANOUT-001",
),
coordination_input(
CoordinationPressureFamily::StaleInProgressReclaim,
"ASWARM-WL-STALE-001",
),
coordination_input(
CoordinationPressureFamily::CoordinationLatencyBurst,
"ASWARM-WL-LATENCY-001",
),
],
};
evidence
.validate()
.expect("coordination evidence validates");
}
#[test]
fn coordination_evidence_rejects_missing_semantics_and_unstable_hashes() {
let mut evidence = SchedulerCoordinationEvidenceInputs {
schema_version: SCHEDULER_COORDINATION_EVIDENCE_SCHEMA_VERSION.to_string(),
source_pack_id: "agent-swarm-coordination-pressure".to_string(),
source_bundle_hash: "sha256:coordination-runtime-fixture".to_string(),
source_run_id: "coordination-runtime-fixture-accepted-all-families".to_string(),
evidence_inputs: vec![coordination_input(
CoordinationPressureFamily::TrackerLockContention,
"ASWARM-WL-LOCK-001",
)],
};
evidence.evidence_inputs[0].semantic_pressure.clear();
assert_eq!(
evidence.validate(),
Err(SchedulerEvidenceError::EmptySemanticPressure)
);
evidence.evidence_inputs[0].semantic_pressure = vec!["ready-backlog".to_string()];
evidence.evidence_inputs[0].source_hashes = vec!["not-a-sha".to_string()];
assert_eq!(
evidence.validate(),
Err(SchedulerEvidenceError::InvalidSourceHash {
found: "not-a-sha".to_string(),
})
);
evidence.evidence_inputs.clear();
assert_eq!(
evidence.validate(),
Err(SchedulerEvidenceError::EmptyEvidenceInputSet)
);
}
}