use {
super::InstructionData,
crate::{errors::ClockworkError, response::TaskResponse},
anchor_lang::{
prelude::*,
solana_program::{
instruction::Instruction,
program::{get_return_data, invoke_signed},
},
AnchorDeserialize,
},
chrono::{DateTime, NaiveDateTime, Utc},
clockwork_cron::Schedule,
std::{convert::TryFrom, str::FromStr},
};
pub const SEED_QUEUE: &[u8] = b"queue";
#[account]
#[derive(Debug)]
pub struct Queue {
pub authority: Pubkey,
pub name: String,
pub process_at: Option<i64>,
pub schedule: String,
pub status: QueueStatus,
pub task_count: u64,
}
impl Queue {
pub fn pubkey(authority: Pubkey, name: String) -> Pubkey {
Pubkey::find_program_address(
&[SEED_QUEUE, authority.as_ref(), name.as_bytes()],
&crate::ID,
)
.0
}
}
impl TryFrom<Vec<u8>> for Queue {
type Error = Error;
fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
Queue::try_deserialize(&mut data.as_slice())
}
}
pub trait QueueAccount {
fn process(&mut self) -> Result<()>;
fn new(&mut self, authority: Pubkey, name: String, schedule: String) -> Result<()>;
fn next_process_at(&self, ts: i64) -> Option<i64>;
fn roll_forward(&mut self) -> Result<()>;
fn sign(
&self,
account_infos: &[AccountInfo],
bump: u8,
ix: &InstructionData,
) -> Result<Option<TaskResponse>>;
}
impl QueueAccount for Account<'_, Queue> {
fn process(&mut self) -> Result<()> {
require!(
self.status == QueueStatus::Pending,
ClockworkError::InvalidQueueStatus,
);
if self.task_count > 0 {
self.status = QueueStatus::Processing { task_id: 0 };
} else {
self.roll_forward()?;
}
Ok(())
}
fn new(&mut self, authority: Pubkey, name: String, schedule: String) -> Result<()> {
self.authority = authority.key();
self.name = name;
self.schedule = schedule;
self.status = QueueStatus::Pending;
self.task_count = 0;
let ts = Clock::get().unwrap().unix_timestamp;
self.process_at = self.next_process_at(ts);
Ok(())
}
fn next_process_at(&self, ts: i64) -> Option<i64> {
match Schedule::from_str(&self.schedule)
.unwrap()
.after(&DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(ts, 0),
Utc,
))
.take(1)
.next()
{
Some(datetime) => Some(datetime.timestamp()),
None => None,
}
}
fn roll_forward(&mut self) -> Result<()> {
self.status = QueueStatus::Pending;
match self.process_at {
Some(process_at) => self.process_at = self.next_process_at(process_at),
None => (),
};
Ok(())
}
fn sign(
&self,
account_infos: &[AccountInfo],
bump: u8,
ix: &InstructionData,
) -> Result<Option<TaskResponse>> {
invoke_signed(
&Instruction::from(ix),
account_infos,
&[&[
SEED_QUEUE,
self.authority.as_ref(),
self.name.as_bytes(),
&[bump],
]],
)
.map_err(|_err| ClockworkError::InnerIxFailed)?;
match get_return_data() {
None => Ok(None),
Some((program_id, return_data)) => {
if program_id != ix.program_id {
Err(ClockworkError::InvalidReturnData.into())
} else {
Ok(Some(
TaskResponse::try_from_slice(return_data.as_slice())
.map_err(|_err| ClockworkError::InvalidTaskResponse)?,
))
}
}
}
}
}
#[derive(AnchorDeserialize, AnchorSerialize, Clone, Copy, Debug, PartialEq, Eq)]
pub enum QueueStatus {
Paused,
Pending,
Processing { task_id: u64 },
}