clockwork_scheduler/state/
queue.rs

1use {
2    super::InstructionData,
3    crate::{errors::ClockworkError, response::TaskResponse},
4    anchor_lang::{
5        prelude::*,
6        solana_program::{
7            instruction::Instruction,
8            program::{get_return_data, invoke_signed},
9        },
10        AnchorDeserialize,
11    },
12    chrono::{DateTime, NaiveDateTime, Utc},
13    clockwork_cron::Schedule,
14    std::{convert::TryFrom, str::FromStr},
15};
16
17pub const SEED_QUEUE: &[u8] = b"queue";
18
19/**
20 * Queue
21 */
22
23#[account]
24#[derive(Debug)]
25pub struct Queue {
26    pub authority: Pubkey,
27    pub name: String,
28    pub process_at: Option<i64>,
29    pub schedule: String,
30    pub status: QueueStatus,
31    pub task_count: u64,
32}
33
34impl Queue {
35    pub fn pubkey(authority: Pubkey, name: String) -> Pubkey {
36        Pubkey::find_program_address(
37            &[SEED_QUEUE, authority.as_ref(), name.as_bytes()],
38            &crate::ID,
39        )
40        .0
41    }
42}
43
44impl TryFrom<Vec<u8>> for Queue {
45    type Error = Error;
46    fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
47        Queue::try_deserialize(&mut data.as_slice())
48    }
49}
50
51/**
52 * QueueAccount
53 */
54
55pub trait QueueAccount {
56    fn process(&mut self) -> Result<()>;
57
58    fn new(&mut self, authority: Pubkey, name: String, schedule: String) -> Result<()>;
59
60    fn next_process_at(&self, ts: i64) -> Option<i64>;
61
62    fn roll_forward(&mut self) -> Result<()>;
63
64    fn sign(
65        &self,
66        account_infos: &[AccountInfo],
67        bump: u8,
68        ix: &InstructionData,
69    ) -> Result<Option<TaskResponse>>;
70}
71
72impl QueueAccount for Account<'_, Queue> {
73    fn process(&mut self) -> Result<()> {
74        // Validate the queue is pending
75        require!(
76            self.status == QueueStatus::Pending,
77            ClockworkError::InvalidQueueStatus,
78        );
79
80        if self.task_count > 0 {
81            // If there are actions, change the queue status to 'executing'
82            self.status = QueueStatus::Processing { task_id: 0 };
83        } else {
84            // Otherwise, just roll forward the process_at timestamp
85            self.roll_forward()?;
86        }
87
88        Ok(())
89    }
90
91    fn new(&mut self, authority: Pubkey, name: String, schedule: String) -> Result<()> {
92        // Initialize queue account
93        self.authority = authority.key();
94        self.name = name;
95        self.schedule = schedule;
96        self.status = QueueStatus::Pending;
97        self.task_count = 0;
98
99        // Set process_at (schedule must be set first)
100        let ts = Clock::get().unwrap().unix_timestamp;
101        self.process_at = self.next_process_at(ts);
102
103        Ok(())
104    }
105
106    fn next_process_at(&self, ts: i64) -> Option<i64> {
107        match Schedule::from_str(&self.schedule)
108            .unwrap()
109            .after(&DateTime::<Utc>::from_utc(
110                NaiveDateTime::from_timestamp(ts, 0),
111                Utc,
112            ))
113            .take(1)
114            .next()
115        {
116            Some(datetime) => Some(datetime.timestamp()),
117            None => None,
118        }
119    }
120
121    fn roll_forward(&mut self) -> Result<()> {
122        self.status = QueueStatus::Pending;
123        match self.process_at {
124            Some(process_at) => self.process_at = self.next_process_at(process_at),
125            None => (),
126        };
127        Ok(())
128    }
129
130    fn sign(
131        &self,
132        account_infos: &[AccountInfo],
133        bump: u8,
134        ix: &InstructionData,
135    ) -> Result<Option<TaskResponse>> {
136        invoke_signed(
137            &Instruction::from(ix),
138            account_infos,
139            &[&[
140                SEED_QUEUE,
141                self.authority.as_ref(),
142                self.name.as_bytes(),
143                &[bump],
144            ]],
145        )
146        .map_err(|_err| ClockworkError::InnerIxFailed)?;
147
148        match get_return_data() {
149            None => Ok(None),
150            Some((program_id, return_data)) => {
151                if program_id != ix.program_id {
152                    Err(ClockworkError::InvalidReturnData.into())
153                } else {
154                    Ok(Some(
155                        TaskResponse::try_from_slice(return_data.as_slice())
156                            .map_err(|_err| ClockworkError::InvalidTaskResponse)?,
157                    ))
158                }
159            }
160        }
161    }
162}
163
164/**
165 * QueueStatus
166 */
167
168#[derive(AnchorDeserialize, AnchorSerialize, Clone, Copy, Debug, PartialEq, Eq)]
169pub enum QueueStatus {
170    Paused,
171    Pending,
172    Processing { task_id: u64 },
173}