clockwork_scheduler/state/
queue.rs1use {
2 super::InstructionData,
3 crate::{errors::ClockworkError, response::TaskResponse},
4 anchor_lang::{
5 prelude::*,
6 solana_program::{
7 instruction::Instruction,
8 program::{get_return_data, invoke_signed},
9 },
10 AnchorDeserialize,
11 },
12 chrono::{DateTime, NaiveDateTime, Utc},
13 clockwork_cron::Schedule,
14 std::{convert::TryFrom, str::FromStr},
15};
16
17pub const SEED_QUEUE: &[u8] = b"queue";
18
19#[account]
24#[derive(Debug)]
25pub struct Queue {
26 pub authority: Pubkey,
27 pub name: String,
28 pub process_at: Option<i64>,
29 pub schedule: String,
30 pub status: QueueStatus,
31 pub task_count: u64,
32}
33
34impl Queue {
35 pub fn pubkey(authority: Pubkey, name: String) -> Pubkey {
36 Pubkey::find_program_address(
37 &[SEED_QUEUE, authority.as_ref(), name.as_bytes()],
38 &crate::ID,
39 )
40 .0
41 }
42}
43
44impl TryFrom<Vec<u8>> for Queue {
45 type Error = Error;
46 fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
47 Queue::try_deserialize(&mut data.as_slice())
48 }
49}
50
51pub trait QueueAccount {
56 fn process(&mut self) -> Result<()>;
57
58 fn new(&mut self, authority: Pubkey, name: String, schedule: String) -> Result<()>;
59
60 fn next_process_at(&self, ts: i64) -> Option<i64>;
61
62 fn roll_forward(&mut self) -> Result<()>;
63
64 fn sign(
65 &self,
66 account_infos: &[AccountInfo],
67 bump: u8,
68 ix: &InstructionData,
69 ) -> Result<Option<TaskResponse>>;
70}
71
72impl QueueAccount for Account<'_, Queue> {
73 fn process(&mut self) -> Result<()> {
74 require!(
76 self.status == QueueStatus::Pending,
77 ClockworkError::InvalidQueueStatus,
78 );
79
80 if self.task_count > 0 {
81 self.status = QueueStatus::Processing { task_id: 0 };
83 } else {
84 self.roll_forward()?;
86 }
87
88 Ok(())
89 }
90
91 fn new(&mut self, authority: Pubkey, name: String, schedule: String) -> Result<()> {
92 self.authority = authority.key();
94 self.name = name;
95 self.schedule = schedule;
96 self.status = QueueStatus::Pending;
97 self.task_count = 0;
98
99 let ts = Clock::get().unwrap().unix_timestamp;
101 self.process_at = self.next_process_at(ts);
102
103 Ok(())
104 }
105
106 fn next_process_at(&self, ts: i64) -> Option<i64> {
107 match Schedule::from_str(&self.schedule)
108 .unwrap()
109 .after(&DateTime::<Utc>::from_utc(
110 NaiveDateTime::from_timestamp(ts, 0),
111 Utc,
112 ))
113 .take(1)
114 .next()
115 {
116 Some(datetime) => Some(datetime.timestamp()),
117 None => None,
118 }
119 }
120
121 fn roll_forward(&mut self) -> Result<()> {
122 self.status = QueueStatus::Pending;
123 match self.process_at {
124 Some(process_at) => self.process_at = self.next_process_at(process_at),
125 None => (),
126 };
127 Ok(())
128 }
129
130 fn sign(
131 &self,
132 account_infos: &[AccountInfo],
133 bump: u8,
134 ix: &InstructionData,
135 ) -> Result<Option<TaskResponse>> {
136 invoke_signed(
137 &Instruction::from(ix),
138 account_infos,
139 &[&[
140 SEED_QUEUE,
141 self.authority.as_ref(),
142 self.name.as_bytes(),
143 &[bump],
144 ]],
145 )
146 .map_err(|_err| ClockworkError::InnerIxFailed)?;
147
148 match get_return_data() {
149 None => Ok(None),
150 Some((program_id, return_data)) => {
151 if program_id != ix.program_id {
152 Err(ClockworkError::InvalidReturnData.into())
153 } else {
154 Ok(Some(
155 TaskResponse::try_from_slice(return_data.as_slice())
156 .map_err(|_err| ClockworkError::InvalidTaskResponse)?,
157 ))
158 }
159 }
160 }
161 }
162}
163
164#[derive(AnchorDeserialize, AnchorSerialize, Clone, Copy, Debug, PartialEq, Eq)]
169pub enum QueueStatus {
170 Paused,
171 Pending,
172 Processing { task_id: u64 },
173}