croncat_agents/
distributor.rs

1use cosmwasm_std::{Addr, Deps, Env, Storage, Uint64};
2use croncat_sdk_agents::msg::{AgentTaskResponse, TaskStats};
3use croncat_sdk_tasks::types::SlotType;
4
5use crate::{
6    error::ContractError,
7    state::{AGENTS_ACTIVE, AGENT_STATS},
8};
9
10pub trait RoundRobinAgentTaskDistributor<'a> {
11    #[doc = r".Gets agent tasks count for block/cron slots
12    # Errors
13    This function will return an error if agent does not exists"]
14    fn get_agent_tasks(
15        &self,
16        deps: &Deps,
17        env: &Env,
18        agent_id: Addr,
19        slot_items: (Option<u64>, Option<u64>),
20    ) -> Result<AgentTaskResponse, ContractError>;
21
22    #[doc = r"Updates agent stats when agent completed task on specified slot"]
23    fn on_task_completed(
24        &self,
25        storage: &'a mut dyn Storage,
26        _env: &Env,
27        agent_id: &Addr,
28        slot_type: SlotType,
29    ) -> Result<(), ContractError>;
30}
31
32pub struct AgentTaskDistributor {}
33
34impl AgentTaskDistributor {
35    pub const fn new() -> AgentTaskDistributor {
36        AgentTaskDistributor {}
37    }
38}
39
40/// Note that we ran into problems with clippy here
41/// See <https://github.com/CronCats/cw-croncat/pull/415>
42#[allow(clippy::op_ref)]
43impl<'a> RoundRobinAgentTaskDistributor<'a> for AgentTaskDistributor {
44    fn get_agent_tasks(
45        &self,
46        deps: &Deps,
47        _env: &Env,
48        agent_id: Addr,
49        slot_items: (Option<u64>, Option<u64>),
50    ) -> Result<AgentTaskResponse, ContractError> {
51        let mut active = AGENTS_ACTIVE.load(deps.storage)?;
52        if !active.contains(&agent_id) {
53            return Err(ContractError::AgentNotRegistered {});
54        }
55        if slot_items == (None, None) {
56            return Ok(AgentTaskResponse {
57                stats: TaskStats {
58                    num_block_tasks: Uint64::zero(),
59                    num_cron_tasks: Uint64::zero(),
60                },
61            });
62        }
63        let agent_count = active.len() as u64;
64        let (block_slots, cron_slots) = slot_items;
65
66        let mut equalizer = |slot_type: SlotType,
67                             total_tasks: u64|
68         -> Result<Uint64, ContractError> {
69            if total_tasks < 1 {
70                return Ok(Uint64::zero());
71            }
72            //This sort is unstable (i.e., may reorder equal elements), in-place (i.e., does not allocate),
73            //and O(n log n) worst-case.
74            //It is typically faster than stable sorting, except in a few special cases,
75            //e.g., when the slice consists of several concatenated sorted sequences.
76            active.sort_unstable_by(|left, right| {
77                let stats1 = AGENT_STATS.load(deps.storage, left).unwrap_or_default();
78                let stats2 = AGENT_STATS.load(deps.storage, right).unwrap_or_default();
79                match slot_type {
80                    SlotType::Block => stats1
81                        .completed_block_tasks
82                        .partial_cmp(&stats2.completed_block_tasks)
83                        .unwrap(),
84                    SlotType::Cron => stats1
85                        .completed_cron_tasks
86                        .partial_cmp(&stats2.completed_cron_tasks)
87                        .unwrap(),
88                }
89            });
90            let agent_diff_index = active
91                .iter()
92                .position(|x| x == &agent_id)
93                .ok_or(ContractError::AgentNotRegistered {})?
94                as u64;
95
96            if total_tasks <= active.len() as u64 {
97                let agent_tasks_total = 1u64
98                    .saturating_sub(agent_diff_index.saturating_sub(total_tasks.saturating_sub(1)));
99                Ok(agent_tasks_total.into())
100            } else {
101                let leftover = total_tasks % agent_count;
102                let mut extra = 0u64;
103                if leftover > 0 {
104                    extra = 1u64.saturating_sub(
105                        agent_diff_index.saturating_sub(leftover.saturating_sub(1)),
106                    );
107                }
108                let agent_tasks_total = total_tasks.saturating_div(agent_count) + extra;
109
110                Ok(agent_tasks_total.into())
111            }
112        };
113
114        let n = equalizer(SlotType::Block, block_slots.unwrap_or_default())?;
115        let num_block_tasks = n;
116
117        let n = equalizer(SlotType::Cron, cron_slots.unwrap_or_default())?;
118        let num_cron_tasks = n;
119
120        Ok(AgentTaskResponse {
121            stats: TaskStats {
122                num_block_tasks,
123                num_cron_tasks,
124            },
125        })
126    }
127
128    fn on_task_completed(
129        &self,
130        storage: &'a mut dyn Storage,
131        _env: &Env,
132        agent_id: &Addr,
133        slot_type: SlotType,
134    ) -> Result<(), ContractError> {
135        let mut stats = AGENT_STATS.may_load(storage, agent_id)?.unwrap_or_default();
136        match slot_type {
137            SlotType::Block => stats.completed_block_tasks += 1,
138            SlotType::Cron => stats.completed_cron_tasks += 1,
139        }
140        AGENT_STATS.save(storage, agent_id, &stats)?;
141        Ok(())
142    }
143}
144
145impl Default for AgentTaskDistributor {
146    fn default() -> AgentTaskDistributor {
147        AgentTaskDistributor::new()
148    }
149}