miclockwork_thread_program/instructions/
thread_kickoff.rs

1use std::{
2    collections::hash_map::DefaultHasher,
3    hash::{Hash, Hasher},
4    str::FromStr,
5};
6
7use anchor_lang::prelude::*;
8use chrono::{DateTime, NaiveDateTime, Utc};
9use miclockwork_cron::Schedule;
10use miclockwork_network_program::state::{Worker, WorkerAccount};
11use miclockwork_utils::thread::Trigger;
12use pyth_sdk_solana::load_price_feed_from_account_info;
13
14use crate::{errors::*, state::*};
15
16use super::TRANSACTION_BASE_FEE_REIMBURSEMENT;
17
18/// Accounts required by the `thread_kickoff` instruction.
19#[derive(Accounts)]
20pub struct ThreadKickoff<'info> {
21    /// The signatory.
22    #[account(mut)]
23    pub signatory: Signer<'info>,
24
25    /// The thread to kickoff.
26    #[account(
27        mut,
28        seeds = [
29            SEED_THREAD,
30            thread.authority.as_ref(),
31            thread.id.as_slice(),
32        ],
33        bump = thread.bump,
34        constraint = !thread.paused @ ClockworkError::ThreadPaused,
35        constraint = thread.next_instruction.is_none() @ ClockworkError::ThreadBusy,
36    )]
37    pub thread: Box<Account<'info, Thread>>,
38
39    /// The worker.
40    #[account(address = worker.pubkey())]
41    pub worker: Account<'info, Worker>,
42}
43
44pub fn handler(ctx: Context<ThreadKickoff>) -> Result<()> {
45    // Get accounts.
46    let signatory = &mut ctx.accounts.signatory;
47    let thread = &mut ctx.accounts.thread;
48    let clock = Clock::get().unwrap();
49
50    match thread.trigger.clone() {
51        Trigger::Account {
52            address,
53            offset,
54            size,
55        } => {
56            // Verify proof that account data has been updated.
57            match ctx.remaining_accounts.first() {
58                None => {
59                    return Err(ClockworkError::TriggerConditionFailed.into());
60                }
61                Some(account_info) => {
62                    // Verify the remaining account is the account this thread is listening for.
63                    require!(
64                        address.eq(account_info.key),
65                        ClockworkError::TriggerConditionFailed
66                    );
67
68                    // Begin computing the data hash of this account.
69                    let mut hasher = DefaultHasher::new();
70                    let data = &account_info.try_borrow_data().unwrap();
71                    let offset = offset as usize;
72                    let range_end = offset.checked_add(size as usize).unwrap() as usize;
73                    if data.len().gt(&range_end) {
74                        data[offset..range_end].hash(&mut hasher);
75                    } else {
76                        data[offset..].hash(&mut hasher)
77                    }
78                    let data_hash = hasher.finish();
79
80                    // Verify the data hash is different than the prior data hash.
81                    if let Some(exec_context) = thread.exec_context {
82                        match exec_context.trigger_context {
83                            TriggerContext::Account {
84                                data_hash: prior_data_hash,
85                            } => {
86                                require!(
87                                    data_hash.ne(&prior_data_hash),
88                                    ClockworkError::TriggerConditionFailed
89                                )
90                            }
91                            _ => return Err(ClockworkError::InvalidThreadState.into()),
92                        }
93                    }
94
95                    // Set a new exec context with the new data hash and slot number.
96                    thread.exec_context = Some(ExecContext {
97                        exec_index: 0,
98                        execs_since_reimbursement: 0,
99                        execs_since_slot: 0,
100                        last_exec_at: clock.slot,
101                        trigger_context: TriggerContext::Account { data_hash },
102                    })
103                }
104            }
105        }
106        Trigger::Cron {
107            schedule,
108            skippable,
109        } => {
110            // Get the reference timestamp for calculating the thread's scheduled target timestamp.
111            let reference_timestamp = match thread.exec_context.clone() {
112                None => thread.created_at.unix_timestamp,
113                Some(exec_context) => match exec_context.trigger_context {
114                    TriggerContext::Cron { started_at } => started_at,
115                    _ => return Err(ClockworkError::InvalidThreadState.into()),
116                },
117            };
118
119            // Verify the current timestamp is greater than or equal to the threshold timestamp.
120            let threshold_timestamp = next_timestamp(reference_timestamp, schedule.clone())
121                .ok_or(ClockworkError::TriggerConditionFailed)?;
122            require!(
123                clock.unix_timestamp.ge(&threshold_timestamp),
124                ClockworkError::TriggerConditionFailed
125            );
126
127            // If the schedule is marked as skippable, set the started_at of the exec context to be the current timestamp.
128            // Otherwise, the exec context must iterate through each scheduled kickoff moment.
129            let started_at = if skippable {
130                clock.unix_timestamp
131            } else {
132                threshold_timestamp
133            };
134
135            // Set the exec context.
136            thread.exec_context = Some(ExecContext {
137                exec_index: 0,
138                execs_since_reimbursement: 0,
139                execs_since_slot: 0,
140                last_exec_at: clock.slot,
141                trigger_context: TriggerContext::Cron { started_at },
142            });
143        }
144        Trigger::Now => {
145            // Set the exec context.
146            require!(
147                thread.exec_context.is_none(),
148                ClockworkError::InvalidThreadState
149            );
150            thread.exec_context = Some(ExecContext {
151                exec_index: 0,
152                execs_since_reimbursement: 0,
153                execs_since_slot: 0,
154                last_exec_at: clock.slot,
155                trigger_context: TriggerContext::Now,
156            });
157        }
158        Trigger::Slot { slot } => {
159            require!(clock.slot.ge(&slot), ClockworkError::TriggerConditionFailed);
160            thread.exec_context = Some(ExecContext {
161                exec_index: 0,
162                execs_since_reimbursement: 0,
163                execs_since_slot: 0,
164                last_exec_at: clock.slot,
165                trigger_context: TriggerContext::Slot { started_at: slot },
166            });
167        }
168        Trigger::Epoch { epoch } => {
169            require!(
170                clock.epoch.ge(&epoch),
171                ClockworkError::TriggerConditionFailed
172            );
173            thread.exec_context = Some(ExecContext {
174                exec_index: 0,
175                execs_since_reimbursement: 0,
176                execs_since_slot: 0,
177                last_exec_at: clock.slot,
178                trigger_context: TriggerContext::Epoch { started_at: epoch },
179            })
180        }
181        Trigger::Timestamp { unix_ts } => {
182            require!(
183                clock.unix_timestamp.ge(&unix_ts),
184                ClockworkError::TriggerConditionFailed
185            );
186            thread.exec_context = Some(ExecContext {
187                exec_index: 0,
188                execs_since_reimbursement: 0,
189                execs_since_slot: 0,
190                last_exec_at: clock.slot,
191                trigger_context: TriggerContext::Timestamp {
192                    started_at: unix_ts,
193                },
194            })
195        }
196        Trigger::Pyth {
197            price_feed: price_feed_pubkey,
198            equality,
199            limit,
200        } => {
201            // Verify price limit has been reached.
202            match ctx.remaining_accounts.first() {
203                None => {
204                    return Err(ClockworkError::TriggerConditionFailed.into());
205                }
206                Some(account_info) => {
207                    require!(
208                        price_feed_pubkey.eq(account_info.key),
209                        ClockworkError::TriggerConditionFailed
210                    );
211                    const STALENESS_THRESHOLD: u64 = 60; // staleness threshold in seconds
212                    let price_feed = load_price_feed_from_account_info(account_info).unwrap();
213                    let current_timestamp = Clock::get()?.unix_timestamp;
214                    let current_price = price_feed
215                        .get_price_no_older_than(current_timestamp, STALENESS_THRESHOLD)
216                        .unwrap();
217                    match equality {
218                        Equality::GreaterThanOrEqual => {
219                            require!(
220                                current_price.price.ge(&limit),
221                                ClockworkError::TriggerConditionFailed
222                            );
223                            thread.exec_context = Some(ExecContext {
224                                exec_index: 0,
225                                execs_since_reimbursement: 0,
226                                execs_since_slot: 0,
227                                last_exec_at: clock.slot,
228                                trigger_context: TriggerContext::Pyth {
229                                    price: current_price.price,
230                                },
231                            });
232                        }
233                        Equality::LessThanOrEqual => {
234                            require!(
235                                current_price.price.le(&limit),
236                                ClockworkError::TriggerConditionFailed
237                            );
238                            thread.exec_context = Some(ExecContext {
239                                exec_index: 0,
240                                execs_since_reimbursement: 0,
241                                execs_since_slot: 0,
242                                last_exec_at: clock.slot,
243                                trigger_context: TriggerContext::Pyth {
244                                    price: current_price.price,
245                                },
246                            });
247                        }
248                    }
249                }
250            }
251        }
252    }
253
254    // If we make it here, the trigger is active. Update the next instruction and be done.
255    if let Some(kickoff_instruction) = thread.instructions.first() {
256        thread.next_instruction = Some(kickoff_instruction.clone());
257    }
258
259    // Realloc the thread account
260    thread.realloc()?;
261
262    // Reimburse signatory for transaction fee.
263    **thread.to_account_info().try_borrow_mut_lamports()? = thread
264        .to_account_info()
265        .lamports()
266        .checked_sub(TRANSACTION_BASE_FEE_REIMBURSEMENT)
267        .unwrap();
268    **signatory.to_account_info().try_borrow_mut_lamports()? = signatory
269        .to_account_info()
270        .lamports()
271        .checked_add(TRANSACTION_BASE_FEE_REIMBURSEMENT)
272        .unwrap();
273
274    Ok(())
275}
276
277fn next_timestamp(after: i64, schedule: String) -> Option<i64> {
278    Schedule::from_str(&schedule)
279        .unwrap()
280        .next_after(&DateTime::<Utc>::from_utc(
281            NaiveDateTime::from_timestamp_opt(after, 0).unwrap(),
282            Utc,
283        ))
284        .take()
285        .map(|datetime| datetime.timestamp())
286}