rialo_subscriber_interface/instruction.rs
1// Copyright (c) Subzero Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Instructions for the Subscriber program.
5
6use std::ops::{Range, RangeInclusive};
7
8use rialo_limits::MAX_STATIC_ACCOUNTS_PER_PACKET;
9use rialo_s_instruction::{AccountMeta, Instruction};
10use rialo_s_program::system_program;
11use rialo_s_pubkey::Pubkey;
12use rialo_types::Nonce;
13
14use crate::error::SubscriptionError;
15
16/// Seed used to generate the PDA containing the subscription data. The seed for the PDA should be
17/// `RIALO_SUBSCRIBE_SEED` + `subscriber_key` + `subscriber_nonce`.
18pub const RIALO_SUBSCRIBE_SEED: &str = "rialo_subscribe";
19
20/// Event log prefix for unsubscribe operations.
21///
22/// Logs with this prefix are filtered by the `BankMatcher` to remove active subscriptions.
23pub const RIALO_UNSUBSCRIBE_SEED: &str = "rialo_unsubscribe";
24
25/// Timestamp range, in milliseconds.
26pub type TimestampRange = (u64, u64);
27
28/// Type representing a commit index on Rialo. In Rialo there are currently 3 representations of
29/// a block height:
30/// - Round: A round is a representation of a block height in the consensus layer. Each round
31/// contains a fixed number of blocks. A round is identified by a `u32` index.
32/// - Commit index: A commit index is a representation of a block height in the execution layer.
33/// Each commit index corresponds to a DAG that is an output from consensus. A commit index is identified
34/// by a `u32` index.
35/// - Slot: A slot is a representation of a block height in the Solana layer. Each slot corresponds to a block in
36/// the Solana layer. A slot is identified by a `u32` index.
37///
38/// On Rialo we use commit indexes to represent block heights in the execution layer. This makes the value
39/// for a Commit Index and a Slot the same, but they represent different concepts. The `Commit` type
40/// used here is leverage at execution time around the Virtual Machine, thus it follows the `u64` type
41/// used in Solana for slots.
42// TODO: Update on naming sanitized for block height at execution time, along with types
43pub type Commmit = u64;
44
45/// Subscription kind
46#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
47pub enum SubscriptionKind {
48 /// Persistent subscription which fires every time a matching event is received
49 Persistent,
50 /// One-shot subscription which fires a single time when a matching event is received
51 OneShot,
52}
53
54/// Subscriber program instructions
55#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
56pub enum SubscriberInstruction {
57 /// Subscribe to events
58 Subscribe {
59 /// Nonce to differentiate handlers for the same topic
60 nonce: Nonce,
61 /// [`Subscription`] data
62 handler: Subscription,
63 },
64 /// Delete subscription
65 Unsubscribe {
66 /// Nonce to differentiate handlers for the same topic/signer pair
67 nonce: Nonce,
68 },
69 /// Update subscription
70 Update {
71 /// Nonce used during subscription to differentiate handlers for the same topic/signer pair
72 nonce: Nonce,
73 /// New [`Subscription`] data
74 handler: Subscription,
75 },
76 /// Destroy subscription account.
77 ///
78 /// The instruction transfers funds from the subscription account to the subscriber,
79 /// marking the subscription account as removable.
80 ///
81 /// This instruction does not emit any logs.
82 Destroy {
83 /// Nonce to differentiate handlers for the same topic/signer pair
84 nonce: Nonce,
85 },
86}
87
88/// Represents the predicates set by a subscriber to match the [`Subscription`] to an event.
89#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
90pub struct Predicate {
91 /// The topic the subscription listens to
92 topic: String,
93 /// The account that stores the event data. Used when
94 /// the subscription precisely targets a specific event account.
95 event_account: Option<Pubkey>,
96 /// The timestamp range to trigger the transaction, if provided.
97 /// Left boundary is inclusive, right boundary is exclusive.
98 timestamp_range: Option<TimestampRange>,
99}
100
101impl Predicate {
102 /// Instantiate a new `Predicate` with a topic.
103 ///
104 /// # Arguments
105 ///
106 /// * `topic` - The topic the subscription listens to.
107 ///
108 /// # Returns
109 ///
110 /// * `Predicate` - A new instance of `Predicate`
111 pub fn new(topic: impl Into<String>) -> Self {
112 Self {
113 topic: topic.into(),
114 event_account: None,
115 timestamp_range: None,
116 }
117 }
118
119 /// Create a new predicate with a topic and event account.
120 ///
121 /// # Arguments
122 ///
123 /// * `topic` - The topic the subscription listens to.
124 /// * `event_account` - The account that stores the event data.
125 ///
126 /// # Returns
127 ///
128 /// * `Predicate` - A new instance of `Predicate`
129 pub fn new_with_event_account(topic: impl Into<String>, event_account: Pubkey) -> Self {
130 Self {
131 topic: topic.into(),
132 event_account: Some(event_account),
133 timestamp_range: None,
134 }
135 }
136
137 /// Create a new predicate with a topic and optional event account.
138 ///
139 /// # Arguments
140 ///
141 /// * `topic` - The topic the subscription listens to.
142 /// * `event_account` - An optional account that stores the event data.
143 ///
144 /// # Returns
145 ///
146 /// * `Predicate` - A new instance of `Predicate`
147 pub fn new_with_optional_event_account(
148 topic: impl Into<String>,
149 event_account: Option<Pubkey>,
150 ) -> Self {
151 Self {
152 topic: topic.into(),
153 event_account,
154 timestamp_range: None,
155 }
156 }
157
158 /// Create a new predicate for a timestamp range.
159 ///
160 /// The topic is set to the clock topic and the event account is set to the clock
161 /// sysvar account.
162 ///
163 /// # Arguments
164 ///
165 /// * `range` - A range representing the timestamp range in milliseconds. The start is inclusive,
166 /// and the end is exclusive.
167 ///
168 /// # Returns
169 ///
170 /// * `Predicate` - A new instance of `Predicate`
171 pub fn new_with_timestamp_range(range: Range<u64>) -> Self {
172 Self {
173 topic: rialo_events_core::types::CLOCK_TOPIC.to_string(),
174 event_account: Some(rialo_s_program::sysvar::clock::id()),
175 timestamp_range: Some((range.start, range.end)),
176 }
177 }
178
179 /// Get the topic of the predicate.
180 pub fn topic(&self) -> &str {
181 &self.topic
182 }
183
184 /// Get the event account of the predicate, if any.
185 pub fn event_account(&self) -> Option<Pubkey> {
186 self.event_account
187 }
188
189 /// Get the timestamp range of the predicate, if any.
190 pub fn timestamp_range(&self) -> Option<TimestampRange> {
191 self.timestamp_range
192 }
193}
194
195/// [`Subscription`] data stored by a user for a given topic. This is used to generate scheduled
196/// transactions to be run if the topic is triggered.
197///
198/// The instructions are a set of SVM instructions that are meant to be run when the subscription is
199/// triggered. The instruction array is converted to a `SanitizedTransaction` and executed in the VM.
200#[derive(Clone, Debug, PartialEq, Eq, serde_derive::Deserialize, serde_derive::Serialize)]
201pub struct Subscription {
202 /// Subscriber key
203 pub subscriber: Pubkey,
204 /// The predicate set by the subscriber to match the event
205 pub predicate: Predicate,
206 /// Instructions to be executed
207 pub instructions: Vec<Instruction>,
208 /// Subscription kind
209 pub kind: SubscriptionKind,
210 /// Active commits for the subscription
211 pub active_commits: RangeInclusive<Commmit>,
212}
213
214impl Subscription {
215 /// Instantiate a new `Subscription`
216 ///
217 /// # Arguments
218 /// * `subscriber` - The public key of the subscriber
219 /// * `predicate` - The predicate to match the event
220 /// * `instructions` - A vector of instructions to be executed
221 /// * `kind` - The type of subscription (persistent or one-shot)
222 /// * `active_commits` - The range of active commits for the subscription
223 ///
224 /// # Returns
225 /// * `Subscription` - A new instance of `Subscription`
226 pub fn new(
227 subscriber: Pubkey,
228 predicate: Predicate,
229 instructions: Vec<Instruction>,
230 kind: SubscriptionKind,
231 active_commits: RangeInclusive<u64>,
232 ) -> Self {
233 Self {
234 subscriber,
235 predicate,
236 instructions,
237 kind,
238 active_commits,
239 }
240 }
241
242 /// Sanitize a Subscription
243 ///
244 /// # Returns
245 ///
246 /// Result<(), SubscriptionError> - Ok if the subscription is valid, Err otherwise
247 pub fn sanitize_instructions(&self) -> Result<(), SubscriptionError> {
248 if self.instructions.is_empty() {
249 return Err(SubscriptionError::EmptyInstructions);
250 }
251
252 // Check that no account that is not the payer is a signer
253 if let Some(invalid_signer) = self
254 .instructions
255 .iter()
256 .flat_map(|itx| itx.accounts.iter())
257 .find(|account| account.is_signer && account.pubkey != self.subscriber)
258 {
259 return Err(SubscriptionError::InvalidSigner(invalid_signer.pubkey));
260 }
261
262 // Check that we are limiting the number of accounts across all instructions to MAX_STATIC_ACCOUNTS_PER_PACKET
263 let all_pubkeys: std::collections::HashSet<_> = self
264 .instructions
265 .iter()
266 .flat_map(|itx| {
267 std::iter::once(itx.program_id).chain(itx.accounts.iter().map(|acc| acc.pubkey))
268 })
269 .collect();
270 let total_accounts = all_pubkeys.len();
271
272 if total_accounts > MAX_STATIC_ACCOUNTS_PER_PACKET as usize {
273 return Err(SubscriptionError::TooManyAccounts {
274 actual: total_accounts,
275 max_accounts: MAX_STATIC_ACCOUNTS_PER_PACKET as usize,
276 });
277 }
278
279 Ok(())
280 }
281}
282
283impl SubscriberInstruction {
284 /// Create a `SubscriberInstruction::SubscribeToEvent` `Instruction`
285 ///
286 /// # Account references
287 /// 0. `[SIGNER]` Signer (subscriber) account
288 /// 1. `[WRITE]` Subscription data account
289 /// 2. `[READONLY]` System program account
290 ///
291 /// # Arguments
292 /// * `signer` - The public key of the subscriber
293 /// * `nonce` - The nonce to differentiate handlers for the same topic/signer pair
294 /// * `handler` - The [`Subscription`] data
295 ///
296 /// # Returns
297 /// * `Instruction` - An SVM instruction to be included in a transaction
298 pub fn subscribe_to(signer: Pubkey, nonce: Nonce, mut handler: Subscription) -> Instruction {
299 // todo better error handling
300 if handler.subscriber != signer {
301 panic!("Handler subscriber must be the same as the signer");
302 }
303
304 if handler.instructions.is_empty() {
305 panic!("Handler instructions must not be empty");
306 }
307
308 let subscription_data_account = derive_subscription_address(signer, nonce);
309
310 // append a destroy instruction to one-shot subscription's instructions
311 //
312 // when the one-shot subscription is matched (`BankMatcher::generate_subscription_matches()`),
313 // it is removed from the matcher but the subscription is not automatically removed from the state
314 //
315 // add destroy instruction as the subscription's last instructions, meaning when all the user-specified
316 // instructions have been executed, remove the subscription from the state and return rent to the subscriber
317 if let SubscriptionKind::OneShot = handler.kind {
318 handler.instructions.push(Instruction::new_with_bincode(
319 crate::ID,
320 &SubscriberInstruction::Destroy { nonce },
321 vec![
322 AccountMeta::new(signer, true),
323 AccountMeta::new(subscription_data_account, false),
324 ],
325 ));
326 }
327
328 Instruction::new_with_bincode(
329 crate::ID,
330 &SubscriberInstruction::Subscribe { nonce, handler },
331 vec![
332 AccountMeta::new(signer, true),
333 AccountMeta::new(subscription_data_account, false),
334 AccountMeta::new_readonly(system_program::id(), false),
335 ],
336 )
337 }
338
339 /// Create a `SubscriberInstruction::Unsubscribe` `Instruction`
340 ///
341 /// # Account references
342 /// 0. `[SIGNER]` Signer (subscriber) account
343 /// 1. `[WRITE]` Subscription data account
344 ///
345 /// # Arguments
346 /// * `signer` - The public key of the subscriber
347 /// * `nonce` - The nonce to differentiate handlers for the same topic/signer pair
348 ///
349 /// # Returns
350 /// * `Instruction` - An SVM instruction to be included in a transaction
351 pub fn unsubscribe_from(signer: Pubkey, nonce: Nonce) -> Instruction {
352 let subscription_data_key = derive_subscription_address(signer, nonce);
353
354 Instruction::new_with_bincode(
355 crate::ID,
356 &SubscriberInstruction::Unsubscribe { nonce },
357 vec![
358 AccountMeta::new(signer, true),
359 AccountMeta::new(subscription_data_key, false),
360 ],
361 )
362 }
363
364 /// Create a `SubscriberInstruction::Update` `Instruction`.
365 ///
366 /// # Account references
367 /// 0. `[SIGNER]` Signer (subscriber) account
368 /// 1. `[WRITE]` Subscription data account
369 /// 2. `[READONLY]` System program account
370 ///
371 /// # Arguments
372 /// * `signer` - The public key of the subscriber
373 /// * `nonce` - The nonce set during subscription to differentiate handlers for the same topic/signer pair
374 /// * `handler` - The [`Subscription`] data
375 ///
376 /// # Returns
377 /// * `Instruction` - An SVM instruction to be included in a transaction
378 pub fn update(signer: Pubkey, nonce: Nonce, mut handler: Subscription) -> Instruction {
379 let subscription_data_key = derive_subscription_address(signer, nonce);
380
381 // append a destroy instruction to one-shot subscription's instructions
382 //
383 // when the one-shot subscription is matched (`BankMatcher::generate_subscription_matches()`),
384 // it is removed from the matcher but the subscription is not automatically removed from the state
385 //
386 // add destroy instruction as the subscription's last instructions, meaning when all the user-specified
387 // instructions have been executed, remove the subscription from the state and return rent to the subscriber
388 if let SubscriptionKind::OneShot = handler.kind {
389 handler.instructions.push(Instruction::new_with_bincode(
390 crate::ID,
391 &SubscriberInstruction::Destroy { nonce },
392 vec![
393 AccountMeta::new_readonly(signer, true),
394 AccountMeta::new(subscription_data_key, false),
395 ],
396 ));
397 }
398
399 Instruction::new_with_bincode(
400 crate::ID,
401 &SubscriberInstruction::Update { nonce, handler },
402 vec![
403 AccountMeta::new(signer, true),
404 AccountMeta::new(subscription_data_key, false),
405 AccountMeta::new_readonly(system_program::id(), false),
406 ],
407 )
408 }
409}
410
411/// Generates the program-derived address (PDA) for the subscription data account.
412///
413/// # Arguments
414///
415/// * `subscriber` - The public key of the subscriber.
416/// * `nonce` - A slice of bytes used as a seed to generate the PDA.
417///
418/// # Returns
419///
420/// The public key of the subscription data account.
421pub fn derive_subscription_address<NONCE: Into<Nonce>>(subscriber: Pubkey, nonce: NONCE) -> Pubkey {
422 let nonce = nonce.into();
423 Pubkey::find_program_address(
424 &[
425 RIALO_SUBSCRIBE_SEED.as_bytes(),
426 subscriber.as_array(),
427 nonce.as_bytes(),
428 ],
429 &crate::ID,
430 )
431 .0
432}
433
434#[cfg(test)]
435mod test {
436 use super::*;
437
438 #[test]
439 fn test_subscribe_to_instruction() {
440 let signer = Pubkey::new_unique();
441 let handler = Subscription::new(
442 signer,
443 Predicate {
444 topic: String::from("MySubscription"),
445 event_account: Some(Pubkey::new_unique()),
446 timestamp_range: Some((1337, 1338)),
447 },
448 vec![Instruction::new_with_bincode(
449 Pubkey::new_unique(),
450 &["MyData"],
451 vec![],
452 )],
453 SubscriptionKind::Persistent,
454 0..=u64::MAX,
455 );
456
457 let instruction =
458 SubscriberInstruction::subscribe_to(signer, Nonce::default(), handler.clone());
459
460 assert_eq!(
461 instruction.data,
462 bincode::serialize(&SubscriberInstruction::Subscribe {
463 nonce: Nonce::default(),
464 handler,
465 })
466 .unwrap()
467 );
468 assert_eq!(instruction.program_id, crate::ID);
469
470 let subscription_data_account = derive_subscription_address(signer, Nonce::default());
471 assert_eq!(
472 instruction.accounts,
473 vec![
474 AccountMeta::new(signer, true),
475 AccountMeta::new(subscription_data_account, false),
476 AccountMeta::new_readonly(system_program::id(), false),
477 ]
478 )
479 }
480
481 #[test]
482 fn test_delete_subscription_instruction() {
483 let signer = Pubkey::new_unique();
484
485 let instruction = SubscriberInstruction::unsubscribe_from(signer, Nonce::default());
486
487 assert_eq!(
488 instruction.data,
489 bincode::serialize(&SubscriberInstruction::Unsubscribe {
490 nonce: Nonce::default()
491 })
492 .unwrap()
493 );
494 assert_eq!(instruction.program_id, crate::ID);
495
496 let subscription_data_account = derive_subscription_address(signer, Nonce::default());
497
498 assert_eq!(
499 instruction.accounts,
500 vec![
501 AccountMeta::new(signer, true),
502 AccountMeta::new(subscription_data_account, false),
503 ]
504 )
505 }
506
507 #[test]
508 fn test_update_subscription_instruction() {
509 let signer = Pubkey::new_unique();
510 let handler = Subscription {
511 subscriber: signer,
512 predicate: Predicate {
513 topic: String::from("MySubscription"),
514 event_account: Some(Pubkey::new_unique()),
515 timestamp_range: Some((1337, 1338)),
516 },
517 instructions: vec![Instruction::new_with_bincode(
518 Pubkey::new_unique(),
519 &["MyData"],
520 vec![],
521 )],
522 kind: SubscriptionKind::Persistent,
523 active_commits: 0..=u64::MAX,
524 };
525
526 let instruction = SubscriberInstruction::update(signer, Nonce::default(), handler.clone());
527
528 assert_eq!(
529 instruction.data,
530 bincode::serialize(&SubscriberInstruction::Update {
531 nonce: Nonce::default(),
532 handler
533 })
534 .unwrap()
535 );
536 assert_eq!(instruction.program_id, crate::ID);
537
538 let subscription_data_account = derive_subscription_address(signer, Nonce::default());
539
540 assert_eq!(
541 instruction.accounts,
542 vec![
543 AccountMeta::new(signer, true),
544 AccountMeta::new(subscription_data_account, false),
545 AccountMeta::new_readonly(system_program::id(), false),
546 ]
547 )
548 }
549
550 #[test]
551 fn test_one_shot() {
552 let signer = Pubkey::new_unique();
553 let mut handler = Subscription::new(
554 signer,
555 Predicate {
556 topic: String::from("MySubscription"),
557 event_account: Some(Pubkey::new_unique()),
558 timestamp_range: None,
559 },
560 vec![Instruction::new_with_bincode(
561 Pubkey::new_unique(),
562 &["MyData"],
563 vec![],
564 )],
565 SubscriptionKind::OneShot,
566 0..=u64::MAX,
567 );
568
569 let instruction =
570 SubscriberInstruction::subscribe_to(signer, Nonce::default(), handler.clone());
571
572 handler.instructions.push(Instruction::new_with_bincode(
573 crate::ID,
574 &SubscriberInstruction::Destroy {
575 nonce: Nonce::default(),
576 },
577 vec![
578 AccountMeta::new(signer, true),
579 AccountMeta::new(derive_subscription_address(signer, Nonce::default()), false),
580 ],
581 ));
582
583 assert_eq!(
584 instruction.data,
585 bincode::serialize(&SubscriberInstruction::Subscribe {
586 nonce: Nonce::default(),
587 handler,
588 })
589 .unwrap()
590 );
591 assert_eq!(instruction.program_id, crate::ID);
592
593 let subscription_data_account = derive_subscription_address(signer, Nonce::default());
594 assert_eq!(
595 instruction.accounts,
596 vec![
597 AccountMeta::new(signer, true),
598 AccountMeta::new(subscription_data_account, false),
599 AccountMeta::new_readonly(system_program::id(), false),
600 ]
601 )
602 }
603
604 #[test]
605 fn test_one_shot_with_timestamp_range() {
606 let signer = Pubkey::new_unique();
607 let mut handler = Subscription::new(
608 signer,
609 Predicate::new_with_timestamp_range(1337..1338),
610 vec![Instruction::new_with_bincode(
611 Pubkey::new_unique(),
612 &["MyData"],
613 vec![],
614 )],
615 SubscriptionKind::OneShot,
616 0..=u64::MAX,
617 );
618
619 let instruction =
620 SubscriberInstruction::subscribe_to(signer, Nonce::default(), handler.clone());
621
622 handler.instructions.push(Instruction::new_with_bincode(
623 crate::ID,
624 &SubscriberInstruction::Destroy {
625 nonce: Nonce::default(),
626 },
627 vec![
628 AccountMeta::new(signer, true),
629 AccountMeta::new(derive_subscription_address(signer, Nonce::default()), false),
630 ],
631 ));
632
633 assert_eq!(
634 instruction.data,
635 bincode::serialize(&SubscriberInstruction::Subscribe {
636 nonce: Nonce::default(),
637 handler,
638 })
639 .unwrap()
640 );
641 assert_eq!(instruction.program_id, crate::ID);
642
643 let subscription_data_account = derive_subscription_address(signer, Nonce::default());
644 assert_eq!(
645 instruction.accounts,
646 vec![
647 AccountMeta::new(signer, true),
648 AccountMeta::new(subscription_data_account, false),
649 AccountMeta::new_readonly(system_program::id(), false),
650 ]
651 )
652 }
653}