antegen_thread_program/instructions/
thread_exec.rs1use crate::{
2 errors::*,
3 state::{decompile_instruction, CompiledInstructionV0, Signal},
4 *,
5};
6use anchor_lang::{
7 prelude::*,
8 solana_program::program::{get_return_data, invoke_signed},
9};
10use antegen_fiber_program::state::{Fiber, FiberInstructionProcessor};
11
12#[derive(Accounts)]
14#[instruction(forgo_commission: bool, fiber_cursor: u8)]
15pub struct ThreadExec<'info> {
16 #[account(mut)]
18 pub executor: Signer<'info>,
19
20 #[account(
23 mut,
24 dup,
25 seeds = [
26 SEED_THREAD,
27 thread.authority.as_ref(),
28 thread.id.as_slice(),
29 ],
30 bump = thread.bump,
31 constraint = !thread.paused @ AntegenThreadError::ThreadPaused,
32 constraint = !thread.fiber_ids.is_empty() @ AntegenThreadError::InvalidThreadState,
33 )]
34 pub thread: Box<Account<'info, Thread>>,
35
36 pub fiber: UncheckedAccount<'info>,
39
40 #[account(
42 seeds = [SEED_CONFIG],
43 bump = config.bump,
44 )]
45 pub config: Account<'info, ThreadConfig>,
46
47 #[account(
50 mut,
51 constraint = admin.key().eq(&config.admin) @ AntegenThreadError::InvalidConfigAdmin,
52 )]
53 pub admin: UncheckedAccount<'info>,
54
55 #[account(mut)]
58 pub nonce_account: Option<UncheckedAccount<'info>>,
59
60 pub recent_blockhashes: Option<UncheckedAccount<'info>>,
62
63 #[account(address = anchor_lang::system_program::ID)]
64 pub system_program: Program<'info, System>,
65}
66
67pub fn thread_exec<'info>(
68 ctx: Context<'info, ThreadExec<'info>>,
69 forgo_commission: bool,
70 fiber_cursor: u8,
71) -> Result<()> {
72 let mut all_account_infos = ctx.accounts.to_account_infos();
77 all_account_infos.extend_from_slice(ctx.remaining_accounts);
78
79 let clock: Clock = Clock::get()?;
80 let thread: &mut Box<Account<Thread>> = &mut ctx.accounts.thread;
81 let config: &Account<ThreadConfig> = &ctx.accounts.config;
82 let executor: &mut Signer = &mut ctx.accounts.executor;
83 let executor_lamports_start: u64 = executor.lamports();
84 let thread_pubkey = thread.key();
85
86 require!(
87 !ctx.accounts.config.paused,
88 AntegenThreadError::GlobalPauseActive
89 );
90
91 if thread.fiber_signal == Signal::Close {
93 let compiled = CompiledInstructionV0::try_from_slice(&thread.close_fiber)?;
94 let mut instruction = decompile_instruction(&compiled)?;
95
96 for &fiber_idx in &thread.fiber_ids {
98 instruction.accounts.push(AccountMeta {
99 pubkey: FiberState::pubkey(thread_pubkey, fiber_idx),
100 is_signer: false,
101 is_writable: true,
102 });
103 }
104
105 msg!(
106 "Executing close_fiber to delete thread ({} fibers)",
107 thread.fiber_ids.len()
108 );
109
110 thread.sign(|seeds| invoke_signed(&instruction, &all_account_infos, &[seeds]))?;
111
112 return Ok(());
113 }
114
115 let is_chained = thread.fiber_signal.eq(&Signal::Chain);
117
118 if is_chained {
120 thread.fiber_cursor = fiber_cursor;
121 }
122
123 thread.validate_for_execution()?;
125
126 thread.advance_nonce_if_required(
127 &thread.to_account_info(),
128 &ctx.accounts.nonce_account,
129 &ctx.accounts.recent_blockhashes,
130 )?;
131
132 let time_since_ready = if is_chained {
133 msg!("Chained execution");
134 0
135 } else {
136 thread.validate_trigger(&clock, ctx.remaining_accounts, &thread_pubkey)?
137 };
138
139 let fiber = &ctx.accounts.fiber;
141
142 let expected_fiber = thread.fiber_at_index(&thread_pubkey, fiber_cursor);
143 require!(
144 fiber.key().eq(&expected_fiber),
145 AntegenThreadError::WrongFiberIndex
146 );
147
148 let fiber_read = Fiber::try_deserialize(&mut &fiber.data.borrow()[..])?;
149 require!(
150 fiber_read.thread().eq(&thread_pubkey),
151 AntegenThreadError::InvalidFiberAccount
152 );
153 let instruction = fiber_read.get_instruction(&executor.key())?;
154
155 msg!(
156 "invoke_signed: program={}, ix_accounts={}, remaining_accounts={}",
157 instruction.program_id,
158 instruction.accounts.len(),
159 ctx.remaining_accounts.len()
160 );
161
162 for (i, acc_meta) in instruction.accounts.iter().enumerate() {
164 if !ctx
165 .remaining_accounts
166 .iter()
167 .any(|ai| ai.key.eq(&acc_meta.pubkey))
168 {
169 msg!("MISSING remaining_account[{}]: {}", i, acc_meta.pubkey);
170 }
171 }
172 if !ctx
173 .remaining_accounts
174 .iter()
175 .any(|ai| ai.key.eq(&instruction.program_id))
176 {
177 msg!(
178 "MISSING program_id in remaining_accounts: {}",
179 instruction.program_id
180 );
181 }
182
183 thread.sign(|seeds| invoke_signed(&instruction, &all_account_infos, &[seeds]))?;
184
185 require!(
187 executor.data_is_empty(),
188 AntegenThreadError::UnauthorizedWrite
189 );
190
191 let signal: Signal = match get_return_data() {
193 None => Signal::None,
194 Some((program_id, return_data)) => {
195 if program_id.eq(&instruction.program_id) {
196 Signal::try_from_slice(return_data.as_slice()).unwrap_or(Signal::None)
197 } else {
198 Signal::None
199 }
200 }
201 };
202
203 let last_fiber = thread.fiber_ids.last().copied().unwrap_or(fiber_cursor);
205 let signal = if signal.eq(&Signal::Chain) && last_fiber.eq(&fiber_cursor) {
206 Signal::None
207 } else {
208 signal
209 };
210
211 if signal.ne(&Signal::Chain) {
213 let balance_change = executor.lamports() as i64 - executor_lamports_start as i64;
214 let payments =
215 config.calculate_payments(time_since_ready, balance_change, forgo_commission);
216
217 if forgo_commission && payments.executor_commission.eq(&0) {
218 let effective_commission = config.calculate_effective_commission(time_since_ready);
219 let forgone = config.calculate_executor_fee(effective_commission);
220 msg!(
221 "Executed {}s after trigger, forgoing {} commission",
222 time_since_ready,
223 forgone
224 );
225 } else {
226 msg!("Executed {}s after trigger", time_since_ready);
227 }
228
229 thread.distribute_payments(
230 &thread.to_account_info(),
231 &executor.to_account_info(),
232 &ctx.accounts.admin.to_account_info(),
233 &payments,
234 )?;
235 }
236
237 let fired_trigger = thread.trigger.clone();
240
241 thread.fiber_signal = Signal::None;
244 match &signal {
245 Signal::Chain | Signal::Close => {
246 thread.fiber_signal = signal.clone();
247 }
248 Signal::Next { index } => {
249 thread.fiber_cursor = *index;
250 }
251 Signal::Update {
252 paused,
253 trigger,
254 index,
255 } => {
256 if let Some(paused) = paused {
257 thread.paused = *paused;
258 }
259 if let Some(trigger) = trigger {
260 thread.trigger = trigger.clone();
261 }
262 if trigger.is_some() && paused.is_none() {
264 thread.paused = false;
265 }
266 if let Some(index) = index {
267 thread.fiber_cursor = *index;
268 } else {
269 thread.advance_to_next_fiber();
270 }
271 }
272 Signal::Repeat => {
273 }
275 Signal::None => {
276 thread.advance_to_next_fiber();
277 }
278 }
279
280 if matches!(fired_trigger, Trigger::Immediate { .. }) && signal != Signal::Chain {
282 thread.fiber_signal = Signal::Close;
283 }
284
285 if matches!(fired_trigger, Trigger::Timestamp { .. }) && signal != Signal::Chain {
289 let signal_unpaused = matches!(
290 &signal,
291 Signal::Update {
292 paused: Some(false),
293 ..
294 }
295 );
296 if !signal_unpaused {
297 thread.paused = true;
298 }
299 }
300
301 if signal != Signal::Chain {
303 thread.update_schedule(&clock, ctx.remaining_accounts, &thread_pubkey)?;
304 }
305
306 thread.exec_count += 1;
308 thread.last_executor = executor.key();
309
310 Ok(())
311}