cronos_scheduler/state/
queue.rs1use {
2 super::Manager,
3 crate::{errors::CronosError, pda::PDA},
4 anchor_lang::{prelude::*, AnchorDeserialize},
5 chrono::{DateTime, NaiveDateTime, Utc},
6 cronos_cron::Schedule,
7 std::{convert::TryFrom, str::FromStr},
8};
9
10pub const SEED_QUEUE: &[u8] = b"queue";
11
12#[account]
17#[derive(Debug)]
18pub struct Queue {
19 pub exec_at: Option<i64>,
20 pub id: u128,
21 pub manager: Pubkey,
22 pub schedule: String,
23 pub status: QueueStatus,
24 pub task_count: u128,
25}
26
27impl Queue {
28 pub fn pda(manager: Pubkey, id: u128) -> PDA {
29 Pubkey::find_program_address(
30 &[SEED_QUEUE, manager.as_ref(), id.to_be_bytes().as_ref()],
31 &crate::ID,
32 )
33 }
34}
35
36impl TryFrom<Vec<u8>> for Queue {
37 type Error = Error;
38 fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
39 Queue::try_deserialize(&mut data.as_slice())
40 }
41}
42
43pub trait QueueAccount {
48 fn start(&mut self) -> Result<()>;
49
50 fn new(
51 &mut self,
52 clock: &Sysvar<Clock>,
53 manager: &mut Account<Manager>,
54 schedule: String,
55 ) -> Result<()>;
56
57 fn next_exec_at(&self, ts: i64) -> Option<i64>;
58
59 fn roll_forward(&mut self) -> Result<()>;
60}
61
62impl QueueAccount for Account<'_, Queue> {
63 fn start(&mut self) -> Result<()> {
64 require!(
66 self.status == QueueStatus::Pending,
67 CronosError::InvalidQueueStatus,
68 );
69
70 if self.task_count > 0 {
71 self.status = QueueStatus::Processing { task_id: 0 };
73 } else {
74 self.roll_forward()?;
76 }
77
78 Ok(())
79 }
80
81 fn new(
82 &mut self,
83 clock: &Sysvar<Clock>,
84 manager: &mut Account<Manager>,
85 schedule: String,
86 ) -> Result<()> {
87 self.id = manager.queue_count;
89 self.manager = manager.key();
90 self.schedule = schedule;
91 self.status = QueueStatus::Pending;
92 self.task_count = 0;
93
94 self.exec_at = self.next_exec_at(clock.unix_timestamp);
96
97 manager.queue_count = manager.queue_count.checked_add(1).unwrap();
99
100 Ok(())
101 }
102
103 fn next_exec_at(&self, ts: i64) -> Option<i64> {
104 match Schedule::from_str(&self.schedule)
105 .unwrap()
106 .after(&DateTime::<Utc>::from_utc(
107 NaiveDateTime::from_timestamp(ts, 0),
108 Utc,
109 ))
110 .take(1)
111 .next()
112 {
113 Some(datetime) => Some(datetime.timestamp()),
114 None => None,
115 }
116 }
117
118 fn roll_forward(&mut self) -> Result<()> {
119 self.status = QueueStatus::Pending;
120 match self.exec_at {
121 Some(exec_at) => self.exec_at = self.next_exec_at(exec_at),
122 None => (),
123 };
124 Ok(())
125 }
126}
127
128#[derive(AnchorDeserialize, AnchorSerialize, Clone, Copy, Debug, PartialEq, Eq)]
133pub enum QueueStatus {
134 Paused,
135 Pending,
136 Processing { task_id: u128 },
137}