sablier_thread_program/instructions/
thread_exec.rs1use anchor_lang::{
2 prelude::*,
3 solana_program::{
4 instruction::Instruction,
5 program::{get_return_data, invoke_signed},
6 },
7 AnchorDeserialize, InstructionData,
8};
9use sablier_network_program::state::{Fee, Pool, Worker, WorkerAccount};
10use sablier_utils::thread::{SerializableInstruction, ThreadResponse, PAYER_PUBKEY};
11
12use crate::{constants::*, errors::SablierError, state::*};
13
14#[derive(Accounts)]
16pub struct ThreadExec<'info> {
17 #[account(
19 mut,
20 seeds = [
21 sablier_network_program::constants::SEED_FEE,
22 worker.key().as_ref(),
23 ],
24 bump,
25 seeds::program = sablier_network_program::ID,
26 has_one = worker,
27 )]
28 pub fee: Account<'info, Fee>,
29
30 #[account(address = Pool::pubkey(POOL_ID))]
32 pub pool: Box<Account<'info, Pool>>,
33
34 #[account(mut)]
36 pub signatory: Signer<'info>,
37
38 #[account(
40 mut,
41 seeds = [
42 SEED_THREAD,
43 thread.authority.as_ref(),
44 thread.id.as_slice(),
45 thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
46 ],
47 bump = thread.bump,
48 constraint = !thread.paused @ SablierError::ThreadPaused,
49 constraint = thread.next_instruction.is_some(),
50 constraint = thread.exec_context.is_some()
51 )]
52 pub thread: Box<Account<'info, Thread>>,
53
54 #[account(address = worker.pubkey())]
56 pub worker: Account<'info, Worker>,
57}
58
59pub fn handler(ctx: Context<ThreadExec>) -> Result<()> {
60 let clock = Clock::get().unwrap();
62 let fee = &mut ctx.accounts.fee;
63 let pool = &ctx.accounts.pool;
64 let signatory = &mut ctx.accounts.signatory;
65 let thread = &mut ctx.accounts.thread;
66 let worker = &ctx.accounts.worker;
67
68 if thread.exec_context.unwrap().last_exec_at == clock.slot
70 && thread.exec_context.unwrap().execs_since_slot >= thread.rate_limit
71 {
72 return Err(SablierError::RateLimitExeceeded.into());
73 }
74
75 let signatory_lamports_pre = signatory.lamports();
77
78 let instruction: &mut SerializableInstruction = &mut thread.next_instruction.clone().unwrap();
81
82 for acc in instruction.accounts.iter_mut() {
84 if acc.pubkey.eq(&PAYER_PUBKEY) {
85 acc.pubkey = signatory.key();
86 }
87 }
88
89 invoke_signed(
91 &Instruction::from(&*instruction),
92 ctx.remaining_accounts,
93 &[&[
94 SEED_THREAD,
95 thread.authority.as_ref(),
96 thread.id.as_slice(),
97 thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice(),
98 &[thread.bump],
99 ]],
100 )?;
101
102 require!(signatory.data_is_empty(), SablierError::UnauthorizedWrite);
104
105 let thread_response: Option<ThreadResponse> = match get_return_data() {
107 None => None,
108 Some((program_id, return_data)) => {
109 require!(
110 program_id.eq(&instruction.program_id),
111 SablierError::InvalidThreadResponse
112 );
113 ThreadResponse::try_from_slice(return_data.as_slice()).ok()
114 }
115 };
116
117 let mut close_to = None;
119 let mut next_instruction = None;
120 if let Some(thread_response) = thread_response {
121 close_to = thread_response.close_to;
122 next_instruction = thread_response.dynamic_instruction;
123
124 if let Some(trigger) = thread_response.trigger {
126 require!(
127 std::mem::discriminant(&thread.trigger) == std::mem::discriminant(&trigger),
128 SablierError::InvalidTriggerVariant
129 );
130 thread.trigger = trigger.clone();
131
132 thread.exec_context = Some(ExecContext {
135 trigger_context: match trigger {
136 Trigger::Account {
137 address: _,
138 offset: _,
139 size: _,
140 } => TriggerContext::Account { data_hash: 0 },
141 _ => thread.exec_context.unwrap().trigger_context,
142 },
143 ..thread.exec_context.unwrap()
144 })
145 }
146 }
147
148 let mut exec_index = thread.exec_context.unwrap().exec_index;
150 if next_instruction.is_none() {
151 if let Some(ix) = thread.instructions.get((exec_index + 1) as usize) {
152 next_instruction = Some(ix.clone());
153 exec_index += 1;
154 }
155 }
156
157 if let Some(close_to) = close_to {
159 thread.next_instruction = Some(
160 Instruction {
161 program_id: crate::ID,
162 accounts: crate::accounts::ThreadDelete {
163 authority: thread.key(),
164 close_to,
165 thread: thread.key(),
166 }
167 .to_account_metas(Some(true)),
168 data: crate::instruction::ThreadDelete {}.data(),
169 }
170 .into(),
171 );
172 } else {
173 thread.next_instruction = next_instruction;
174 }
175
176 let should_reimburse_transaction = clock.slot > thread.exec_context.unwrap().last_exec_at;
178 thread.exec_context = Some(ExecContext {
179 exec_index,
180 execs_since_slot: if clock.slot == thread.exec_context.unwrap().last_exec_at {
181 thread.exec_context.unwrap().execs_since_slot + 1
182 } else {
183 1
184 },
185 last_exec_at: clock.slot,
186 ..thread.exec_context.unwrap()
187 });
188
189 let signatory_lamports_post = signatory.lamports();
191 let mut signatory_reimbursement =
192 signatory_lamports_pre.saturating_sub(signatory_lamports_post);
193 if should_reimburse_transaction {
194 signatory_reimbursement += TRANSACTION_BASE_FEE_REIMBURSEMENT;
195 }
196 if signatory_reimbursement > 0 {
197 thread.sub_lamports(signatory_reimbursement)?;
198 signatory.add_lamports(signatory_reimbursement)?;
199 }
200
201 if pool.workers.contains(&worker.key()) {
203 thread.sub_lamports(thread.fee)?;
204 fee.add_lamports(thread.fee)?;
205 }
206
207 let exec_ctx = &mut thread.exec_context.unwrap();
208
209 match thread.trigger {
213 Trigger::Cron { skippable, .. } => {
214 if skippable {
215 exec_ctx.last_exec_at = clock.slot;
216 exec_ctx.trigger_context = TriggerContext::Cron {
217 started_at: clock.unix_timestamp,
218 };
219 }
220 }
221 Trigger::Periodic { .. } => {
222 exec_ctx.last_exec_at = clock.slot;
223 exec_ctx.trigger_context = TriggerContext::Periodic {
224 started_at: clock.unix_timestamp,
225 };
226 }
227 _ => (),
228 }
229
230 Ok(())
231}