Skip to main content

rialo_subscriber_interface/
instruction.rs

1// Copyright (c) Subzero Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Instructions for the Subscriber program.
5
6use std::ops::{Range, RangeInclusive};
7
8use rialo_limits::MAX_STATIC_ACCOUNTS_PER_PACKET;
9use rialo_s_instruction::{AccountMeta, Instruction};
10use rialo_s_program::system_program;
11use rialo_s_pubkey::Pubkey;
12use rialo_types::Nonce;
13
14use crate::error::SubscriptionError;
15
16/// Seed used to generate the PDA containing the subscription data. The seed for the PDA should be
17/// `RIALO_SUBSCRIBE_SEED` + `subscriber_key` + `subscriber_nonce`.
18pub const RIALO_SUBSCRIBE_SEED: &str = "rialo_subscribe";
19
20/// Event log prefix for unsubscribe operations.
21///
22/// Logs with this prefix are filtered by the `BankMatcher` to remove active subscriptions.
23pub const RIALO_UNSUBSCRIBE_SEED: &str = "rialo_unsubscribe";
24
25/// Timestamp range, in milliseconds.
26pub type TimestampRange = (u64, u64);
27
28/// Type representing a commit index on Rialo. In Rialo there are currently 3 representations of
29/// a block height:
30/// - Round: A round is a representation of a block height in the consensus layer. Each round
31///   contains a fixed number of blocks. A round is identified by a `u32` index.
32/// - Commit index: A commit index is a representation of a block height in the execution layer.
33///   Each commit index corresponds to a DAG that is an output from consensus. A commit index is identified
34///   by a `u32` index.
35/// - Slot: A slot is a representation of a block height in the Solana layer. Each slot corresponds to a block in
36///   the Solana layer. A slot is identified by a `u32` index.
37///
38/// On Rialo we use commit indexes to represent block heights in the execution layer. This makes the value
39/// for a Commit Index and a Slot the same, but they represent different concepts. The `Commit` type
40/// used here is leverage at execution time around the Virtual Machine, thus it follows the `u64` type
41/// used in Solana for slots.
42// TODO: Update on naming sanitized for block height at execution time, along with types
43pub type Commmit = u64;
44
45/// Subscription kind
46#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
47pub enum SubscriptionKind {
48    /// Persistent subscription which fires every time a matching event is received
49    Persistent,
50    /// One-shot subscription which fires a single time when a matching event is received
51    OneShot,
52}
53
54/// Subscriber program instructions
55#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
56pub enum SubscriberInstruction {
57    /// Subscribe to events
58    Subscribe {
59        /// Nonce to differentiate handlers for the same topic
60        nonce: Nonce,
61        /// [`Subscription`] data
62        handler: Subscription,
63    },
64    /// Delete subscription
65    Unsubscribe {
66        /// Nonce to differentiate handlers for the same topic/signer pair
67        nonce: Nonce,
68    },
69    /// Update subscription
70    Update {
71        /// Nonce used during subscription to differentiate handlers for the same topic/signer pair
72        nonce: Nonce,
73        /// New [`Subscription`] data
74        handler: Subscription,
75    },
76    /// Destroy subscription account.
77    ///
78    /// The instruction transfers funds from the subscription account to the subscriber,
79    /// marking the subscription account as removable.
80    ///
81    /// This instruction does not emit any logs.
82    Destroy {
83        /// Nonce to differentiate handlers for the same topic/signer pair
84        nonce: Nonce,
85    },
86}
87
88/// Represents the predicates set by a subscriber to match the [`Subscription`] to an event.
89#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
90pub struct Predicate {
91    /// The topic the subscription listens to
92    topic: String,
93    /// The account that stores the event data. Used when
94    /// the subscription precisely targets a specific event account.
95    event_account: Option<Pubkey>,
96    /// The timestamp range to trigger the transaction, if provided.
97    /// Left boundary is inclusive, right boundary is exclusive.
98    timestamp_range: Option<TimestampRange>,
99}
100
101impl Predicate {
102    /// Instantiate a new `Predicate` with a topic.
103    ///
104    /// # Arguments
105    ///
106    /// * `topic` - The topic the subscription listens to.
107    ///
108    /// # Returns
109    ///
110    /// * `Predicate` - A new instance of `Predicate`
111    pub fn new(topic: impl Into<String>) -> Self {
112        Self {
113            topic: topic.into(),
114            event_account: None,
115            timestamp_range: None,
116        }
117    }
118
119    /// Create a new predicate with a topic and event account.
120    ///
121    /// # Arguments
122    ///
123    /// * `topic` - The topic the subscription listens to.
124    /// * `event_account` - The account that stores the event data.
125    ///
126    /// # Returns
127    ///
128    /// * `Predicate` - A new instance of `Predicate`
129    pub fn new_with_event_account(topic: impl Into<String>, event_account: Pubkey) -> Self {
130        Self {
131            topic: topic.into(),
132            event_account: Some(event_account),
133            timestamp_range: None,
134        }
135    }
136
137    /// Create a new predicate with a topic and optional event account.
138    ///
139    /// # Arguments
140    ///
141    /// * `topic` - The topic the subscription listens to.
142    /// * `event_account` - An optional account that stores the event data.
143    ///
144    /// # Returns
145    ///
146    /// * `Predicate` - A new instance of `Predicate`
147    pub fn new_with_optional_event_account(
148        topic: impl Into<String>,
149        event_account: Option<Pubkey>,
150    ) -> Self {
151        Self {
152            topic: topic.into(),
153            event_account,
154            timestamp_range: None,
155        }
156    }
157
158    /// Create a new predicate for a timestamp range.
159    ///
160    /// The topic is set to the clock topic and the event account is set to the clock
161    /// sysvar account.
162    ///
163    /// # Arguments
164    ///
165    /// * `range` - A range representing the timestamp range in milliseconds. The start is inclusive,
166    ///   and the end is exclusive.
167    ///
168    /// # Returns
169    ///
170    /// * `Predicate` - A new instance of `Predicate`
171    pub fn new_with_timestamp_range(range: Range<u64>) -> Self {
172        Self {
173            topic: rialo_events_core::types::CLOCK_TOPIC.to_string(),
174            event_account: Some(rialo_s_program::sysvar::clock::id()),
175            timestamp_range: Some((range.start, range.end)),
176        }
177    }
178
179    /// Get the topic of the predicate.
180    pub fn topic(&self) -> &str {
181        &self.topic
182    }
183
184    /// Get the event account of the predicate, if any.
185    pub fn event_account(&self) -> Option<Pubkey> {
186        self.event_account
187    }
188
189    /// Get the timestamp range of the predicate, if any.
190    pub fn timestamp_range(&self) -> Option<TimestampRange> {
191        self.timestamp_range
192    }
193}
194
195/// [`Subscription`] data stored by a user for a given topic. This is used to generate scheduled
196/// transactions to be run if the topic is triggered.
197///
198/// The instructions are a set of SVM instructions that are meant to be run when the subscription is
199/// triggered. The instruction array is converted to a `SanitizedTransaction` and executed in the VM.
200#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
201pub struct Subscription {
202    /// Subscriber key
203    pub subscriber: Pubkey,
204    /// The predicate set by the subscriber to match the event
205    pub predicate: Predicate,
206    /// Instructions to be executed
207    pub instructions: Vec<Instruction>,
208    /// Subscription kind
209    pub kind: SubscriptionKind,
210    /// Active commits for the subscription
211    pub active_commits: RangeInclusive<Commmit>,
212}
213
214impl Subscription {
215    /// Instantiate a new `Subscription`
216    ///
217    /// # Arguments
218    /// * `subscriber` - The public key of the subscriber
219    /// * `predicate` -  The predicate to match the event
220    /// * `instructions` - A vector of instructions to be executed
221    /// * `kind` - The type of subscription (persistent or one-shot)
222    /// * `active_commits` - The range of active commits for the subscription
223    ///
224    /// # Returns
225    /// * `Subscription` - A new instance of `Subscription`
226    pub fn new(
227        subscriber: Pubkey,
228        predicate: Predicate,
229        instructions: Vec<Instruction>,
230        kind: SubscriptionKind,
231        active_commits: RangeInclusive<u64>,
232    ) -> Self {
233        Self {
234            subscriber,
235            predicate,
236            instructions,
237            kind,
238            active_commits,
239        }
240    }
241
242    /// Sanitize a Subscription
243    ///
244    /// # Returns
245    ///
246    /// Result<(), SubscriptionError> - Ok if the subscription is valid, Err otherwise
247    pub fn sanitize_instructions(&self) -> Result<(), SubscriptionError> {
248        if self.instructions.is_empty() {
249            return Err(SubscriptionError::EmptyInstructions);
250        }
251
252        // Check that no account that is not the payer is a signer
253        if let Some(invalid_signer) = self
254            .instructions
255            .iter()
256            .flat_map(|itx| itx.accounts.iter())
257            .find(|account| account.is_signer && account.pubkey != self.subscriber)
258        {
259            return Err(SubscriptionError::InvalidSigner(invalid_signer.pubkey));
260        }
261
262        // Check that we are limiting the  number of accounts across all instructions to MAX_STATIC_ACCOUNTS_PER_PACKET
263        let all_pubkeys: std::collections::HashSet<_> = self
264            .instructions
265            .iter()
266            .flat_map(|itx| {
267                std::iter::once(itx.program_id).chain(itx.accounts.iter().map(|acc| acc.pubkey))
268            })
269            .collect();
270        let total_accounts = all_pubkeys.len();
271
272        if total_accounts > MAX_STATIC_ACCOUNTS_PER_PACKET as usize {
273            return Err(SubscriptionError::TooManyAccounts {
274                actual: total_accounts,
275                max_accounts: MAX_STATIC_ACCOUNTS_PER_PACKET as usize,
276            });
277        }
278
279        Ok(())
280    }
281}
282
283impl SubscriberInstruction {
284    /// Create a `SubscriberInstruction::SubscribeToEvent` `Instruction`
285    ///
286    /// # Account references
287    ///   0. `[SIGNER]` Signer (subscriber) account
288    ///   1. `[WRITE]` Subscription data account
289    ///   2. `[READONLY]` System program account
290    ///
291    /// # Arguments
292    /// * `signer` - The public key of the subscriber
293    /// * `nonce` - The nonce to differentiate handlers for the same topic/signer pair
294    /// * `handler` - The [`Subscription`] data
295    ///
296    /// # Returns
297    /// * `Instruction` - An SVM instruction to be included in a transaction
298    pub fn subscribe_to(signer: Pubkey, nonce: Nonce, mut handler: Subscription) -> Instruction {
299        // todo better error handling
300        if handler.subscriber != signer {
301            panic!("Handler subscriber must be the same as the signer");
302        }
303
304        if handler.instructions.is_empty() {
305            panic!("Handler instructions must not be empty");
306        }
307
308        let subscription_data_account = derive_subscription_address(signer, nonce);
309
310        // append a destroy instruction to one-shot subscription's instructions
311        //
312        // when the one-shot subscription is matched (`BankMatcher::generate_subscription_matches()`),
313        // it is removed from the matcher but the subscription is not automatically removed from the state
314        //
315        // add destroy instruction as the subscription's last instructions, meaning when all the user-specified
316        // instructions have been executed, remove the subscription from the state and return rent to the subscriber
317        if let SubscriptionKind::OneShot = handler.kind {
318            handler.instructions.push(Instruction::new_with_bincode(
319                crate::ID,
320                &SubscriberInstruction::Destroy { nonce },
321                vec![
322                    AccountMeta::new(signer, true),
323                    AccountMeta::new(subscription_data_account, false),
324                ],
325            ));
326        }
327
328        Instruction::new_with_bincode(
329            crate::ID,
330            &SubscriberInstruction::Subscribe { nonce, handler },
331            vec![
332                AccountMeta::new(signer, true),
333                AccountMeta::new(subscription_data_account, false),
334                AccountMeta::new_readonly(system_program::id(), false),
335            ],
336        )
337    }
338
339    /// Create a `SubscriberInstruction::Unsubscribe` `Instruction`
340    ///
341    /// # Account references
342    ///   0. `[SIGNER]` Signer (subscriber) account
343    ///   1. `[WRITE]`  Subscription data account
344    ///
345    /// # Arguments
346    /// * `signer` - The public key of the subscriber
347    /// * `nonce` - The nonce to differentiate handlers for the same topic/signer pair
348    ///
349    /// # Returns
350    /// * `Instruction` - An SVM instruction to be included in a transaction
351    pub fn unsubscribe_from(signer: Pubkey, nonce: Nonce) -> Instruction {
352        let subscription_data_key = derive_subscription_address(signer, nonce);
353
354        Instruction::new_with_bincode(
355            crate::ID,
356            &SubscriberInstruction::Unsubscribe { nonce },
357            vec![
358                AccountMeta::new(signer, true),
359                AccountMeta::new(subscription_data_key, false),
360            ],
361        )
362    }
363
364    /// Create a `SubscriberInstruction::Update` `Instruction`.
365    ///
366    /// # Account references
367    ///   0. `[SIGNER]` Signer (subscriber) account
368    ///   1. `[WRITE]`  Subscription data account
369    ///   2. `[READONLY]` System program account
370    ///
371    /// # Arguments
372    /// * `signer` - The public key of the subscriber
373    /// * `nonce` - The nonce set during subscription to differentiate handlers for the same topic/signer pair
374    /// * `handler` - The [`Subscription`] data
375    ///
376    /// # Returns
377    /// * `Instruction` - An SVM instruction to be included in a transaction
378    pub fn update(signer: Pubkey, nonce: Nonce, mut handler: Subscription) -> Instruction {
379        let subscription_data_key = derive_subscription_address(signer, nonce);
380
381        // append a destroy instruction to one-shot subscription's instructions
382        //
383        // when the one-shot subscription is matched (`BankMatcher::generate_subscription_matches()`),
384        // it is removed from the matcher but the subscription is not automatically removed from the state
385        //
386        // add destroy instruction as the subscription's last instructions, meaning when all the user-specified
387        // instructions have been executed, remove the subscription from the state and return rent to the subscriber
388        if let SubscriptionKind::OneShot = handler.kind {
389            handler.instructions.push(Instruction::new_with_bincode(
390                crate::ID,
391                &SubscriberInstruction::Destroy { nonce },
392                vec![
393                    AccountMeta::new_readonly(signer, true),
394                    AccountMeta::new(subscription_data_key, false),
395                ],
396            ));
397        }
398
399        Instruction::new_with_bincode(
400            crate::ID,
401            &SubscriberInstruction::Update { nonce, handler },
402            vec![
403                AccountMeta::new(signer, true),
404                AccountMeta::new(subscription_data_key, false),
405                AccountMeta::new_readonly(system_program::id(), false),
406            ],
407        )
408    }
409}
410
411/// Generates the program-derived address (PDA) for the subscription data account.
412///
413/// # Arguments
414///
415/// * `subscriber` - The public key of the subscriber.
416/// * `nonce` - A slice of bytes used as a seed to generate the PDA.
417///
418/// # Returns
419///
420/// The public key of the subscription data account.
421pub fn derive_subscription_address<NONCE: Into<Nonce>>(subscriber: Pubkey, nonce: NONCE) -> Pubkey {
422    let nonce = nonce.into();
423    Pubkey::find_program_address(
424        &[
425            RIALO_SUBSCRIBE_SEED.as_bytes(),
426            subscriber.as_array(),
427            nonce.as_bytes(),
428        ],
429        &crate::ID,
430    )
431    .0
432}
433
434#[cfg(test)]
435mod test {
436    use super::*;
437
438    #[test]
439    fn test_subscribe_to_instruction() {
440        let signer = Pubkey::new_unique();
441        let handler = Subscription::new(
442            signer,
443            Predicate {
444                topic: String::from("MySubscription"),
445                event_account: Some(Pubkey::new_unique()),
446                timestamp_range: Some((1337, 1338)),
447            },
448            vec![Instruction::new_with_bincode(
449                Pubkey::new_unique(),
450                &["MyData"],
451                vec![],
452            )],
453            SubscriptionKind::Persistent,
454            0..=u64::MAX,
455        );
456
457        let instruction =
458            SubscriberInstruction::subscribe_to(signer, Nonce::default(), handler.clone());
459
460        assert_eq!(
461            instruction.data,
462            bincode::serialize(&SubscriberInstruction::Subscribe {
463                nonce: Nonce::default(),
464                handler,
465            })
466            .unwrap()
467        );
468        assert_eq!(instruction.program_id, crate::ID);
469
470        let subscription_data_account = derive_subscription_address(signer, Nonce::default());
471        assert_eq!(
472            instruction.accounts,
473            vec![
474                AccountMeta::new(signer, true),
475                AccountMeta::new(subscription_data_account, false),
476                AccountMeta::new_readonly(system_program::id(), false),
477            ]
478        )
479    }
480
481    #[test]
482    fn test_delete_subscription_instruction() {
483        let signer = Pubkey::new_unique();
484
485        let instruction = SubscriberInstruction::unsubscribe_from(signer, Nonce::default());
486
487        assert_eq!(
488            instruction.data,
489            bincode::serialize(&SubscriberInstruction::Unsubscribe {
490                nonce: Nonce::default()
491            })
492            .unwrap()
493        );
494        assert_eq!(instruction.program_id, crate::ID);
495
496        let subscription_data_account = derive_subscription_address(signer, Nonce::default());
497
498        assert_eq!(
499            instruction.accounts,
500            vec![
501                AccountMeta::new(signer, true),
502                AccountMeta::new(subscription_data_account, false),
503            ]
504        )
505    }
506
507    #[test]
508    fn test_update_subscription_instruction() {
509        let signer = Pubkey::new_unique();
510        let handler = Subscription {
511            subscriber: signer,
512            predicate: Predicate {
513                topic: String::from("MySubscription"),
514                event_account: Some(Pubkey::new_unique()),
515                timestamp_range: Some((1337, 1338)),
516            },
517            instructions: vec![Instruction::new_with_bincode(
518                Pubkey::new_unique(),
519                &["MyData"],
520                vec![],
521            )],
522            kind: SubscriptionKind::Persistent,
523            active_commits: 0..=u64::MAX,
524        };
525
526        let instruction = SubscriberInstruction::update(signer, Nonce::default(), handler.clone());
527
528        assert_eq!(
529            instruction.data,
530            bincode::serialize(&SubscriberInstruction::Update {
531                nonce: Nonce::default(),
532                handler
533            })
534            .unwrap()
535        );
536        assert_eq!(instruction.program_id, crate::ID);
537
538        let subscription_data_account = derive_subscription_address(signer, Nonce::default());
539
540        assert_eq!(
541            instruction.accounts,
542            vec![
543                AccountMeta::new(signer, true),
544                AccountMeta::new(subscription_data_account, false),
545                AccountMeta::new_readonly(system_program::id(), false),
546            ]
547        )
548    }
549
550    #[test]
551    fn test_one_shot() {
552        let signer = Pubkey::new_unique();
553        let mut handler = Subscription::new(
554            signer,
555            Predicate {
556                topic: String::from("MySubscription"),
557                event_account: Some(Pubkey::new_unique()),
558                timestamp_range: None,
559            },
560            vec![Instruction::new_with_bincode(
561                Pubkey::new_unique(),
562                &["MyData"],
563                vec![],
564            )],
565            SubscriptionKind::OneShot,
566            0..=u64::MAX,
567        );
568
569        let instruction =
570            SubscriberInstruction::subscribe_to(signer, Nonce::default(), handler.clone());
571
572        handler.instructions.push(Instruction::new_with_bincode(
573            crate::ID,
574            &SubscriberInstruction::Destroy {
575                nonce: Nonce::default(),
576            },
577            vec![
578                AccountMeta::new(signer, true),
579                AccountMeta::new(derive_subscription_address(signer, Nonce::default()), false),
580            ],
581        ));
582
583        assert_eq!(
584            instruction.data,
585            bincode::serialize(&SubscriberInstruction::Subscribe {
586                nonce: Nonce::default(),
587                handler,
588            })
589            .unwrap()
590        );
591        assert_eq!(instruction.program_id, crate::ID);
592
593        let subscription_data_account = derive_subscription_address(signer, Nonce::default());
594        assert_eq!(
595            instruction.accounts,
596            vec![
597                AccountMeta::new(signer, true),
598                AccountMeta::new(subscription_data_account, false),
599                AccountMeta::new_readonly(system_program::id(), false),
600            ]
601        )
602    }
603
604    #[test]
605    fn test_one_shot_with_timestamp_range() {
606        let signer = Pubkey::new_unique();
607        let mut handler = Subscription::new(
608            signer,
609            Predicate::new_with_timestamp_range(1337..1338),
610            vec![Instruction::new_with_bincode(
611                Pubkey::new_unique(),
612                &["MyData"],
613                vec![],
614            )],
615            SubscriptionKind::OneShot,
616            0..=u64::MAX,
617        );
618
619        let instruction =
620            SubscriberInstruction::subscribe_to(signer, Nonce::default(), handler.clone());
621
622        handler.instructions.push(Instruction::new_with_bincode(
623            crate::ID,
624            &SubscriberInstruction::Destroy {
625                nonce: Nonce::default(),
626            },
627            vec![
628                AccountMeta::new(signer, true),
629                AccountMeta::new(derive_subscription_address(signer, Nonce::default()), false),
630            ],
631        ));
632
633        assert_eq!(
634            instruction.data,
635            bincode::serialize(&SubscriberInstruction::Subscribe {
636                nonce: Nonce::default(),
637                handler,
638            })
639            .unwrap()
640        );
641        assert_eq!(instruction.program_id, crate::ID);
642
643        let subscription_data_account = derive_subscription_address(signer, Nonce::default());
644        assert_eq!(
645            instruction.accounts,
646            vec![
647                AccountMeta::new(signer, true),
648                AccountMeta::new(subscription_data_account, false),
649                AccountMeta::new_readonly(system_program::id(), false),
650            ]
651        )
652    }
653}