sablier_thread_program/instructions/
thread_kickoff.rs

1use std::{
2    collections::hash_map::DefaultHasher,
3    hash::{Hash, Hasher},
4    str::FromStr,
5};
6
7use anchor_lang::prelude::*;
8use chrono::DateTime;
9use sablier_cron::Schedule;
10use sablier_network_program::state::{Worker, WorkerAccount};
11use sablier_utils::{
12    pyth::{self, PriceUpdateV2},
13    thread::Trigger,
14};
15
16use crate::{constants::*, errors::*, state::*};
17
18/// Accounts required by the `thread_kickoff` instruction.
19#[derive(Accounts)]
20pub struct ThreadKickoff<'info> {
21    /// The signatory.
22    #[account(mut)]
23    pub signatory: Signer<'info>,
24
25    /// The thread to kickoff.
26    #[account(
27        mut,
28        seeds = [
29            SEED_THREAD,
30            thread.authority.as_ref(),
31            thread.id.as_slice(),
32            thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
33        ],
34        bump = thread.bump,
35        constraint = !thread.paused @ SablierError::ThreadPaused,
36        constraint = thread.next_instruction.is_none() @ SablierError::ThreadBusy,
37    )]
38    pub thread: Account<'info, Thread>,
39
40    /// The worker.
41    #[account(address = worker.pubkey())]
42    pub worker: Account<'info, Worker>,
43}
44
45pub fn handler(ctx: Context<ThreadKickoff>) -> Result<()> {
46    // Get accounts.
47    let signatory = &mut ctx.accounts.signatory;
48    let thread = &mut ctx.accounts.thread;
49    let clock = Clock::get()?;
50
51    match &thread.trigger {
52        Trigger::Account {
53            address,
54            offset,
55            size,
56        } => {
57            // Verify proof that account data has been updated.
58            match ctx.remaining_accounts.first() {
59                None => {
60                    return Err(SablierError::TriggerConditionFailed.into());
61                }
62                Some(account_info) => {
63                    // Verify the remaining account is the account this thread is listening for.
64                    require!(
65                        address.eq(account_info.key),
66                        SablierError::TriggerConditionFailed
67                    );
68
69                    // Begin computing the data hash of this account.
70                    let mut hasher = DefaultHasher::new();
71                    let data = &account_info.try_borrow_data()?;
72                    let offset = *offset as usize;
73                    let range_end = offset + *size as usize;
74                    if data.len().gt(&range_end) {
75                        data[offset..range_end].hash(&mut hasher);
76                    } else {
77                        data[offset..].hash(&mut hasher)
78                    }
79                    let data_hash = hasher.finish();
80
81                    // Verify the data hash is different than the prior data hash.
82                    if let Some(exec_context) = thread.exec_context {
83                        match exec_context.trigger_context {
84                            TriggerContext::Account {
85                                data_hash: prior_data_hash,
86                            } => {
87                                require!(
88                                    data_hash.ne(&prior_data_hash),
89                                    SablierError::TriggerConditionFailed
90                                )
91                            }
92                            _ => return Err(SablierError::InvalidThreadState.into()),
93                        }
94                    }
95
96                    // Set a new exec context with the new data hash and slot number.
97                    thread.exec_context = Some(ExecContext {
98                        exec_index: 0,
99                        execs_since_reimbursement: 0,
100                        execs_since_slot: 0,
101                        last_exec_at: clock.slot,
102                        trigger_context: TriggerContext::Account { data_hash },
103                    })
104                }
105            }
106        }
107        Trigger::Cron {
108            schedule,
109            skippable,
110        } => {
111            // Get the reference timestamp for calculating the thread's scheduled target timestamp.
112            let reference_timestamp = match thread.exec_context {
113                None => thread.created_at.unix_timestamp,
114                Some(exec_context) => match exec_context.trigger_context {
115                    TriggerContext::Cron { started_at } => started_at,
116                    _ => return Err(SablierError::InvalidThreadState.into()),
117                },
118            };
119
120            // Verify the current timestamp is greater than or equal to the threshold timestamp.
121            let threshold_timestamp = next_timestamp(reference_timestamp, schedule)
122                .ok_or(SablierError::TriggerConditionFailed)?;
123            msg!(
124                "Threshold timestamp: {}, clock timestamp: {}",
125                threshold_timestamp,
126                clock.unix_timestamp
127            );
128            require!(
129                clock.unix_timestamp.ge(&threshold_timestamp),
130                SablierError::TriggerConditionFailed
131            );
132
133            // If the schedule is marked as skippable, set the started_at of the exec context to be the current timestamp.
134            // Otherwise, the exec context must iterate through each scheduled kickoff moment.
135            let started_at = if *skippable {
136                clock.unix_timestamp
137            } else {
138                threshold_timestamp
139            };
140
141            // Set the exec context.
142            thread.exec_context = Some(ExecContext {
143                exec_index: 0,
144                execs_since_reimbursement: 0,
145                execs_since_slot: 0,
146                last_exec_at: clock.slot,
147                trigger_context: TriggerContext::Cron { started_at },
148            });
149        }
150        Trigger::Now => {
151            // Set the exec context.
152            require!(
153                thread.exec_context.is_none(),
154                SablierError::InvalidThreadState
155            );
156            thread.exec_context = Some(ExecContext {
157                exec_index: 0,
158                execs_since_reimbursement: 0,
159                execs_since_slot: 0,
160                last_exec_at: clock.slot,
161                trigger_context: TriggerContext::Now,
162            });
163        }
164        Trigger::Slot { slot } => {
165            require!(clock.slot.ge(slot), SablierError::TriggerConditionFailed);
166            thread.exec_context = Some(ExecContext {
167                exec_index: 0,
168                execs_since_reimbursement: 0,
169                execs_since_slot: 0,
170                last_exec_at: clock.slot,
171                trigger_context: TriggerContext::Slot { started_at: *slot },
172            });
173        }
174        Trigger::Epoch { epoch } => {
175            require!(clock.epoch.ge(epoch), SablierError::TriggerConditionFailed);
176            thread.exec_context = Some(ExecContext {
177                exec_index: 0,
178                execs_since_reimbursement: 0,
179                execs_since_slot: 0,
180                last_exec_at: clock.slot,
181                trigger_context: TriggerContext::Epoch { started_at: *epoch },
182            })
183        }
184        Trigger::Timestamp { unix_ts } => {
185            require!(
186                clock.unix_timestamp.ge(unix_ts),
187                SablierError::TriggerConditionFailed
188            );
189            thread.exec_context = Some(ExecContext {
190                exec_index: 0,
191                execs_since_reimbursement: 0,
192                execs_since_slot: 0,
193                last_exec_at: clock.slot,
194                trigger_context: TriggerContext::Timestamp {
195                    started_at: *unix_ts,
196                },
197            })
198        }
199        Trigger::Pyth {
200            feed_id,
201            equality,
202            limit,
203        } => {
204            // Verify price limit has been reached.
205            match ctx.remaining_accounts.first() {
206                None => {
207                    return Err(SablierError::TriggerConditionFailed.into());
208                }
209                Some(account_info) => {
210                    require_keys_eq!(
211                        *account_info.owner,
212                        pyth::ID,
213                        SablierError::TriggerConditionFailed
214                    );
215                    const STALENESS_THRESHOLD: u64 = 60; // staleness threshold in seconds
216                    let price_update =
217                        PriceUpdateV2::try_deserialize(&mut account_info.data.borrow().as_ref())?;
218
219                    let current_price = price_update.get_price_no_older_than(
220                        &Clock::get()?,
221                        STALENESS_THRESHOLD,
222                        feed_id,
223                    )?;
224
225                    match equality {
226                        Equality::GreaterThanOrEqual => {
227                            require!(
228                                current_price.price.ge(limit),
229                                SablierError::TriggerConditionFailed
230                            );
231                            thread.exec_context = Some(ExecContext {
232                                exec_index: 0,
233                                execs_since_reimbursement: 0,
234                                execs_since_slot: 0,
235                                last_exec_at: clock.slot,
236                                trigger_context: TriggerContext::Pyth {
237                                    price: current_price.price,
238                                },
239                            });
240                        }
241                        Equality::LessThanOrEqual => {
242                            require!(
243                                current_price.price.le(limit),
244                                SablierError::TriggerConditionFailed
245                            );
246                            thread.exec_context = Some(ExecContext {
247                                exec_index: 0,
248                                execs_since_reimbursement: 0,
249                                execs_since_slot: 0,
250                                last_exec_at: clock.slot,
251                                trigger_context: TriggerContext::Pyth {
252                                    price: current_price.price,
253                                },
254                            });
255                        }
256                    }
257                }
258            }
259        }
260        Trigger::Periodic { delay } => {
261            // Get the reference timestamp for calculating the thread's scheduled target timestamp.
262            let reference_timestamp = match thread.exec_context {
263                None => thread.created_at.unix_timestamp,
264                Some(exec_context) => match exec_context.trigger_context {
265                    TriggerContext::Periodic { started_at } => started_at,
266                    _ => return Err(SablierError::InvalidThreadState.into()),
267                },
268            };
269
270            // Verify the current timestamp is greater than or equal to the threshold timestamp.
271            let threshold_timestamp = reference_timestamp + *delay as i64;
272
273            msg!(
274                "Threshold timestamp: {}, clock timestamp: {}",
275                threshold_timestamp,
276                clock.unix_timestamp
277            );
278            require!(
279                clock.unix_timestamp.ge(&threshold_timestamp),
280                SablierError::TriggerConditionFailed
281            );
282
283            // Set the exec context.
284            thread.exec_context = Some(ExecContext {
285                exec_index: 0,
286                execs_since_reimbursement: 0,
287                execs_since_slot: 0,
288                last_exec_at: clock.slot,
289                trigger_context: TriggerContext::Periodic {
290                    started_at: threshold_timestamp,
291                },
292            });
293        }
294    }
295
296    // If we make it here, the trigger is active. Update the next instruction and be done.
297    if let Some(kickoff_instruction) = thread.instructions.first() {
298        thread.next_instruction = Some(kickoff_instruction.clone());
299    }
300
301    // Realloc the thread account
302    thread.realloc_account()?;
303
304    // Reimburse signatory for transaction fee.
305    thread.sub_lamports(TRANSACTION_BASE_FEE_REIMBURSEMENT)?;
306    signatory.add_lamports(TRANSACTION_BASE_FEE_REIMBURSEMENT)?;
307
308    Ok(())
309}
310
311fn next_timestamp(after: i64, schedule: &str) -> Option<i64> {
312    Schedule::from_str(schedule)
313        .unwrap()
314        .next_after(&DateTime::from_timestamp(after, 0).unwrap())
315        .take()
316        .map(|datetime| datetime.timestamp())
317}