sablier_thread_program/instructions/
thread_exec.rs

1use anchor_lang::{
2    prelude::*,
3    solana_program::{
4        instruction::Instruction,
5        program::{get_return_data, invoke_signed},
6    },
7    AnchorDeserialize, InstructionData,
8};
9use sablier_network_program::state::{Fee, Pool, Worker, WorkerAccount};
10use sablier_utils::thread::{SerializableInstruction, ThreadResponse, PAYER_PUBKEY};
11
12use crate::{constants::*, errors::SablierError, state::*};
13
14/// Accounts required by the `thread_exec` instruction.
15#[derive(Accounts)]
16pub struct ThreadExec<'info> {
17    /// The worker's fee account.
18    #[account(
19        mut,
20        seeds = [
21            sablier_network_program::constants::SEED_FEE,
22            worker.key().as_ref(),
23        ],
24        bump,
25        seeds::program = sablier_network_program::ID,
26        has_one = worker,
27    )]
28    pub fee: Account<'info, Fee>,
29
30    /// The active worker pool.
31    #[account(address = Pool::pubkey(POOL_ID))]
32    pub pool: Box<Account<'info, Pool>>,
33
34    /// The signatory.
35    #[account(mut)]
36    pub signatory: Signer<'info>,
37
38    /// The thread to execute.
39    #[account(
40        mut,
41        seeds = [
42            SEED_THREAD,
43            thread.authority.as_ref(),
44            thread.id.as_slice(),
45            thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice()
46        ],
47        bump = thread.bump,
48        constraint = !thread.paused @ SablierError::ThreadPaused,
49        constraint = thread.next_instruction.is_some(),
50        constraint = thread.exec_context.is_some()
51    )]
52    pub thread: Box<Account<'info, Thread>>,
53
54    /// The worker.
55    #[account(address = worker.pubkey())]
56    pub worker: Account<'info, Worker>,
57}
58
59pub fn handler(ctx: Context<ThreadExec>) -> Result<()> {
60    // Get accounts
61    let clock = Clock::get().unwrap();
62    let fee = &mut ctx.accounts.fee;
63    let pool = &ctx.accounts.pool;
64    let signatory = &mut ctx.accounts.signatory;
65    let thread = &mut ctx.accounts.thread;
66    let worker = &ctx.accounts.worker;
67
68    // If the rate limit has been met, exit early.
69    if thread.exec_context.unwrap().last_exec_at == clock.slot
70        && thread.exec_context.unwrap().execs_since_slot >= thread.rate_limit
71    {
72        return Err(SablierError::RateLimitExeceeded.into());
73    }
74
75    // Record the worker's lamports before invoking inner ixs.
76    let signatory_lamports_pre = signatory.lamports();
77
78    // Get the instruction to execute.
79    // We have already verified that it is not null during account validation.
80    let instruction: &mut SerializableInstruction = &mut thread.next_instruction.clone().unwrap();
81
82    // Inject the signatory's pubkey for the Sablier payer ID.
83    for acc in instruction.accounts.iter_mut() {
84        if acc.pubkey.eq(&PAYER_PUBKEY) {
85            acc.pubkey = signatory.key();
86        }
87    }
88
89    // Invoke the provided instruction.
90    invoke_signed(
91        &Instruction::from(&*instruction),
92        ctx.remaining_accounts,
93        &[&[
94            SEED_THREAD,
95            thread.authority.as_ref(),
96            thread.id.as_slice(),
97            thread.domain.as_ref().unwrap_or(&Vec::new()).as_slice(),
98            &[thread.bump],
99        ]],
100    )?;
101
102    // Verify the inner instruction did not write data to the signatory address.
103    require!(signatory.data_is_empty(), SablierError::UnauthorizedWrite);
104
105    // Parse the thread response
106    let thread_response: Option<ThreadResponse> = match get_return_data() {
107        None => None,
108        Some((program_id, return_data)) => {
109            require!(
110                program_id.eq(&instruction.program_id),
111                SablierError::InvalidThreadResponse
112            );
113            ThreadResponse::try_from_slice(return_data.as_slice()).ok()
114        }
115    };
116
117    // Grab the next instruction from the thread response.
118    let mut close_to = None;
119    let mut next_instruction = None;
120    if let Some(thread_response) = thread_response {
121        close_to = thread_response.close_to;
122        next_instruction = thread_response.dynamic_instruction;
123
124        // Update the trigger.
125        if let Some(trigger) = thread_response.trigger {
126            require!(
127                std::mem::discriminant(&thread.trigger) == std::mem::discriminant(&trigger),
128                SablierError::InvalidTriggerVariant
129            );
130            thread.trigger = trigger.clone();
131
132            // If the user updates an account trigger, the trigger context is no longer valid.
133            // Here we reset the trigger context to zero to re-prime the trigger.
134            thread.exec_context = Some(ExecContext {
135                trigger_context: match trigger {
136                    Trigger::Account {
137                        address: _,
138                        offset: _,
139                        size: _,
140                    } => TriggerContext::Account { data_hash: 0 },
141                    _ => thread.exec_context.unwrap().trigger_context,
142                },
143                ..thread.exec_context.unwrap()
144            })
145        }
146    }
147
148    // If there is no dynamic next instruction, get the next instruction from the instruction set.
149    let mut exec_index = thread.exec_context.unwrap().exec_index;
150    if next_instruction.is_none() {
151        if let Some(ix) = thread.instructions.get((exec_index + 1) as usize) {
152            next_instruction = Some(ix.clone());
153            exec_index += 1;
154        }
155    }
156
157    // Update the next instruction.
158    if let Some(close_to) = close_to {
159        thread.next_instruction = Some(
160            Instruction {
161                program_id: crate::ID,
162                accounts: crate::accounts::ThreadDelete {
163                    authority: thread.key(),
164                    close_to,
165                    thread: thread.key(),
166                }
167                .to_account_metas(Some(true)),
168                data: crate::instruction::ThreadDelete {}.data(),
169            }
170            .into(),
171        );
172    } else {
173        thread.next_instruction = next_instruction;
174    }
175
176    // Update the exec context.
177    let should_reimburse_transaction = clock.slot > thread.exec_context.unwrap().last_exec_at;
178    thread.exec_context = Some(ExecContext {
179        exec_index,
180        execs_since_slot: if clock.slot == thread.exec_context.unwrap().last_exec_at {
181            thread.exec_context.unwrap().execs_since_slot + 1
182        } else {
183            1
184        },
185        last_exec_at: clock.slot,
186        ..thread.exec_context.unwrap()
187    });
188
189    // Reimbursement signatory for lamports paid during inner ix.
190    let signatory_lamports_post = signatory.lamports();
191    let mut signatory_reimbursement =
192        signatory_lamports_pre.saturating_sub(signatory_lamports_post);
193    if should_reimburse_transaction {
194        signatory_reimbursement += TRANSACTION_BASE_FEE_REIMBURSEMENT;
195    }
196    if signatory_reimbursement > 0 {
197        thread.sub_lamports(signatory_reimbursement)?;
198        signatory.add_lamports(signatory_reimbursement)?;
199    }
200
201    // If the worker is in the pool, debit from the thread account and payout to the worker's fee account.
202    if pool.workers.contains(&worker.key()) {
203        thread.sub_lamports(thread.fee)?;
204        fee.add_lamports(thread.fee)?;
205    }
206
207    let exec_ctx = &mut thread.exec_context.unwrap();
208
209    // Update execution context for Cron and Periodic triggers
210    // This ensures the next execution is scheduled based on the actual execution time,
211    // rather than the theoretical scheduled time, preventing drift in long-running threads
212    match thread.trigger {
213        Trigger::Cron { skippable, .. } => {
214            if skippable {
215                exec_ctx.last_exec_at = clock.slot;
216                exec_ctx.trigger_context = TriggerContext::Cron {
217                    started_at: clock.unix_timestamp,
218                };
219            }
220        }
221        Trigger::Periodic { .. } => {
222            exec_ctx.last_exec_at = clock.slot;
223            exec_ctx.trigger_context = TriggerContext::Periodic {
224                started_at: clock.unix_timestamp,
225            };
226        }
227        _ => (),
228    }
229
230    Ok(())
231}