antegen_thread_program/instructions/
thread_exec.rs1use crate::{
2 errors::*,
3 state::{decompile_instruction, CompiledInstructionV0, Signal},
4 *,
5};
6use anchor_lang::{
7 prelude::*,
8 solana_program::program::{get_return_data, invoke_signed},
9};
10
11#[derive(Accounts)]
13#[instruction(forgo_commission: bool, fiber_cursor: u8)]
14pub struct ThreadExec<'info> {
15 #[account(mut)]
17 pub executor: Signer<'info>,
18
19 #[account(
22 mut,
23 dup,
24 seeds = [
25 SEED_THREAD,
26 thread.authority.as_ref(),
27 thread.id.as_slice(),
28 ],
29 bump = thread.bump,
30 constraint = !thread.paused @ AntegenThreadError::ThreadPaused,
31 constraint = !thread.fiber_ids.is_empty() @ AntegenThreadError::InvalidThreadState,
32 )]
33 pub thread: Box<Account<'info, Thread>>,
34
35 pub fiber: Box<Account<'info, antegen_fiber_program::state::FiberState>>,
37
38 #[account(
40 seeds = [SEED_CONFIG],
41 bump = config.bump,
42 )]
43 pub config: Account<'info, ThreadConfig>,
44
45 #[account(
48 mut,
49 constraint = admin.key().eq(&config.admin) @ AntegenThreadError::InvalidConfigAdmin,
50 )]
51 pub admin: UncheckedAccount<'info>,
52
53 #[account(mut)]
56 pub nonce_account: Option<UncheckedAccount<'info>>,
57
58 pub recent_blockhashes: Option<UncheckedAccount<'info>>,
60
61 #[account(address = anchor_lang::system_program::ID)]
62 pub system_program: Program<'info, System>,
63}
64
65pub fn thread_exec<'info>(
66 ctx: Context<'info, ThreadExec<'info>>,
67 forgo_commission: bool,
68 fiber_cursor: u8,
69) -> Result<()> {
70 let mut all_account_infos = ctx.accounts.to_account_infos();
75 all_account_infos.extend_from_slice(ctx.remaining_accounts);
76
77 let clock: Clock = Clock::get()?;
78 let thread: &mut Box<Account<Thread>> = &mut ctx.accounts.thread;
79 let config: &Account<ThreadConfig> = &ctx.accounts.config;
80 let executor: &mut Signer = &mut ctx.accounts.executor;
81 let executor_lamports_start: u64 = executor.lamports();
82 let thread_pubkey = thread.key();
83
84 require!(
85 !ctx.accounts.config.paused,
86 AntegenThreadError::GlobalPauseActive
87 );
88
89 if thread.fiber_signal == Signal::Close {
91 let compiled = CompiledInstructionV0::try_from_slice(&thread.close_fiber)?;
92 let mut instruction = decompile_instruction(&compiled)?;
93
94 for &fiber_idx in &thread.fiber_ids {
96 instruction.accounts.push(AccountMeta {
97 pubkey: FiberState::pubkey(thread_pubkey, fiber_idx),
98 is_signer: false,
99 is_writable: true,
100 });
101 }
102
103 msg!("Executing close_fiber to delete thread ({} fibers)", thread.fiber_ids.len());
104
105 thread.sign(|seeds| invoke_signed(&instruction, &all_account_infos, &[seeds]))?;
106
107 return Ok(());
108 }
109
110 let is_chained = thread.fiber_signal.eq(&Signal::Chain);
112
113 if is_chained {
115 thread.fiber_cursor = fiber_cursor;
116 }
117
118 thread.validate_for_execution()?;
120
121 thread.advance_nonce_if_required(
122 &thread.to_account_info(),
123 &ctx.accounts.nonce_account,
124 &ctx.accounts.recent_blockhashes,
125 )?;
126
127 let time_since_ready = if is_chained {
128 msg!("Chained execution");
129 0
130 } else {
131 thread.validate_trigger(&clock, ctx.remaining_accounts, &thread_pubkey)?
132 };
133
134 let fiber = &ctx.accounts.fiber;
136
137 let expected_fiber = thread.fiber_at_index(&thread_pubkey, fiber_cursor);
138 require!(
139 fiber.key().eq(&expected_fiber),
140 AntegenThreadError::WrongFiberIndex
141 );
142
143 let instruction = fiber.get_instruction(&executor.key())?;
144
145 msg!(
146 "invoke_signed: program={}, ix_accounts={}, remaining_accounts={}",
147 instruction.program_id,
148 instruction.accounts.len(),
149 ctx.remaining_accounts.len()
150 );
151
152 for (i, acc_meta) in instruction.accounts.iter().enumerate() {
154 if !ctx.remaining_accounts.iter().any(|ai| ai.key.eq(&acc_meta.pubkey)) {
155 msg!("MISSING remaining_account[{}]: {}", i, acc_meta.pubkey);
156 }
157 }
158 if !ctx.remaining_accounts.iter().any(|ai| ai.key.eq(&instruction.program_id)) {
159 msg!("MISSING program_id in remaining_accounts: {}", instruction.program_id);
160 }
161
162 thread.sign(|seeds| invoke_signed(&instruction, &all_account_infos, &[seeds]))?;
163
164 require!(
166 executor.data_is_empty(),
167 AntegenThreadError::UnauthorizedWrite
168 );
169
170 let signal: Signal = match get_return_data() {
172 None => Signal::None,
173 Some((program_id, return_data)) => {
174 if program_id.eq(&instruction.program_id) {
175 Signal::try_from_slice(return_data.as_slice()).unwrap_or(Signal::None)
176 } else {
177 Signal::None
178 }
179 }
180 };
181
182 let last_fiber = thread.fiber_ids.last().copied().unwrap_or(fiber_cursor);
184 let signal = if signal.eq(&Signal::Chain) && last_fiber.eq(&fiber_cursor) {
185 Signal::None
186 } else {
187 signal
188 };
189
190 if signal.ne(&Signal::Chain) {
192 let balance_change = executor.lamports() as i64 - executor_lamports_start as i64;
193 let payments =
194 config.calculate_payments(time_since_ready, balance_change, forgo_commission);
195
196 if forgo_commission && payments.executor_commission.eq(&0) {
197 let effective_commission = config.calculate_effective_commission(time_since_ready);
198 let forgone = config.calculate_executor_fee(effective_commission);
199 msg!(
200 "Executed {}s after trigger, forgoing {} commission",
201 time_since_ready,
202 forgone
203 );
204 } else {
205 msg!("Executed {}s after trigger", time_since_ready);
206 }
207
208 thread.distribute_payments(
209 &thread.to_account_info(),
210 &executor.to_account_info(),
211 &ctx.accounts.admin.to_account_info(),
212 &payments,
213 )?;
214 }
215
216 let fired_trigger = thread.trigger.clone();
219
220 thread.fiber_signal = Signal::None;
223 match &signal {
224 Signal::Chain | Signal::Close => {
225 thread.fiber_signal = signal.clone();
226 }
227 Signal::Next { index } => {
228 thread.fiber_cursor = *index;
229 }
230 Signal::Update { paused, trigger, index } => {
231 if let Some(paused) = paused {
232 thread.paused = *paused;
233 }
234 if let Some(trigger) = trigger {
235 thread.trigger = trigger.clone();
236 }
237 if trigger.is_some() && paused.is_none() {
239 thread.paused = false;
240 }
241 if let Some(index) = index {
242 thread.fiber_cursor = *index;
243 } else {
244 thread.advance_to_next_fiber();
245 }
246 }
247 Signal::Repeat => {
248 }
250 Signal::None => {
251 thread.advance_to_next_fiber();
252 }
253 }
254
255 if matches!(fired_trigger, Trigger::Immediate { .. }) && signal != Signal::Chain {
257 thread.fiber_signal = Signal::Close;
258 }
259
260 if matches!(fired_trigger, Trigger::Timestamp { .. }) && signal != Signal::Chain {
262 thread.paused = true;
263 }
264
265 if signal != Signal::Chain {
267 thread.update_schedule(&clock, ctx.remaining_accounts, &thread_pubkey)?;
268 }
269
270 thread.exec_count += 1;
272 thread.last_executor = executor.key();
273
274 Ok(())
275}