use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Priority {
Critical = 3,
High = 2,
Normal = 1,
Low = 0,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum LatencyTier {
Hot,
Warm,
Cold,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ResourceCost {
Minimal,
Light,
Moderate,
Heavy,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct SchedulingHint {
pub priority: Priority,
pub latency_tier: LatencyTier,
pub resource_cost: ResourceCost,
}
impl SchedulingHint {
pub fn new(priority: Priority, latency_tier: LatencyTier, resource_cost: ResourceCost) -> Self {
Self {
priority,
latency_tier,
resource_cost,
}
}
pub fn sort_key(&self) -> (u8, u8, u8) {
(
self.priority as u8,
match self.latency_tier {
LatencyTier::Hot => 2,
LatencyTier::Warm => 1,
LatencyTier::Cold => 0,
},
match self.resource_cost {
ResourceCost::Minimal => 0,
ResourceCost::Light => 1,
ResourceCost::Moderate => 2,
ResourceCost::Heavy => 3,
},
)
}
}
#[derive(Debug, Clone)]
pub struct SnapshotDescriptor {
pub version: u64,
pub snapshot_id: String,
}
pub struct SnapshotCell {
descriptor: Arc<SnapshotDescriptor>,
staging: Option<Arc<SnapshotDescriptor>>,
}
impl SnapshotCell {
pub fn new(snapshot_id: impl Into<String>) -> Self {
Self {
descriptor: Arc::new(SnapshotDescriptor {
version: 0,
snapshot_id: snapshot_id.into(),
}),
staging: None,
}
}
pub fn current(&self) -> Arc<SnapshotDescriptor> {
Arc::clone(&self.descriptor)
}
pub fn stage_next(&mut self, new_snapshot_id: impl Into<String>) {
self.staging = Some(Arc::new(SnapshotDescriptor {
version: self.descriptor.version + 1,
snapshot_id: new_snapshot_id.into(),
}));
}
pub fn commit_staged(&mut self) -> Result<(), String> {
match self.staging.take() {
Some(next) => {
self.descriptor = next;
Ok(())
}
None => Err("No staged snapshot to commit".to_string()),
}
}
pub fn version(&self) -> u64 {
self.descriptor.version
}
}
impl Clone for SnapshotCell {
fn clone(&self) -> Self {
Self {
descriptor: Arc::clone(&self.descriptor),
staging: self.staging.as_ref().map(Arc::clone),
}
}
}
pub trait ConflictFree: Clone {
fn try_merge(&self, other: &Self) -> Result<Self, String>;
fn is_commutative_with(&self, _other: &Self) -> bool {
true }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommutativeProposal {
pub id: String,
pub action: String,
pub priority: Priority,
}
impl ConflictFree for CommutativeProposal {
fn try_merge(&self, other: &Self) -> Result<Self, String> {
Ok(CommutativeProposal {
id: format!("{}-{}", self.id, other.id),
action: format!("{}|{}", self.action, other.action),
priority: std::cmp::max(self.priority, other.priority),
})
}
fn is_commutative_with(&self, _other: &Self) -> bool {
true
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConditionalProposal {
pub id: String,
pub action: String,
pub resource: String, pub priority: Priority,
}
impl ConflictFree for ConditionalProposal {
fn try_merge(&self, other: &Self) -> Result<Self, String> {
if self.resource != other.resource {
Ok(ConditionalProposal {
id: format!("{}-{}", self.id, other.id),
action: format!("{}|{}", self.action, other.action),
resource: "mixed".to_string(),
priority: std::cmp::max(self.priority, other.priority),
})
} else {
Err(format!(
"Conflict: both proposals touch resource {}",
self.resource
))
}
}
fn is_commutative_with(&self, other: &Self) -> bool {
self.resource != other.resource
}
}
pub struct ProposalAggregator;
impl ProposalAggregator {
pub fn aggregate_commutative(
proposals: Vec<CommutativeProposal>,
) -> Result<CommutativeProposal, String> {
if proposals.is_empty() {
return Err("No proposals to aggregate".to_string());
}
let mut result = proposals[0].clone();
for proposal in &proposals[1..] {
result = result.try_merge(proposal)?;
}
Ok(result)
}
pub fn aggregate_conditional(
proposals: Vec<ConditionalProposal>,
) -> (Option<ConditionalProposal>, Vec<ConditionalProposal>) {
let mut merged: Option<ConditionalProposal> = None;
let mut failed = Vec::new();
for proposal in proposals {
match &merged {
None => merged = Some(proposal),
Some(existing) => match existing.try_merge(&proposal) {
Ok(combined) => merged = Some(combined),
Err(_) => failed.push(proposal), },
}
}
(merged, failed)
}
}
#[derive(Debug, Clone)]
pub struct ScheduledTask {
pub task_id: String,
pub hint: SchedulingHint,
pub description: String,
}
impl ScheduledTask {
pub fn new(
task_id: impl Into<String>, hint: SchedulingHint, description: impl Into<String>,
) -> Self {
Self {
task_id: task_id.into(),
hint,
description: description.into(),
}
}
}
pub struct PriorityScheduler {
critical_queue: Vec<ScheduledTask>,
high_queue: Vec<ScheduledTask>,
normal_queue: Vec<ScheduledTask>,
low_queue: Vec<ScheduledTask>,
}
impl PriorityScheduler {
pub fn new() -> Self {
Self {
critical_queue: Vec::new(),
high_queue: Vec::new(),
normal_queue: Vec::new(),
low_queue: Vec::new(),
}
}
pub fn enqueue(&mut self, task: ScheduledTask) {
match task.hint.priority {
Priority::Critical => self.critical_queue.push(task),
Priority::High => self.high_queue.push(task),
Priority::Normal => self.normal_queue.push(task),
Priority::Low => self.low_queue.push(task),
}
}
pub fn dequeue(&mut self, allow_starvation: bool) -> Option<ScheduledTask> {
if !self.critical_queue.is_empty() {
return Some(self.critical_queue.remove(0));
}
if !self.high_queue.is_empty() {
return Some(self.high_queue.remove(0));
}
if !self.normal_queue.is_empty() {
return Some(self.normal_queue.remove(0));
}
if allow_starvation || self.high_queue.is_empty() {
if !self.low_queue.is_empty() {
return Some(self.low_queue.remove(0));
}
}
None
}
pub fn queue_depth(&self) -> usize {
self.critical_queue.len()
+ self.high_queue.len()
+ self.normal_queue.len()
+ self.low_queue.len()
}
}
impl Default for PriorityScheduler {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmMetrics {
pub total_tasks_processed: u64,
pub total_merges_successful: u64,
pub total_conflicts: u64,
pub avg_queue_depth: f64,
}
impl SwarmMetrics {
pub fn new() -> Self {
Self {
total_tasks_processed: 0,
total_merges_successful: 0,
total_conflicts: 0,
avg_queue_depth: 0.0,
}
}
pub fn conflict_ratio(&self) -> f64 {
if self.total_merges_successful == 0 {
return 0.0;
}
self.total_conflicts as f64 / (self.total_merges_successful + self.total_conflicts) as f64
}
}
impl Default for SwarmMetrics {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scheduling_hint_sort_key() {
let hint1 =
SchedulingHint::new(Priority::Critical, LatencyTier::Hot, ResourceCost::Minimal);
let hint2 = SchedulingHint::new(Priority::Low, LatencyTier::Cold, ResourceCost::Heavy);
let key1 = hint1.sort_key();
let key2 = hint2.sort_key();
assert!(key1 > key2); }
#[test]
fn test_snapshot_cell_lock_free() {
let cell = SnapshotCell::new("snap-1");
let snap1 = cell.current();
assert_eq!(snap1.version, 0);
assert_eq!(snap1.snapshot_id, "snap-1");
let snap1_copy = cell.current();
assert_eq!(snap1_copy.version, snap1.version);
}
#[test]
fn test_snapshot_staging_and_commit() {
let mut cell = SnapshotCell::new("snap-1");
cell.stage_next("snap-2");
assert_eq!(cell.current().version, 0);
assert!(cell.commit_staged().is_ok());
assert_eq!(cell.current().version, 1);
assert_eq!(cell.current().snapshot_id, "snap-2");
}
#[test]
fn test_commutative_proposal_merge() {
let p1 = CommutativeProposal {
id: "p1".to_string(),
action: "action1".to_string(),
priority: Priority::High,
};
let p2 = CommutativeProposal {
id: "p2".to_string(),
action: "action2".to_string(),
priority: Priority::Normal,
};
let merged = p1.try_merge(&p2);
assert!(merged.is_ok());
let merged = merged.unwrap();
assert_eq!(merged.priority, Priority::High); }
#[test]
fn test_conditional_proposal_merge() {
let p1 = ConditionalProposal {
id: "p1".to_string(),
action: "read".to_string(),
resource: "ontology_v1".to_string(),
priority: Priority::Normal,
};
let p2 = ConditionalProposal {
id: "p2".to_string(),
action: "read".to_string(),
resource: "marketplace_v1".to_string(),
priority: Priority::Normal,
};
let merged = p1.try_merge(&p2);
assert!(merged.is_ok());
let p3 = ConditionalProposal {
id: "p3".to_string(),
action: "write".to_string(),
resource: "ontology_v1".to_string(),
priority: Priority::High,
};
let conflict = p1.try_merge(&p3);
assert!(conflict.is_err());
}
#[test]
fn test_priority_scheduler() {
let mut scheduler = PriorityScheduler::new();
scheduler.enqueue(ScheduledTask::new(
"task-low",
SchedulingHint::new(Priority::Low, LatencyTier::Cold, ResourceCost::Light),
"Low priority",
));
scheduler.enqueue(ScheduledTask::new(
"task-critical",
SchedulingHint::new(Priority::Critical, LatencyTier::Hot, ResourceCost::Minimal),
"Critical",
));
scheduler.enqueue(ScheduledTask::new(
"task-normal",
SchedulingHint::new(Priority::Normal, LatencyTier::Warm, ResourceCost::Moderate),
"Normal",
));
let first = scheduler.dequeue(false).unwrap();
assert_eq!(first.task_id, "task-critical");
let second = scheduler.dequeue(false).unwrap();
assert_eq!(second.task_id, "task-normal");
let third = scheduler.dequeue(false).unwrap();
assert_eq!(third.task_id, "task-low");
}
#[test]
fn test_aggregator_commutative() {
let proposals = vec![
CommutativeProposal {
id: "p1".to_string(),
action: "action1".to_string(),
priority: Priority::High,
},
CommutativeProposal {
id: "p2".to_string(),
action: "action2".to_string(),
priority: Priority::Normal,
},
];
let merged = ProposalAggregator::aggregate_commutative(proposals);
assert!(merged.is_ok());
}
#[test]
fn test_swarm_metrics() {
let mut metrics = SwarmMetrics::new();
metrics.total_merges_successful = 100;
metrics.total_conflicts = 10;
let ratio = metrics.conflict_ratio();
assert!(ratio > 0.0 && ratio < 1.0);
}
}