croncat_tasks/
contract.rs

1use crate::ContractError::InvalidZeroValue;
2#[cfg(not(feature = "library"))]
3use cosmwasm_std::entry_point;
4use cosmwasm_std::{
5    to_binary, Attribute, Binary, Deps, DepsMut, Env, MessageInfo, Order, Response, StdResult,
6    Uint64,
7};
8use croncat_sdk_core::internal_messages::agents::AgentOnTaskCreated;
9use croncat_sdk_core::internal_messages::manager::{ManagerCreateTaskBalance, ManagerRemoveTask};
10use croncat_sdk_core::internal_messages::tasks::{TasksRemoveTaskByManager, TasksRescheduleTask};
11use croncat_sdk_core::types::{DEFAULT_PAGINATION_FROM_INDEX, DEFAULT_PAGINATION_LIMIT};
12use croncat_sdk_tasks::msg::UpdateConfigMsg;
13use croncat_sdk_tasks::types::{
14    Config, CurrentTaskInfoResponse, Interval, SlotHashesResponse, SlotIdsResponse,
15    SlotTasksTotalResponse, SlotType, Task, TaskExecutionInfo, TaskInfo, TaskRequest, TaskResponse,
16};
17use cw2::set_contract_version;
18use cw20::Cw20CoinVerified;
19use cw_storage_plus::Bound;
20
21use crate::error::ContractError;
22use crate::helpers::{
23    check_if_sender_is_manager, get_agents_addr, get_manager_addr, remove_task, validate_boundary,
24    validate_msg_calculate_usage, validate_queries, validate_transforms,
25};
26use crate::msg::{ExecuteMsg, InstantiateMsg, QueryMsg};
27use crate::state::{
28    tasks_map, BLOCK_SLOTS, CONFIG, EVENTED_TASKS_LOOKUP, LAST_TASK_CREATION, PAUSED, TASKS_TOTAL,
29    TIME_SLOTS,
30};
31
32const CONTRACT_NAME: &str = "crate:croncat-tasks";
33const CONTRACT_VERSION: &str = env!("CARGO_PKG_VERSION");
34
35// Default value based on non-wasm operations, wasm ops seem impossible to predict
36// TODO: this values based of pre-split, need to recalculate GAS_BASE_FEE
37pub(crate) const GAS_BASE_FEE: u64 = 300_000;
38pub(crate) const GAS_ACTION_FEE: u64 = 130_000;
39pub(crate) const GAS_QUERY_FEE: u64 = 130_000; // Load query module(~61_000) and query after that(~65_000+)
40pub(crate) const GAS_LIMIT: u64 = 3_000_000; // 10M is default for juno, but let's make sure we have space for block inclusivity guarantees
41pub(crate) const SLOT_GRANULARITY_TIME: u64 = 10_000_000_000; // 10 seconds
42
43#[cfg_attr(not(feature = "library"), entry_point)]
44pub fn instantiate(
45    deps: DepsMut,
46    _env: Env,
47    info: MessageInfo,
48    msg: InstantiateMsg,
49) -> Result<Response, ContractError> {
50    let InstantiateMsg {
51        chain_name,
52        version,
53        pause_admin,
54        croncat_manager_key,
55        croncat_agents_key,
56        slot_granularity_time,
57        gas_base_fee,
58        gas_action_fee,
59        gas_limit,
60        gas_query_fee,
61    } = msg;
62
63    validate_non_zero_value(slot_granularity_time, "slot_granularity_time")?;
64
65    let contract_version = version.unwrap_or_else(|| CONTRACT_VERSION.to_string());
66    set_contract_version(deps.storage, CONTRACT_NAME, &contract_version)?;
67
68    let owner_addr = info.sender.clone();
69
70    // Validate pause_admin
71    // MUST: only be contract address
72    // MUST: not be same address as factory owner (DAO)
73    // Any factory action should be done by the owner_addr
74    // NOTE: different networks have diff bech32 prefix lengths. Capturing min/max here
75    if !(61usize..=74usize).contains(&pause_admin.to_string().len()) {
76        return Err(ContractError::InvalidPauseAdmin {});
77    }
78
79    let config = Config {
80        chain_name,
81        version: contract_version,
82        owner_addr,
83        pause_admin,
84        croncat_factory_addr: info.sender,
85        croncat_manager_key,
86        croncat_agents_key,
87        slot_granularity_time: slot_granularity_time.unwrap_or(SLOT_GRANULARITY_TIME),
88        gas_base_fee: gas_base_fee.unwrap_or(GAS_BASE_FEE),
89        gas_action_fee: gas_action_fee.unwrap_or(GAS_ACTION_FEE),
90        gas_query_fee: gas_query_fee.unwrap_or(GAS_QUERY_FEE),
91        gas_limit: gas_limit.unwrap_or(GAS_LIMIT),
92    };
93
94    // Ensure the new gas limit will work
95    validate_gas_limit(&config)?;
96
97    // Save initializing states
98    CONFIG.save(deps.storage, &config)?;
99    PAUSED.save(deps.storage, &false)?;
100    TASKS_TOTAL.save(deps.storage, &0)?;
101    Ok(Response::new().add_attribute("action", "instantiate"))
102}
103
104#[cfg_attr(not(feature = "library"), entry_point)]
105pub fn execute(
106    deps: DepsMut,
107    env: Env,
108    info: MessageInfo,
109    msg: ExecuteMsg,
110) -> Result<Response, ContractError> {
111    match msg {
112        ExecuteMsg::UpdateConfig(msg) => execute_update_config(deps, info, msg),
113        ExecuteMsg::CreateTask { task } => execute_create_task(deps, env, info, *task),
114        ExecuteMsg::RemoveTask { task_hash } => execute_remove_task(deps, info, task_hash),
115        // Methods for other contracts
116        ExecuteMsg::RemoveTaskByManager(remove_task_msg) => {
117            execute_remove_task_by_manager(deps, info, remove_task_msg)
118        }
119        ExecuteMsg::RescheduleTask(reschedule_msg) => {
120            execute_reschedule_task(deps, env, info, reschedule_msg)
121        }
122        ExecuteMsg::PauseContract {} => execute_pause(deps, info),
123        ExecuteMsg::UnpauseContract {} => execute_unpause(deps, info),
124    }
125}
126
127fn execute_update_config(
128    deps: DepsMut,
129    info: MessageInfo,
130    msg: UpdateConfigMsg,
131) -> Result<Response, ContractError> {
132    let config = CONFIG.load(deps.storage)?;
133
134    if info.sender != config.owner_addr {
135        return Err(ContractError::Unauthorized {});
136    }
137
138    // Destruct so we won't forget to update if if new fields added
139    let UpdateConfigMsg {
140        croncat_factory_addr,
141        croncat_manager_key,
142        croncat_agents_key,
143        slot_granularity_time,
144        gas_base_fee,
145        gas_action_fee,
146        gas_query_fee,
147        gas_limit,
148    } = msg;
149
150    let new_config = Config {
151        owner_addr: config.owner_addr,
152        pause_admin: config.pause_admin,
153        croncat_factory_addr: croncat_factory_addr
154            .map(|human| deps.api.addr_validate(&human))
155            .transpose()?
156            .unwrap_or(config.croncat_factory_addr),
157        chain_name: config.chain_name,
158        version: config.version,
159        croncat_manager_key: croncat_manager_key.unwrap_or(config.croncat_manager_key),
160        croncat_agents_key: croncat_agents_key.unwrap_or(config.croncat_agents_key),
161        slot_granularity_time: slot_granularity_time.unwrap_or(config.slot_granularity_time),
162        gas_base_fee: gas_base_fee.unwrap_or(config.gas_base_fee),
163        gas_action_fee: gas_action_fee.unwrap_or(config.gas_action_fee),
164        gas_query_fee: gas_query_fee.unwrap_or(config.gas_query_fee),
165        gas_limit: gas_limit.unwrap_or(config.gas_limit),
166    };
167
168    // Ensure the new gas limit will work
169    validate_gas_limit(&new_config)?;
170    validate_non_zero_value(slot_granularity_time, "slot_granularity_time")?;
171
172    CONFIG.save(deps.storage, &new_config)?;
173    Ok(Response::new().add_attribute("action", "update_config"))
174}
175
176fn execute_reschedule_task(
177    deps: DepsMut,
178    env: Env,
179    info: MessageInfo,
180    reschedule_msg: TasksRescheduleTask,
181) -> Result<Response, ContractError> {
182    let task_hash = reschedule_msg.task_hash;
183    let config = CONFIG.load(deps.storage)?;
184    check_if_sender_is_manager(&deps.querier, &config, &info.sender)?;
185
186    let mut task_to_remove = None;
187    let mut res_attributes: Vec<Attribute> = vec![];
188
189    // Check default map
190    let (next_id, slot_kind) = if let Some(task) = tasks_map().may_load(deps.storage, &task_hash)? {
191        let (next_id, slot_kind) =
192            task.interval
193                .next(&env, &task.boundary, config.slot_granularity_time);
194
195        res_attributes.push(Attribute::new(
196            "task_hash",
197            task.to_hash(&config.chain_name),
198        ));
199        res_attributes.push(Attribute::new("task_version", task.version.to_owned()));
200
201        // NOTE: If task is evented, we dont want to "schedule" inside slots
202        // but we also dont want to remove unless it was Interval::Once
203        if next_id != 0 && !task.is_evented() && task.interval != Interval::Once {
204            res_attributes.push(Attribute::new("action", "reschedule_task"));
205            // Get previous task hashes in slot, add as needed
206            let update_vec_data = |d: Option<Vec<Vec<u8>>>| -> StdResult<Vec<Vec<u8>>> {
207                match d {
208                    // has some data, simply push new hash
209                    Some(data) => {
210                        let mut s = data;
211                        s.push(task_hash);
212                        Ok(s)
213                    }
214                    // No data, push new vec & hash
215                    None => Ok(vec![task_hash]),
216                }
217            };
218            // Based on slot kind, put into block or cron slots
219            match slot_kind {
220                SlotType::Block => {
221                    BLOCK_SLOTS.update(deps.storage, next_id, update_vec_data)?;
222                    // Don't forget to pop finished task
223                    let mut block_slot: Vec<(u64, Vec<Vec<u8>>)> = BLOCK_SLOTS
224                        .range(
225                            deps.storage,
226                            None,
227                            Some(Bound::inclusive(env.block.height)),
228                            Order::Ascending,
229                        )
230                        .take(1)
231                        .collect::<StdResult<_>>()?;
232                    let mut slot = block_slot.pop().unwrap();
233                    slot.1.pop();
234                    if slot.1.is_empty() {
235                        BLOCK_SLOTS.remove(deps.storage, slot.0)
236                    } else {
237                        BLOCK_SLOTS.save(deps.storage, slot.0, &slot.1)?;
238                    }
239                }
240                SlotType::Cron => {
241                    TIME_SLOTS.update(deps.storage, next_id, update_vec_data)?;
242                    // Don't forget to pop finished task
243                    let mut time_slot: Vec<(u64, Vec<Vec<u8>>)> = TIME_SLOTS
244                        .range(
245                            deps.storage,
246                            None,
247                            Some(Bound::inclusive(env.block.time.nanos())),
248                            Order::Ascending,
249                        )
250                        .take(1)
251                        .collect::<StdResult<_>>()?;
252                    let mut slot = time_slot.pop().unwrap();
253                    slot.1.pop();
254                    if slot.1.is_empty() {
255                        TIME_SLOTS.remove(deps.storage, slot.0)
256                    } else {
257                        TIME_SLOTS.save(deps.storage, slot.0, &slot.1)?;
258                    }
259                }
260            }
261        } else if !task.is_evented() {
262            remove_task(
263                deps.storage,
264                &task_hash,
265                task.boundary.is_block(),
266                task.is_evented(),
267            )?;
268            task_to_remove = Some(ManagerRemoveTask {
269                sender: task.owner_addr,
270                task_hash,
271            });
272            res_attributes.push(Attribute::new("action", "remove_task"));
273        } else if task.is_evented() {
274            // Seems like overkill but super nice to know we didn't remove an evented task
275            res_attributes.push(Attribute::new("action", "continue_task"));
276        }
277        (next_id, slot_kind)
278    } else {
279        return Err(ContractError::NoTaskFound {});
280    };
281
282    Ok(Response::new()
283        .add_attributes(res_attributes)
284        .add_attribute("slot_id", next_id.to_string())
285        .add_attribute("slot_kind", slot_kind.to_string())
286        .set_data(to_binary(&task_to_remove)?))
287}
288
289fn execute_remove_task_by_manager(
290    deps: DepsMut,
291    info: MessageInfo,
292    remove_task_msg: TasksRemoveTaskByManager,
293) -> Result<Response, ContractError> {
294    let task_hash = remove_task_msg.task_hash;
295    let config = CONFIG.load(deps.storage)?;
296    check_if_sender_is_manager(&deps.querier, &config, &info.sender)?;
297
298    if let Some(task) = tasks_map().may_load(deps.storage, &task_hash)? {
299        remove_task(
300            deps.storage,
301            &task_hash,
302            task.boundary.is_block(),
303            task.is_evented(),
304        )?;
305    } else {
306        return Err(ContractError::NoTaskFound {});
307    }
308
309    Ok(Response::new().add_attribute("action", "remove_task_by_manager"))
310}
311
312fn execute_remove_task(
313    deps: DepsMut,
314    info: MessageInfo,
315    task_hash: String,
316) -> Result<Response, ContractError> {
317    if PAUSED.load(deps.storage)? {
318        return Err(ContractError::ContractPaused);
319    }
320    let config = CONFIG.load(deps.storage)?;
321    let hash = task_hash.as_bytes();
322    if let Some(task) = tasks_map().may_load(deps.storage, hash)? {
323        if task.owner_addr != info.sender {
324            return Err(ContractError::Unauthorized {});
325        }
326        remove_task(
327            deps.storage,
328            hash,
329            task.boundary.is_block(),
330            task.is_evented(),
331        )?;
332    } else {
333        return Err(ContractError::NoTaskFound {});
334    }
335    let manager_addr = get_manager_addr(&deps.querier, &config)?;
336    let remove_task_msg = ManagerRemoveTask {
337        sender: info.sender,
338        task_hash: task_hash.into_bytes(),
339    }
340    .into_cosmos_msg(manager_addr)?;
341    Ok(Response::new()
342        .add_attribute("action", "remove_task")
343        .add_message(remove_task_msg))
344}
345
346fn execute_create_task(
347    deps: DepsMut,
348    env: Env,
349    info: MessageInfo,
350    task: TaskRequest,
351) -> Result<Response, ContractError> {
352    if PAUSED.load(deps.storage)? {
353        return Err(ContractError::ContractPaused);
354    }
355    let config = CONFIG.load(deps.storage)?;
356    let owner_addr = info.sender;
357
358    // Validate boundary and interval
359    let boundary = validate_boundary(&env.block, task.boundary.clone(), &task.interval)?;
360
361    let amount_for_one_task = validate_msg_calculate_usage(
362        deps.as_ref(),
363        &task,
364        &env.contract.address,
365        &owner_addr,
366        &config,
367    )?;
368    if amount_for_one_task.gas > config.gas_limit {
369        return Err(ContractError::InvalidGas {});
370    }
371    let cw20 = task
372        .cw20
373        .map(|human| {
374            StdResult::Ok(Cw20CoinVerified {
375                address: deps.api.addr_validate(&human.address)?,
376                amount: human.amount,
377            })
378        })
379        .transpose()?;
380
381    let item = Task {
382        owner_addr: owner_addr.clone(),
383        interval: task.interval,
384        boundary,
385        stop_on_fail: task.stop_on_fail,
386        amount_for_one_task: amount_for_one_task.clone(),
387        actions: task.actions,
388        // NOTE: See process_queries in manager contract for details on limitations of malformed queries
389        queries: task.queries.unwrap_or_default(),
390        transforms: task.transforms.unwrap_or_default(),
391        version: config.version.clone(),
392    };
393    if !item.interval.is_valid() {
394        return Err(ContractError::InvalidInterval {});
395    }
396    if !validate_queries(&deps.as_ref(), &item.queries) {
397        return Err(ContractError::InvalidQueries {});
398    }
399    if !validate_transforms(&item) {
400        return Err(ContractError::InvalidTransform {});
401    }
402
403    let hash_prefix = &config.chain_name;
404    let hash = item.to_hash(hash_prefix);
405
406    let (next_id, slot_kind) =
407        item.interval
408            .next(&env, &item.boundary, config.slot_granularity_time);
409    if next_id == 0 {
410        return Err(ContractError::TaskEnded {});
411    }
412
413    let recurring = item.recurring();
414    let hash_vec = hash.clone().into_bytes();
415    let mut attributes: Vec<Attribute> = vec![];
416
417    // Update query totals and map
418    TASKS_TOTAL.update(deps.storage, |amt| -> StdResult<_> { Ok(amt + 1) })?;
419    tasks_map().update(deps.storage, &hash_vec, |old| match old {
420        Some(_) => Err(ContractError::TaskExists {}),
421        None => Ok(item.clone()),
422    })?;
423
424    // Get previous task hashes in slot, add as needed
425    let update_vec_data = |d: Option<Vec<Vec<u8>>>| -> StdResult<Vec<Vec<u8>>> {
426        match d {
427            // has some data, simply push new hash
428            Some(data) => {
429                let mut s = data;
430                s.push(hash_vec.clone());
431                Ok(s)
432            }
433            // No data, push new vec & hash
434            None => Ok(vec![hash_vec.clone()]),
435        }
436    };
437
438    if item.is_evented() {
439        EVENTED_TASKS_LOOKUP.update(deps.storage, next_id, update_vec_data)?;
440        attributes.push(Attribute::new("evented_id", next_id.to_string()));
441    } else {
442        // Only scheduled tasks get put into slots
443        match slot_kind {
444            SlotType::Block => {
445                BLOCK_SLOTS.update(deps.storage, next_id, update_vec_data)?;
446            }
447            SlotType::Cron => {
448                TIME_SLOTS.update(deps.storage, next_id, update_vec_data)?;
449            }
450        }
451        attributes.push(Attribute::new("slot_id", next_id.to_string()));
452        attributes.push(Attribute::new("slot_kind", slot_kind.to_string()));
453    }
454
455    // Save the current timestamp as the last time a task was created
456    LAST_TASK_CREATION.save(deps.storage, &env.block.time)?;
457
458    let manager_addr = get_manager_addr(&deps.querier, &config)?;
459    let manager_create_task_balance_msg = ManagerCreateTaskBalance {
460        sender: owner_addr,
461        task_hash: hash_vec,
462        recurring,
463        cw20,
464        amount_for_one_task,
465    }
466    .into_cosmos_msg(manager_addr, info.funds)?;
467
468    let agent_addr = get_agents_addr(&deps.querier, &config)?;
469    let agent_new_task_msg = AgentOnTaskCreated {}.into_cosmos_msg(agent_addr)?;
470    let response_data = TaskExecutionInfo {
471        block_height: env.block.height,
472        tx_info: env.transaction,
473        task_hash: hash.clone(),
474        owner_addr: item.owner_addr,
475        amount_for_one_task: item.amount_for_one_task,
476        version: item.version.clone(),
477    };
478    Ok(Response::new()
479        .set_data(to_binary(&response_data)?)
480        .add_attribute("action", "create_task")
481        .add_attributes(attributes)
482        .add_attribute("task_hash", hash)
483        .add_attribute("task_version", item.version)
484        .add_message(manager_create_task_balance_msg)
485        .add_message(agent_new_task_msg))
486}
487
488pub fn execute_pause(deps: DepsMut, info: MessageInfo) -> Result<Response, ContractError> {
489    if PAUSED.load(deps.storage)? {
490        return Err(ContractError::ContractPaused);
491    }
492    let config = CONFIG.load(deps.storage)?;
493    if info.sender != config.pause_admin {
494        return Err(ContractError::Unauthorized {});
495    }
496    PAUSED.save(deps.storage, &true)?;
497    Ok(Response::new().add_attribute("action", "pause_contract"))
498}
499
500pub fn execute_unpause(deps: DepsMut, info: MessageInfo) -> Result<Response, ContractError> {
501    if !PAUSED.load(deps.storage)? {
502        return Err(ContractError::ContractUnpaused);
503    }
504    let config = CONFIG.load(deps.storage)?;
505    if info.sender != config.owner_addr {
506        return Err(ContractError::Unauthorized {});
507    }
508    PAUSED.save(deps.storage, &false)?;
509    Ok(Response::new().add_attribute("action", "unpause_contract"))
510}
511
512#[cfg_attr(not(feature = "library"), entry_point)]
513pub fn query(deps: Deps, env: Env, msg: QueryMsg) -> StdResult<Binary> {
514    match msg {
515        QueryMsg::Config {} => to_binary(&CONFIG.load(deps.storage)?),
516        QueryMsg::Paused {} => to_binary(&PAUSED.load(deps.storage)?),
517        QueryMsg::TasksTotal {} => to_binary(&cosmwasm_std::Uint64::from(query_tasks_total(deps)?)),
518        QueryMsg::CurrentTaskInfo {} => to_binary(&query_current_task_info(deps, env)?),
519        QueryMsg::CurrentTask {} => to_binary(&query_current_task(deps, env)?),
520        QueryMsg::Tasks { from_index, limit } => to_binary(&query_tasks(deps, from_index, limit)?),
521        QueryMsg::EventedIds { from_index, limit } => {
522            to_binary(&query_evented_ids(deps, from_index, limit)?)
523        }
524        QueryMsg::EventedHashes {
525            id,
526            from_index,
527            limit,
528        } => to_binary(&query_evented_hashes(deps, id, from_index, limit)?),
529        QueryMsg::EventedTasks {
530            start,
531            from_index,
532            limit,
533        } => to_binary(&query_evented_tasks(deps, env, start, from_index, limit)?),
534        QueryMsg::TasksByOwner {
535            owner_addr,
536            from_index,
537            limit,
538        } => to_binary(&query_tasks_by_owner(deps, owner_addr, from_index, limit)?),
539        QueryMsg::Task { task_hash } => to_binary(&query_task(deps, task_hash)?),
540        QueryMsg::TaskHash { task } => to_binary(&query_task_hash(deps, *task)?),
541        QueryMsg::SlotHashes { slot } => to_binary(&query_slot_hashes(deps, slot)?),
542        QueryMsg::SlotIds { from_index, limit } => {
543            to_binary(&query_slot_ids(deps, from_index, limit)?)
544        }
545        QueryMsg::SlotTasksTotal { offset } => {
546            to_binary(&query_slot_tasks_total(deps, env, offset)?)
547        }
548    }
549}
550
551fn query_tasks_total(deps: Deps) -> StdResult<u64> {
552    TASKS_TOTAL.load(deps.storage)
553}
554
555// returns the total task count & last task creation timestamp for agent nomination checks
556fn query_current_task_info(deps: Deps, _env: Env) -> StdResult<CurrentTaskInfoResponse> {
557    Ok(CurrentTaskInfoResponse {
558        total: Uint64::from(query_tasks_total(deps).unwrap()),
559        last_created_task: LAST_TASK_CREATION.load(deps.storage)?,
560    })
561}
562
563// Offset can be defined for forward looking task amounts, but will default to current block - 1
564// NOTE: Subtracts 1 block so that during the current finalized block we can see the upcoming work
565// and react accordingly - current task will always be 1 block ahead for signing to occur accurately.
566fn query_slot_tasks_total(
567    deps: Deps,
568    env: Env,
569    offset: Option<u64>,
570) -> StdResult<SlotTasksTotalResponse> {
571    if let Some(off) = offset {
572        let config = CONFIG.load(deps.storage)?;
573        let block_tasks = BLOCK_SLOTS
574            .may_load(deps.storage, env.block.height + off)?
575            .unwrap_or_default()
576            .len() as u64;
577        let evented_tasks = EVENTED_TASKS_LOOKUP
578            .may_load(deps.storage, env.block.height + off)?
579            .unwrap_or_default()
580            .len() as u64;
581
582        let current_block_ts = env.block.time.nanos();
583        let current_block_slot =
584            current_block_ts.saturating_sub(current_block_ts % config.slot_granularity_time);
585        let cron_tasks = TIME_SLOTS
586            .may_load(
587                deps.storage,
588                current_block_slot + config.slot_granularity_time * off,
589            )?
590            .unwrap_or_default()
591            .len() as u64;
592        Ok(SlotTasksTotalResponse {
593            block_tasks,
594            cron_tasks,
595            evented_tasks,
596        })
597    } else {
598        // NOTE: Using addition here since the range uses "max", which is still subtracting current block
599        let block_height = env.block.height.saturating_add(1);
600        // Follows the same logic as block, needs to be far enough in past to cover ready tasks
601        let block_time = env.block.time.plus_seconds(6).nanos();
602        let block_slots: Vec<(u64, Vec<Vec<u8>>)> = BLOCK_SLOTS
603            .range(
604                deps.storage,
605                None,
606                Some(Bound::inclusive(block_height)),
607                Order::Ascending,
608            )
609            .collect::<StdResult<_>>()?;
610
611        let block_tasks = block_slots
612            .iter()
613            .fold(0, |acc, (_, hashes)| acc + hashes.len()) as u64;
614
615        let evented_task_list: Vec<(u64, Vec<Vec<u8>>)> = EVENTED_TASKS_LOOKUP
616            .range(
617                deps.storage,
618                None,
619                Some(Bound::inclusive(block_height)),
620                Order::Ascending,
621            )
622            .collect::<StdResult<_>>()?;
623
624        let evented_tasks = evented_task_list
625            .iter()
626            .fold(0, |acc, (_, hashes)| acc + hashes.len()) as u64;
627
628        let time_slot: Vec<(u64, Vec<Vec<u8>>)> = TIME_SLOTS
629            .range(
630                deps.storage,
631                None,
632                Some(Bound::inclusive(block_time)),
633                Order::Ascending,
634            )
635            .collect::<StdResult<_>>()?;
636
637        let cron_tasks = time_slot
638            .iter()
639            .fold(0, |acc, (_, hashes)| acc + hashes.len()) as u64;
640        Ok(SlotTasksTotalResponse {
641            block_tasks,
642            cron_tasks,
643            evented_tasks,
644        })
645    }
646}
647
648/// Get the slot with lowest height/timestamp
649/// NOTE: This prioritizes blocks over timestamps
650/// Why blocks over timestamps? Time-based tasks have a wider granularity than blocks.
651/// For example, the default configuration for time granularity is 10 seconds. This means
652/// on average a time-based task could occur on 1 of 2 blocks. This configuration is aimed
653/// at a larger window of time because timestamp guarantees are virtually impossible, without
654/// protocol level mechanisms. Since this is the case, timestamps should be considered more
655/// flexible in execution windows. Future versions of this contract can ensure better
656/// execution guarantees, based on whether the timestamp is nearing the end of its block-span
657/// window (timestamp end is closer to 2nd block than 1st).
658fn query_current_task(deps: Deps, env: Env) -> StdResult<TaskResponse> {
659    let config = CONFIG.load(deps.storage)?;
660    let mut block_slot: Vec<(u64, Vec<Vec<u8>>)> = BLOCK_SLOTS
661        .range(
662            deps.storage,
663            None,
664            // NOTE: Remove 1 block so that during the current finalized block we can see the upcoming work
665            // and react accordingly - current task will always be 1 block ahead for signing to occur accurately.
666            Some(Bound::inclusive(env.block.height.saturating_add(1))),
667            Order::Ascending,
668        )
669        .take(1)
670        .collect::<StdResult<_>>()?;
671    if !block_slot.is_empty() {
672        let task_hash = block_slot.pop().unwrap().1.pop().unwrap();
673        let task = tasks_map().load(deps.storage, &task_hash)?;
674        Ok(task.into_response(&config.chain_name))
675    } else {
676        let mut time_slot: Vec<(u64, Vec<Vec<u8>>)> = TIME_SLOTS
677            .range(
678                deps.storage,
679                None,
680                Some(Bound::inclusive(env.block.time.plus_nanos(6).nanos())),
681                Order::Ascending,
682            )
683            .take(1)
684            .collect::<StdResult<_>>()?;
685        if !time_slot.is_empty() {
686            let task_hash = time_slot.pop().unwrap().1.pop().unwrap();
687            let task = tasks_map().load(deps.storage, &task_hash)?;
688            Ok(task.into_response(&config.chain_name))
689        } else {
690            Ok(TaskResponse { task: None })
691        }
692    }
693}
694
695fn query_tasks(
696    deps: Deps,
697    from_index: Option<u64>,
698    limit: Option<u64>,
699) -> StdResult<Vec<TaskInfo>> {
700    let config = CONFIG.load(deps.storage)?;
701
702    let from_index = from_index.unwrap_or_default();
703    let limit = limit.unwrap_or(100);
704
705    tasks_map()
706        .range(deps.storage, None, None, Order::Ascending)
707        .skip(from_index as usize)
708        .take(limit as usize)
709        .map(|task_res| {
710            task_res.map(|(_, task)| task.into_response(&config.chain_name).task.unwrap())
711        })
712        .collect()
713}
714
715fn query_evented_tasks(
716    deps: Deps,
717    env: Env,
718    start: Option<u64>,
719    from_index: Option<u64>,
720    limit: Option<u64>,
721) -> StdResult<Vec<TaskInfo>> {
722    let config = CONFIG.load(deps.storage)?;
723    let from_index = from_index.unwrap_or(DEFAULT_PAGINATION_FROM_INDEX);
724    let limit = limit.unwrap_or(DEFAULT_PAGINATION_LIMIT);
725    let tm = tasks_map();
726
727    let mut evented_hashes: Vec<Vec<u8>> = Vec::new();
728    let mut all_tasks: Vec<TaskInfo> = Vec::new();
729
730    // Check if start was supplied, otherwise get the next ids for block and time
731    if let Some(i) = start {
732        evented_hashes = EVENTED_TASKS_LOOKUP
733            .may_load(deps.storage, i)?
734            .unwrap_or_default();
735    } else {
736        let block_evented: Vec<(u64, _)> = EVENTED_TASKS_LOOKUP
737            .range(
738                deps.storage,
739                Some(Bound::inclusive(env.block.height)),
740                None,
741                Order::Ascending,
742            )
743            .skip(from_index as usize)
744            .take(limit as usize)
745            .collect::<StdResult<Vec<(u64, _)>>>()?;
746
747        if !block_evented.is_empty() {
748            for item in block_evented {
749                evented_hashes = [evented_hashes, item.1].concat();
750            }
751        }
752
753        let time_evented: Vec<(u64, _)> = EVENTED_TASKS_LOOKUP
754            .range(
755                deps.storage,
756                Some(Bound::inclusive(env.block.time.nanos())),
757                None,
758                Order::Ascending,
759            )
760            .take(1)
761            .collect::<StdResult<Vec<(u64, _)>>>()?;
762
763        if !time_evented.is_empty() {
764            for item in time_evented {
765                evented_hashes = [evented_hashes, item.1].concat();
766            }
767        }
768    }
769
770    // Loop and get all associated tasks by hash
771    for t in evented_hashes {
772        if let Some(task) = tm.may_load(deps.storage, &t)? {
773            all_tasks.push(task.into_response(&config.chain_name).task.unwrap())
774        }
775    }
776
777    Ok(all_tasks)
778}
779
780fn query_evented_ids(
781    deps: Deps,
782    from_index: Option<u64>,
783    limit: Option<u64>,
784) -> StdResult<Vec<u64>> {
785    let from_index = from_index.unwrap_or(DEFAULT_PAGINATION_FROM_INDEX);
786    let limit = limit.unwrap_or(DEFAULT_PAGINATION_LIMIT);
787
788    let evented_ids = EVENTED_TASKS_LOOKUP
789        .keys(deps.storage, None, None, Order::Ascending)
790        .skip(from_index as usize)
791        .take(limit as usize)
792        .collect::<StdResult<_>>()?;
793
794    Ok(evented_ids)
795}
796
797fn query_evented_hashes(
798    deps: Deps,
799    id: Option<u64>,
800    from_index: Option<u64>,
801    limit: Option<u64>,
802) -> StdResult<Vec<String>> {
803    let mut evented_hashes: Vec<Vec<u8>> = Vec::new();
804    let from_index = from_index.unwrap_or(DEFAULT_PAGINATION_FROM_INDEX);
805    let limit = limit.unwrap_or(DEFAULT_PAGINATION_LIMIT);
806
807    // Check if slot was supplied, otherwise get the next slots for block and time
808    if let Some(i) = id {
809        evented_hashes = EVENTED_TASKS_LOOKUP
810            .may_load(deps.storage, i)?
811            .unwrap_or_default();
812    } else {
813        let evented: Vec<(u64, _)> = EVENTED_TASKS_LOOKUP
814            .range(deps.storage, None, None, Order::Ascending)
815            .skip(from_index as usize)
816            .take(limit as usize)
817            .collect::<StdResult<Vec<(u64, _)>>>()?;
818
819        if !evented.is_empty() {
820            for item in evented {
821                evented_hashes = [evented_hashes, item.1].concat();
822            }
823        }
824    }
825
826    // Generate strings for all hashes
827    let evented_task_hashes: Vec<_> = evented_hashes
828        .iter()
829        .map(|t| String::from_utf8(t.to_vec()).unwrap_or_else(|_| "".to_string()))
830        .collect();
831
832    Ok(evented_task_hashes)
833}
834
835fn query_tasks_by_owner(
836    deps: Deps,
837    owner_addr: String,
838    from_index: Option<u64>,
839    limit: Option<u64>,
840) -> StdResult<Vec<TaskInfo>> {
841    let owner_addr = deps.api.addr_validate(&owner_addr)?;
842    let config = CONFIG.load(deps.storage)?;
843
844    let from_index = from_index.unwrap_or_default();
845    let limit = limit.unwrap_or(100);
846
847    tasks_map()
848        .idx
849        .owner
850        .prefix(owner_addr)
851        .range(deps.storage, None, None, Order::Ascending)
852        .skip(from_index as usize)
853        .take(limit as usize)
854        .map(|task_res| {
855            task_res.map(|(_, task)| task.into_response(&config.chain_name).task.unwrap())
856        })
857        .collect()
858}
859
860fn query_task(deps: Deps, task_hash: String) -> StdResult<TaskResponse> {
861    let config = CONFIG.load(deps.storage)?;
862
863    if let Some(task) = tasks_map().may_load(deps.storage, task_hash.as_bytes())? {
864        Ok(task.into_response(&config.chain_name))
865    } else {
866        Ok(TaskResponse { task: None })
867    }
868}
869
870fn query_task_hash(deps: Deps, task: Task) -> StdResult<String> {
871    let config = CONFIG.load(deps.storage)?;
872    Ok(task.to_hash(&config.chain_name))
873}
874
875fn query_slot_hashes(deps: Deps, slot: Option<u64>) -> StdResult<SlotHashesResponse> {
876    let mut block_id: u64 = 0;
877    let mut block_hashes: Vec<Vec<u8>> = Vec::new();
878    let mut time_id: u64 = 0;
879    let mut time_hashes: Vec<Vec<u8>> = Vec::new();
880
881    // Check if slot was supplied, otherwise get the next slots for block and time
882    if let Some(id) = slot {
883        block_hashes = BLOCK_SLOTS.may_load(deps.storage, id)?.unwrap_or_default();
884        if !block_hashes.is_empty() {
885            block_id = id;
886        }
887        time_hashes = TIME_SLOTS.may_load(deps.storage, id)?.unwrap_or_default();
888        if !time_hashes.is_empty() {
889            time_id = id;
890        }
891    } else {
892        let time: Vec<(u64, _)> = TIME_SLOTS
893            .range(deps.storage, None, None, Order::Ascending)
894            .take(1)
895            .collect::<StdResult<Vec<(u64, _)>>>()?;
896
897        if !time.is_empty() {
898            let slot = time[0].clone();
899            time_id = slot.0;
900            time_hashes = slot.1;
901        }
902
903        let block: Vec<(u64, _)> = BLOCK_SLOTS
904            .range(deps.storage, None, None, Order::Ascending)
905            .take(1)
906            .collect::<StdResult<Vec<(u64, _)>>>()?;
907
908        if !block.is_empty() {
909            let slot = block[0].clone();
910            block_id = slot.0;
911            block_hashes = slot.1;
912        }
913    }
914
915    // Generate strings for all hashes
916    let block_task_hash: Vec<_> = block_hashes
917        .iter()
918        .map(|b| String::from_utf8(b.to_vec()).unwrap_or_else(|_| "".to_string()))
919        .collect();
920    let time_task_hash: Vec<_> = time_hashes
921        .iter()
922        .map(|t| String::from_utf8(t.to_vec()).unwrap_or_else(|_| "".to_string()))
923        .collect();
924
925    Ok(SlotHashesResponse {
926        block_id,
927        block_task_hash,
928        time_id,
929        time_task_hash,
930    })
931}
932
933fn query_slot_ids(
934    deps: Deps,
935    from_index: Option<u64>,
936    limit: Option<u64>,
937) -> StdResult<SlotIdsResponse> {
938    let from_index = from_index.unwrap_or_default();
939    let limit = limit.unwrap_or(100);
940
941    let time_ids = TIME_SLOTS
942        .keys(deps.storage, None, None, Order::Ascending)
943        .skip(from_index as usize)
944        .take(limit as usize)
945        .collect::<StdResult<_>>()?;
946    let block_ids = BLOCK_SLOTS
947        .keys(deps.storage, None, None, Order::Ascending)
948        .skip(from_index as usize)
949        .take(limit as usize)
950        .collect::<StdResult<_>>()?;
951
952    Ok(SlotIdsResponse {
953        time_ids,
954        block_ids,
955    })
956}
957
958fn validate_non_zero_value(opt_num: Option<u64>, field_name: &str) -> Result<(), ContractError> {
959    if let Some(num) = opt_num {
960        if num == 0u64 {
961            Err(InvalidZeroValue {
962                field: field_name.to_string(),
963            })
964        } else {
965            Ok(())
966        }
967    } else {
968        Ok(())
969    }
970}
971
972/// Before changing the configuration's gas_limit, ensure it's a reasonable
973/// value, meaning higher than the sum of its needed parts
974fn validate_gas_limit(config: &Config) -> Result<(), ContractError> {
975    if config.gas_limit > config.gas_base_fee + config.gas_action_fee + config.gas_query_fee {
976        Ok(())
977    } else {
978        // Must be greater than those params
979        Err(ContractError::InvalidGas {})
980    }
981}