miclockwork_thread_program/instructions/
thread_kickoff.rs1use std::{
2 collections::hash_map::DefaultHasher,
3 hash::{Hash, Hasher},
4 str::FromStr,
5};
6
7use anchor_lang::prelude::*;
8use chrono::{DateTime, NaiveDateTime, Utc};
9use miclockwork_cron::Schedule;
10use miclockwork_network_program::state::{Worker, WorkerAccount};
11use miclockwork_utils::thread::Trigger;
12use pyth_sdk_solana::load_price_feed_from_account_info;
13
14use crate::{errors::*, state::*};
15
16use super::TRANSACTION_BASE_FEE_REIMBURSEMENT;
17
18#[derive(Accounts)]
20pub struct ThreadKickoff<'info> {
21 #[account(mut)]
23 pub signatory: Signer<'info>,
24
25 #[account(
27 mut,
28 seeds = [
29 SEED_THREAD,
30 thread.authority.as_ref(),
31 thread.id.as_slice(),
32 ],
33 bump = thread.bump,
34 constraint = !thread.paused @ ClockworkError::ThreadPaused,
35 constraint = thread.next_instruction.is_none() @ ClockworkError::ThreadBusy,
36 )]
37 pub thread: Box<Account<'info, Thread>>,
38
39 #[account(address = worker.pubkey())]
41 pub worker: Account<'info, Worker>,
42}
43
44pub fn handler(ctx: Context<ThreadKickoff>) -> Result<()> {
45 let signatory = &mut ctx.accounts.signatory;
47 let thread = &mut ctx.accounts.thread;
48 let clock = Clock::get().unwrap();
49
50 match thread.trigger.clone() {
51 Trigger::Account {
52 address,
53 offset,
54 size,
55 } => {
56 match ctx.remaining_accounts.first() {
58 None => {
59 return Err(ClockworkError::TriggerConditionFailed.into());
60 }
61 Some(account_info) => {
62 require!(
64 address.eq(account_info.key),
65 ClockworkError::TriggerConditionFailed
66 );
67
68 let mut hasher = DefaultHasher::new();
70 let data = &account_info.try_borrow_data().unwrap();
71 let offset = offset as usize;
72 let range_end = offset.checked_add(size as usize).unwrap() as usize;
73 if data.len().gt(&range_end) {
74 data[offset..range_end].hash(&mut hasher);
75 } else {
76 data[offset..].hash(&mut hasher)
77 }
78 let data_hash = hasher.finish();
79
80 if let Some(exec_context) = thread.exec_context {
82 match exec_context.trigger_context {
83 TriggerContext::Account {
84 data_hash: prior_data_hash,
85 } => {
86 require!(
87 data_hash.ne(&prior_data_hash),
88 ClockworkError::TriggerConditionFailed
89 )
90 }
91 _ => return Err(ClockworkError::InvalidThreadState.into()),
92 }
93 }
94
95 thread.exec_context = Some(ExecContext {
97 exec_index: 0,
98 execs_since_reimbursement: 0,
99 execs_since_slot: 0,
100 last_exec_at: clock.slot,
101 trigger_context: TriggerContext::Account { data_hash },
102 })
103 }
104 }
105 }
106 Trigger::Cron {
107 schedule,
108 skippable,
109 } => {
110 let reference_timestamp = match thread.exec_context.clone() {
112 None => thread.created_at.unix_timestamp,
113 Some(exec_context) => match exec_context.trigger_context {
114 TriggerContext::Cron { started_at } => started_at,
115 _ => return Err(ClockworkError::InvalidThreadState.into()),
116 },
117 };
118
119 let threshold_timestamp = next_timestamp(reference_timestamp, schedule.clone())
121 .ok_or(ClockworkError::TriggerConditionFailed)?;
122 require!(
123 clock.unix_timestamp.ge(&threshold_timestamp),
124 ClockworkError::TriggerConditionFailed
125 );
126
127 let started_at = if skippable {
130 clock.unix_timestamp
131 } else {
132 threshold_timestamp
133 };
134
135 thread.exec_context = Some(ExecContext {
137 exec_index: 0,
138 execs_since_reimbursement: 0,
139 execs_since_slot: 0,
140 last_exec_at: clock.slot,
141 trigger_context: TriggerContext::Cron { started_at },
142 });
143 }
144 Trigger::Now => {
145 require!(
147 thread.exec_context.is_none(),
148 ClockworkError::InvalidThreadState
149 );
150 thread.exec_context = Some(ExecContext {
151 exec_index: 0,
152 execs_since_reimbursement: 0,
153 execs_since_slot: 0,
154 last_exec_at: clock.slot,
155 trigger_context: TriggerContext::Now,
156 });
157 }
158 Trigger::Slot { slot } => {
159 require!(clock.slot.ge(&slot), ClockworkError::TriggerConditionFailed);
160 thread.exec_context = Some(ExecContext {
161 exec_index: 0,
162 execs_since_reimbursement: 0,
163 execs_since_slot: 0,
164 last_exec_at: clock.slot,
165 trigger_context: TriggerContext::Slot { started_at: slot },
166 });
167 }
168 Trigger::Epoch { epoch } => {
169 require!(
170 clock.epoch.ge(&epoch),
171 ClockworkError::TriggerConditionFailed
172 );
173 thread.exec_context = Some(ExecContext {
174 exec_index: 0,
175 execs_since_reimbursement: 0,
176 execs_since_slot: 0,
177 last_exec_at: clock.slot,
178 trigger_context: TriggerContext::Epoch { started_at: epoch },
179 })
180 }
181 Trigger::Timestamp { unix_ts } => {
182 require!(
183 clock.unix_timestamp.ge(&unix_ts),
184 ClockworkError::TriggerConditionFailed
185 );
186 thread.exec_context = Some(ExecContext {
187 exec_index: 0,
188 execs_since_reimbursement: 0,
189 execs_since_slot: 0,
190 last_exec_at: clock.slot,
191 trigger_context: TriggerContext::Timestamp {
192 started_at: unix_ts,
193 },
194 })
195 }
196 Trigger::Pyth {
197 price_feed: price_feed_pubkey,
198 equality,
199 limit,
200 } => {
201 match ctx.remaining_accounts.first() {
203 None => {
204 return Err(ClockworkError::TriggerConditionFailed.into());
205 }
206 Some(account_info) => {
207 require!(
208 price_feed_pubkey.eq(account_info.key),
209 ClockworkError::TriggerConditionFailed
210 );
211 const STALENESS_THRESHOLD: u64 = 60; let price_feed = load_price_feed_from_account_info(account_info).unwrap();
213 let current_timestamp = Clock::get()?.unix_timestamp;
214 let current_price = price_feed
215 .get_price_no_older_than(current_timestamp, STALENESS_THRESHOLD)
216 .unwrap();
217 match equality {
218 Equality::GreaterThanOrEqual => {
219 require!(
220 current_price.price.ge(&limit),
221 ClockworkError::TriggerConditionFailed
222 );
223 thread.exec_context = Some(ExecContext {
224 exec_index: 0,
225 execs_since_reimbursement: 0,
226 execs_since_slot: 0,
227 last_exec_at: clock.slot,
228 trigger_context: TriggerContext::Pyth {
229 price: current_price.price,
230 },
231 });
232 }
233 Equality::LessThanOrEqual => {
234 require!(
235 current_price.price.le(&limit),
236 ClockworkError::TriggerConditionFailed
237 );
238 thread.exec_context = Some(ExecContext {
239 exec_index: 0,
240 execs_since_reimbursement: 0,
241 execs_since_slot: 0,
242 last_exec_at: clock.slot,
243 trigger_context: TriggerContext::Pyth {
244 price: current_price.price,
245 },
246 });
247 }
248 }
249 }
250 }
251 }
252 }
253
254 if let Some(kickoff_instruction) = thread.instructions.first() {
256 thread.next_instruction = Some(kickoff_instruction.clone());
257 }
258
259 thread.realloc()?;
261
262 **thread.to_account_info().try_borrow_mut_lamports()? = thread
264 .to_account_info()
265 .lamports()
266 .checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT)
267 .unwrap();
268 **signatory.to_account_info().try_borrow_mut_lamports()? = signatory
269 .to_account_info()
270 .lamports()
271 .checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT)
272 .unwrap();
273
274 Ok(())
275}
276
277fn next_timestamp(after: i64, schedule: String) -> Option<i64> {
278 Schedule::from_str(&schedule)
279 .unwrap()
280 .next_after(&DateTime::<Utc>::from_utc(
281 NaiveDateTime::from_timestamp_opt(after, 0).unwrap(),
282 Utc,
283 ))
284 .take()
285 .map(|datetime| datetime.timestamp())
286}