Skip to main content

antegen_thread_program/state/
thread.rs

1use crate::{errors::AntegenThreadError, *};
2use anchor_lang::{prelude::*, AnchorDeserialize, AnchorSerialize};
3use std::{collections::hash_map::DefaultHasher, hash::Hasher};
4
5// Re-export types from Fiber Program
6pub use antegen_fiber_program::state::{
7    compile_instruction, decompile_instruction, CompiledInstructionData, CompiledInstructionV0,
8    SerializableAccountMeta, SerializableInstruction,
9};
10pub use antegen_fiber_program::{PAYER_PUBKEY, SEED_THREAD_FIBER};
11
12/// Current version of the Thread structure.
13pub const CURRENT_THREAD_VERSION: u8 = 1;
14
15/// The triggering conditions of a thread.
16#[derive(AnchorDeserialize, AnchorSerialize, Clone, InitSpace, PartialEq, Debug)]
17pub enum Trigger {
18    /// Allows a thread to be kicked off whenever the data of an account changes.
19    Account {
20        /// The address of the account to monitor.
21        address: Pubkey,
22        /// The byte offset of the account data to monitor.
23        offset: u64,
24        /// The size of the byte slice to monitor (must be less than 1kb)
25        size: u64,
26    },
27
28    /// Allows a thread to be kicked off as soon as it's created.
29    Immediate {
30        /// Optional jitter in seconds to prevent thundering herd (0 = no jitter)
31        jitter: u64,
32    },
33
34    /// Allows a thread to be kicked off according to a unix timestamp.
35    Timestamp {
36        unix_ts: i64,
37        /// Optional jitter in seconds to spread execution across a window (0 = no jitter)
38        jitter: u64,
39    },
40
41    /// Allows a thread to be kicked off at regular intervals.
42    Interval {
43        /// Interval in seconds between executions
44        seconds: i64,
45        /// Boolean value indicating whether triggering moments may be skipped
46        skippable: bool,
47        /// Optional jitter in seconds to prevent thundering herd (0 = no jitter)
48        jitter: u64,
49    },
50
51    /// Allows a thread to be kicked off according to a one-time or recurring schedule.
52    Cron {
53        /// The schedule in cron syntax. Value must be parsable by the `antegen_cron` package.
54        #[max_len(255)]
55        schedule: String,
56
57        /// Boolean value indicating whether triggering moments may be skipped if they are missed (e.g. due to network downtime).
58        /// If false, any "missed" triggering moments will simply be executed as soon as the network comes back online.
59        skippable: bool,
60
61        /// Optional jitter in seconds to spread execution across a window (0 = no jitter)
62        jitter: u64,
63    },
64
65    /// Allows a thread to be kicked off according to a slot.
66    Slot { slot: u64 },
67
68    /// Allows a thread to be kicked off according to an epoch number.
69    Epoch { epoch: u64 },
70}
71
72/// Tracks the execution schedule - when the thread last ran and when it should run next
73/// (was: TriggerContext)
74#[derive(AnchorDeserialize, AnchorSerialize, Clone, InitSpace, Debug, PartialEq)]
75pub enum Schedule {
76    /// For Account triggers - tracks data hash for change detection
77    OnChange { prev: u64 },
78
79    /// For time-based triggers (Immediate, Timestamp, Interval, Cron)
80    Timed { prev: i64, next: i64 },
81
82    /// For block-based triggers (Slot, Epoch)
83    Block { prev: u64, next: u64 },
84}
85
86/// Signal from a fiber about what should happen after execution.
87/// Emitted via set_return_data(), received by thread program via get_return_data().
88#[derive(AnchorDeserialize, AnchorSerialize, Clone, Default, InitSpace, Debug, PartialEq)]
89pub enum Signal {
90    #[default]
91    None, // No signal - normal execution flow
92    Chain,  // Chain to next fiber (same tx)
93    Close,  // Chain to delete thread (same tx)
94    Repeat, // Repeat this fiber on next trigger (skip cursor advancement)
95    Next {
96        index: u8, // Set specific fiber to execute on next trigger
97    },
98    Update {
99        paused: Option<bool>,
100        trigger: Option<Trigger>,
101        index: Option<u8>,
102    },
103}
104
105/// Tracks the current state of a transaction thread on Solana.
106#[account]
107#[derive(Debug, InitSpace)]
108pub struct Thread {
109    // Identity
110    pub version: u8,
111    pub bump: u8,
112    pub authority: Pubkey,
113    #[max_len(32)]
114    pub id: Vec<u8>,
115    #[max_len(64)]
116    pub name: String,
117    pub created_at: i64,
118
119    // Scheduling
120    pub trigger: Trigger,
121    pub schedule: Schedule,
122
123    // Fibers (all managed by Fiber Program as external FiberState accounts)
124    #[max_len(50)]
125    pub fiber_ids: Vec<u8>,
126    pub fiber_cursor: u8,
127    pub fiber_next_id: u8,
128    pub fiber_signal: Signal,
129
130    // Lifecycle
131    pub paused: bool,
132
133    // Execution tracking
134    pub exec_count: u64,
135    pub last_executor: Pubkey,
136
137    // Nonce (for durable transactions)
138    pub nonce_account: Pubkey,
139    #[max_len(44)]
140    pub last_nonce: String,
141
142    // Pre-compiled thread_delete instruction for self-closing
143    #[max_len(256)]
144    pub close_fiber: Vec<u8>,
145}
146
147impl Thread {
148    /// Derive the pubkey of a thread account.
149    pub fn pubkey(authority: Pubkey, id: impl AsRef<[u8]>) -> Pubkey {
150        let id_bytes = id.as_ref();
151        assert!(id_bytes.len() <= 32, "Thread ID must not exceed 32 bytes");
152
153        Pubkey::find_program_address(&[SEED_THREAD, authority.as_ref(), id_bytes], &crate::ID).0
154    }
155
156    /// Check if this thread has a nonce account.
157    pub fn has_nonce_account(&self) -> bool {
158        self.nonce_account != anchor_lang::solana_program::system_program::ID
159            && self.nonce_account != crate::ID
160    }
161
162    /// Advance fiber_cursor to the next fiber in the sequence.
163    pub fn advance_to_next_fiber(&mut self) {
164        if self.fiber_ids.is_empty() {
165            self.fiber_cursor = 0;
166            return;
167        }
168
169        // Find current index position in fiber_ids vec
170        if let Some(current_pos) = self.fiber_ids.iter().position(|&x| x == self.fiber_cursor) {
171            // Move to next fiber, or wrap to beginning
172            let next_pos = (current_pos + 1) % self.fiber_ids.len();
173            self.fiber_cursor = self.fiber_ids[next_pos];
174        } else {
175            // Current fiber_cursor not found, reset to first fiber
176            self.fiber_cursor = self.fiber_ids.first().copied().unwrap_or(0);
177        }
178    }
179
180    /// Get the next fiber index in sequence (without mutating).
181    /// Used to validate Chain signals target the correct consecutive fiber.
182    pub fn next_fiber_index(&self) -> u8 {
183        if self.fiber_ids.is_empty() {
184            return 0;
185        }
186        if let Some(current_pos) = self.fiber_ids.iter().position(|&x| x == self.fiber_cursor) {
187            let next_pos = (current_pos + 1) % self.fiber_ids.len();
188            self.fiber_ids[next_pos]
189        } else {
190            self.fiber_ids.first().copied().unwrap_or(0)
191        }
192    }
193
194    /// Get the fiber PDA for the current fiber_cursor
195    pub fn fiber(&self, thread_pubkey: &Pubkey) -> Pubkey {
196        self.fiber_at_index(thread_pubkey, self.fiber_cursor)
197    }
198
199    /// Get the fiber PDA for a specific fiber_index
200    pub fn fiber_at_index(&self, thread_pubkey: &Pubkey, fiber_index: u8) -> Pubkey {
201        Pubkey::find_program_address(
202            &[SEED_THREAD_FIBER, thread_pubkey.as_ref(), &[fiber_index]],
203            &antegen_fiber_program::ID,
204        )
205        .0
206    }
207
208    /// Get the next fiber PDA (for the next fiber_cursor in the sequence)
209    pub fn next_fiber(&self, thread_pubkey: &Pubkey) -> Pubkey {
210        // Calculate next index based on fiber_ids sequence
211        let next_index = if self.fiber_ids.is_empty() {
212            0
213        } else if let Some(current_pos) =
214            self.fiber_ids.iter().position(|&x| x == self.fiber_cursor)
215        {
216            let next_pos = (current_pos + 1) % self.fiber_ids.len();
217            self.fiber_ids[next_pos]
218        } else {
219            self.fiber_ids.first().copied().unwrap_or(0)
220        };
221
222        self.fiber_at_index(thread_pubkey, next_index)
223    }
224
225    /// Check if thread is ready to execute based on schedule
226    pub fn is_ready(&self, current_slot: u64, current_timestamp: i64) -> bool {
227        match &self.schedule {
228            Schedule::Timed { next, .. } => current_timestamp >= *next,
229            Schedule::Block { next, .. } => {
230                match &self.trigger {
231                    Trigger::Slot { .. } => current_slot >= *next,
232                    Trigger::Epoch { .. } => {
233                        // For epoch triggers, we'd need epoch info
234                        // This is a simplified check
235                        false
236                    }
237                    _ => false,
238                }
239            }
240            Schedule::OnChange { .. } => {
241                // Account triggers are handled by the observer
242                false
243            }
244        }
245    }
246
247    /// Validate that the thread is ready for execution
248    pub fn validate_for_execution(&self) -> Result<()> {
249        // Check that thread has fibers
250        require!(
251            !self.fiber_ids.is_empty(),
252            crate::errors::AntegenThreadError::ThreadHasNoFibersToExecute
253        );
254
255        // Check that fiber_cursor is valid (must exist in fiber_ids)
256        require!(
257            self.fiber_ids.contains(&self.fiber_cursor),
258            crate::errors::AntegenThreadError::InvalidExecIndex
259        );
260
261        Ok(())
262    }
263}
264
265impl TryFrom<Vec<u8>> for Thread {
266    type Error = Error;
267
268    fn try_from(data: Vec<u8>) -> std::result::Result<Self, Self::Error> {
269        Thread::try_deserialize(&mut data.as_slice())
270    }
271}
272
273/// Trait for processing trigger validation and schedule updates
274pub trait TriggerProcessor {
275    fn validate_trigger(
276        &self,
277        clock: &Clock,
278        remaining_accounts: &[AccountInfo],
279        thread_pubkey: &Pubkey,
280    ) -> Result<i64>; // Returns time_since_ready (elapsed time since trigger was ready)
281
282    fn update_schedule(
283        &mut self,
284        clock: &Clock,
285        remaining_accounts: &[AccountInfo],
286        thread_pubkey: &Pubkey,
287    ) -> Result<()>; // Updates schedule for next execution
288
289    fn get_last_started_at(&self) -> i64;
290}
291
292/// Trait for getting thread seeds for signing
293pub trait ThreadSeeds {
294    fn get_seed_bytes(&self) -> Vec<Vec<u8>>;
295
296    /// Use seeds with a callback to avoid lifetime issues
297    fn sign<F, R>(&self, f: F) -> R
298    where
299        F: FnOnce(&[&[u8]]) -> R,
300    {
301        let seed_bytes = self.get_seed_bytes();
302        let seeds: Vec<&[u8]> = seed_bytes.iter().map(|s| s.as_slice()).collect();
303        f(&seeds)
304    }
305}
306
307/// Trait for handling nonce account operations
308pub trait NonceProcessor {
309    fn advance_nonce_if_required<'info>(
310        &self,
311        thread_account_info: &AccountInfo<'info>,
312        nonce_account: &Option<UncheckedAccount<'info>>,
313        recent_blockhashes: &Option<UncheckedAccount<'info>>,
314    ) -> Result<()>;
315}
316
317/// Trait for distributing payments
318pub trait PaymentDistributor {
319    fn distribute_payments<'info>(
320        &self,
321        thread_account: &AccountInfo<'info>,
322        executor: &AccountInfo<'info>,
323        admin: &AccountInfo<'info>,
324        payments: &crate::state::PaymentDetails,
325    ) -> Result<()>;
326}
327
328impl TriggerProcessor for Thread {
329    fn validate_trigger(
330        &self,
331        clock: &Clock,
332        remaining_accounts: &[AccountInfo],
333        thread_pubkey: &Pubkey,
334    ) -> Result<i64> {
335        let last_started_at = self.get_last_started_at();
336
337        // Determine trigger ready time and validate
338        let trigger_ready_time = match &self.trigger {
339            Trigger::Immediate { jitter } => {
340                let jitter_offset =
341                    crate::utils::calculate_jitter_offset(last_started_at, thread_pubkey, *jitter);
342                clock.unix_timestamp.saturating_add(jitter_offset)
343            }
344
345            Trigger::Timestamp { unix_ts, jitter } => {
346                let jitter_offset =
347                    crate::utils::calculate_jitter_offset(last_started_at, thread_pubkey, *jitter);
348                let trigger_time = unix_ts.saturating_add(jitter_offset);
349
350                require!(
351                    clock.unix_timestamp >= trigger_time,
352                    AntegenThreadError::TriggerConditionFailed
353                );
354                trigger_time
355            }
356
357            Trigger::Slot { slot } => {
358                require!(
359                    clock.slot >= *slot,
360                    AntegenThreadError::TriggerConditionFailed
361                );
362                // Approximate when slot was reached (assuming 400ms per slot)
363                clock.unix_timestamp - ((clock.slot - slot) as i64 * 400 / 1000)
364            }
365
366            Trigger::Epoch { epoch } => {
367                require!(
368                    clock.epoch >= *epoch,
369                    AntegenThreadError::TriggerConditionFailed
370                );
371                clock.unix_timestamp
372            }
373
374            Trigger::Interval {
375                seconds: _,
376                skippable: _,
377                jitter: _,
378            } => {
379                // schedule.next already has jitter baked in from previous execution
380                let trigger_time = match self.schedule {
381                    Schedule::Timed { next, .. } => next,
382                    _ => return Err(AntegenThreadError::TriggerConditionFailed.into()),
383                };
384
385                require!(
386                    clock.unix_timestamp >= trigger_time,
387                    AntegenThreadError::TriggerConditionFailed
388                );
389                trigger_time
390            }
391
392            Trigger::Cron {
393                schedule: _,
394                skippable: _,
395                jitter: _,
396            } => {
397                // schedule.next already has jitter baked in from previous execution
398                let trigger_time = match self.schedule {
399                    Schedule::Timed { next, .. } => next,
400                    _ => return Err(AntegenThreadError::TriggerConditionFailed.into()),
401                };
402
403                require!(
404                    clock.unix_timestamp >= trigger_time,
405                    AntegenThreadError::TriggerConditionFailed
406                );
407                trigger_time
408            }
409
410            Trigger::Account {
411                address,
412                offset,
413                size,
414            } => {
415                // Verify proof account is provided
416                let account_info = remaining_accounts
417                    .first()
418                    .ok_or(AntegenThreadError::TriggerConditionFailed)?;
419
420                // Verify it's the correct account
421                require!(
422                    address.eq(account_info.key),
423                    AntegenThreadError::TriggerConditionFailed
424                );
425
426                // Compute data hash
427                let mut hasher = DefaultHasher::new();
428                let data = &account_info.try_borrow_data()?;
429                let offset = *offset as usize;
430                let range_end = offset.checked_add(*size as usize).unwrap() as usize;
431
432                use std::hash::Hash;
433                if data.len() > range_end {
434                    data[offset..range_end].hash(&mut hasher);
435                } else {
436                    data[offset..].hash(&mut hasher);
437                }
438                let data_hash = hasher.finish();
439
440                // Verify hash changed
441                if let Schedule::OnChange { prev: prior_hash } = &self.schedule {
442                    require!(
443                        data_hash.ne(prior_hash),
444                        AntegenThreadError::TriggerConditionFailed
445                    );
446                }
447
448                clock.unix_timestamp
449            }
450        };
451
452        // Return elapsed time since trigger was ready
453        Ok(clock.unix_timestamp.saturating_sub(trigger_ready_time))
454    }
455
456    fn update_schedule(
457        &mut self,
458        clock: &Clock,
459        remaining_accounts: &[AccountInfo],
460        thread_pubkey: &Pubkey,
461    ) -> Result<()> {
462        let current_timestamp = clock.unix_timestamp;
463
464        self.schedule = match &self.trigger {
465            Trigger::Account { offset, size, .. } => {
466                // Compute data hash for Account trigger
467                let account_info = remaining_accounts
468                    .first()
469                    .ok_or(AntegenThreadError::TriggerConditionFailed)?;
470
471                let mut hasher = DefaultHasher::new();
472                let data = &account_info.try_borrow_data()?;
473                let offset = *offset as usize;
474                let range_end = offset.checked_add(*size as usize).unwrap() as usize;
475
476                use std::hash::Hash;
477                if data.len() > range_end {
478                    data[offset..range_end].hash(&mut hasher);
479                } else {
480                    data[offset..].hash(&mut hasher);
481                }
482                let data_hash = hasher.finish();
483
484                Schedule::OnChange { prev: data_hash }
485            }
486            Trigger::Cron {
487                schedule, jitter, ..
488            } => {
489                // Calculate next cron time WITH jitter baked in
490                // Use current_timestamp since this is called right after execution
491                let next_cron = crate::utils::next_timestamp(current_timestamp, schedule.clone())
492                    .ok_or(AntegenThreadError::TriggerConditionFailed)?;
493                let next_jitter = crate::utils::calculate_jitter_offset(
494                    current_timestamp,
495                    thread_pubkey,
496                    *jitter,
497                );
498                let next_trigger_time = next_cron.saturating_add(next_jitter);
499
500                Schedule::Timed {
501                    prev: current_timestamp,
502                    next: next_trigger_time,
503                }
504            }
505            Trigger::Immediate { .. } => Schedule::Timed {
506                prev: current_timestamp,
507                next: 0, // Use 0 instead of i64::MAX to avoid JSON serialization issues
508            },
509            Trigger::Slot { slot } => Schedule::Block {
510                prev: clock.slot,
511                next: *slot,
512            },
513            Trigger::Epoch { epoch } => Schedule::Block {
514                prev: clock.epoch,
515                next: *epoch,
516            },
517            Trigger::Interval {
518                seconds, jitter, ..
519            } => {
520                // Calculate next trigger time WITH jitter baked in
521                // Use current_timestamp since this is called right after execution
522                let next_base = current_timestamp.saturating_add(*seconds);
523                let next_jitter = crate::utils::calculate_jitter_offset(
524                    current_timestamp,
525                    thread_pubkey,
526                    *jitter,
527                );
528                let next_trigger_time = next_base.saturating_add(next_jitter);
529
530                Schedule::Timed {
531                    prev: current_timestamp,
532                    next: next_trigger_time,
533                }
534            }
535            Trigger::Timestamp { unix_ts, .. } => Schedule::Timed {
536                prev: current_timestamp,
537                next: *unix_ts,
538            },
539        };
540
541        Ok(())
542    }
543
544    fn get_last_started_at(&self) -> i64 {
545        match &self.schedule {
546            Schedule::Timed { prev, .. } => *prev,
547            Schedule::Block { prev, .. } => *prev as i64,
548            Schedule::OnChange { .. } => self.created_at,
549        }
550    }
551}
552
553impl ThreadSeeds for Thread {
554    fn get_seed_bytes(&self) -> Vec<Vec<u8>> {
555        vec![
556            SEED_THREAD.to_vec(),
557            self.authority.to_bytes().to_vec(),
558            self.id.clone(),
559            vec![self.bump],
560        ]
561    }
562}
563
564impl PaymentDistributor for Thread {
565    fn distribute_payments<'info>(
566        &self,
567        thread_account: &AccountInfo<'info>,
568        executor: &AccountInfo<'info>,
569        admin: &AccountInfo<'info>,
570        payments: &crate::state::PaymentDetails,
571    ) -> Result<()> {
572        use crate::utils::transfer_lamports;
573
574        // Combined payment to executor (reimbursement + commission)
575        let total_executor_payment =
576            payments.fee_payer_reimbursement + payments.executor_commission;
577
578        // Log all payments in one line for conciseness
579        if total_executor_payment > 0 || payments.core_team_fee > 0 {
580            msg!(
581                "Payments: executor {} (reimburse {}, commission {}), team {}",
582                total_executor_payment,
583                payments.fee_payer_reimbursement,
584                payments.executor_commission,
585                payments.core_team_fee
586            );
587        }
588
589        if total_executor_payment > 0 {
590            transfer_lamports(thread_account, executor, total_executor_payment)?;
591        }
592
593        // Transfer core team fee to admin
594        if payments.core_team_fee > 0 {
595            transfer_lamports(thread_account, admin, payments.core_team_fee)?;
596        }
597
598        Ok(())
599    }
600}
601
602impl NonceProcessor for Thread {
603    fn advance_nonce_if_required<'info>(
604        &self,
605        thread_account_info: &AccountInfo<'info>,
606        nonce_account: &Option<UncheckedAccount<'info>>,
607        recent_blockhashes: &Option<UncheckedAccount<'info>>,
608    ) -> Result<()> {
609        use anchor_lang::solana_program::{
610            program::invoke_signed, system_instruction::advance_nonce_account,
611        };
612
613        if !self.has_nonce_account() {
614            return Ok(());
615        }
616
617        match (nonce_account, recent_blockhashes) {
618            (Some(nonce_acc), Some(recent_bh)) => {
619                // Get thread key from account info
620                let thread_key = *thread_account_info.key;
621
622                // Use seeds with callback to handle invoke_signed
623                self.sign(|seeds| {
624                    invoke_signed(
625                        &advance_nonce_account(&nonce_acc.key(), &thread_key),
626                        &[
627                            nonce_acc.to_account_info(),
628                            recent_bh.to_account_info(),
629                            thread_account_info.clone(),
630                        ],
631                        &[seeds],
632                    )
633                })?;
634                Ok(())
635            }
636            _ => Err(AntegenThreadError::NonceRequired.into()),
637        }
638    }
639}