asupersync 0.3.4

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
//! Work-stealing scheduler with 3-lane priority support.
//!
//! The scheduler uses three priority lanes:
//! 1. Cancel lane (highest) - tasks with pending cancellation
//! 2. Timed lane (EDF) - tasks with deadlines
//! 3. Ready lane (lowest) - all other ready tasks
//!
//! Multi-worker scheduling preserves this strict lane ordering while
//! supporting work-stealing for load balancing across workers.

pub mod autotuner;
pub mod content;
#[cfg(test)]
pub mod content_tests;
pub mod decision_contract;
#[cfg(test)]
pub mod edf_priority_metamorphic;
pub mod global_injector;
pub mod global_queue;
pub mod intrusive;
pub mod intrusive_heap;
pub mod invariant_monitor;
#[cfg(test)]
pub mod lane_pressure_scaling_metamorphic;
pub mod local_queue;
#[cfg(test)]
pub mod metamorphic_tests;
pub mod priority;
#[cfg(test)]
pub mod priority_inversion_metamorphic;
pub mod priority_inversion_oracle;
#[cfg(test)]
pub mod ready_dispatch_invariance_metamorphic;
#[cfg(test)]
pub mod shutdown_behavior_audit_test;
/// Trait abstraction over runtime backing-state shapes.
///
/// Tracks br-asupersync-30atgp.1.
pub mod state_backing;
pub mod stealing;
pub mod stream_priority;
/// Versioned swarm-evidence artifacts and offline tuning contracts.
pub mod swarm_evidence;
pub mod three_lane;
pub mod work_stealing_checker;
#[cfg(test)]
pub mod work_stealing_fairness_metamorphic;
pub mod worker;

pub use crate::runtime::config::SchedulerPlacementMode;
pub use autotuner::{
    AutotunerConfig, AutotunerRecommendation, HotPathObservation,
    SchedulerAdmissionControlThresholds, SchedulerAutotuner, SchedulerFeedbackClamp,
    SchedulerFeedbackClampReason, SchedulerFeedbackCurrentKnobs, SchedulerFeedbackEvidence,
    SchedulerFeedbackKnob, SchedulerFeedbackKnobSet, SchedulerFeedbackMetrics,
    SchedulerFeedbackPolicy, SchedulerFeedbackProtectedInvariants, SchedulerFeedbackReason,
    SchedulerFeedbackRecommendation, SchedulerFeedbackSignal, SchedulerFeedbackWorkloadClass,
    extract_observation, recommend_scheduler_feedback,
};
pub use content::{
    ContentId, ContentItem, ContentScheduler, PressureSnapshot, PriorityClass, ScheduleEvidence,
    ScheduleReason,
};
pub use global_injector::GlobalInjector;
pub use global_queue::GlobalQueue;
pub use intrusive::{IntrusiveRing, IntrusiveStack, QUEUE_TAG_CANCEL, QUEUE_TAG_READY};
pub use intrusive_heap::IntrusivePriorityHeap;
pub use invariant_monitor::{
    InvariantCategory, InvariantConfig, InvariantStats, InvariantViolation, QueueSnapshot,
    SchedulerInvariant, SchedulerInvariantMonitor, WorkerLoadSnapshot,
};
pub use local_queue::LocalQueue;
pub use priority::{
    DispatchLane, ScheduleCertificate, Scheduler as PriorityScheduler, SchedulerMode,
};
pub use priority_inversion_oracle::{
    InversionId, InversionImpact, InversionOracleConfig, InversionSeverity, InversionStats,
    InversionType, Priority, PriorityInversion, PriorityInversionOracle, ResourceId,
};
pub use stream_priority::{
    SchedulerIntegration, SchedulerStats, StreamAssignment, StreamPriority, StreamPriorityScheduler,
};
pub use swarm_evidence::{
    CoordinationPressureFamily, SCHEDULER_COORDINATION_EVIDENCE_SCHEMA_VERSION,
    SCHEDULER_EVIDENCE_SCHEMA_VERSION, SWARM_ADMISSION_POLICY_REPORT_SCHEMA_VERSION,
    SWARM_CAPACITY_SNAPSHOT_SCHEMA_VERSION, SWARM_MEMORY_BUDGET_PLAN_SCHEMA_VERSION,
    SWARM_MEMORY_RESIDENCY_POLICY_SCHEMA_VERSION, SchedulerCoordinationEvidenceInput,
    SchedulerCoordinationEvidenceInputs, SchedulerEvidenceArtifact, SchedulerEvidenceError,
    SchedulerEvidenceMetrics, SchedulerKnobProfile, SchedulerRecommendationReason,
    SchedulerTopologyDescriptor, SchedulerTuneReport, SchedulerWorkloadClass,
    SwarmAdmissionDecision, SwarmAdmissionLane, SwarmAdmissionReasonCode, SwarmAdmissionReport,
    SwarmCapacitySnapshot, SwarmCoordinationBacklogSignals, SwarmCpuTopologyHints,
    SwarmDiskCapacity, SwarmDiskPressureLevel, SwarmLaneAdmission, SwarmMemoryBrownoutClass,
    SwarmMemoryBudgetPlan, SwarmMemoryCapacity, SwarmMemoryHostTier, SwarmMemoryPressureTier,
    SwarmMemoryProtectedInvariant, SwarmMemoryResidencyDecision, SwarmMemoryResidencyEnvelope,
    SwarmMemoryResidencyFallbackReason, SwarmMemoryResidencyPlan, SwarmMemoryResidencyRequest,
    SwarmMemoryResidencyTier, SwarmMemoryResidencyWorkloadClass, SwarmRchAdmissibility,
    SwarmRchCapacity, SwarmValidationClass,
};
pub use three_lane::{ThreeLaneScheduler, ThreeLaneWorker};
pub use work_stealing_checker::{
    OwnershipState, StealingStats, ViolationType, WorkStealingChecker,
};
pub use worker::{Parker, Worker};

