1use crate::{errors::AntegenThreadError, *};
2use anchor_lang::{prelude::*, AnchorDeserialize, AnchorSerialize};
3use std::{collections::hash_map::DefaultHasher, hash::Hasher};
4
5pub use antegen_fiber_program::state::{
7 compile_instruction, decompile_instruction, CompiledInstructionData, CompiledInstructionV0,
8 SerializableAccountMeta, SerializableInstruction,
9};
10pub use antegen_fiber_program::{PAYER_PUBKEY, SEED_THREAD_FIBER};
11
12pub const CURRENT_THREAD_VERSION: u8 = 1;
14
15#[derive(AnchorDeserialize, AnchorSerialize, Clone, InitSpace, PartialEq, Debug)]
17pub enum Trigger {
18 Account {
20 address: Pubkey,
22 offset: u64,
24 size: u64,
26 },
27
28 Immediate {
30 jitter: u64,
32 },
33
34 Timestamp {
36 unix_ts: i64,
37 jitter: u64,
39 },
40
41 Interval {
43 seconds: i64,
45 skippable: bool,
47 jitter: u64,
49 },
50
51 Cron {
53 #[max_len(255)]
55 schedule: String,
56
57 skippable: bool,
60
61 jitter: u64,
63 },
64
65 Slot { slot: u64 },
67
68 Epoch { epoch: u64 },
70}
71
72#[derive(AnchorDeserialize, AnchorSerialize, Clone, InitSpace, Debug, PartialEq)]
75pub enum Schedule {
76 OnChange { prev: u64 },
78
79 Timed { prev: i64, next: i64 },
81
82 Block { prev: u64, next: u64 },
84}
85
86#[derive(AnchorDeserialize, AnchorSerialize, Clone, Default, InitSpace, Debug, PartialEq)]
89pub enum Signal {
90 #[default]
91 None, Chain, Close, Repeat, Next {
96 index: u8, },
98 Update {
99 paused: Option<bool>,
100 trigger: Option<Trigger>,
101 index: Option<u8>,
102 },
103}
104
105#[account]
107#[derive(Debug, InitSpace)]
108pub struct Thread {
109 pub version: u8,
111 pub bump: u8,
112 pub authority: Pubkey,
113 #[max_len(32)]
114 pub id: Vec<u8>,
115 #[max_len(64)]
116 pub name: String,
117 pub created_at: i64,
118
119 pub trigger: Trigger,
121 pub schedule: Schedule,
122
123 #[max_len(50)]
125 pub fiber_ids: Vec<u8>,
126 pub fiber_cursor: u8,
127 pub fiber_next_id: u8,
128 pub fiber_signal: Signal,
129
130 pub paused: bool,
132
133 pub exec_count: u64,
135 pub last_executor: Pubkey,
136
137 pub nonce_account: Pubkey,
139 #[max_len(44)]
140 pub last_nonce: String,
141
142 #[max_len(256)]
144 pub close_fiber: Vec<u8>,
145}
146
147impl Thread {
148 pub fn pubkey(authority: Pubkey, id: impl AsRef<[u8]>) -> Pubkey {
150 let id_bytes = id.as_ref();
151 assert!(id_bytes.len() <= 32, "Thread ID must not exceed 32 bytes");
152
153 Pubkey::find_program_address(&[SEED_THREAD, authority.as_ref(), id_bytes], &crate::ID).0
154 }
155
156 pub fn has_nonce_account(&self) -> bool {
158 self.nonce_account != anchor_lang::solana_program::system_program::ID
159 && self.nonce_account != crate::ID
160 }
161
162 pub fn advance_to_next_fiber(&mut self) {
164 if self.fiber_ids.is_empty() {
165 self.fiber_cursor = 0;
166 return;
167 }
168
169 if let Some(current_pos) = self.fiber_ids.iter().position(|&x| x == self.fiber_cursor) {
171 let next_pos = (current_pos + 1) % self.fiber_ids.len();
173 self.fiber_cursor = self.fiber_ids[next_pos];
174 } else {
175 self.fiber_cursor = self.fiber_ids.first().copied().unwrap_or(0);
177 }
178 }
179
180 pub fn next_fiber_index(&self) -> u8 {
183 if self.fiber_ids.is_empty() {
184 return 0;
185 }
186 if let Some(current_pos) = self.fiber_ids.iter().position(|&x| x == self.fiber_cursor) {
187 let next_pos = (current_pos + 1) % self.fiber_ids.len();
188 self.fiber_ids[next_pos]
189 } else {
190 self.fiber_ids.first().copied().unwrap_or(0)
191 }
192 }
193
194 pub fn fiber(&self, thread_pubkey: &Pubkey) -> Pubkey {
196 self.fiber_at_index(thread_pubkey, self.fiber_cursor)
197 }
198
199 pub fn fiber_at_index(&self, thread_pubkey: &Pubkey, fiber_index: u8) -> Pubkey {
201 Pubkey::find_program_address(
202 &[SEED_THREAD_FIBER, thread_pubkey.as_ref(), &[fiber_index]],
203 &antegen_fiber_program::ID,
204 )
205 .0
206 }
207
208 pub fn next_fiber(&self, thread_pubkey: &Pubkey) -> Pubkey {
210 let next_index = if self.fiber_ids.is_empty() {
212 0
213 } else if let Some(current_pos) =
214 self.fiber_ids.iter().position(|&x| x == self.fiber_cursor)
215 {
216 let next_pos = (current_pos + 1) % self.fiber_ids.len();
217 self.fiber_ids[next_pos]
218 } else {
219 self.fiber_ids.first().copied().unwrap_or(0)
220 };
221
222 self.fiber_at_index(thread_pubkey, next_index)
223 }
224
225 pub fn is_ready(&self, current_slot: u64, current_timestamp: i64) -> bool {
227 match &self.schedule {
228 Schedule::Timed { next, .. } => current_timestamp >= *next,
229 Schedule::Block { next, .. } => {
230 match &self.trigger {
231 Trigger::Slot { .. } => current_slot >= *next,
232 Trigger::Epoch { .. } => {
233 false
236 }
237 _ => false,
238 }
239 }
240 Schedule::OnChange { .. } => {
241 false
243 }
244 }
245 }
246
247 pub fn validate_for_execution(&self) -> Result<()> {
249 require!(
251 !self.fiber_ids.is_empty(),
252 crate::errors::AntegenThreadError::ThreadHasNoFibersToExecute
253 );
254
255 require!(
257 self.fiber_ids.contains(&self.fiber_cursor),
258 crate::errors::AntegenThreadError::InvalidExecIndex
259 );
260
261 Ok(())
262 }
263}
264
265impl TryFrom<Vec<u8>> for Thread {
266 type Error = Error;
267
268 fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
269 Thread::try_deserialize(&mut data.as_slice())
270 }
271}
272
273pub trait TriggerProcessor {
275 fn validate_trigger(
276 &self,
277 clock: &Clock,
278 remaining_accounts: &[AccountInfo],
279 thread_pubkey: &Pubkey,
280 ) -> Result<i64>; fn update_schedule(
283 &mut self,
284 clock: &Clock,
285 remaining_accounts: &[AccountInfo],
286 thread_pubkey: &Pubkey,
287 ) -> Result<()>; fn get_last_started_at(&self) -> i64;
290}
291
292pub trait ThreadSeeds {
294 fn get_seed_bytes(&self) -> Vec<Vec<u8>>;
295
296 fn sign<F, R>(&self, f: F) -> R
298 where
299 F: FnOnce(&[&[u8]]) -> R,
300 {
301 let seed_bytes = self.get_seed_bytes();
302 let seeds: Vec<&[u8]> = seed_bytes.iter().map(|s| s.as_slice()).collect();
303 f(&seeds)
304 }
305}
306
307pub trait NonceProcessor {
309 fn advance_nonce_if_required<'info>(
310 &self,
311 thread_account_info: &AccountInfo<'info>,
312 nonce_account: &Option<UncheckedAccount<'info>>,
313 recent_blockhashes: &Option<UncheckedAccount<'info>>,
314 ) -> Result<()>;
315}
316
317pub trait PaymentDistributor {
319 fn distribute_payments<'info>(
320 &self,
321 thread_account: &AccountInfo<'info>,
322 executor: &AccountInfo<'info>,
323 admin: &AccountInfo<'info>,
324 payments: &crate::state::PaymentDetails,
325 ) -> Result<()>;
326}
327
328impl TriggerProcessor for Thread {
329 fn validate_trigger(
330 &self,
331 clock: &Clock,
332 remaining_accounts: &[AccountInfo],
333 thread_pubkey: &Pubkey,
334 ) -> Result<i64> {
335 let last_started_at = self.get_last_started_at();
336
337 let trigger_ready_time = match &self.trigger {
339 Trigger::Immediate { jitter } => {
340 let jitter_offset =
341 crate::utils::calculate_jitter_offset(last_started_at, thread_pubkey, *jitter);
342 clock.unix_timestamp.saturating_add(jitter_offset)
343 }
344
345 Trigger::Timestamp { unix_ts, jitter } => {
346 let jitter_offset =
347 crate::utils::calculate_jitter_offset(last_started_at, thread_pubkey, *jitter);
348 let trigger_time = unix_ts.saturating_add(jitter_offset);
349
350 require!(
351 clock.unix_timestamp >= trigger_time,
352 AntegenThreadError::TriggerConditionFailed
353 );
354 trigger_time
355 }
356
357 Trigger::Slot { slot } => {
358 require!(
359 clock.slot >= *slot,
360 AntegenThreadError::TriggerConditionFailed
361 );
362 clock.unix_timestamp - ((clock.slot - slot) as i64 * 400 / 1000)
364 }
365
366 Trigger::Epoch { epoch } => {
367 require!(
368 clock.epoch >= *epoch,
369 AntegenThreadError::TriggerConditionFailed
370 );
371 clock.unix_timestamp
372 }
373
374 Trigger::Interval {
375 seconds: _,
376 skippable: _,
377 jitter: _,
378 } => {
379 let trigger_time = match self.schedule {
381 Schedule::Timed { next, .. } => next,
382 _ => return Err(AntegenThreadError::TriggerConditionFailed.into()),
383 };
384
385 require!(
386 clock.unix_timestamp >= trigger_time,
387 AntegenThreadError::TriggerConditionFailed
388 );
389 trigger_time
390 }
391
392 Trigger::Cron {
393 schedule: _,
394 skippable: _,
395 jitter: _,
396 } => {
397 let trigger_time = match self.schedule {
399 Schedule::Timed { next, .. } => next,
400 _ => return Err(AntegenThreadError::TriggerConditionFailed.into()),
401 };
402
403 require!(
404 clock.unix_timestamp >= trigger_time,
405 AntegenThreadError::TriggerConditionFailed
406 );
407 trigger_time
408 }
409
410 Trigger::Account {
411 address,
412 offset,
413 size,
414 } => {
415 let account_info = remaining_accounts
417 .first()
418 .ok_or(AntegenThreadError::TriggerConditionFailed)?;
419
420 require!(
422 address.eq(account_info.key),
423 AntegenThreadError::TriggerConditionFailed
424 );
425
426 let mut hasher = DefaultHasher::new();
428 let data = &account_info.try_borrow_data()?;
429 let offset = *offset as usize;
430 let range_end = offset.checked_add(*size as usize).unwrap() as usize;
431
432 use std::hash::Hash;
433 if data.len() > range_end {
434 data[offset..range_end].hash(&mut hasher);
435 } else {
436 data[offset..].hash(&mut hasher);
437 }
438 let data_hash = hasher.finish();
439
440 if let Schedule::OnChange { prev: prior_hash } = &self.schedule {
442 require!(
443 data_hash.ne(prior_hash),
444 AntegenThreadError::TriggerConditionFailed
445 );
446 }
447
448 clock.unix_timestamp
449 }
450 };
451
452 Ok(clock.unix_timestamp.saturating_sub(trigger_ready_time))
454 }
455
456 fn update_schedule(
457 &mut self,
458 clock: &Clock,
459 remaining_accounts: &[AccountInfo],
460 thread_pubkey: &Pubkey,
461 ) -> Result<()> {
462 let current_timestamp = clock.unix_timestamp;
463
464 self.schedule = match &self.trigger {
465 Trigger::Account { offset, size, .. } => {
466 let account_info = remaining_accounts
468 .first()
469 .ok_or(AntegenThreadError::TriggerConditionFailed)?;
470
471 let mut hasher = DefaultHasher::new();
472 let data = &account_info.try_borrow_data()?;
473 let offset = *offset as usize;
474 let range_end = offset.checked_add(*size as usize).unwrap() as usize;
475
476 use std::hash::Hash;
477 if data.len() > range_end {
478 data[offset..range_end].hash(&mut hasher);
479 } else {
480 data[offset..].hash(&mut hasher);
481 }
482 let data_hash = hasher.finish();
483
484 Schedule::OnChange { prev: data_hash }
485 }
486 Trigger::Cron {
487 schedule, jitter, ..
488 } => {
489 let next_cron = crate::utils::next_timestamp(current_timestamp, schedule.clone())
492 .ok_or(AntegenThreadError::TriggerConditionFailed)?;
493 let next_jitter = crate::utils::calculate_jitter_offset(
494 current_timestamp,
495 thread_pubkey,
496 *jitter,
497 );
498 let next_trigger_time = next_cron.saturating_add(next_jitter);
499
500 Schedule::Timed {
501 prev: current_timestamp,
502 next: next_trigger_time,
503 }
504 }
505 Trigger::Immediate { .. } => Schedule::Timed {
506 prev: current_timestamp,
507 next: 0, },
509 Trigger::Slot { slot } => Schedule::Block {
510 prev: clock.slot,
511 next: *slot,
512 },
513 Trigger::Epoch { epoch } => Schedule::Block {
514 prev: clock.epoch,
515 next: *epoch,
516 },
517 Trigger::Interval {
518 seconds, jitter, ..
519 } => {
520 let next_base = current_timestamp.saturating_add(*seconds);
523 let next_jitter = crate::utils::calculate_jitter_offset(
524 current_timestamp,
525 thread_pubkey,
526 *jitter,
527 );
528 let next_trigger_time = next_base.saturating_add(next_jitter);
529
530 Schedule::Timed {
531 prev: current_timestamp,
532 next: next_trigger_time,
533 }
534 }
535 Trigger::Timestamp { unix_ts, .. } => Schedule::Timed {
536 prev: current_timestamp,
537 next: *unix_ts,
538 },
539 };
540
541 Ok(())
542 }
543
544 fn get_last_started_at(&self) -> i64 {
545 match &self.schedule {
546 Schedule::Timed { prev, .. } => *prev,
547 Schedule::Block { prev, .. } => *prev as i64,
548 Schedule::OnChange { .. } => self.created_at,
549 }
550 }
551}
552
553impl ThreadSeeds for Thread {
554 fn get_seed_bytes(&self) -> Vec<Vec<u8>> {
555 vec![
556 SEED_THREAD.to_vec(),
557 self.authority.to_bytes().to_vec(),
558 self.id.clone(),
559 vec![self.bump],
560 ]
561 }
562}
563
564impl PaymentDistributor for Thread {
565 fn distribute_payments<'info>(
566 &self,
567 thread_account: &AccountInfo<'info>,
568 executor: &AccountInfo<'info>,
569 admin: &AccountInfo<'info>,
570 payments: &crate::state::PaymentDetails,
571 ) -> Result<()> {
572 use crate::utils::transfer_lamports;
573
574 let total_executor_payment =
576 payments.fee_payer_reimbursement + payments.executor_commission;
577
578 if total_executor_payment > 0 || payments.core_team_fee > 0 {
580 msg!(
581 "Payments: executor {} (reimburse {}, commission {}), team {}",
582 total_executor_payment,
583 payments.fee_payer_reimbursement,
584 payments.executor_commission,
585 payments.core_team_fee
586 );
587 }
588
589 if total_executor_payment > 0 {
590 transfer_lamports(thread_account, executor, total_executor_payment)?;
591 }
592
593 if payments.core_team_fee > 0 {
595 transfer_lamports(thread_account, admin, payments.core_team_fee)?;
596 }
597
598 Ok(())
599 }
600}
601
602impl NonceProcessor for Thread {
603 fn advance_nonce_if_required<'info>(
604 &self,
605 thread_account_info: &AccountInfo<'info>,
606 nonce_account: &Option<UncheckedAccount<'info>>,
607 recent_blockhashes: &Option<UncheckedAccount<'info>>,
608 ) -> Result<()> {
609 use anchor_lang::solana_program::{
610 program::invoke_signed, system_instruction::advance_nonce_account,
611 };
612
613 if !self.has_nonce_account() {
614 return Ok(());
615 }
616
617 match (nonce_account, recent_blockhashes) {
618 (Some(nonce_acc), Some(recent_bh)) => {
619 let thread_key = *thread_account_info.key;
621
622 self.sign(|seeds| {
624 invoke_signed(
625 &advance_nonce_account(&nonce_acc.key(), &thread_key),
626 &[
627 nonce_acc.to_account_info(),
628 recent_bh.to_account_info(),
629 thread_account_info.clone(),
630 ],
631 &[seeds],
632 )
633 })?;
634 Ok(())
635 }
636 _ => Err(AntegenThreadError::NonceRequired.into()),
637 }
638 }
639}