sablier_thread_program/instructions/
thread_kickoff.rs1use std::{
2 collections::hash_map::DefaultHasher,
3 hash::{Hash, Hasher},
4 str::FromStr,
5};
6
7use anchor_lang::prelude::*;
8use chrono::DateTime;
9use sablier_cron::Schedule;
10use sablier_network_program::state::{Worker, WorkerAccount};
11use sablier_utils::{
12 pyth::{self, PriceUpdateV2},
13 thread::Trigger,
14};
15
16use crate::{constants::*, errors::*, state::*};
17
18#[derive(Accounts)]
20pub struct ThreadKickoff<'info> {
21 #[account(mut)]
23 pub signatory: Signer<'info>,
24
25 #[account(
27 mut,
28 seeds = [
29 SEED_THREAD,
30 thread.authority.as_ref(),
31 thread.id.as_slice(),
32 thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
33 ],
34 bump = thread.bump,
35 constraint = !thread.paused @ SablierError::ThreadPaused,
36 constraint = thread.next_instruction.is_none() @ SablierError::ThreadBusy,
37 )]
38 pub thread: Account<'info, Thread>,
39
40 #[account(address = worker.pubkey())]
42 pub worker: Account<'info, Worker>,
43}
44
45pub fn handler(ctx: Context<ThreadKickoff>) -> Result<()> {
46 let signatory = &mut ctx.accounts.signatory;
48 let thread = &mut ctx.accounts.thread;
49 let clock = Clock::get()?;
50
51 match &thread.trigger {
52 Trigger::Account {
53 address,
54 offset,
55 size,
56 } => {
57 match ctx.remaining_accounts.first() {
59 None => {
60 return Err(SablierError::TriggerConditionFailed.into());
61 }
62 Some(account_info) => {
63 require!(
65 address.eq(account_info.key),
66 SablierError::TriggerConditionFailed
67 );
68
69 let mut hasher = DefaultHasher::new();
71 let data = &account_info.try_borrow_data()?;
72 let offset = *offset as usize;
73 let range_end = offset + *size as usize;
74 if data.len().gt(&range_end) {
75 data[offset..range_end].hash(&mut hasher);
76 } else {
77 data[offset..].hash(&mut hasher)
78 }
79 let data_hash = hasher.finish();
80
81 if let Some(exec_context) = thread.exec_context {
83 match exec_context.trigger_context {
84 TriggerContext::Account {
85 data_hash: prior_data_hash,
86 } => {
87 require!(
88 data_hash.ne(&prior_data_hash),
89 SablierError::TriggerConditionFailed
90 )
91 }
92 _ => return Err(SablierError::InvalidThreadState.into()),
93 }
94 }
95
96 thread.exec_context = Some(ExecContext {
98 exec_index: 0,
99 execs_since_reimbursement: 0,
100 execs_since_slot: 0,
101 last_exec_at: clock.slot,
102 trigger_context: TriggerContext::Account { data_hash },
103 })
104 }
105 }
106 }
107 Trigger::Cron {
108 schedule,
109 skippable,
110 } => {
111 let reference_timestamp = match thread.exec_context {
113 None => thread.created_at.unix_timestamp,
114 Some(exec_context) => match exec_context.trigger_context {
115 TriggerContext::Cron { started_at } => started_at,
116 _ => return Err(SablierError::InvalidThreadState.into()),
117 },
118 };
119
120 let threshold_timestamp = next_timestamp(reference_timestamp, schedule)
122 .ok_or(SablierError::TriggerConditionFailed)?;
123 msg!(
124 "Threshold timestamp: {}, clock timestamp: {}",
125 threshold_timestamp,
126 clock.unix_timestamp
127 );
128 require!(
129 clock.unix_timestamp.ge(&threshold_timestamp),
130 SablierError::TriggerConditionFailed
131 );
132
133 let started_at = if *skippable {
136 clock.unix_timestamp
137 } else {
138 threshold_timestamp
139 };
140
141 thread.exec_context = Some(ExecContext {
143 exec_index: 0,
144 execs_since_reimbursement: 0,
145 execs_since_slot: 0,
146 last_exec_at: clock.slot,
147 trigger_context: TriggerContext::Cron { started_at },
148 });
149 }
150 Trigger::Now => {
151 require!(
153 thread.exec_context.is_none(),
154 SablierError::InvalidThreadState
155 );
156 thread.exec_context = Some(ExecContext {
157 exec_index: 0,
158 execs_since_reimbursement: 0,
159 execs_since_slot: 0,
160 last_exec_at: clock.slot,
161 trigger_context: TriggerContext::Now,
162 });
163 }
164 Trigger::Slot { slot } => {
165 require!(clock.slot.ge(slot), SablierError::TriggerConditionFailed);
166 thread.exec_context = Some(ExecContext {
167 exec_index: 0,
168 execs_since_reimbursement: 0,
169 execs_since_slot: 0,
170 last_exec_at: clock.slot,
171 trigger_context: TriggerContext::Slot { started_at: *slot },
172 });
173 }
174 Trigger::Epoch { epoch } => {
175 require!(clock.epoch.ge(epoch), SablierError::TriggerConditionFailed);
176 thread.exec_context = Some(ExecContext {
177 exec_index: 0,
178 execs_since_reimbursement: 0,
179 execs_since_slot: 0,
180 last_exec_at: clock.slot,
181 trigger_context: TriggerContext::Epoch { started_at: *epoch },
182 })
183 }
184 Trigger::Timestamp { unix_ts } => {
185 require!(
186 clock.unix_timestamp.ge(unix_ts),
187 SablierError::TriggerConditionFailed
188 );
189 thread.exec_context = Some(ExecContext {
190 exec_index: 0,
191 execs_since_reimbursement: 0,
192 execs_since_slot: 0,
193 last_exec_at: clock.slot,
194 trigger_context: TriggerContext::Timestamp {
195 started_at: *unix_ts,
196 },
197 })
198 }
199 Trigger::Pyth {
200 feed_id,
201 equality,
202 limit,
203 } => {
204 match ctx.remaining_accounts.first() {
206 None => {
207 return Err(SablierError::TriggerConditionFailed.into());
208 }
209 Some(account_info) => {
210 require_keys_eq!(
211 *account_info.owner,
212 pyth::ID,
213 SablierError::TriggerConditionFailed
214 );
215 const STALENESS_THRESHOLD: u64 = 60; let price_update =
217 PriceUpdateV2::try_deserialize(&mut account_info.data.borrow().as_ref())?;
218
219 let current_price = price_update.get_price_no_older_than(
220 &Clock::get()?,
221 STALENESS_THRESHOLD,
222 feed_id,
223 )?;
224
225 match equality {
226 Equality::GreaterThanOrEqual => {
227 require!(
228 current_price.price.ge(limit),
229 SablierError::TriggerConditionFailed
230 );
231 thread.exec_context = Some(ExecContext {
232 exec_index: 0,
233 execs_since_reimbursement: 0,
234 execs_since_slot: 0,
235 last_exec_at: clock.slot,
236 trigger_context: TriggerContext::Pyth {
237 price: current_price.price,
238 },
239 });
240 }
241 Equality::LessThanOrEqual => {
242 require!(
243 current_price.price.le(limit),
244 SablierError::TriggerConditionFailed
245 );
246 thread.exec_context = Some(ExecContext {
247 exec_index: 0,
248 execs_since_reimbursement: 0,
249 execs_since_slot: 0,
250 last_exec_at: clock.slot,
251 trigger_context: TriggerContext::Pyth {
252 price: current_price.price,
253 },
254 });
255 }
256 }
257 }
258 }
259 }
260 Trigger::Periodic { delay } => {
261 let reference_timestamp = match thread.exec_context {
263 None => thread.created_at.unix_timestamp,
264 Some(exec_context) => match exec_context.trigger_context {
265 TriggerContext::Periodic { started_at } => started_at,
266 _ => return Err(SablierError::InvalidThreadState.into()),
267 },
268 };
269
270 let threshold_timestamp = reference_timestamp + *delay as i64;
272
273 msg!(
274 "Threshold timestamp: {}, clock timestamp: {}",
275 threshold_timestamp,
276 clock.unix_timestamp
277 );
278 require!(
279 clock.unix_timestamp.ge(&threshold_timestamp),
280 SablierError::TriggerConditionFailed
281 );
282
283 thread.exec_context = Some(ExecContext {
285 exec_index: 0,
286 execs_since_reimbursement: 0,
287 execs_since_slot: 0,
288 last_exec_at: clock.slot,
289 trigger_context: TriggerContext::Periodic {
290 started_at: threshold_timestamp,
291 },
292 });
293 }
294 }
295
296 if let Some(kickoff_instruction) = thread.instructions.first() {
298 thread.next_instruction = Some(kickoff_instruction.clone());
299 }
300
301 thread.realloc_account()?;
303
304 thread.sub_lamports(TRANSACTION_BASE_FEE_REIMBURSEMENT)?;
306 signatory.add_lamports(TRANSACTION_BASE_FEE_REIMBURSEMENT)?;
307
308 Ok(())
309}
310
311fn next_timestamp(after: i64, schedule: &str) -> Option<i64> {
312 Schedule::from_str(schedule)
313 .unwrap()
314 .next_after(&DateTime::from_timestamp(after, 0).unwrap())
315 .take()
316 .map(|datetime| datetime.timestamp())
317}