use std::sync::{Arc, RwLock, Mutex, Condvar};
use std::thread::ThreadId;
use std::collections::{HashMap, VecDeque, BTreeMap};
use std::time::{SystemTime, Duration};
use std::sync::atomic::{AtomicU64, Ordering};
use crossbeam::channel::{Sender, Receiver, unbounded};
#[derive(Debug)]
pub struct IOCoordinator {
active_operations: Arc<RwLock<HashMap<ThreadId, Vec<IOOperation>>>>,
resource_locks: Arc<RwLock<HashMap<String, IOResourceLock>>>,
ordering_manager: Arc<IOOrderingManager>,
coordination_channels: Arc<RwLock<HashMap<ThreadId, IOChannel>>>,
operation_history: Arc<Mutex<VecDeque<IOOperationEvent>>>,
policies: Arc<IOCoordinationPolicies>,
}
#[derive(Debug)]
pub struct IOOrderingManager {
sequence_counter: AtomicU64,
pending_operations: Arc<RwLock<BTreeMap<u64, PendingIOOperation>>>,
#[allow(dead_code)] resource_queues: Arc<RwLock<HashMap<String, VecDeque<u64>>>>,
completion_notifier: Arc<(Mutex<HashMap<u64, bool>>, Condvar)>,
}
#[derive(Debug, Clone)]
pub struct PendingIOOperation {
pub sequence: u64,
pub thread_id: ThreadId,
pub operation_type: IOOperationType,
pub resource: String,
pub parameters: IOParameters,
#[allow(dead_code)] pub dependencies: Vec<u64>,
pub submitted_at: SystemTime,
}
#[derive(Debug)]
pub struct IOChannel {
#[allow(dead_code)] pub sender: Sender<IOCoordinationMessage>,
#[allow(dead_code)] pub receiver: Receiver<IOCoordinationMessage>,
}
#[derive(Debug)]
pub struct IOResourceLock {
#[allow(dead_code)] pub resource_id: String,
pub holder: ThreadId,
pub lock_type: LockType,
pub acquired_at: SystemTime,
#[allow(dead_code)] pub timeout: Duration,
pub wait_queue: VecDeque<(ThreadId, LockType, SystemTime)>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum LockType {
Read,
Write,
}
#[derive(Debug, Clone)]
pub struct IOOperation {
pub id: u64,
pub operation_type: IOOperationType,
pub resource: String,
pub parameters: IOParameters,
pub started_at: SystemTime,
pub status: IOOperationStatus,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum IOOperationType {
FileRead,
FileWrite,
FileOpen,
FileClose,
Directory,
ConsoleOutput,
ConsoleInput,
Network,
}
#[derive(Debug, Clone, Default)]
pub struct IOParameters {
pub file_path: Option<String>,
pub data: Option<Vec<u8>>,
pub offset: Option<u64>,
pub length: Option<usize>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum IOOperationStatus {
Pending,
InProgress,
Completed,
Failed(String),
Cancelled,
}
#[derive(Debug, Clone)]
pub struct IOOperationEvent {
pub thread_id: ThreadId,
pub timestamp: SystemTime,
pub operation: IOOperation,
pub result: Result<IOResult, String>,
pub sequence: u64,
}
#[derive(Debug, Clone)]
pub struct IOResult {
pub data: Option<Vec<u8>>,
pub bytes_processed: usize,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum IOCoordinationMessage {
#[allow(dead_code)] CoordinateIO {
operation: IOOperation,
dependencies: Vec<u64>,
},
#[allow(dead_code)] CoordinationResponse {
operation_id: u64,
success: bool,
error: Option<String>,
},
#[allow(dead_code)] IOCompleted {
operation_id: u64,
result: Result<IOResult, String>,
},
#[allow(dead_code)] RequestLock {
resource: String,
lock_type: LockType,
timeout: Duration,
},
#[allow(dead_code)] LockResponse {
resource: String,
granted: bool,
error: Option<String>,
},
#[allow(dead_code)] ReleaseLock {
resource: String,
},
}
#[derive(Debug)]
pub struct IOCoordinationPolicies {
track_history: bool,
max_history_size: usize,
#[allow(dead_code)] default_operation_timeout: Duration,
default_lock_timeout: Duration,
#[allow(dead_code)] enforce_strict_ordering: bool,
allow_concurrent_reads: bool,
max_concurrent_operations_per_thread: usize,
}
impl IOCoordinator {
pub fn new() -> Self {
Self {
active_operations: Arc::new(RwLock::new(HashMap::new())),
resource_locks: Arc::new(RwLock::new(HashMap::new())),
ordering_manager: Arc::new(IOOrderingManager::new()),
coordination_channels: Arc::new(RwLock::new(HashMap::new())),
operation_history: Arc::new(Mutex::new(VecDeque::new())),
policies: Arc::new(IOCoordinationPolicies::default()),
}
}
pub fn with_policies(policies: IOCoordinationPolicies) -> Self {
Self {
active_operations: Arc::new(RwLock::new(HashMap::new())),
resource_locks: Arc::new(RwLock::new(HashMap::new())),
ordering_manager: Arc::new(IOOrderingManager::new()),
coordination_channels: Arc::new(RwLock::new(HashMap::new())),
operation_history: Arc::new(Mutex::new(VecDeque::new())),
policies: Arc::new(policies),
}
}
pub fn register_thread(&self, thread_id: ThreadId) {
{
let mut operations = self.active_operations.write().unwrap();
operations.insert(thread_id, Vec::new());
}
let (sender, receiver) = unbounded();
let channel = IOChannel { sender, receiver };
let mut channels = self.coordination_channels.write().unwrap();
channels.insert(thread_id, channel);
}
pub fn unregister_thread(&self, thread_id: ThreadId) {
{
let mut operations = self.active_operations.write().unwrap();
operations.remove(&thread_id);
}
{
let mut channels = self.coordination_channels.write().unwrap();
channels.remove(&thread_id);
}
self.release_all_locks_for_thread(thread_id);
self.ordering_manager.cancel_operations_for_thread(thread_id);
}
pub fn coordinate_io_operation(
&self,
thread_id: ThreadId,
operation_type: IOOperationType,
resource: String,
parameters: IOParameters,
) -> Result<u64, String> {
{
let operations = self.active_operations.read().unwrap();
if let Some(thread_ops) = operations.get(&thread_id) {
if thread_ops.len() >= self.policies.max_concurrent_operations_per_thread {
return Err("Thread has reached maximum concurrent IO operations limit".to_string());
}
}
}
let operation_id = self.ordering_manager.next_sequence();
let operation = IOOperation {
id: operation_id,
operation_type: operation_type.clone(),
resource: resource.clone(),
parameters,
started_at: SystemTime::now(),
status: IOOperationStatus::Pending,
};
let dependencies = self.ordering_manager.compute_dependencies(&operation)?;
let lock_type = match operation_type {
IOOperationType::FileRead | IOOperationType::ConsoleInput => LockType::Read,
IOOperationType::FileWrite | IOOperationType::FileOpen |
IOOperationType::FileClose | IOOperationType::ConsoleOutput => LockType::Write,
_ => LockType::Read,
};
self.request_resource_lock(&resource, lock_type, thread_id)?;
let pending_operation = PendingIOOperation {
sequence: operation_id,
thread_id,
operation_type,
resource: resource.clone(),
parameters: operation.parameters.clone(),
dependencies,
submitted_at: SystemTime::now(),
};
self.ordering_manager.add_pending_operation(pending_operation)?;
{
let mut operations = self.active_operations.write().unwrap();
if let Some(thread_ops) = operations.get_mut(&thread_id) {
thread_ops.push(operation.clone());
}
}
if self.policies.track_history {
self.record_operation_event(thread_id, operation, Ok(IOResult {
data: None,
bytes_processed: 0,
metadata: HashMap::new(),
}), operation_id);
}
Ok(operation_id)
}
pub fn complete_io_operation(
&self,
operation_id: u64,
result: Result<IOResult, String>,
) -> Result<(), String> {
let pending_op = self.ordering_manager.complete_operation(operation_id)?;
self.release_resource_lock(&pending_op.resource, pending_op.thread_id);
{
let mut operations = self.active_operations.write().unwrap();
if let Some(thread_ops) = operations.get_mut(&pending_op.thread_id) {
thread_ops.retain(|op| op.id != operation_id);
}
}
self.ordering_manager.notify_operation_completion(operation_id);
if self.policies.track_history {
let operation = IOOperation {
id: operation_id,
operation_type: pending_op.operation_type,
resource: pending_op.resource,
parameters: pending_op.parameters,
started_at: pending_op.submitted_at,
status: match result {
Ok(_) => IOOperationStatus::Completed,
Err(_) => IOOperationStatus::Failed("Operation failed".to_string()),
},
};
self.record_operation_event(pending_op.thread_id, operation, result, operation_id);
}
Ok(())
}
fn request_resource_lock(
&self,
resource: &str,
lock_type: LockType,
thread_id: ThreadId,
) -> Result<(), String> {
let mut locks = self.resource_locks.write().unwrap();
if let Some(existing_lock) = locks.get_mut(resource) {
match (&existing_lock.lock_type, &lock_type) {
(LockType::Read, LockType::Read) if self.policies.allow_concurrent_reads => {
return Ok(());
}
_ => {
existing_lock.wait_queue.push_back((thread_id, lock_type, SystemTime::now()));
return Err(format!("Resource {resource} is locked, added to wait queue"));
}
}
} else {
let lock = IOResourceLock {
resource_id: resource.to_string(),
holder: thread_id,
lock_type,
acquired_at: SystemTime::now(),
timeout: self.policies.default_lock_timeout,
wait_queue: VecDeque::new(),
};
locks.insert(resource.to_string(), lock);
}
Ok(())
}
fn release_resource_lock(&self, resource: &str, thread_id: ThreadId) {
let mut locks = self.resource_locks.write().unwrap();
if let Some(lock) = locks.get_mut(resource) {
if lock.holder == thread_id {
if let Some((next_thread, next_lock_type, _)) = lock.wait_queue.pop_front() {
lock.holder = next_thread;
lock.lock_type = next_lock_type;
lock.acquired_at = SystemTime::now();
} else {
locks.remove(resource);
}
}
}
}
fn release_all_locks_for_thread(&self, thread_id: ThreadId) {
let mut locks = self.resource_locks.write().unwrap();
let resources_to_release: Vec<String> = locks
.iter()
.filter(|(_, lock)| lock.holder == thread_id)
.map(|(resource, _)| resource.clone())
.collect();
for resource in resources_to_release {
if let Some(mut lock) = locks.remove(&resource) {
if let Some((next_thread, next_lock_type, _)) = lock.wait_queue.pop_front() {
lock.holder = next_thread;
lock.lock_type = next_lock_type;
lock.acquired_at = SystemTime::now();
locks.insert(resource, lock);
}
}
}
}
fn record_operation_event(
&self,
thread_id: ThreadId,
operation: IOOperation,
result: Result<IOResult, String>,
sequence: u64,
) {
let event = IOOperationEvent {
thread_id,
timestamp: SystemTime::now(),
operation,
result,
sequence,
};
let mut history = self.operation_history.lock().unwrap();
history.push_back(event);
if history.len() > self.policies.max_history_size {
history.pop_front();
}
}
pub fn get_io_statistics(&self) -> IOStatistics {
let operations = self.active_operations.read().unwrap();
let locks = self.resource_locks.read().unwrap();
let history = self.operation_history.lock().unwrap();
let mut stats = IOStatistics {
active_threads: operations.len(),
total_active_operations: 0,
operations_by_type: HashMap::new(),
active_locks: locks.len(),
total_historical_operations: history.len(),
recent_operations: 0,
};
for thread_ops in operations.values() {
stats.total_active_operations += thread_ops.len();
for op in thread_ops {
*stats.operations_by_type.entry(op.operation_type.clone()).or_insert(0) += 1;
}
}
let one_minute_ago = SystemTime::now() - Duration::from_secs(60);
stats.recent_operations = history
.iter()
.filter(|event| event.timestamp > one_minute_ago)
.count();
stats
}
pub fn get_operation_history(&self) -> Vec<IOOperationEvent> {
let history = self.operation_history.lock().unwrap();
history.iter().cloned().collect()
}
pub fn clear_operation_history(&self) {
let mut history = self.operation_history.lock().unwrap();
history.clear();
}
}
#[derive(Debug, Clone)]
pub struct IOStatistics {
pub active_threads: usize,
pub total_active_operations: usize,
pub operations_by_type: HashMap<IOOperationType, usize>,
pub active_locks: usize,
pub total_historical_operations: usize,
pub recent_operations: usize,
}
impl IOOrderingManager {
pub fn new() -> Self {
Self {
sequence_counter: AtomicU64::new(0),
pending_operations: Arc::new(RwLock::new(BTreeMap::new())),
resource_queues: Arc::new(RwLock::new(HashMap::new())),
completion_notifier: Arc::new((Mutex::new(HashMap::new()), Condvar::new())),
}
}
pub fn next_sequence(&self) -> u64 {
self.sequence_counter.fetch_add(1, Ordering::SeqCst) + 1
}
pub fn compute_dependencies(&self, operation: &IOOperation) -> Result<Vec<u64>, String> {
let pending = self.pending_operations.read().unwrap();
let mut dependencies = Vec::new();
for (seq, pending_op) in pending.iter() {
if pending_op.resource == operation.resource && *seq < operation.id {
match (&pending_op.operation_type, &operation.operation_type) {
(_, IOOperationType::FileWrite) |
(_, IOOperationType::FileOpen) |
(_, IOOperationType::FileClose) |
(IOOperationType::FileWrite, _) |
(IOOperationType::FileOpen, _) |
(IOOperationType::FileClose, _) => {
dependencies.push(*seq);
}
_ => {
}
}
}
}
Ok(dependencies)
}
pub fn add_pending_operation(&self, operation: PendingIOOperation) -> Result<(), String> {
let mut pending = self.pending_operations.write().unwrap();
pending.insert(operation.sequence, operation);
Ok(())
}
pub fn complete_operation(&self, sequence: u64) -> Result<PendingIOOperation, String> {
let mut pending = self.pending_operations.write().unwrap();
pending.remove(&sequence).ok_or_else(|| {
format!("Operation {sequence} not found in pending operations")
})
}
pub fn notify_operation_completion(&self, sequence: u64) {
let (lock, cvar) = &*self.completion_notifier;
let mut completed = lock.lock().unwrap();
completed.insert(sequence, true);
cvar.notify_all();
}
pub fn cancel_operations_for_thread(&self, thread_id: ThreadId) {
let mut pending = self.pending_operations.write().unwrap();
pending.retain(|_, op| op.thread_id != thread_id);
}
}
impl Default for IOCoordinationPolicies {
fn default() -> Self {
Self {
track_history: true,
max_history_size: 1000,
default_operation_timeout: Duration::from_secs(30),
default_lock_timeout: Duration::from_secs(10),
enforce_strict_ordering: true,
allow_concurrent_reads: true,
max_concurrent_operations_per_thread: 10,
}
}
}
impl IOCoordinationPolicies {
pub fn minimal() -> Self {
Self {
track_history: false,
max_history_size: 100,
default_operation_timeout: Duration::from_secs(5),
default_lock_timeout: Duration::from_secs(1),
enforce_strict_ordering: false,
allow_concurrent_reads: true,
max_concurrent_operations_per_thread: 5,
}
}
pub fn high_throughput() -> Self {
Self {
track_history: false,
max_history_size: 500,
default_operation_timeout: Duration::from_secs(60),
default_lock_timeout: Duration::from_secs(5),
enforce_strict_ordering: false,
allow_concurrent_reads: true,
max_concurrent_operations_per_thread: 50,
}
}
}
impl Clone for IOChannel {
fn clone(&self) -> Self {
let (sender, receiver) = unbounded();
Self { sender, receiver }
}
}
impl Default for IOCoordinator {
fn default() -> Self {
Self::new()
}
}
impl Clone for IOCoordinator {
fn clone(&self) -> Self {
Self {
active_operations: self.active_operations.clone(),
resource_locks: self.resource_locks.clone(),
ordering_manager: self.ordering_manager.clone(),
coordination_channels: self.coordination_channels.clone(),
operation_history: self.operation_history.clone(),
policies: self.policies.clone(),
}
}
}