1use crate::ContractError::InvalidZeroValue;
2#[cfg(not(feature = "library"))]
3use cosmwasm_std::entry_point;
4use cosmwasm_std::{
5 to_binary, Attribute, Binary, Deps, DepsMut, Env, MessageInfo, Order, Response, StdResult,
6 Uint64,
7};
8use croncat_sdk_core::internal_messages::agents::AgentOnTaskCreated;
9use croncat_sdk_core::internal_messages::manager::{ManagerCreateTaskBalance, ManagerRemoveTask};
10use croncat_sdk_core::internal_messages::tasks::{TasksRemoveTaskByManager, TasksRescheduleTask};
11use croncat_sdk_core::types::{DEFAULT_PAGINATION_FROM_INDEX, DEFAULT_PAGINATION_LIMIT};
12use croncat_sdk_tasks::msg::UpdateConfigMsg;
13use croncat_sdk_tasks::types::{
14 Config, CurrentTaskInfoResponse, Interval, SlotHashesResponse, SlotIdsResponse,
15 SlotTasksTotalResponse, SlotType, Task, TaskExecutionInfo, TaskInfo, TaskRequest, TaskResponse,
16};
17use cw2::set_contract_version;
18use cw20::Cw20CoinVerified;
19use cw_storage_plus::Bound;
20
21use crate::error::ContractError;
22use crate::helpers::{
23 check_if_sender_is_manager, get_agents_addr, get_manager_addr, remove_task, validate_boundary,
24 validate_msg_calculate_usage, validate_queries, validate_transforms,
25};
26use crate::msg::{ExecuteMsg, InstantiateMsg, QueryMsg};
27use crate::state::{
28 tasks_map, BLOCK_SLOTS, CONFIG, EVENTED_TASKS_LOOKUP, LAST_TASK_CREATION, PAUSED, TASKS_TOTAL,
29 TIME_SLOTS,
30};
31
32const CONTRACT_NAME: &str = "crate:croncat-tasks";
33const CONTRACT_VERSION: &str = env!("CARGO_PKG_VERSION");
34
35pub(crate) const GAS_BASE_FEE: u64 = 300_000;
38pub(crate) const GAS_ACTION_FEE: u64 = 130_000;
39pub(crate) const GAS_QUERY_FEE: u64 = 130_000; pub(crate) const GAS_LIMIT: u64 = 3_000_000; pub(crate) const SLOT_GRANULARITY_TIME: u64 = 10_000_000_000; #[cfg_attr(not(feature = "library"), entry_point)]
44pub fn instantiate(
45 deps: DepsMut,
46 _env: Env,
47 info: MessageInfo,
48 msg: InstantiateMsg,
49) -> Result<Response, ContractError> {
50 let InstantiateMsg {
51 chain_name,
52 version,
53 pause_admin,
54 croncat_manager_key,
55 croncat_agents_key,
56 slot_granularity_time,
57 gas_base_fee,
58 gas_action_fee,
59 gas_limit,
60 gas_query_fee,
61 } = msg;
62
63 validate_non_zero_value(slot_granularity_time, "slot_granularity_time")?;
64
65 let contract_version = version.unwrap_or_else(|| CONTRACT_VERSION.to_string());
66 set_contract_version(deps.storage, CONTRACT_NAME, &contract_version)?;
67
68 let owner_addr = info.sender.clone();
69
70 if !(61usize..=74usize).contains(&pause_admin.to_string().len()) {
76 return Err(ContractError::InvalidPauseAdmin {});
77 }
78
79 let config = Config {
80 chain_name,
81 version: contract_version,
82 owner_addr,
83 pause_admin,
84 croncat_factory_addr: info.sender,
85 croncat_manager_key,
86 croncat_agents_key,
87 slot_granularity_time: slot_granularity_time.unwrap_or(SLOT_GRANULARITY_TIME),
88 gas_base_fee: gas_base_fee.unwrap_or(GAS_BASE_FEE),
89 gas_action_fee: gas_action_fee.unwrap_or(GAS_ACTION_FEE),
90 gas_query_fee: gas_query_fee.unwrap_or(GAS_QUERY_FEE),
91 gas_limit: gas_limit.unwrap_or(GAS_LIMIT),
92 };
93
94 validate_gas_limit(&config)?;
96
97 CONFIG.save(deps.storage, &config)?;
99 PAUSED.save(deps.storage, &false)?;
100 TASKS_TOTAL.save(deps.storage, &0)?;
101 Ok(Response::new().add_attribute("action", "instantiate"))
102}
103
104#[cfg_attr(not(feature = "library"), entry_point)]
105pub fn execute(
106 deps: DepsMut,
107 env: Env,
108 info: MessageInfo,
109 msg: ExecuteMsg,
110) -> Result<Response, ContractError> {
111 match msg {
112 ExecuteMsg::UpdateConfig(msg) => execute_update_config(deps, info, msg),
113 ExecuteMsg::CreateTask { task } => execute_create_task(deps, env, info, *task),
114 ExecuteMsg::RemoveTask { task_hash } => execute_remove_task(deps, info, task_hash),
115 ExecuteMsg::RemoveTaskByManager(remove_task_msg) => {
117 execute_remove_task_by_manager(deps, info, remove_task_msg)
118 }
119 ExecuteMsg::RescheduleTask(reschedule_msg) => {
120 execute_reschedule_task(deps, env, info, reschedule_msg)
121 }
122 ExecuteMsg::PauseContract {} => execute_pause(deps, info),
123 ExecuteMsg::UnpauseContract {} => execute_unpause(deps, info),
124 }
125}
126
127fn execute_update_config(
128 deps: DepsMut,
129 info: MessageInfo,
130 msg: UpdateConfigMsg,
131) -> Result<Response, ContractError> {
132 let config = CONFIG.load(deps.storage)?;
133
134 if info.sender != config.owner_addr {
135 return Err(ContractError::Unauthorized {});
136 }
137
138 let UpdateConfigMsg {
140 croncat_factory_addr,
141 croncat_manager_key,
142 croncat_agents_key,
143 slot_granularity_time,
144 gas_base_fee,
145 gas_action_fee,
146 gas_query_fee,
147 gas_limit,
148 } = msg;
149
150 let new_config = Config {
151 owner_addr: config.owner_addr,
152 pause_admin: config.pause_admin,
153 croncat_factory_addr: croncat_factory_addr
154 .map(|human| deps.api.addr_validate(&human))
155 .transpose()?
156 .unwrap_or(config.croncat_factory_addr),
157 chain_name: config.chain_name,
158 version: config.version,
159 croncat_manager_key: croncat_manager_key.unwrap_or(config.croncat_manager_key),
160 croncat_agents_key: croncat_agents_key.unwrap_or(config.croncat_agents_key),
161 slot_granularity_time: slot_granularity_time.unwrap_or(config.slot_granularity_time),
162 gas_base_fee: gas_base_fee.unwrap_or(config.gas_base_fee),
163 gas_action_fee: gas_action_fee.unwrap_or(config.gas_action_fee),
164 gas_query_fee: gas_query_fee.unwrap_or(config.gas_query_fee),
165 gas_limit: gas_limit.unwrap_or(config.gas_limit),
166 };
167
168 validate_gas_limit(&new_config)?;
170 validate_non_zero_value(slot_granularity_time, "slot_granularity_time")?;
171
172 CONFIG.save(deps.storage, &new_config)?;
173 Ok(Response::new().add_attribute("action", "update_config"))
174}
175
176fn execute_reschedule_task(
177 deps: DepsMut,
178 env: Env,
179 info: MessageInfo,
180 reschedule_msg: TasksRescheduleTask,
181) -> Result<Response, ContractError> {
182 let task_hash = reschedule_msg.task_hash;
183 let config = CONFIG.load(deps.storage)?;
184 check_if_sender_is_manager(&deps.querier, &config, &info.sender)?;
185
186 let mut task_to_remove = None;
187 let mut res_attributes: Vec<Attribute> = vec![];
188
189 let (next_id, slot_kind) = if let Some(task) = tasks_map().may_load(deps.storage, &task_hash)? {
191 let (next_id, slot_kind) =
192 task.interval
193 .next(&env, &task.boundary, config.slot_granularity_time);
194
195 res_attributes.push(Attribute::new(
196 "task_hash",
197 task.to_hash(&config.chain_name),
198 ));
199 res_attributes.push(Attribute::new("task_version", task.version.to_owned()));
200
201 if next_id != 0 && !task.is_evented() && task.interval != Interval::Once {
204 res_attributes.push(Attribute::new("action", "reschedule_task"));
205 let update_vec_data = |d: Option<Vec<Vec<u8>>>| -> StdResult<Vec<Vec<u8>>> {
207 match d {
208 Some(data) => {
210 let mut s = data;
211 s.push(task_hash);
212 Ok(s)
213 }
214 None => Ok(vec![task_hash]),
216 }
217 };
218 match slot_kind {
220 SlotType::Block => {
221 BLOCK_SLOTS.update(deps.storage, next_id, update_vec_data)?;
222 let mut block_slot: Vec<(u64, Vec<Vec<u8>>)> = BLOCK_SLOTS
224 .range(
225 deps.storage,
226 None,
227 Some(Bound::inclusive(env.block.height)),
228 Order::Ascending,
229 )
230 .take(1)
231 .collect::<StdResult<_>>()?;
232 let mut slot = block_slot.pop().unwrap();
233 slot.1.pop();
234 if slot.1.is_empty() {
235 BLOCK_SLOTS.remove(deps.storage, slot.0)
236 } else {
237 BLOCK_SLOTS.save(deps.storage, slot.0, &slot.1)?;
238 }
239 }
240 SlotType::Cron => {
241 TIME_SLOTS.update(deps.storage, next_id, update_vec_data)?;
242 let mut time_slot: Vec<(u64, Vec<Vec<u8>>)> = TIME_SLOTS
244 .range(
245 deps.storage,
246 None,
247 Some(Bound::inclusive(env.block.time.nanos())),
248 Order::Ascending,
249 )
250 .take(1)
251 .collect::<StdResult<_>>()?;
252 let mut slot = time_slot.pop().unwrap();
253 slot.1.pop();
254 if slot.1.is_empty() {
255 TIME_SLOTS.remove(deps.storage, slot.0)
256 } else {
257 TIME_SLOTS.save(deps.storage, slot.0, &slot.1)?;
258 }
259 }
260 }
261 } else if !task.is_evented() {
262 remove_task(
263 deps.storage,
264 &task_hash,
265 task.boundary.is_block(),
266 task.is_evented(),
267 )?;
268 task_to_remove = Some(ManagerRemoveTask {
269 sender: task.owner_addr,
270 task_hash,
271 });
272 res_attributes.push(Attribute::new("action", "remove_task"));
273 } else if task.is_evented() {
274 res_attributes.push(Attribute::new("action", "continue_task"));
276 }
277 (next_id, slot_kind)
278 } else {
279 return Err(ContractError::NoTaskFound {});
280 };
281
282 Ok(Response::new()
283 .add_attributes(res_attributes)
284 .add_attribute("slot_id", next_id.to_string())
285 .add_attribute("slot_kind", slot_kind.to_string())
286 .set_data(to_binary(&task_to_remove)?))
287}
288
289fn execute_remove_task_by_manager(
290 deps: DepsMut,
291 info: MessageInfo,
292 remove_task_msg: TasksRemoveTaskByManager,
293) -> Result<Response, ContractError> {
294 let task_hash = remove_task_msg.task_hash;
295 let config = CONFIG.load(deps.storage)?;
296 check_if_sender_is_manager(&deps.querier, &config, &info.sender)?;
297
298 if let Some(task) = tasks_map().may_load(deps.storage, &task_hash)? {
299 remove_task(
300 deps.storage,
301 &task_hash,
302 task.boundary.is_block(),
303 task.is_evented(),
304 )?;
305 } else {
306 return Err(ContractError::NoTaskFound {});
307 }
308
309 Ok(Response::new().add_attribute("action", "remove_task_by_manager"))
310}
311
312fn execute_remove_task(
313 deps: DepsMut,
314 info: MessageInfo,
315 task_hash: String,
316) -> Result<Response, ContractError> {
317 if PAUSED.load(deps.storage)? {
318 return Err(ContractError::ContractPaused);
319 }
320 let config = CONFIG.load(deps.storage)?;
321 let hash = task_hash.as_bytes();
322 if let Some(task) = tasks_map().may_load(deps.storage, hash)? {
323 if task.owner_addr != info.sender {
324 return Err(ContractError::Unauthorized {});
325 }
326 remove_task(
327 deps.storage,
328 hash,
329 task.boundary.is_block(),
330 task.is_evented(),
331 )?;
332 } else {
333 return Err(ContractError::NoTaskFound {});
334 }
335 let manager_addr = get_manager_addr(&deps.querier, &config)?;
336 let remove_task_msg = ManagerRemoveTask {
337 sender: info.sender,
338 task_hash: task_hash.into_bytes(),
339 }
340 .into_cosmos_msg(manager_addr)?;
341 Ok(Response::new()
342 .add_attribute("action", "remove_task")
343 .add_message(remove_task_msg))
344}
345
346fn execute_create_task(
347 deps: DepsMut,
348 env: Env,
349 info: MessageInfo,
350 task: TaskRequest,
351) -> Result<Response, ContractError> {
352 if PAUSED.load(deps.storage)? {
353 return Err(ContractError::ContractPaused);
354 }
355 let config = CONFIG.load(deps.storage)?;
356 let owner_addr = info.sender;
357
358 let boundary = validate_boundary(&env.block, task.boundary.clone(), &task.interval)?;
360
361 let amount_for_one_task = validate_msg_calculate_usage(
362 deps.as_ref(),
363 &task,
364 &env.contract.address,
365 &owner_addr,
366 &config,
367 )?;
368 if amount_for_one_task.gas > config.gas_limit {
369 return Err(ContractError::InvalidGas {});
370 }
371 let cw20 = task
372 .cw20
373 .map(|human| {
374 StdResult::Ok(Cw20CoinVerified {
375 address: deps.api.addr_validate(&human.address)?,
376 amount: human.amount,
377 })
378 })
379 .transpose()?;
380
381 let item = Task {
382 owner_addr: owner_addr.clone(),
383 interval: task.interval,
384 boundary,
385 stop_on_fail: task.stop_on_fail,
386 amount_for_one_task: amount_for_one_task.clone(),
387 actions: task.actions,
388 queries: task.queries.unwrap_or_default(),
390 transforms: task.transforms.unwrap_or_default(),
391 version: config.version.clone(),
392 };
393 if !item.interval.is_valid() {
394 return Err(ContractError::InvalidInterval {});
395 }
396 if !validate_queries(&deps.as_ref(), &item.queries) {
397 return Err(ContractError::InvalidQueries {});
398 }
399 if !validate_transforms(&item) {
400 return Err(ContractError::InvalidTransform {});
401 }
402
403 let hash_prefix = &config.chain_name;
404 let hash = item.to_hash(hash_prefix);
405
406 let (next_id, slot_kind) =
407 item.interval
408 .next(&env, &item.boundary, config.slot_granularity_time);
409 if next_id == 0 {
410 return Err(ContractError::TaskEnded {});
411 }
412
413 let recurring = item.recurring();
414 let hash_vec = hash.clone().into_bytes();
415 let mut attributes: Vec<Attribute> = vec![];
416
417 TASKS_TOTAL.update(deps.storage, |amt| -> StdResult<_> { Ok(amt + 1) })?;
419 tasks_map().update(deps.storage, &hash_vec, |old| match old {
420 Some(_) => Err(ContractError::TaskExists {}),
421 None => Ok(item.clone()),
422 })?;
423
424 let update_vec_data = |d: Option<Vec<Vec<u8>>>| -> StdResult<Vec<Vec<u8>>> {
426 match d {
427 Some(data) => {
429 let mut s = data;
430 s.push(hash_vec.clone());
431 Ok(s)
432 }
433 None => Ok(vec![hash_vec.clone()]),
435 }
436 };
437
438 if item.is_evented() {
439 EVENTED_TASKS_LOOKUP.update(deps.storage, next_id, update_vec_data)?;
440 attributes.push(Attribute::new("evented_id", next_id.to_string()));
441 } else {
442 match slot_kind {
444 SlotType::Block => {
445 BLOCK_SLOTS.update(deps.storage, next_id, update_vec_data)?;
446 }
447 SlotType::Cron => {
448 TIME_SLOTS.update(deps.storage, next_id, update_vec_data)?;
449 }
450 }
451 attributes.push(Attribute::new("slot_id", next_id.to_string()));
452 attributes.push(Attribute::new("slot_kind", slot_kind.to_string()));
453 }
454
455 LAST_TASK_CREATION.save(deps.storage, &env.block.time)?;
457
458 let manager_addr = get_manager_addr(&deps.querier, &config)?;
459 let manager_create_task_balance_msg = ManagerCreateTaskBalance {
460 sender: owner_addr,
461 task_hash: hash_vec,
462 recurring,
463 cw20,
464 amount_for_one_task,
465 }
466 .into_cosmos_msg(manager_addr, info.funds)?;
467
468 let agent_addr = get_agents_addr(&deps.querier, &config)?;
469 let agent_new_task_msg = AgentOnTaskCreated {}.into_cosmos_msg(agent_addr)?;
470 let response_data = TaskExecutionInfo {
471 block_height: env.block.height,
472 tx_info: env.transaction,
473 task_hash: hash.clone(),
474 owner_addr: item.owner_addr,
475 amount_for_one_task: item.amount_for_one_task,
476 version: item.version.clone(),
477 };
478 Ok(Response::new()
479 .set_data(to_binary(&response_data)?)
480 .add_attribute("action", "create_task")
481 .add_attributes(attributes)
482 .add_attribute("task_hash", hash)
483 .add_attribute("task_version", item.version)
484 .add_message(manager_create_task_balance_msg)
485 .add_message(agent_new_task_msg))
486}
487
488pub fn execute_pause(deps: DepsMut, info: MessageInfo) -> Result<Response, ContractError> {
489 if PAUSED.load(deps.storage)? {
490 return Err(ContractError::ContractPaused);
491 }
492 let config = CONFIG.load(deps.storage)?;
493 if info.sender != config.pause_admin {
494 return Err(ContractError::Unauthorized {});
495 }
496 PAUSED.save(deps.storage, &true)?;
497 Ok(Response::new().add_attribute("action", "pause_contract"))
498}
499
500pub fn execute_unpause(deps: DepsMut, info: MessageInfo) -> Result<Response, ContractError> {
501 if !PAUSED.load(deps.storage)? {
502 return Err(ContractError::ContractUnpaused);
503 }
504 let config = CONFIG.load(deps.storage)?;
505 if info.sender != config.owner_addr {
506 return Err(ContractError::Unauthorized {});
507 }
508 PAUSED.save(deps.storage, &false)?;
509 Ok(Response::new().add_attribute("action", "unpause_contract"))
510}
511
512#[cfg_attr(not(feature = "library"), entry_point)]
513pub fn query(deps: Deps, env: Env, msg: QueryMsg) -> StdResult<Binary> {
514 match msg {
515 QueryMsg::Config {} => to_binary(&CONFIG.load(deps.storage)?),
516 QueryMsg::Paused {} => to_binary(&PAUSED.load(deps.storage)?),
517 QueryMsg::TasksTotal {} => to_binary(&cosmwasm_std::Uint64::from(query_tasks_total(deps)?)),
518 QueryMsg::CurrentTaskInfo {} => to_binary(&query_current_task_info(deps, env)?),
519 QueryMsg::CurrentTask {} => to_binary(&query_current_task(deps, env)?),
520 QueryMsg::Tasks { from_index, limit } => to_binary(&query_tasks(deps, from_index, limit)?),
521 QueryMsg::EventedIds { from_index, limit } => {
522 to_binary(&query_evented_ids(deps, from_index, limit)?)
523 }
524 QueryMsg::EventedHashes {
525 id,
526 from_index,
527 limit,
528 } => to_binary(&query_evented_hashes(deps, id, from_index, limit)?),
529 QueryMsg::EventedTasks {
530 start,
531 from_index,
532 limit,
533 } => to_binary(&query_evented_tasks(deps, env, start, from_index, limit)?),
534 QueryMsg::TasksByOwner {
535 owner_addr,
536 from_index,
537 limit,
538 } => to_binary(&query_tasks_by_owner(deps, owner_addr, from_index, limit)?),
539 QueryMsg::Task { task_hash } => to_binary(&query_task(deps, task_hash)?),
540 QueryMsg::TaskHash { task } => to_binary(&query_task_hash(deps, *task)?),
541 QueryMsg::SlotHashes { slot } => to_binary(&query_slot_hashes(deps, slot)?),
542 QueryMsg::SlotIds { from_index, limit } => {
543 to_binary(&query_slot_ids(deps, from_index, limit)?)
544 }
545 QueryMsg::SlotTasksTotal { offset } => {
546 to_binary(&query_slot_tasks_total(deps, env, offset)?)
547 }
548 }
549}
550
551fn query_tasks_total(deps: Deps) -> StdResult<u64> {
552 TASKS_TOTAL.load(deps.storage)
553}
554
555fn query_current_task_info(deps: Deps, _env: Env) -> StdResult<CurrentTaskInfoResponse> {
557 Ok(CurrentTaskInfoResponse {
558 total: Uint64::from(query_tasks_total(deps).unwrap()),
559 last_created_task: LAST_TASK_CREATION.load(deps.storage)?,
560 })
561}
562
563fn query_slot_tasks_total(
567 deps: Deps,
568 env: Env,
569 offset: Option<u64>,
570) -> StdResult<SlotTasksTotalResponse> {
571 if let Some(off) = offset {
572 let config = CONFIG.load(deps.storage)?;
573 let block_tasks = BLOCK_SLOTS
574 .may_load(deps.storage, env.block.height + off)?
575 .unwrap_or_default()
576 .len() as u64;
577 let evented_tasks = EVENTED_TASKS_LOOKUP
578 .may_load(deps.storage, env.block.height + off)?
579 .unwrap_or_default()
580 .len() as u64;
581
582 let current_block_ts = env.block.time.nanos();
583 let current_block_slot =
584 current_block_ts.saturating_sub(current_block_ts % config.slot_granularity_time);
585 let cron_tasks = TIME_SLOTS
586 .may_load(
587 deps.storage,
588 current_block_slot + config.slot_granularity_time * off,
589 )?
590 .unwrap_or_default()
591 .len() as u64;
592 Ok(SlotTasksTotalResponse {
593 block_tasks,
594 cron_tasks,
595 evented_tasks,
596 })
597 } else {
598 let block_height = env.block.height.saturating_add(1);
600 let block_time = env.block.time.plus_seconds(6).nanos();
602 let block_slots: Vec<(u64, Vec<Vec<u8>>)> = BLOCK_SLOTS
603 .range(
604 deps.storage,
605 None,
606 Some(Bound::inclusive(block_height)),
607 Order::Ascending,
608 )
609 .collect::<StdResult<_>>()?;
610
611 let block_tasks = block_slots
612 .iter()
613 .fold(0, |acc, (_, hashes)| acc + hashes.len()) as u64;
614
615 let evented_task_list: Vec<(u64, Vec<Vec<u8>>)> = EVENTED_TASKS_LOOKUP
616 .range(
617 deps.storage,
618 None,
619 Some(Bound::inclusive(block_height)),
620 Order::Ascending,
621 )
622 .collect::<StdResult<_>>()?;
623
624 let evented_tasks = evented_task_list
625 .iter()
626 .fold(0, |acc, (_, hashes)| acc + hashes.len()) as u64;
627
628 let time_slot: Vec<(u64, Vec<Vec<u8>>)> = TIME_SLOTS
629 .range(
630 deps.storage,
631 None,
632 Some(Bound::inclusive(block_time)),
633 Order::Ascending,
634 )
635 .collect::<StdResult<_>>()?;
636
637 let cron_tasks = time_slot
638 .iter()
639 .fold(0, |acc, (_, hashes)| acc + hashes.len()) as u64;
640 Ok(SlotTasksTotalResponse {
641 block_tasks,
642 cron_tasks,
643 evented_tasks,
644 })
645 }
646}
647
648fn query_current_task(deps: Deps, env: Env) -> StdResult<TaskResponse> {
659 let config = CONFIG.load(deps.storage)?;
660 let mut block_slot: Vec<(u64, Vec<Vec<u8>>)> = BLOCK_SLOTS
661 .range(
662 deps.storage,
663 None,
664 Some(Bound::inclusive(env.block.height.saturating_add(1))),
667 Order::Ascending,
668 )
669 .take(1)
670 .collect::<StdResult<_>>()?;
671 if !block_slot.is_empty() {
672 let task_hash = block_slot.pop().unwrap().1.pop().unwrap();
673 let task = tasks_map().load(deps.storage, &task_hash)?;
674 Ok(task.into_response(&config.chain_name))
675 } else {
676 let mut time_slot: Vec<(u64, Vec<Vec<u8>>)> = TIME_SLOTS
677 .range(
678 deps.storage,
679 None,
680 Some(Bound::inclusive(env.block.time.plus_nanos(6).nanos())),
681 Order::Ascending,
682 )
683 .take(1)
684 .collect::<StdResult<_>>()?;
685 if !time_slot.is_empty() {
686 let task_hash = time_slot.pop().unwrap().1.pop().unwrap();
687 let task = tasks_map().load(deps.storage, &task_hash)?;
688 Ok(task.into_response(&config.chain_name))
689 } else {
690 Ok(TaskResponse { task: None })
691 }
692 }
693}
694
695fn query_tasks(
696 deps: Deps,
697 from_index: Option<u64>,
698 limit: Option<u64>,
699) -> StdResult<Vec<TaskInfo>> {
700 let config = CONFIG.load(deps.storage)?;
701
702 let from_index = from_index.unwrap_or_default();
703 let limit = limit.unwrap_or(100);
704
705 tasks_map()
706 .range(deps.storage, None, None, Order::Ascending)
707 .skip(from_index as usize)
708 .take(limit as usize)
709 .map(|task_res| {
710 task_res.map(|(_, task)| task.into_response(&config.chain_name).task.unwrap())
711 })
712 .collect()
713}
714
715fn query_evented_tasks(
716 deps: Deps,
717 env: Env,
718 start: Option<u64>,
719 from_index: Option<u64>,
720 limit: Option<u64>,
721) -> StdResult<Vec<TaskInfo>> {
722 let config = CONFIG.load(deps.storage)?;
723 let from_index = from_index.unwrap_or(DEFAULT_PAGINATION_FROM_INDEX);
724 let limit = limit.unwrap_or(DEFAULT_PAGINATION_LIMIT);
725 let tm = tasks_map();
726
727 let mut evented_hashes: Vec<Vec<u8>> = Vec::new();
728 let mut all_tasks: Vec<TaskInfo> = Vec::new();
729
730 if let Some(i) = start {
732 evented_hashes = EVENTED_TASKS_LOOKUP
733 .may_load(deps.storage, i)?
734 .unwrap_or_default();
735 } else {
736 let block_evented: Vec<(u64, _)> = EVENTED_TASKS_LOOKUP
737 .range(
738 deps.storage,
739 Some(Bound::inclusive(env.block.height)),
740 None,
741 Order::Ascending,
742 )
743 .skip(from_index as usize)
744 .take(limit as usize)
745 .collect::<StdResult<Vec<(u64, _)>>>()?;
746
747 if !block_evented.is_empty() {
748 for item in block_evented {
749 evented_hashes = [evented_hashes, item.1].concat();
750 }
751 }
752
753 let time_evented: Vec<(u64, _)> = EVENTED_TASKS_LOOKUP
754 .range(
755 deps.storage,
756 Some(Bound::inclusive(env.block.time.nanos())),
757 None,
758 Order::Ascending,
759 )
760 .take(1)
761 .collect::<StdResult<Vec<(u64, _)>>>()?;
762
763 if !time_evented.is_empty() {
764 for item in time_evented {
765 evented_hashes = [evented_hashes, item.1].concat();
766 }
767 }
768 }
769
770 for t in evented_hashes {
772 if let Some(task) = tm.may_load(deps.storage, &t)? {
773 all_tasks.push(task.into_response(&config.chain_name).task.unwrap())
774 }
775 }
776
777 Ok(all_tasks)
778}
779
780fn query_evented_ids(
781 deps: Deps,
782 from_index: Option<u64>,
783 limit: Option<u64>,
784) -> StdResult<Vec<u64>> {
785 let from_index = from_index.unwrap_or(DEFAULT_PAGINATION_FROM_INDEX);
786 let limit = limit.unwrap_or(DEFAULT_PAGINATION_LIMIT);
787
788 let evented_ids = EVENTED_TASKS_LOOKUP
789 .keys(deps.storage, None, None, Order::Ascending)
790 .skip(from_index as usize)
791 .take(limit as usize)
792 .collect::<StdResult<_>>()?;
793
794 Ok(evented_ids)
795}
796
797fn query_evented_hashes(
798 deps: Deps,
799 id: Option<u64>,
800 from_index: Option<u64>,
801 limit: Option<u64>,
802) -> StdResult<Vec<String>> {
803 let mut evented_hashes: Vec<Vec<u8>> = Vec::new();
804 let from_index = from_index.unwrap_or(DEFAULT_PAGINATION_FROM_INDEX);
805 let limit = limit.unwrap_or(DEFAULT_PAGINATION_LIMIT);
806
807 if let Some(i) = id {
809 evented_hashes = EVENTED_TASKS_LOOKUP
810 .may_load(deps.storage, i)?
811 .unwrap_or_default();
812 } else {
813 let evented: Vec<(u64, _)> = EVENTED_TASKS_LOOKUP
814 .range(deps.storage, None, None, Order::Ascending)
815 .skip(from_index as usize)
816 .take(limit as usize)
817 .collect::<StdResult<Vec<(u64, _)>>>()?;
818
819 if !evented.is_empty() {
820 for item in evented {
821 evented_hashes = [evented_hashes, item.1].concat();
822 }
823 }
824 }
825
826 let evented_task_hashes: Vec<_> = evented_hashes
828 .iter()
829 .map(|t| String::from_utf8(t.to_vec()).unwrap_or_else(|_| "".to_string()))
830 .collect();
831
832 Ok(evented_task_hashes)
833}
834
835fn query_tasks_by_owner(
836 deps: Deps,
837 owner_addr: String,
838 from_index: Option<u64>,
839 limit: Option<u64>,
840) -> StdResult<Vec<TaskInfo>> {
841 let owner_addr = deps.api.addr_validate(&owner_addr)?;
842 let config = CONFIG.load(deps.storage)?;
843
844 let from_index = from_index.unwrap_or_default();
845 let limit = limit.unwrap_or(100);
846
847 tasks_map()
848 .idx
849 .owner
850 .prefix(owner_addr)
851 .range(deps.storage, None, None, Order::Ascending)
852 .skip(from_index as usize)
853 .take(limit as usize)
854 .map(|task_res| {
855 task_res.map(|(_, task)| task.into_response(&config.chain_name).task.unwrap())
856 })
857 .collect()
858}
859
860fn query_task(deps: Deps, task_hash: String) -> StdResult<TaskResponse> {
861 let config = CONFIG.load(deps.storage)?;
862
863 if let Some(task) = tasks_map().may_load(deps.storage, task_hash.as_bytes())? {
864 Ok(task.into_response(&config.chain_name))
865 } else {
866 Ok(TaskResponse { task: None })
867 }
868}
869
870fn query_task_hash(deps: Deps, task: Task) -> StdResult<String> {
871 let config = CONFIG.load(deps.storage)?;
872 Ok(task.to_hash(&config.chain_name))
873}
874
875fn query_slot_hashes(deps: Deps, slot: Option<u64>) -> StdResult<SlotHashesResponse> {
876 let mut block_id: u64 = 0;
877 let mut block_hashes: Vec<Vec<u8>> = Vec::new();
878 let mut time_id: u64 = 0;
879 let mut time_hashes: Vec<Vec<u8>> = Vec::new();
880
881 if let Some(id) = slot {
883 block_hashes = BLOCK_SLOTS.may_load(deps.storage, id)?.unwrap_or_default();
884 if !block_hashes.is_empty() {
885 block_id = id;
886 }
887 time_hashes = TIME_SLOTS.may_load(deps.storage, id)?.unwrap_or_default();
888 if !time_hashes.is_empty() {
889 time_id = id;
890 }
891 } else {
892 let time: Vec<(u64, _)> = TIME_SLOTS
893 .range(deps.storage, None, None, Order::Ascending)
894 .take(1)
895 .collect::<StdResult<Vec<(u64, _)>>>()?;
896
897 if !time.is_empty() {
898 let slot = time[0].clone();
899 time_id = slot.0;
900 time_hashes = slot.1;
901 }
902
903 let block: Vec<(u64, _)> = BLOCK_SLOTS
904 .range(deps.storage, None, None, Order::Ascending)
905 .take(1)
906 .collect::<StdResult<Vec<(u64, _)>>>()?;
907
908 if !block.is_empty() {
909 let slot = block[0].clone();
910 block_id = slot.0;
911 block_hashes = slot.1;
912 }
913 }
914
915 let block_task_hash: Vec<_> = block_hashes
917 .iter()
918 .map(|b| String::from_utf8(b.to_vec()).unwrap_or_else(|_| "".to_string()))
919 .collect();
920 let time_task_hash: Vec<_> = time_hashes
921 .iter()
922 .map(|t| String::from_utf8(t.to_vec()).unwrap_or_else(|_| "".to_string()))
923 .collect();
924
925 Ok(SlotHashesResponse {
926 block_id,
927 block_task_hash,
928 time_id,
929 time_task_hash,
930 })
931}
932
933fn query_slot_ids(
934 deps: Deps,
935 from_index: Option<u64>,
936 limit: Option<u64>,
937) -> StdResult<SlotIdsResponse> {
938 let from_index = from_index.unwrap_or_default();
939 let limit = limit.unwrap_or(100);
940
941 let time_ids = TIME_SLOTS
942 .keys(deps.storage, None, None, Order::Ascending)
943 .skip(from_index as usize)
944 .take(limit as usize)
945 .collect::<StdResult<_>>()?;
946 let block_ids = BLOCK_SLOTS
947 .keys(deps.storage, None, None, Order::Ascending)
948 .skip(from_index as usize)
949 .take(limit as usize)
950 .collect::<StdResult<_>>()?;
951
952 Ok(SlotIdsResponse {
953 time_ids,
954 block_ids,
955 })
956}
957
958fn validate_non_zero_value(opt_num: Option<u64>, field_name: &str) -> Result<(), ContractError> {
959 if let Some(num) = opt_num {
960 if num == 0u64 {
961 Err(InvalidZeroValue {
962 field: field_name.to_string(),
963 })
964 } else {
965 Ok(())
966 }
967 } else {
968 Ok(())
969 }
970}
971
972fn validate_gas_limit(config: &Config) -> Result<(), ContractError> {
975 if config.gas_limit > config.gas_base_fee + config.gas_action_fee + config.gas_query_fee {
976 Ok(())
977 } else {
978 Err(ContractError::InvalidGas {})
980 }
981}