1use std::collections::HashMap;
19use std::time::{Duration, Instant};
20
21use serde::{Deserialize, Serialize};
22use tokio::sync::{RwLock, broadcast};
23
24pub struct ContractNetManager {
26 announcements: RwLock<HashMap<String, TaskAnnouncement>>,
28 bids: RwLock<HashMap<String, Vec<TaskBid>>>,
30 awarded: RwLock<HashMap<String, AwardedContract>>,
32 broadcast_tx: broadcast::Sender<ContractMessage>,
34 evaluation_strategy: BidEvaluationStrategy,
36 next_task_id: RwLock<u64>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct TaskAnnouncement {
43 pub task_id: String,
45 pub description: String,
47 pub requirements: TaskRequirements,
49 #[serde(skip, default)]
51 pub deadline: Option<Instant>,
52 #[serde(skip, default = "Instant::now")]
54 pub bid_deadline: Instant,
55 pub announcer: String,
57 #[serde(skip, default = "Instant::now")]
59 pub announced_at: Instant,
60}
61
62impl TaskAnnouncement {
63 pub fn new(
65 task_id: impl Into<String>,
66 description: impl Into<String>,
67 announcer: impl Into<String>,
68 bid_deadline: Instant,
69 ) -> Self {
70 Self {
71 task_id: task_id.into(),
72 description: description.into(),
73 requirements: TaskRequirements::default(),
74 deadline: None,
75 bid_deadline,
76 announcer: announcer.into(),
77 announced_at: Instant::now(),
78 }
79 }
80
81 pub fn with_requirements(mut self, requirements: TaskRequirements) -> Self {
83 self.requirements = requirements;
84 self
85 }
86
87 pub fn with_deadline(mut self, deadline: Instant) -> Self {
89 self.deadline = Some(deadline);
90 self
91 }
92
93 pub fn is_bidding_open(&self) -> bool {
95 Instant::now() < self.bid_deadline
96 }
97
98 pub fn time_remaining(&self) -> Duration {
100 self.bid_deadline.saturating_duration_since(Instant::now())
101 }
102}
103
104#[derive(Debug, Clone, Default, Serialize, Deserialize)]
106pub struct TaskRequirements {
107 pub capabilities: Vec<String>,
109 pub resources_needed: Vec<String>,
111 pub complexity: u8,
113 pub priority: u8,
115 pub min_capability_score: f32,
117}
118
119impl TaskRequirements {
120 pub fn new() -> Self {
122 Self::default()
123 }
124
125 pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
127 self.capabilities = capabilities;
128 self
129 }
130
131 pub fn with_complexity(mut self, complexity: u8) -> Self {
133 self.complexity = complexity.min(10);
134 self
135 }
136
137 pub fn with_priority(mut self, priority: u8) -> Self {
139 self.priority = priority;
140 self
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct TaskBid {
147 pub agent_id: String,
149 pub task_id: String,
151 pub capability_score: f32,
153 pub current_load: f32,
155 #[serde(skip, default = "default_duration")]
157 pub estimated_duration: Duration,
158 pub conditions: Vec<String>,
160 #[serde(skip, default = "Instant::now")]
162 pub submitted_at: Instant,
163}
164
165fn default_duration() -> Duration {
167 Duration::from_secs(60)
168}
169
170impl TaskBid {
171 pub fn new(agent_id: impl Into<String>, task_id: impl Into<String>) -> Self {
173 Self {
174 agent_id: agent_id.into(),
175 task_id: task_id.into(),
176 capability_score: 0.0,
177 current_load: 0.0,
178 estimated_duration: Duration::from_secs(60),
179 conditions: Vec::new(),
180 submitted_at: Instant::now(),
181 }
182 }
183
184 pub fn with_capability_score(mut self, score: f32) -> Self {
186 self.capability_score = score.clamp(0.0, 1.0);
187 self
188 }
189
190 pub fn with_load(mut self, load: f32) -> Self {
192 self.current_load = load.clamp(0.0, 1.0);
193 self
194 }
195
196 pub fn with_duration(mut self, duration: Duration) -> Self {
198 self.estimated_duration = duration;
199 self
200 }
201
202 pub fn with_condition(mut self, condition: impl Into<String>) -> Self {
204 self.conditions.push(condition.into());
205 self
206 }
207
208 pub fn score(&self) -> f32 {
210 let availability = 1.0 - self.current_load;
215 let speed = 1.0 / (1.0 + self.estimated_duration.as_secs_f32() / 60.0);
216
217 0.4 * self.capability_score + 0.3 * availability + 0.3 * speed
218 }
219
220 pub fn score_weighted(
222 &self,
223 capability_weight: f32,
224 availability_weight: f32,
225 speed_weight: f32,
226 ) -> f32 {
227 let total_weight = capability_weight + availability_weight + speed_weight;
228 if total_weight == 0.0 {
229 return 0.0;
230 }
231
232 let availability = 1.0 - self.current_load;
233 let speed = 1.0 / (1.0 + self.estimated_duration.as_secs_f32() / 60.0);
234
235 (capability_weight * self.capability_score
236 + availability_weight * availability
237 + speed_weight * speed)
238 / total_weight
239 }
240}
241
242#[derive(Debug, Clone, Default)]
244pub enum BidEvaluationStrategy {
245 #[default]
247 HighestScore,
248 FastestCompletion,
250 LoadBalancing,
252 BestCapability,
254 CustomWeights {
256 capability: f32,
258 availability: f32,
260 speed: f32,
262 },
263}
264
265#[derive(Debug, Clone)]
267pub enum ContractMessage {
268 Announce(TaskAnnouncement),
270 Bid(TaskBid),
272 Award {
274 task_id: String,
276 winner: String,
278 score: f32,
280 },
281 NoAward {
283 task_id: String,
285 reason: String,
287 },
288 Accept {
290 task_id: String,
292 agent_id: String,
294 },
295 Decline {
297 task_id: String,
299 agent_id: String,
301 reason: String,
303 },
304 Complete {
306 task_id: String,
308 agent_id: String,
310 success: bool,
312 result: Option<String>,
314 },
315 Cancel {
317 task_id: String,
319 reason: String,
321 },
322}
323
324#[derive(Debug, Clone)]
326pub struct AwardedContract {
327 pub task_id: String,
329 pub winner: String,
331 pub winning_bid: TaskBid,
333 pub awarded_at: Instant,
335 pub accepted: bool,
337 pub completed: Option<bool>,
339}
340
341impl ContractNetManager {
342 pub fn new() -> Self {
344 let (broadcast_tx, _) = broadcast::channel(1024);
345 Self {
346 announcements: RwLock::new(HashMap::new()),
347 bids: RwLock::new(HashMap::new()),
348 awarded: RwLock::new(HashMap::new()),
349 broadcast_tx,
350 evaluation_strategy: BidEvaluationStrategy::HighestScore,
351 next_task_id: RwLock::new(1),
352 }
353 }
354
355 pub fn with_strategy(strategy: BidEvaluationStrategy) -> Self {
357 let mut manager = Self::new();
358 manager.evaluation_strategy = strategy;
359 manager
360 }
361
362 pub fn subscribe(&self) -> broadcast::Receiver<ContractMessage> {
364 self.broadcast_tx.subscribe()
365 }
366
367 pub async fn generate_task_id(&self) -> String {
369 let mut id = self.next_task_id.write().await;
370 let task_id = format!("task-{}", *id);
371 *id += 1;
372 task_id
373 }
374
375 pub async fn announce_task(&self, mut announcement: TaskAnnouncement) -> String {
377 let task_id = if announcement.task_id.is_empty() {
378 self.generate_task_id().await
379 } else {
380 announcement.task_id.clone()
381 };
382 announcement.task_id = task_id.clone();
383
384 self.announcements
386 .write()
387 .await
388 .insert(task_id.clone(), announcement.clone());
389
390 self.bids.write().await.insert(task_id.clone(), Vec::new());
392
393 let _ = self
395 .broadcast_tx
396 .send(ContractMessage::Announce(announcement));
397
398 task_id
399 }
400
401 pub async fn receive_bid(&self, bid: TaskBid) -> Result<(), String> {
403 let announcements = self.announcements.read().await;
404
405 let announcement = announcements
407 .get(&bid.task_id)
408 .ok_or_else(|| format!("Unknown task: {}", bid.task_id))?;
409
410 if !announcement.is_bidding_open() {
411 return Err("Bid deadline passed".to_string());
412 }
413
414 if bid.capability_score < announcement.requirements.min_capability_score {
416 return Err(format!(
417 "Capability score {} below minimum {}",
418 bid.capability_score, announcement.requirements.min_capability_score
419 ));
420 }
421
422 let mut bids = self.bids.write().await;
424 if let Some(task_bids) = bids.get_mut(&bid.task_id) {
425 task_bids.retain(|b| b.agent_id != bid.agent_id);
427 task_bids.push(bid.clone());
428 }
429
430 let bid_task_id = bid.task_id.clone();
432 if let Err(e) = self.broadcast_tx.send(ContractMessage::Bid(bid)) {
433 tracing::warn!("Failed to broadcast bid for task {}: {}", bid_task_id, e);
434 }
435
436 Ok(())
437 }
438
439 pub async fn award_task(&self, task_id: &str) -> Option<String> {
441 let bids = self.bids.read().await;
442 let task_bids = bids.get(task_id)?;
443
444 if task_bids.is_empty() {
445 if let Err(e) = self.broadcast_tx.send(ContractMessage::NoAward {
446 task_id: task_id.to_string(),
447 reason: "No bids received".to_string(),
448 }) {
449 tracing::warn!("Failed to broadcast no-award for task {}: {}", task_id, e);
450 }
451 return None;
452 }
453
454 let (winner, winning_bid) = self.evaluate_bids(task_bids)?;
456 let score = winning_bid.score();
457
458 self.awarded.write().await.insert(
460 task_id.to_string(),
461 AwardedContract {
462 task_id: task_id.to_string(),
463 winner: winner.clone(),
464 winning_bid: winning_bid.clone(),
465 awarded_at: Instant::now(),
466 accepted: false,
467 completed: None,
468 },
469 );
470
471 let _ = self.broadcast_tx.send(ContractMessage::Award {
473 task_id: task_id.to_string(),
474 winner: winner.clone(),
475 score,
476 });
477
478 Some(winner)
479 }
480
481 fn evaluate_bids(&self, bids: &[TaskBid]) -> Option<(String, TaskBid)> {
483 if bids.is_empty() {
484 return None;
485 }
486
487 fn safe_f32_cmp(a: f32, b: f32) -> std::cmp::Ordering {
489 a.partial_cmp(&b).unwrap_or(std::cmp::Ordering::Less)
490 }
491
492 let winning_bid = match &self.evaluation_strategy {
493 BidEvaluationStrategy::HighestScore => bids
494 .iter()
495 .max_by(|a, b| safe_f32_cmp(a.score(), b.score())),
496 BidEvaluationStrategy::FastestCompletion => {
497 bids.iter().min_by_key(|b| b.estimated_duration)
498 }
499 BidEvaluationStrategy::LoadBalancing => bids
500 .iter()
501 .min_by(|a, b| safe_f32_cmp(a.current_load, b.current_load)),
502 BidEvaluationStrategy::BestCapability => bids
503 .iter()
504 .max_by(|a, b| safe_f32_cmp(a.capability_score, b.capability_score)),
505 BidEvaluationStrategy::CustomWeights {
506 capability,
507 availability,
508 speed,
509 } => bids.iter().max_by(|a, b| {
510 safe_f32_cmp(
511 a.score_weighted(*capability, *availability, *speed),
512 b.score_weighted(*capability, *availability, *speed),
513 )
514 }),
515 }?;
516
517 Some((winning_bid.agent_id.clone(), winning_bid.clone()))
518 }
519
520 pub async fn accept_award(&self, task_id: &str, agent_id: &str) -> Result<(), String> {
522 let mut awarded = self.awarded.write().await;
523 let contract = awarded
524 .get_mut(task_id)
525 .ok_or_else(|| format!("No award found for task: {}", task_id))?;
526
527 if contract.winner != agent_id {
528 return Err(format!(
529 "Agent {} is not the winner of task {}",
530 agent_id, task_id
531 ));
532 }
533
534 contract.accepted = true;
535
536 let _ = self.broadcast_tx.send(ContractMessage::Accept {
537 task_id: task_id.to_string(),
538 agent_id: agent_id.to_string(),
539 });
540
541 Ok(())
542 }
543
544 pub async fn decline_award(
546 &self,
547 task_id: &str,
548 agent_id: &str,
549 reason: &str,
550 ) -> Result<(), String> {
551 let mut awarded = self.awarded.write().await;
552 awarded.remove(task_id);
553
554 let _ = self.broadcast_tx.send(ContractMessage::Decline {
555 task_id: task_id.to_string(),
556 agent_id: agent_id.to_string(),
557 reason: reason.to_string(),
558 });
559
560 Ok(())
561 }
562
563 pub async fn complete_task(
565 &self,
566 task_id: &str,
567 agent_id: &str,
568 success: bool,
569 result: Option<String>,
570 ) -> Result<(), String> {
571 let mut awarded = self.awarded.write().await;
572 let contract = awarded
573 .get_mut(task_id)
574 .ok_or_else(|| format!("No contract found for task: {}", task_id))?;
575
576 if contract.winner != agent_id {
577 return Err(format!(
578 "Agent {} is not the contractor for task {}",
579 agent_id, task_id
580 ));
581 }
582
583 contract.completed = Some(success);
584
585 let _ = self.broadcast_tx.send(ContractMessage::Complete {
586 task_id: task_id.to_string(),
587 agent_id: agent_id.to_string(),
588 success,
589 result,
590 });
591
592 self.announcements.write().await.remove(task_id);
594 self.bids.write().await.remove(task_id);
595
596 Ok(())
597 }
598
599 pub async fn get_task_status(&self, task_id: &str) -> Option<TaskStatus> {
601 if let Some(contract) = self.awarded.read().await.get(task_id) {
602 return Some(if contract.completed.is_some() {
603 TaskStatus::Completed
604 } else if contract.accepted {
605 TaskStatus::InProgress
606 } else {
607 TaskStatus::Awarded
608 });
609 }
610
611 if self.announcements.read().await.contains_key(task_id) {
612 return Some(TaskStatus::OpenForBids);
613 }
614
615 None
616 }
617
618 pub async fn get_pending_tasks(&self) -> Vec<TaskAnnouncement> {
620 self.announcements.read().await.values().cloned().collect()
621 }
622
623 pub async fn get_bids(&self, task_id: &str) -> Vec<TaskBid> {
625 self.bids
626 .read()
627 .await
628 .get(task_id)
629 .cloned()
630 .unwrap_or_default()
631 }
632}
633
634impl Default for ContractNetManager {
635 fn default() -> Self {
636 Self::new()
637 }
638}
639
640#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642pub enum TaskStatus {
643 OpenForBids,
645 Awarded,
647 InProgress,
649 Completed,
651}
652
653pub struct ContractParticipant {
655 agent_id: String,
656 capabilities: Vec<String>,
657 current_tasks: RwLock<Vec<String>>,
658 max_concurrent: usize,
659 message_rx: Option<broadcast::Receiver<ContractMessage>>,
661}
662
663impl ContractParticipant {
664 pub fn new(agent_id: impl Into<String>, capabilities: Vec<String>) -> Self {
666 Self {
667 agent_id: agent_id.into(),
668 capabilities,
669 current_tasks: RwLock::new(Vec::new()),
670 max_concurrent: 3,
671 message_rx: None,
672 }
673 }
674
675 pub fn with_max_concurrent(mut self, max: usize) -> Self {
677 self.max_concurrent = max;
678 self
679 }
680
681 pub fn connect(&mut self, manager: &ContractNetManager) {
683 self.message_rx = Some(manager.subscribe());
684 }
685
686 pub async fn should_bid(&self, announcement: &TaskAnnouncement) -> bool {
688 let has_capabilities = announcement
690 .requirements
691 .capabilities
692 .iter()
693 .all(|req| self.capabilities.contains(req));
694
695 if !has_capabilities {
696 return false;
697 }
698
699 let current = self.current_tasks.read().await.len();
701 if current >= self.max_concurrent {
702 return false;
703 }
704
705 announcement.is_bidding_open()
707 }
708
709 pub async fn generate_bid(&self, announcement: &TaskAnnouncement) -> TaskBid {
711 let current_tasks = self.current_tasks.read().await.len();
712
713 TaskBid::new(&self.agent_id, &announcement.task_id)
714 .with_capability_score(self.calculate_capability_score(&announcement.requirements))
715 .with_load(current_tasks as f32 / self.max_concurrent as f32)
716 .with_duration(self.estimate_duration(&announcement.requirements))
717 }
718
719 fn calculate_capability_score(&self, requirements: &TaskRequirements) -> f32 {
721 if requirements.capabilities.is_empty() {
722 return 1.0; }
724
725 let matched = requirements
726 .capabilities
727 .iter()
728 .filter(|c| self.capabilities.contains(c))
729 .count();
730
731 matched as f32 / requirements.capabilities.len() as f32
732 }
733
734 fn estimate_duration(&self, requirements: &TaskRequirements) -> Duration {
736 let base_seconds = (requirements.complexity as u64 + 1) * 60;
738 Duration::from_secs(base_seconds)
739 }
740
741 pub async fn accept_task(&self, task_id: &str) {
743 self.current_tasks.write().await.push(task_id.to_string());
744 }
745
746 pub async fn complete_task(&self, task_id: &str) {
748 self.current_tasks.write().await.retain(|t| t != task_id);
749 }
750
751 pub async fn current_task_count(&self) -> usize {
753 self.current_tasks.read().await.len()
754 }
755
756 pub async fn available_capacity(&self) -> usize {
758 self.max_concurrent - self.current_tasks.read().await.len()
759 }
760}
761
762#[cfg(test)]
763mod tests {
764 use super::*;
765
766 #[tokio::test]
767 async fn test_task_announcement() {
768 let announcement = TaskAnnouncement::new(
769 "task-1",
770 "Test task",
771 "manager",
772 Instant::now() + Duration::from_secs(60),
773 )
774 .with_requirements(TaskRequirements::new().with_complexity(5));
775
776 assert!(announcement.is_bidding_open());
777 assert!(announcement.time_remaining() <= Duration::from_secs(60));
778 }
779
780 #[tokio::test]
781 async fn test_task_bid_scoring() {
782 let bid = TaskBid::new("agent-1", "task-1")
783 .with_capability_score(0.8)
784 .with_load(0.2)
785 .with_duration(Duration::from_secs(120));
786
787 let score = bid.score();
788 assert!(score > 0.0 && score <= 1.0);
789
790 let high_cap_bid = TaskBid::new("agent-2", "task-1")
792 .with_capability_score(1.0)
793 .with_load(0.2)
794 .with_duration(Duration::from_secs(120));
795
796 assert!(high_cap_bid.score() > bid.score());
797 }
798
799 #[tokio::test]
800 async fn test_announce_and_bid() {
801 let manager = ContractNetManager::new();
802
803 let _rx = manager.subscribe();
805
806 let announcement = TaskAnnouncement::new(
808 "",
809 "Test task",
810 "manager",
811 Instant::now() + Duration::from_secs(60),
812 );
813 let task_id = manager.announce_task(announcement).await;
814 assert!(!task_id.is_empty());
815
816 let bid = TaskBid::new("agent-1", &task_id)
818 .with_capability_score(0.9)
819 .with_load(0.1);
820
821 let result = manager.receive_bid(bid).await;
822 assert!(result.is_ok());
823
824 let bids = manager.get_bids(&task_id).await;
826 assert_eq!(bids.len(), 1);
827 assert_eq!(bids[0].agent_id, "agent-1");
828 }
829
830 #[tokio::test]
831 async fn test_award_task() {
832 let manager = ContractNetManager::new();
833
834 let announcement = TaskAnnouncement::new(
836 "task-1",
837 "Test task",
838 "manager",
839 Instant::now() + Duration::from_secs(60),
840 );
841 manager.announce_task(announcement).await;
842
843 manager
845 .receive_bid(TaskBid::new("agent-1", "task-1").with_capability_score(0.7))
846 .await
847 .unwrap();
848
849 manager
850 .receive_bid(TaskBid::new("agent-2", "task-1").with_capability_score(0.9))
851 .await
852 .unwrap();
853
854 let winner = manager.award_task("task-1").await;
856 assert_eq!(winner, Some("agent-2".to_string())); }
858
859 #[tokio::test]
860 async fn test_evaluation_strategies() {
861 let manager = ContractNetManager::with_strategy(BidEvaluationStrategy::LoadBalancing);
863
864 let announcement = TaskAnnouncement::new(
865 "task-1",
866 "Test task",
867 "manager",
868 Instant::now() + Duration::from_secs(60),
869 );
870 manager.announce_task(announcement).await;
871
872 manager
873 .receive_bid(TaskBid::new("agent-1", "task-1").with_load(0.8))
874 .await
875 .unwrap();
876
877 manager
878 .receive_bid(TaskBid::new("agent-2", "task-1").with_load(0.2))
879 .await
880 .unwrap();
881
882 let winner = manager.award_task("task-1").await;
883 assert_eq!(winner, Some("agent-2".to_string())); }
885
886 #[tokio::test]
887 async fn test_bid_rejection_after_deadline() {
888 let manager = ContractNetManager::new();
889
890 let announcement = TaskAnnouncement::new(
892 "task-1",
893 "Test task",
894 "manager",
895 Instant::now() - Duration::from_secs(1), );
897 manager.announce_task(announcement).await;
898
899 let result = manager.receive_bid(TaskBid::new("agent-1", "task-1")).await;
901
902 assert!(result.is_err());
903 assert!(result.unwrap_err().contains("deadline"));
904 }
905
906 #[tokio::test]
907 async fn test_task_lifecycle() {
908 let manager = ContractNetManager::new();
909
910 let announcement = TaskAnnouncement::new(
912 "task-1",
913 "Test task",
914 "manager",
915 Instant::now() + Duration::from_secs(60),
916 );
917 manager.announce_task(announcement).await;
918 assert_eq!(
919 manager.get_task_status("task-1").await,
920 Some(TaskStatus::OpenForBids)
921 );
922
923 manager
925 .receive_bid(TaskBid::new("agent-1", "task-1").with_capability_score(0.9))
926 .await
927 .unwrap();
928
929 manager.award_task("task-1").await;
931 assert_eq!(
932 manager.get_task_status("task-1").await,
933 Some(TaskStatus::Awarded)
934 );
935
936 manager.accept_award("task-1", "agent-1").await.unwrap();
938 assert_eq!(
939 manager.get_task_status("task-1").await,
940 Some(TaskStatus::InProgress)
941 );
942
943 manager
945 .complete_task("task-1", "agent-1", true, Some("Done".to_string()))
946 .await
947 .unwrap();
948 assert_eq!(
949 manager.get_task_status("task-1").await,
950 Some(TaskStatus::Completed)
951 );
952 }
953
954 #[tokio::test]
955 async fn test_contract_participant() {
956 let participant =
957 ContractParticipant::new("agent-1", vec!["rust".to_string(), "git".to_string()])
958 .with_max_concurrent(2);
959
960 let announcement = TaskAnnouncement::new(
961 "task-1",
962 "Test task",
963 "manager",
964 Instant::now() + Duration::from_secs(60),
965 )
966 .with_requirements(
967 TaskRequirements::new()
968 .with_capabilities(vec!["rust".to_string()])
969 .with_complexity(5),
970 );
971
972 assert!(participant.should_bid(&announcement).await);
974
975 let bid = participant.generate_bid(&announcement).await;
977 assert_eq!(bid.agent_id, "agent-1");
978 assert_eq!(bid.capability_score, 1.0); participant.accept_task("task-1").await;
982 assert_eq!(participant.current_task_count().await, 1);
983
984 participant.complete_task("task-1").await;
986 assert_eq!(participant.current_task_count().await, 0);
987 }
988
989 #[tokio::test]
990 async fn test_capacity_limit() {
991 let participant =
992 ContractParticipant::new("agent-1", vec!["rust".to_string()]).with_max_concurrent(1);
993
994 participant.accept_task("task-1").await;
996
997 let announcement = TaskAnnouncement::new(
998 "task-2",
999 "Another task",
1000 "manager",
1001 Instant::now() + Duration::from_secs(60),
1002 );
1003
1004 assert!(!participant.should_bid(&announcement).await);
1006 }
1007}