cronos_scheduler/state/
queue.rs

1use {
2    super::Manager,
3    crate::{errors::CronosError, pda::PDA},
4    anchor_lang::{prelude::*, AnchorDeserialize},
5    chrono::{DateTime, NaiveDateTime, Utc},
6    cronos_cron::Schedule,
7    std::{convert::TryFrom, str::FromStr},
8};
9
10pub const SEED_QUEUE: &[u8] = b"queue";
11
12/**
13 * Queue
14 */
15
16#[account]
17#[derive(Debug)]
18pub struct Queue {
19    pub exec_at: Option<i64>,
20    pub id: u128,
21    pub manager: Pubkey,
22    pub schedule: String,
23    pub status: QueueStatus,
24    pub task_count: u128,
25}
26
27impl Queue {
28    pub fn pda(manager: Pubkey, id: u128) -> PDA {
29        Pubkey::find_program_address(
30            &[SEED_QUEUE, manager.as_ref(), id.to_be_bytes().as_ref()],
31            &crate::ID,
32        )
33    }
34}
35
36impl TryFrom<Vec<u8>> for Queue {
37    type Error = Error;
38    fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
39        Queue::try_deserialize(&mut data.as_slice())
40    }
41}
42
43/**
44 * QueueAccount
45 */
46
47pub trait QueueAccount {
48    fn start(&mut self) -> Result<()>;
49
50    fn new(
51        &mut self,
52        clock: &Sysvar<Clock>,
53        manager: &mut Account<Manager>,
54        schedule: String,
55    ) -> Result<()>;
56
57    fn next_exec_at(&self, ts: i64) -> Option<i64>;
58
59    fn roll_forward(&mut self) -> Result<()>;
60}
61
62impl QueueAccount for Account<'_, Queue> {
63    fn start(&mut self) -> Result<()> {
64        // Validate the queue is pending
65        require!(
66            self.status == QueueStatus::Pending,
67            CronosError::InvalidQueueStatus,
68        );
69
70        if self.task_count > 0 {
71            // If there are actions, change the queue status to 'executing'
72            self.status = QueueStatus::Processing { task_id: 0 };
73        } else {
74            // Otherwise, just roll forward the exec_at timestamp
75            self.roll_forward()?;
76        }
77
78        Ok(())
79    }
80
81    fn new(
82        &mut self,
83        clock: &Sysvar<Clock>,
84        manager: &mut Account<Manager>,
85        schedule: String,
86    ) -> Result<()> {
87        // Initialize queue account
88        self.id = manager.queue_count;
89        self.manager = manager.key();
90        self.schedule = schedule;
91        self.status = QueueStatus::Pending;
92        self.task_count = 0;
93
94        // Set exec_at (schedule must be set first)
95        self.exec_at = self.next_exec_at(clock.unix_timestamp);
96
97        // Increment manager queue counter
98        manager.queue_count = manager.queue_count.checked_add(1).unwrap();
99
100        Ok(())
101    }
102
103    fn next_exec_at(&self, ts: i64) -> Option<i64> {
104        match Schedule::from_str(&self.schedule)
105            .unwrap()
106            .after(&DateTime::<Utc>::from_utc(
107                NaiveDateTime::from_timestamp(ts, 0),
108                Utc,
109            ))
110            .take(1)
111            .next()
112        {
113            Some(datetime) => Some(datetime.timestamp()),
114            None => None,
115        }
116    }
117
118    fn roll_forward(&mut self) -> Result<()> {
119        self.status = QueueStatus::Pending;
120        match self.exec_at {
121            Some(exec_at) => self.exec_at = self.next_exec_at(exec_at),
122            None => (),
123        };
124        Ok(())
125    }
126}
127
128/**
129 * QueueStatus
130 */
131
132#[derive(AnchorDeserialize, AnchorSerialize, Clone, Copy, Debug, PartialEq, Eq)]
133pub enum QueueStatus {
134    Paused,
135    Pending,
136    Processing { task_id: u128 },
137}