use crate::types::TaskId;

/// Work-stealing scheduler coordinator.
#[derive(Debug)]
pub struct WorkStealingScheduler {
    inner: ThreeLaneScheduler,
}

impl WorkStealingScheduler {
    /// Creates a new scheduler with the given number of workers.
    ///
    /// This also creates the workers and their local queues.
    pub fn new(
        worker_count: usize,
        state: &std::sync::Arc<crate::sync::ContendedMutex<crate::runtime::RuntimeState>>,
    ) -> Self {
        // A zero-worker scheduler cannot make progress; clamp to one worker
        // to preserve forward progress for direct users of this API.
        let worker_count = worker_count.max(1);
        Self {
            inner: ThreeLaneScheduler::new(worker_count, state),
        }
    }

    /// Spawns a task.
    ///
    /// If called from a worker thread, it should push to the local queue.
    /// Otherwise, it pushes to the global queue.
    ///
    /// For Phase 1 initial implementation, we always push to global queue
    /// to avoid TLS complexity for now.
    pub fn spawn(&self, task: TaskId) {
        self.inner.spawn(task, 0);
    }

    /// Wakes a task.
    pub fn wake(&self, task: TaskId) {
        self.inner.wake(task, 0);
    }

    /// Extract workers to run them in threads.
    pub fn take_workers(&mut self) -> Vec<ThreeLaneWorker> {
        self.inner.take_workers()
    }

    /// Signals all workers to shutdown.
    pub fn shutdown(&self) {
        self.inner.shutdown();
    }
}

// Preserve backward compatibility for Phase 0
pub use priority::Scheduler;

#[cfg(test)]
mod tests {
    #![allow(
        clippy::pedantic,
        clippy::nursery,
        clippy::expect_fun_call,
        clippy::map_unwrap_or,
        clippy::cast_possible_wrap,
        clippy::future_not_send
    )]
    use super::*;
    use crate::runtime::RuntimeState;
    use crate::sync::ContendedMutex;
    use std::sync::Arc;
    use std::time::Duration;

    #[test]
    fn test_worker_shutdown() {
        // Create state
        let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));

        // Create scheduler with 2 workers
        let mut scheduler = WorkStealingScheduler::new(2, &state);

        // Take workers
        let workers = scheduler.take_workers();
        assert_eq!(workers.len(), 2);

        // Spawn threads for workers
        let handles: Vec<_> = workers
            .into_iter()
            .map(|mut worker| {
                std::thread::spawn(move || {
                    worker.run_loop();
                })
            })
            .collect();

        // Let them run briefly (they will park immediately as there is no work)
        std::thread::sleep(Duration::from_millis(10));

        // Signal shutdown
        scheduler.shutdown();

        // Join threads (this will hang if shutdown logic is broken)
        for handle in handles {
            handle.join().unwrap();
        }

        // If we reach here, shutdown worked!
    }

    #[test]
    fn test_zero_worker_count_is_clamped_to_one() {
        let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
        let mut scheduler = WorkStealingScheduler::new(0, &state);
        let workers = scheduler.take_workers();
        assert_eq!(
            workers.len(),
            1,
            "scheduler must clamp zero workers to one for forward progress"
        );
    }
}