croncat_agents/
distributor.rs1use cosmwasm_std::{Addr, Deps, Env, Storage, Uint64};
2use croncat_sdk_agents::msg::{AgentTaskResponse, TaskStats};
3use croncat_sdk_tasks::types::SlotType;
4
5use crate::{
6 error::ContractError,
7 state::{AGENTS_ACTIVE, AGENT_STATS},
8};
9
10pub trait RoundRobinAgentTaskDistributor<'a> {
11 #[doc = r".Gets agent tasks count for block/cron slots
12 # Errors
13 This function will return an error if agent does not exists"]
14 fn get_agent_tasks(
15 &self,
16 deps: &Deps,
17 env: &Env,
18 agent_id: Addr,
19 slot_items: (Option<u64>, Option<u64>),
20 ) -> Result<AgentTaskResponse, ContractError>;
21
22 #[doc = r"Updates agent stats when agent completed task on specified slot"]
23 fn on_task_completed(
24 &self,
25 storage: &'a mut dyn Storage,
26 _env: &Env,
27 agent_id: &Addr,
28 slot_type: SlotType,
29 ) -> Result<(), ContractError>;
30}
31
32pub struct AgentTaskDistributor {}
33
34impl AgentTaskDistributor {
35 pub const fn new() -> AgentTaskDistributor {
36 AgentTaskDistributor {}
37 }
38}
39
40#[allow(clippy::op_ref)]
43impl<'a> RoundRobinAgentTaskDistributor<'a> for AgentTaskDistributor {
44 fn get_agent_tasks(
45 &self,
46 deps: &Deps,
47 _env: &Env,
48 agent_id: Addr,
49 slot_items: (Option<u64>, Option<u64>),
50 ) -> Result<AgentTaskResponse, ContractError> {
51 let mut active = AGENTS_ACTIVE.load(deps.storage)?;
52 if !active.contains(&agent_id) {
53 return Err(ContractError::AgentNotRegistered {});
54 }
55 if slot_items == (None, None) {
56 return Ok(AgentTaskResponse {
57 stats: TaskStats {
58 num_block_tasks: Uint64::zero(),
59 num_cron_tasks: Uint64::zero(),
60 },
61 });
62 }
63 let agent_count = active.len() as u64;
64 let (block_slots, cron_slots) = slot_items;
65
66 let mut equalizer = |slot_type: SlotType,
67 total_tasks: u64|
68 -> Result<Uint64, ContractError> {
69 if total_tasks < 1 {
70 return Ok(Uint64::zero());
71 }
72 active.sort_unstable_by(|left, right| {
77 let stats1 = AGENT_STATS.load(deps.storage, left).unwrap_or_default();
78 let stats2 = AGENT_STATS.load(deps.storage, right).unwrap_or_default();
79 match slot_type {
80 SlotType::Block => stats1
81 .completed_block_tasks
82 .partial_cmp(&stats2.completed_block_tasks)
83 .unwrap(),
84 SlotType::Cron => stats1
85 .completed_cron_tasks
86 .partial_cmp(&stats2.completed_cron_tasks)
87 .unwrap(),
88 }
89 });
90 let agent_diff_index = active
91 .iter()
92 .position(|x| x == &agent_id)
93 .ok_or(ContractError::AgentNotRegistered {})?
94 as u64;
95
96 if total_tasks <= active.len() as u64 {
97 let agent_tasks_total = 1u64
98 .saturating_sub(agent_diff_index.saturating_sub(total_tasks.saturating_sub(1)));
99 Ok(agent_tasks_total.into())
100 } else {
101 let leftover = total_tasks % agent_count;
102 let mut extra = 0u64;
103 if leftover > 0 {
104 extra = 1u64.saturating_sub(
105 agent_diff_index.saturating_sub(leftover.saturating_sub(1)),
106 );
107 }
108 let agent_tasks_total = total_tasks.saturating_div(agent_count) + extra;
109
110 Ok(agent_tasks_total.into())
111 }
112 };
113
114 let n = equalizer(SlotType::Block, block_slots.unwrap_or_default())?;
115 let num_block_tasks = n;
116
117 let n = equalizer(SlotType::Cron, cron_slots.unwrap_or_default())?;
118 let num_cron_tasks = n;
119
120 Ok(AgentTaskResponse {
121 stats: TaskStats {
122 num_block_tasks,
123 num_cron_tasks,
124 },
125 })
126 }
127
128 fn on_task_completed(
129 &self,
130 storage: &'a mut dyn Storage,
131 _env: &Env,
132 agent_id: &Addr,
133 slot_type: SlotType,
134 ) -> Result<(), ContractError> {
135 let mut stats = AGENT_STATS.may_load(storage, agent_id)?.unwrap_or_default();
136 match slot_type {
137 SlotType::Block => stats.completed_block_tasks += 1,
138 SlotType::Cron => stats.completed_cron_tasks += 1,
139 }
140 AGENT_STATS.save(storage, agent_id, &stats)?;
141 Ok(())
142 }
143}
144
145impl Default for AgentTaskDistributor {
146 fn default() -> AgentTaskDistributor {
147 AgentTaskDistributor::new()
148 }
149}