//! Process management for async simulation processes
//!
//! This module provides support for async processes in the simulation.
//! Processes are coroutine-like entities that can yield control and wait
//! for events or time delays.
use crate::event::EventPayload;
use crate::time::SimTime;
use crate::types::{EventId, ProcessId};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
/// A process in the simulation
///
/// Processes are async tasks that can yield control and wait for events
/// or time delays. They are the primary way to model concurrent behavior
/// in the simulation.
pub trait Process: Future<Output = ()> + Send {
/// Get the name of this process
fn name(&self) -> &str;
}
/// Manages active processes and their wakers
///
/// The ProcessManager tracks all active processes in the simulation and
/// manages their execution state. It handles waking processes when events
/// occur or time advances.
pub struct ProcessManager {
/// Active processes indexed by their ID
processes: HashMap<ProcessId, Pin<Box<dyn Process>>>,
/// Wakers for processes waiting on events or time
wakers: HashMap<ProcessId, Waker>,
/// Next process ID to assign
next_id: u64,
/// Processes waiting for specific events
event_waiters: HashMap<EventId, Vec<ProcessId>>,
/// Processes waiting for time to advance
time_waiters: Vec<(ProcessId, SimTime)>,
}
impl ProcessManager {
/// Create a new process manager
pub fn new() -> Self {
Self {
processes: HashMap::new(),
wakers: HashMap::new(),
next_id: 0,
event_waiters: HashMap::new(),
time_waiters: Vec::new(),
}
}
/// Register a new process
///
/// Returns the unique ID assigned to the process.
pub fn register_process(&mut self, process: Pin<Box<dyn Process>>) -> ProcessId {
let id = ProcessId(self.next_id);
self.next_id += 1;
self.processes.insert(id, process);
id
}
/// Store a waker for a process
pub fn register_waker(&mut self, process_id: ProcessId, waker: Waker) {
self.wakers.insert(process_id, waker);
}
/// Register a process as waiting for a specific event
pub fn wait_for_event(&mut self, process_id: ProcessId, event_id: EventId) {
self.event_waiters
.entry(event_id)
.or_insert_with(Vec::new)
.push(process_id);
}
/// Register a process as waiting for a specific time
pub fn wait_for_time(&mut self, process_id: ProcessId, time: SimTime) {
self.time_waiters.push((process_id, time));
}
/// Wake processes waiting for a specific event
pub fn wake_event_waiters(&mut self, event_id: EventId) {
if let Some(waiters) = self.event_waiters.remove(&event_id) {
for process_id in waiters {
if let Some(waker) = self.wakers.remove(&process_id) {
waker.wake();
}
}
}
}
/// Wake processes waiting for time to advance to or past the given time
pub fn wake_time_waiters(&mut self, current_time: SimTime) {
// Find all processes whose wait time has been reached
let mut to_wake = Vec::new();
self.time_waiters.retain(|(process_id, wait_time)| {
if *wait_time <= current_time {
to_wake.push(*process_id);
false // Remove from waiters
} else {
true // Keep waiting
}
});
// Wake the processes
for process_id in to_wake {
if let Some(waker) = self.wakers.remove(&process_id) {
waker.wake();
}
}
}
/// Poll a specific process
pub fn poll_process(&mut self, process_id: ProcessId, cx: &mut Context<'_>) -> Option<Poll<()>> {
self.processes.get_mut(&process_id).map(|process| {
process.as_mut().poll(cx)
})
}
/// Remove a completed process
pub fn remove_process(&mut self, process_id: ProcessId) {
self.processes.remove(&process_id);
self.wakers.remove(&process_id);
}
/// Get the number of active processes
pub fn active_count(&self) -> usize {
self.processes.len()
}
/// Check if there are any active processes
pub fn has_active_processes(&self) -> bool {
!self.processes.is_empty()
}
/// Get the number of processes waiting for time (for testing)
#[cfg(test)]
pub(crate) fn time_waiters_count(&self) -> usize {
self.time_waiters.len()
}
/// Check if a process is waiting for an event (for testing)
#[cfg(test)]
pub(crate) fn has_event_waiter(&self, event_id: EventId) -> bool {
self.event_waiters.contains_key(&event_id)
}
}
impl Default for ProcessManager {
fn default() -> Self {
Self::new()
}
}
/// Future that completes after a delay
///
/// This is used internally by the `delay()` helper function.
pub struct Delay {
target_time: Option<SimTime>,
registered: bool,
}
impl Delay {
/// Create a new delay future
pub fn new() -> Self {
Self {
target_time: None,
registered: false,
}
}
/// Set the target time for this delay
pub fn set_target_time(&mut self, time: SimTime) {
self.target_time = Some(time);
}
/// Check if the delay has been registered
pub fn is_registered(&self) -> bool {
self.registered
}
/// Mark the delay as registered
pub fn mark_registered(&mut self) {
self.registered = true;
}
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
// This is a simplified implementation
// In a full integration, the Environment would check if target_time has been reached
if !self.registered {
self.registered = true;
// Store the waker - in practice, this would be registered with ProcessManager
Poll::Pending
} else {
Poll::Ready(())
}
}
}
/// Future that completes when a specific event occurs
///
/// This is used internally by the `wait_for_event()` helper function.
pub struct EventWaiter {
event_id: Option<EventId>,
registered: bool,
result: Option<EventPayload>,
}
impl EventWaiter {
/// Create a new event waiter
pub fn new() -> Self {
Self {
event_id: None,
registered: false,
result: None,
}
}
/// Set the event ID to wait for
pub fn set_event_id(&mut self, event_id: EventId) {
self.event_id = Some(event_id);
}
/// Set the result when the event occurs
pub fn set_result(&mut self, payload: EventPayload) {
self.result = Some(payload);
}
/// Check if the waiter has been registered
pub fn is_registered(&self) -> bool {
self.registered
}
/// Mark the waiter as registered
pub fn mark_registered(&mut self) {
self.registered = true;
}
}
impl Future for EventWaiter {
type Output = EventPayload;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(result) = self.result.take() {
Poll::Ready(result)
} else {
if !self.registered {
self.registered = true;
// Store the waker - in practice, this would be registered with ProcessManager
}
Poll::Pending
}
}
}
impl Default for EventWaiter {
fn default() -> Self {
Self::new()
}
}
impl Default for Delay {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
// Helper struct for testing
struct TestProcess {
name: String,
}
impl Future for TestProcess {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(())
}
}
impl Process for TestProcess {
fn name(&self) -> &str {
&self.name
}
}
#[test]
fn test_process_manager_creation() {
let manager = ProcessManager::new();
assert_eq!(manager.active_count(), 0);
assert!(!manager.has_active_processes());
}
#[test]
fn test_register_process() {
let mut manager = ProcessManager::new();
let process = TestProcess {
name: "test".to_string(),
};
let id = manager.register_process(Box::pin(process));
assert_eq!(id, ProcessId(0));
assert_eq!(manager.active_count(), 1);
assert!(manager.has_active_processes());
}
#[test]
fn test_multiple_processes() {
let mut manager = ProcessManager::new();
let id1 = manager.register_process(Box::pin(TestProcess {
name: "process1".to_string(),
}));
let id2 = manager.register_process(Box::pin(TestProcess {
name: "process2".to_string(),
}));
assert_eq!(id1, ProcessId(0));
assert_eq!(id2, ProcessId(1));
assert_eq!(manager.active_count(), 2);
}
#[test]
fn test_remove_process() {
let mut manager = ProcessManager::new();
let id = manager.register_process(Box::pin(TestProcess {
name: "test".to_string(),
}));
assert_eq!(manager.active_count(), 1);
manager.remove_process(id);
assert_eq!(manager.active_count(), 0);
assert!(!manager.has_active_processes());
}
#[test]
fn test_time_waiters() {
let mut manager = ProcessManager::new();
let id1 = ProcessId(0);
let id2 = ProcessId(1);
manager.wait_for_time(id1, SimTime::from_millis(100));
manager.wait_for_time(id2, SimTime::from_millis(200));
// Wake processes at time 150 - should wake id1 but not id2
manager.wake_time_waiters(SimTime::from_millis(150));
// id2 should still be waiting
assert_eq!(manager.time_waiters.len(), 1);
assert_eq!(manager.time_waiters[0].0, id2);
}
#[test]
fn test_event_waiters() {
let mut manager = ProcessManager::new();
let process_id = ProcessId(0);
let event_id = EventId(42);
manager.wait_for_event(process_id, event_id);
// Verify the waiter is registered
assert!(manager.event_waiters.contains_key(&event_id));
assert_eq!(manager.event_waiters[&event_id].len(), 1);
// Wake the waiters
manager.wake_event_waiters(event_id);
// Verify the waiter was removed
assert!(!manager.event_waiters.contains_key(&event_id));
}
#[test]
fn test_multiple_event_waiters() {
let mut manager = ProcessManager::new();
let event_id = EventId(42);
let id1 = ProcessId(0);
let id2 = ProcessId(1);
let id3 = ProcessId(2);
manager.wait_for_event(id1, event_id);
manager.wait_for_event(id2, event_id);
manager.wait_for_event(id3, event_id);
assert_eq!(manager.event_waiters[&event_id].len(), 3);
manager.wake_event_waiters(event_id);
assert!(!manager.event_waiters.contains_key(&event_id));
}
}
/// Async helper to delay execution for a specified duration
///
/// This function suspends the current process until the simulation time
/// advances by the specified duration.
///
/// # Arguments
///
/// * `_duration` - How long to delay (currently unused in this simplified implementation)
///
/// # Examples
///
/// ```ignore
/// use des_core::process::delay;
/// use std::time::Duration;
///
/// async fn my_process() {
/// // Do some work
/// delay(Duration::from_millis(100)).await;
/// // Continue after 100ms of simulation time
/// }
/// ```
pub async fn delay(_duration: Duration) {
let delay_future = Delay::new();
// In a full implementation, this would interact with the Environment
// to register the delay and get woken up when time advances
delay_future.await
}
/// Async helper to wait for a specific event to occur
///
/// This function suspends the current process until the specified event
/// is triggered in the simulation.
///
/// # Arguments
///
/// * `event_id` - The ID of the event to wait for
///
/// # Returns
///
/// The payload of the event when it occurs
///
/// # Examples
///
/// ```ignore
/// use des_core::process::wait_for_event;
/// use des_core::types::EventId;
///
/// async fn my_process() {
/// let event_id = EventId(42);
/// let payload = wait_for_event(event_id).await;
/// // Process the event payload
/// }
/// ```
pub async fn wait_for_event(event_id: EventId) -> EventPayload {
let mut waiter = EventWaiter::new();
waiter.set_event_id(event_id);
// In a full implementation, this would interact with the Environment
// to register the waiter and get woken up when the event occurs
waiter.await
}