1use crate::{errors::AntegenThreadError, *};
2use anchor_lang::{
3 prelude::*,
4 solana_program::instruction::{AccountMeta, Instruction},
5 AnchorDeserialize, AnchorSerialize,
6};
7use std::{collections::hash_map::DefaultHasher, collections::HashMap, hash::Hasher};
8
9pub const CURRENT_THREAD_VERSION: u8 = 1;
11
12pub const PAYER_PUBKEY: Pubkey = pubkey!("AntegenPayer1111111111111111111111111111111");
15
16#[derive(AnchorDeserialize, AnchorSerialize, Clone, Debug)]
18pub struct SerializableInstruction {
19 pub program_id: Pubkey,
20 pub accounts: Vec<SerializableAccountMeta>,
21 pub data: Vec<u8>,
22}
23
24#[derive(AnchorDeserialize, AnchorSerialize, Clone, Debug)]
26pub struct SerializableAccountMeta {
27 pub pubkey: Pubkey,
28 pub is_signer: bool,
29 pub is_writable: bool,
30}
31
32impl From<Instruction> for SerializableInstruction {
33 fn from(ix: Instruction) -> Self {
34 SerializableInstruction {
35 program_id: ix.program_id,
36 accounts: ix
37 .accounts
38 .into_iter()
39 .map(|acc| SerializableAccountMeta {
40 pubkey: acc.pubkey,
41 is_signer: acc.is_signer,
42 is_writable: acc.is_writable,
43 })
44 .collect(),
45 data: ix.data,
46 }
47 }
48}
49
50impl From<SerializableInstruction> for Instruction {
51 fn from(ix: SerializableInstruction) -> Self {
52 Instruction {
53 program_id: ix.program_id,
54 accounts: ix
55 .accounts
56 .into_iter()
57 .map(|acc| AccountMeta {
58 pubkey: acc.pubkey,
59 is_signer: acc.is_signer,
60 is_writable: acc.is_writable,
61 })
62 .collect(),
63 data: ix.data,
64 }
65 }
66}
67
68#[derive(AnchorDeserialize, AnchorSerialize, Clone, InitSpace, PartialEq, Debug)]
70pub enum Trigger {
71 Account {
73 address: Pubkey,
75 offset: u64,
77 size: u64,
79 },
80
81 Immediate {
83 jitter: u64,
85 },
86
87 Timestamp {
89 unix_ts: i64,
90 jitter: u64,
92 },
93
94 Interval {
96 seconds: i64,
98 skippable: bool,
100 jitter: u64,
102 },
103
104 Cron {
106 #[max_len(255)]
108 schedule: String,
109
110 skippable: bool,
113
114 jitter: u64,
116 },
117
118 Slot { slot: u64 },
120
121 Epoch { epoch: u64 },
123}
124
125#[derive(AnchorDeserialize, AnchorSerialize, Clone, InitSpace, Debug, PartialEq)]
128pub enum Schedule {
129 OnChange { prev: u64 },
131
132 Timed { prev: i64, next: i64 },
134
135 Block { prev: u64, next: u64 },
137}
138
139#[derive(AnchorDeserialize, AnchorSerialize, Clone, Default, InitSpace, Debug, PartialEq)]
142pub enum Signal {
143 #[default]
144 None, Chain, Close, Repeat, Next {
149 index: u8, },
151 UpdateTrigger {
152 trigger: Trigger, },
154}
155
156#[derive(AnchorDeserialize, AnchorSerialize, Clone, Debug)]
158pub struct CompiledInstructionData {
159 pub program_id_index: u8,
160 pub accounts: Vec<u8>,
161 pub data: Vec<u8>,
162}
163
164#[derive(AnchorDeserialize, AnchorSerialize, Clone, Debug)]
166pub struct CompiledInstructionV0 {
167 pub num_ro_signers: u8,
168 pub num_rw_signers: u8,
169 pub num_rw: u8,
170 pub instructions: Vec<CompiledInstructionData>,
171 pub signer_seeds: Vec<Vec<Vec<u8>>>,
172 pub accounts: Vec<Pubkey>,
173}
174
175#[account]
177#[derive(Debug, InitSpace)]
178pub struct Thread {
179 pub version: u8,
181 pub bump: u8,
182 pub authority: Pubkey,
183 #[max_len(32)]
184 pub id: Vec<u8>,
185 #[max_len(64)]
186 pub name: String,
187 pub created_at: i64,
188
189 pub trigger: Trigger,
191 pub schedule: Schedule,
192
193 #[max_len(1024)]
195 pub default_fiber: Option<Vec<u8>>,
196 pub default_fiber_priority_fee: u64,
197
198 #[max_len(50)]
200 pub fiber_ids: Vec<u8>,
201 pub fiber_cursor: u8,
202 pub fiber_next_id: u8,
203 pub fiber_signal: Signal,
204
205 pub paused: bool,
207
208 pub exec_count: u64,
210 pub last_executor: Pubkey,
211 pub last_error_time: Option<i64>,
212
213 pub nonce_account: Pubkey,
215 #[max_len(44)]
216 pub last_nonce: String,
217
218 #[max_len(256)]
220 pub close_fiber: Vec<u8>,
221}
222
223impl Thread {
224 pub fn pubkey(authority: Pubkey, id: impl AsRef<[u8]>) -> Pubkey {
226 let id_bytes = id.as_ref();
227 assert!(id_bytes.len() <= 32, "Thread ID must not exceed 32 bytes");
228
229 Pubkey::find_program_address(&[SEED_THREAD, authority.as_ref(), id_bytes], &crate::ID).0
230 }
231
232 pub fn has_nonce_account(&self) -> bool {
234 self.nonce_account != anchor_lang::solana_program::system_program::ID
235 && self.nonce_account != crate::ID
236 }
237
238 pub fn advance_to_next_fiber(&mut self) {
240 if self.fiber_ids.is_empty() {
241 self.fiber_cursor = 0;
242 return;
243 }
244
245 if let Some(current_pos) = self.fiber_ids.iter().position(|&x| x == self.fiber_cursor) {
247 let next_pos = (current_pos + 1) % self.fiber_ids.len();
249 self.fiber_cursor = self.fiber_ids[next_pos];
250 } else {
251 self.fiber_cursor = self.fiber_ids.first().copied().unwrap_or(0);
253 }
254 }
255
256 pub fn next_fiber_index(&self) -> u8 {
259 if self.fiber_ids.is_empty() {
260 return 0;
261 }
262 if let Some(current_pos) = self.fiber_ids.iter().position(|&x| x == self.fiber_cursor) {
263 let next_pos = (current_pos + 1) % self.fiber_ids.len();
264 self.fiber_ids[next_pos]
265 } else {
266 self.fiber_ids.first().copied().unwrap_or(0)
267 }
268 }
269
270 pub fn fiber(&self, thread_pubkey: &Pubkey) -> Pubkey {
272 self.fiber_at_index(thread_pubkey, self.fiber_cursor)
273 }
274
275 pub fn fiber_at_index(&self, thread_pubkey: &Pubkey, fiber_index: u8) -> Pubkey {
277 Pubkey::find_program_address(
278 &[b"thread_fiber", thread_pubkey.as_ref(), &[fiber_index]],
279 &crate::ID,
280 )
281 .0
282 }
283
284 pub fn next_fiber(&self, thread_pubkey: &Pubkey) -> Pubkey {
286 let next_index = if self.fiber_ids.is_empty() {
288 0
289 } else if let Some(current_pos) =
290 self.fiber_ids.iter().position(|&x| x == self.fiber_cursor)
291 {
292 let next_pos = (current_pos + 1) % self.fiber_ids.len();
293 self.fiber_ids[next_pos]
294 } else {
295 self.fiber_ids.first().copied().unwrap_or(0)
296 };
297
298 self.fiber_at_index(thread_pubkey, next_index)
299 }
300
301 pub fn is_ready(&self, current_slot: u64, current_timestamp: i64) -> bool {
303 match &self.schedule {
304 Schedule::Timed { next, .. } => current_timestamp >= *next,
305 Schedule::Block { next, .. } => {
306 match &self.trigger {
307 Trigger::Slot { .. } => current_slot >= *next,
308 Trigger::Epoch { .. } => {
309 false
312 }
313 _ => false,
314 }
315 }
316 Schedule::OnChange { .. } => {
317 false
319 }
320 }
321 }
322
323 pub fn validate_for_execution(&self) -> Result<()> {
325 require!(
327 !self.fiber_ids.is_empty(),
328 crate::errors::AntegenThreadError::ThreadHasNoFibersToExecute
329 );
330
331 if self.fiber_cursor == 0 {
333 require!(
335 self.default_fiber.is_some() || self.fiber_ids.contains(&0),
336 crate::errors::AntegenThreadError::InvalidExecIndex
337 );
338 } else {
339 require!(
341 self.fiber_ids.contains(&self.fiber_cursor),
342 crate::errors::AntegenThreadError::InvalidExecIndex
343 );
344 }
345
346 Ok(())
347 }
348}
349
350impl TryFrom<Vec<u8>> for Thread {
351 type Error = Error;
352
353 fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
354 Thread::try_deserialize(&mut data.as_slice())
355 }
356}
357
358pub fn compile_instruction(
360 instruction: Instruction,
361 signer_seeds: Vec<Vec<Vec<u8>>>,
362) -> Result<CompiledInstructionV0> {
363 let mut pubkeys_to_metadata: HashMap<Pubkey, AccountMeta> = HashMap::new();
364
365 pubkeys_to_metadata.insert(
367 instruction.program_id,
368 AccountMeta {
369 pubkey: instruction.program_id,
370 is_signer: false,
371 is_writable: false,
372 },
373 );
374
375 for acc in &instruction.accounts {
377 let entry = pubkeys_to_metadata
378 .entry(acc.pubkey)
379 .or_insert(AccountMeta {
380 pubkey: acc.pubkey,
381 is_signer: false,
382 is_writable: false,
383 });
384 entry.is_signer |= acc.is_signer;
385 entry.is_writable |= acc.is_writable;
386 }
387
388 let mut sorted_accounts: Vec<Pubkey> = pubkeys_to_metadata.keys().cloned().collect();
390 sorted_accounts.sort_by(|a, b| {
391 let a_meta = &pubkeys_to_metadata[a];
392 let b_meta = &pubkeys_to_metadata[b];
393
394 fn get_priority(meta: &AccountMeta) -> u8 {
395 match (meta.is_signer, meta.is_writable) {
396 (true, true) => 0,
397 (true, false) => 1,
398 (false, true) => 2,
399 (false, false) => 3,
400 }
401 }
402
403 get_priority(a_meta).cmp(&get_priority(b_meta))
404 });
405
406 let mut num_rw_signers = 0u8;
408 let mut num_ro_signers = 0u8;
409 let mut num_rw = 0u8;
410
411 for pubkey in &sorted_accounts {
412 let meta = &pubkeys_to_metadata[pubkey];
413 if meta.is_signer && meta.is_writable {
414 num_rw_signers += 1;
415 } else if meta.is_signer && !meta.is_writable {
416 num_ro_signers += 1;
417 } else if meta.is_writable {
418 num_rw += 1;
419 }
420 }
421
422 let accounts_to_index: HashMap<Pubkey, u8> = sorted_accounts
424 .iter()
425 .enumerate()
426 .map(|(i, k)| (*k, i as u8))
427 .collect();
428
429 let compiled_instruction = CompiledInstructionData {
431 program_id_index: *accounts_to_index.get(&instruction.program_id).unwrap(),
432 accounts: instruction
433 .accounts
434 .iter()
435 .map(|acc| *accounts_to_index.get(&acc.pubkey).unwrap())
436 .collect(),
437 data: instruction.data,
438 };
439
440 Ok(CompiledInstructionV0 {
441 num_ro_signers,
442 num_rw_signers,
443 num_rw,
444 instructions: vec![compiled_instruction],
445 signer_seeds,
446 accounts: sorted_accounts,
447 })
448}
449
450pub fn decompile_instruction(compiled: &CompiledInstructionV0) -> Result<Instruction> {
452 if compiled.instructions.is_empty() {
453 return Err(ProgramError::InvalidInstructionData.into());
454 }
455
456 let ix = &compiled.instructions[0];
457 let program_id = compiled.accounts[ix.program_id_index as usize];
458
459 let accounts: Vec<AccountMeta> = ix
460 .accounts
461 .iter()
462 .enumerate()
463 .map(|(_i, &idx)| {
464 let pubkey = compiled.accounts[idx as usize];
465 let is_writable = if idx < compiled.num_rw_signers {
466 true
467 } else if idx < compiled.num_rw_signers + compiled.num_ro_signers {
468 false
469 } else if idx < compiled.num_rw_signers + compiled.num_ro_signers + compiled.num_rw {
470 true
471 } else {
472 false
473 };
474 let is_signer = idx < compiled.num_rw_signers + compiled.num_ro_signers;
475
476 AccountMeta {
477 pubkey,
478 is_signer,
479 is_writable,
480 }
481 })
482 .collect();
483
484 Ok(Instruction {
485 program_id,
486 accounts,
487 data: ix.data.clone(),
488 })
489}
490
491pub trait TriggerProcessor {
493 fn validate_trigger(
494 &self,
495 clock: &Clock,
496 remaining_accounts: &[AccountInfo],
497 thread_pubkey: &Pubkey,
498 ) -> Result<i64>; fn update_schedule(
501 &mut self,
502 clock: &Clock,
503 remaining_accounts: &[AccountInfo],
504 thread_pubkey: &Pubkey,
505 ) -> Result<()>; fn get_last_started_at(&self) -> i64;
508}
509
510pub trait ThreadSeeds {
512 fn get_seed_bytes(&self) -> Vec<Vec<u8>>;
513
514 fn sign<F, R>(&self, f: F) -> R
516 where
517 F: FnOnce(&[&[u8]]) -> R,
518 {
519 let seed_bytes = self.get_seed_bytes();
520 let seeds: Vec<&[u8]> = seed_bytes.iter().map(|s| s.as_slice()).collect();
521 f(&seeds)
522 }
523}
524
525pub trait NonceProcessor {
527 fn advance_nonce_if_required<'info>(
528 &self,
529 thread_account_info: &AccountInfo<'info>,
530 nonce_account: &Option<UncheckedAccount<'info>>,
531 recent_blockhashes: &Option<UncheckedAccount<'info>>,
532 ) -> Result<()>;
533}
534
535pub trait PaymentDistributor {
537 fn distribute_payments<'info>(
538 &self,
539 thread_account: &AccountInfo<'info>,
540 executor: &AccountInfo<'info>,
541 admin: &AccountInfo<'info>,
542 payments: &crate::state::PaymentDetails,
543 ) -> Result<()>;
544}
545
546impl TriggerProcessor for Thread {
547 fn validate_trigger(
548 &self,
549 clock: &Clock,
550 remaining_accounts: &[AccountInfo],
551 thread_pubkey: &Pubkey,
552 ) -> Result<i64> {
553 let last_started_at = self.get_last_started_at();
554
555 let trigger_ready_time = match &self.trigger {
557 Trigger::Immediate { jitter } => {
558 let jitter_offset =
559 crate::utils::calculate_jitter_offset(last_started_at, thread_pubkey, *jitter);
560 clock.unix_timestamp.saturating_add(jitter_offset)
561 }
562
563 Trigger::Timestamp { unix_ts, jitter } => {
564 let jitter_offset =
565 crate::utils::calculate_jitter_offset(last_started_at, thread_pubkey, *jitter);
566 let trigger_time = unix_ts.saturating_add(jitter_offset);
567
568 require!(
569 clock.unix_timestamp >= trigger_time,
570 AntegenThreadError::TriggerConditionFailed
571 );
572 trigger_time
573 }
574
575 Trigger::Slot { slot } => {
576 require!(
577 clock.slot >= *slot,
578 AntegenThreadError::TriggerConditionFailed
579 );
580 clock.unix_timestamp - ((clock.slot - slot) as i64 * 400 / 1000)
582 }
583
584 Trigger::Epoch { epoch } => {
585 require!(
586 clock.epoch >= *epoch,
587 AntegenThreadError::TriggerConditionFailed
588 );
589 clock.unix_timestamp
590 }
591
592 Trigger::Interval {
593 seconds: _,
594 skippable: _,
595 jitter: _,
596 } => {
597 let trigger_time = match self.schedule {
599 Schedule::Timed { next, .. } => next,
600 _ => return Err(AntegenThreadError::TriggerConditionFailed.into()),
601 };
602
603 require!(
604 clock.unix_timestamp >= trigger_time,
605 AntegenThreadError::TriggerConditionFailed
606 );
607 trigger_time
608 }
609
610 Trigger::Cron {
611 schedule: _,
612 skippable: _,
613 jitter: _,
614 } => {
615 let trigger_time = match self.schedule {
617 Schedule::Timed { next, .. } => next,
618 _ => return Err(AntegenThreadError::TriggerConditionFailed.into()),
619 };
620
621 require!(
622 clock.unix_timestamp >= trigger_time,
623 AntegenThreadError::TriggerConditionFailed
624 );
625 trigger_time
626 }
627
628 Trigger::Account {
629 address,
630 offset,
631 size,
632 } => {
633 let account_info = remaining_accounts
635 .first()
636 .ok_or(AntegenThreadError::TriggerConditionFailed)?;
637
638 require!(
640 address.eq(account_info.key),
641 AntegenThreadError::TriggerConditionFailed
642 );
643
644 let mut hasher = DefaultHasher::new();
646 let data = &account_info.try_borrow_data()?;
647 let offset = *offset as usize;
648 let range_end = offset.checked_add(*size as usize).unwrap() as usize;
649
650 use std::hash::Hash;
651 if data.len() > range_end {
652 data[offset..range_end].hash(&mut hasher);
653 } else {
654 data[offset..].hash(&mut hasher);
655 }
656 let data_hash = hasher.finish();
657
658 if let Schedule::OnChange { prev: prior_hash } = &self.schedule {
660 require!(
661 data_hash.ne(prior_hash),
662 AntegenThreadError::TriggerConditionFailed
663 );
664 }
665
666 clock.unix_timestamp
667 }
668 };
669
670 Ok(clock.unix_timestamp.saturating_sub(trigger_ready_time))
672 }
673
674 fn update_schedule(
675 &mut self,
676 clock: &Clock,
677 remaining_accounts: &[AccountInfo],
678 thread_pubkey: &Pubkey,
679 ) -> Result<()> {
680 let current_timestamp = clock.unix_timestamp;
681
682 self.schedule = match &self.trigger {
683 Trigger::Account { offset, size, .. } => {
684 let account_info = remaining_accounts
686 .first()
687 .ok_or(AntegenThreadError::TriggerConditionFailed)?;
688
689 let mut hasher = DefaultHasher::new();
690 let data = &account_info.try_borrow_data()?;
691 let offset = *offset as usize;
692 let range_end = offset.checked_add(*size as usize).unwrap() as usize;
693
694 use std::hash::Hash;
695 if data.len() > range_end {
696 data[offset..range_end].hash(&mut hasher);
697 } else {
698 data[offset..].hash(&mut hasher);
699 }
700 let data_hash = hasher.finish();
701
702 Schedule::OnChange { prev: data_hash }
703 }
704 Trigger::Cron {
705 schedule, jitter, ..
706 } => {
707 let next_cron = crate::utils::next_timestamp(current_timestamp, schedule.clone())
710 .ok_or(AntegenThreadError::TriggerConditionFailed)?;
711 let next_jitter = crate::utils::calculate_jitter_offset(
712 current_timestamp,
713 thread_pubkey,
714 *jitter,
715 );
716 let next_trigger_time = next_cron.saturating_add(next_jitter);
717
718 Schedule::Timed {
719 prev: current_timestamp,
720 next: next_trigger_time,
721 }
722 }
723 Trigger::Immediate { .. } => Schedule::Timed {
724 prev: current_timestamp,
725 next: 0, },
727 Trigger::Slot { slot } => Schedule::Block {
728 prev: clock.slot,
729 next: *slot,
730 },
731 Trigger::Epoch { epoch } => Schedule::Block {
732 prev: clock.epoch,
733 next: *epoch,
734 },
735 Trigger::Interval {
736 seconds, jitter, ..
737 } => {
738 let next_base = current_timestamp.saturating_add(*seconds);
741 let next_jitter = crate::utils::calculate_jitter_offset(
742 current_timestamp,
743 thread_pubkey,
744 *jitter,
745 );
746 let next_trigger_time = next_base.saturating_add(next_jitter);
747
748 Schedule::Timed {
749 prev: current_timestamp,
750 next: next_trigger_time,
751 }
752 }
753 Trigger::Timestamp { unix_ts, .. } => Schedule::Timed {
754 prev: current_timestamp,
755 next: *unix_ts,
756 },
757 };
758
759 Ok(())
760 }
761
762 fn get_last_started_at(&self) -> i64 {
763 match &self.schedule {
764 Schedule::Timed { prev, .. } => *prev,
765 Schedule::Block { prev, .. } => *prev as i64,
766 Schedule::OnChange { .. } => self.created_at,
767 }
768 }
769}
770
771impl ThreadSeeds for Thread {
772 fn get_seed_bytes(&self) -> Vec<Vec<u8>> {
773 vec![
774 SEED_THREAD.to_vec(),
775 self.authority.to_bytes().to_vec(),
776 self.id.clone(),
777 vec![self.bump],
778 ]
779 }
780}
781
782impl PaymentDistributor for Thread {
783 fn distribute_payments<'info>(
784 &self,
785 thread_account: &AccountInfo<'info>,
786 executor: &AccountInfo<'info>,
787 admin: &AccountInfo<'info>,
788 payments: &crate::state::PaymentDetails,
789 ) -> Result<()> {
790 use crate::utils::transfer_lamports;
791
792 let total_executor_payment =
794 payments.fee_payer_reimbursement + payments.executor_commission;
795
796 if total_executor_payment > 0 || payments.core_team_fee > 0 {
798 msg!(
799 "Payments: executor {} (reimburse {}, commission {}), team {}",
800 total_executor_payment,
801 payments.fee_payer_reimbursement,
802 payments.executor_commission,
803 payments.core_team_fee
804 );
805 }
806
807 if total_executor_payment > 0 {
808 transfer_lamports(thread_account, executor, total_executor_payment)?;
809 }
810
811 if payments.core_team_fee > 0 {
813 transfer_lamports(thread_account, admin, payments.core_team_fee)?;
814 }
815
816 Ok(())
817 }
818}
819
820impl NonceProcessor for Thread {
821 fn advance_nonce_if_required<'info>(
822 &self,
823 thread_account_info: &AccountInfo<'info>,
824 nonce_account: &Option<UncheckedAccount<'info>>,
825 recent_blockhashes: &Option<UncheckedAccount<'info>>,
826 ) -> Result<()> {
827 use anchor_lang::solana_program::{
828 program::invoke_signed, system_instruction::advance_nonce_account,
829 };
830
831 if !self.has_nonce_account() {
832 return Ok(());
833 }
834
835 match (nonce_account, recent_blockhashes) {
836 (Some(nonce_acc), Some(recent_bh)) => {
837 let thread_key = *thread_account_info.key;
839
840 self.sign(|seeds| {
842 invoke_signed(
843 &advance_nonce_account(&nonce_acc.key(), &thread_key),
844 &[
845 nonce_acc.to_account_info(),
846 recent_bh.to_account_info(),
847 thread_account_info.clone(),
848 ],
849 &[seeds],
850 )
851 })?;
852 Ok(())
853 }
854 _ => Err(AntegenThreadError::NonceRequired.into()),
855 }
856 }
857}