1#![doc = simple_mermaid::mermaid!("../docs/mermaid/sub_state_lifecycle.mmd")]
23#![cfg_attr(not(feature = "std"), no_std)]
49
50#[cfg(test)]
51mod tests;
52
53#[cfg(feature = "runtime-benchmarks")]
54mod benchmarking;
55
56pub mod impls;
57pub mod primitives;
58pub mod traits;
59pub mod weights;
60
61use crate::{
62 primitives::{
63 CreateSubParams, OriginKind, Quote, QuoteSubParams, SubInfoRequest, SubInfoResponse,
64 SubscriptionCallData, SubscriptionMetadata,
65 },
66 traits::{
67 BalanceDirection, DepositCalculator, DiffBalance, FeesError, FeesManager,
68 Subscription as SubscriptionTrait,
69 },
70};
71use codec::{Codec, Decode, DecodeWithMemTracking, Encode, EncodeLike, MaxEncodedLen};
72use frame_support::{
73 pallet_prelude::{
74 ensure, Blake2_128Concat, DispatchError, DispatchResult, DispatchResultWithPostInfo,
75 EnsureOrigin, Hooks, IsType, OptionQuery, StorageMap, StorageValue, ValueQuery, Weight,
76 Zero,
77 },
78 traits::{
79 fungible::{hold::Mutate as HoldMutate, Inspect},
80 tokens::Precision,
81 Get,
82 },
83};
84use frame_system::{ensure_signed, pallet_prelude::OriginFor};
85use scale_info::TypeInfo;
86use sp_arithmetic::traits::Unsigned;
87use sp_core::H256;
88use sp_idn_traits::{
89 pulse::{Dispatcher, Pulse},
90 Hashable,
91};
92use sp_runtime::traits::{Saturating, TryConvert};
93use sp_std::{boxed::Box, fmt::Debug, vec, vec::Vec};
94use xcm::{
95 prelude::{
96 AllOf, Asset, AssetFilter::Wild, AssetId, BuyExecution, DepositAsset, Junction, Junctions,
97 Location, RefundSurplus, Transact, Unlimited, WildFungibility, WithdrawAsset, Xcm,
98 },
99 DoubleEncoded, VersionedLocation, VersionedXcm,
100};
101use xcm_builder::SendController;
102use xcm_executor::traits::ConvertLocation;
103
104pub use frame_system::pallet_prelude::BlockNumberFor;
105pub use pallet::*;
106pub use weights::WeightInfo;
107
108const LOG_TARGET: &str = "pallet-idn-manager";
109
110pub type BalanceOf<T> = <<T as Config>::Currency as Inspect<AccountIdOf<T>>>::Balance;
112
113pub type MetadataOf<T> = SubscriptionMetadata<<T as Config>::MaxMetadataLen>;
115
116pub type CallDataOf<T> = SubscriptionCallData<<T as Config>::MaxCallDataLen>;
118
119pub type QuoteOf<T> = Quote<BalanceOf<T>>;
121
122pub type QuoteSubParamsOf<T> =
125 QuoteSubParams<CreateSubParamsOf<T>, BlockNumberFor<T>, CallDataOf<T>>;
126
127pub type SubInfoRequestOf<T> = SubInfoRequest<SubscriptionIdOf<T>, CallDataOf<T>>;
128
129pub type SubscriptionIdOf<T> = <T as pallet::Config>::SubscriptionId;
131
132pub type SubscriptionOf<T> = Subscription<
134 AccountIdOf<T>,
135 BlockNumberFor<T>,
136 <T as pallet::Config>::Credits,
137 MetadataOf<T>,
138 SubscriptionIdOf<T>,
139 CallDataOf<T>,
140>;
141
142#[derive(
144 Encode, Decode, Clone, TypeInfo, MaxEncodedLen, Debug, PartialEq, DecodeWithMemTracking,
145)]
146pub struct Subscription<AccountId, BlockNumber, Credits, Metadata, SubscriptionId, CallData> {
147 pub id: SubscriptionId,
149 pub details: SubscriptionDetails<AccountId, CallData>,
151 pub credits_left: Credits,
153 pub state: SubscriptionState,
155 pub created_at: BlockNumber,
157 pub updated_at: BlockNumber,
159 pub credits: Credits,
161 pub frequency: BlockNumber,
163 pub metadata: Option<Metadata>,
165 pub last_delivered: Option<BlockNumber>,
167}
168
169#[derive(
174 Encode, Decode, Clone, TypeInfo, MaxEncodedLen, Debug, PartialEq, DecodeWithMemTracking,
175)]
176pub struct SubscriptionDetails<AccountId, CallData> {
177 pub subscriber: AccountId,
179 pub target: Location,
181 pub call: CallData,
183 pub origin_kind: OriginKind,
185}
186
187pub type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
189
190pub type SubscriptionDetailsOf<T> = SubscriptionDetails<AccountIdOf<T>, CallDataOf<T>>;
193
194pub type CreateSubParamsOf<T> = CreateSubParams<
197 <T as pallet::Config>::Credits,
198 BlockNumberFor<T>,
199 MetadataOf<T>,
200 SubscriptionIdOf<T>,
201 CallDataOf<T>,
202>;
203
204#[derive(
208 Encode, Decode, DecodeWithMemTracking, Clone, TypeInfo, MaxEncodedLen, Debug, PartialEq,
209)]
210pub struct UpdateSubParams<SubId, Credits, BlockNumber, Metadata> {
211 pub sub_id: SubId,
213 pub credits: Option<Credits>,
215 pub frequency: Option<BlockNumber>,
217 pub metadata: Option<Option<Metadata>>,
219}
220
221pub type UpdateSubParamsOf<T> = UpdateSubParams<
224 <T as Config>::SubscriptionId,
225 <T as Config>::Credits,
226 BlockNumberFor<T>,
227 MetadataOf<T>,
228>;
229
230#[derive(
244 Encode, Decode, Clone, PartialEq, TypeInfo, MaxEncodedLen, Debug, DecodeWithMemTracking,
245)]
246pub enum SubscriptionState {
247 Active,
249 Paused,
251 Finalized,
253}
254
255#[frame_support::pallet]
256pub mod pallet {
257 use super::*;
258
259 #[pallet::pallet]
260 pub struct Pallet<T>(_);
261
262 #[pallet::config]
263 pub trait Config: frame_system::Config {
264 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
266
267 type Currency: Inspect<<Self as frame_system::pallet::Config>::AccountId>
269 + HoldMutate<
270 <Self as frame_system::pallet::Config>::AccountId,
271 Reason = Self::RuntimeHoldReason,
272 >;
273
274 type RuntimeHoldReason: From<HoldReason>;
276
277 type FeesManager: FeesManager<
279 BalanceOf<Self>,
280 Self::Credits,
281 BlockNumberFor<Self>,
282 BlockNumberFor<Self>,
283 SubscriptionOf<Self>,
284 DispatchError,
285 <Self as frame_system::pallet::Config>::AccountId,
286 Self::DiffBalance,
287 >;
288
289 type DepositCalculator: DepositCalculator<
291 BalanceOf<Self>,
292 SubscriptionOf<Self>,
293 Self::DiffBalance,
294 >;
295
296 type Pulse: Pulse + Encode + Decode + Debug + Clone + TypeInfo + PartialEq + Default;
298
299 type WeightInfo: WeightInfo;
301
302 type Xcm: xcm_builder::Controller<
305 OriginFor<Self>,
306 <Self as frame_system::Config>::RuntimeCall,
307 BlockNumberFor<Self>,
308 >;
309
310 #[pallet::constant]
312 type MaxMetadataLen: Get<u32>;
313
314 #[pallet::constant]
316 type MaxCallDataLen: Get<u32>;
317
318 #[pallet::constant]
321 type MaxXcmFees: Get<u128>;
322
323 type Credits: Unsigned
325 + Codec
326 + TypeInfo
327 + MaxEncodedLen
328 + Debug
329 + Saturating
330 + Copy
331 + PartialOrd
332 + From<BlockNumberFor<Self>>;
333
334 #[pallet::constant]
340 type MaxSubscriptions: Get<u32>;
341
342 #[pallet::constant]
347 type MaxTerminatableSubs: Get<u32>;
348
349 type SubscriptionId: From<H256>
351 + Codec
352 + Copy
353 + PartialEq
354 + TypeInfo
355 + EncodeLike
356 + MaxEncodedLen
357 + Debug;
358
359 type DiffBalance: DiffBalance<BalanceOf<Self>>;
361
362 type XcmOriginFilter: EnsureOrigin<Self::RuntimeOrigin, Success = Location>;
370
371 type XcmLocationToAccountId: ConvertLocation<Self::AccountId>;
373
374 type LocalOriginToLocation: TryConvert<Self::RuntimeOrigin, Location>;
376 }
377
378 #[pallet::storage]
380 pub(crate) type Subscriptions<T: Config> =
381 StorageMap<_, Blake2_128Concat, T::SubscriptionId, SubscriptionOf<T>, OptionQuery>;
382
383 #[pallet::storage]
386 pub(crate) type SubCounter<T: Config> = StorageValue<_, u32, ValueQuery>;
387
388 #[pallet::event]
389 #[pallet::generate_deposit(pub(super) fn deposit_event)]
390 pub enum Event<T: Config> {
391 SubscriptionCreated { sub_id: T::SubscriptionId },
393 SubscriptionTerminated { sub_id: T::SubscriptionId },
395 SubscriptionPaused { sub_id: T::SubscriptionId },
397 SubscriptionUpdated { sub_id: T::SubscriptionId },
399 SubscriptionReactivated { sub_id: T::SubscriptionId },
401 RandomnessDistributed { sub_id: T::SubscriptionId },
403 FeesCollected { sub_id: T::SubscriptionId, fees: BalanceOf<T> },
405 SubQuoted { requester: Location, quote: QuoteOf<T> },
407 SubscriptionDistributed { sub_id: T::SubscriptionId },
409 }
410
411 #[pallet::error]
412 pub enum Error<T> {
413 SubscriptionAlreadyExists,
415 SubscriptionDoesNotExist,
417 SubscriptionAlreadyActive,
419 SubscriptionInvalidTransition,
421 SubscriptionNotUpdatable,
423 NotSubscriber,
425 TooManySubscriptions,
427 InvalidSubscriber,
429 InvalidParams,
431 XcmSendError,
433 }
434
435 #[pallet::composite_enum]
437 pub enum HoldReason {
438 #[codec(index = 0)]
440 Fees,
441 #[codec(index = 1)]
443 StorageDeposit,
444 }
445
446 #[pallet::hooks]
447 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
448 fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
451 <T as pallet::Config>::WeightInfo::on_finalize()
452 }
453
454 fn on_finalize(_n: BlockNumberFor<T>) {
458 for (sub_id, sub) in Subscriptions::<T>::iter()
460 .filter(|(_, sub)| sub.state == SubscriptionState::Finalized)
461 .take(T::MaxTerminatableSubs::get() as usize)
462 {
463 let _ = Self::terminate_subscription(&sub, sub_id);
465 }
466 }
467 }
468
469 #[pallet::call]
470 impl<T: Config> Pallet<T> {
471 #[pallet::call_index(0)]
487 #[pallet::weight(T::WeightInfo::create_subscription())]
488 #[allow(clippy::useless_conversion)]
489 pub fn create_subscription(
490 origin: OriginFor<T>,
491 params: CreateSubParamsOf<T>,
492 ) -> DispatchResultWithPostInfo {
493 log::trace!(target: LOG_TARGET, "Create subscription request received: {:?}", params);
494 let subscriber = Self::ensure_signed_or_valid_xcm_origin(origin)?;
495
496 ensure!(params.credits != Zero::zero(), Error::<T>::InvalidParams);
497
498 ensure!(
499 SubCounter::<T>::get() < T::MaxSubscriptions::get(),
500 Error::<T>::TooManySubscriptions
501 );
502
503 let current_block = frame_system::Pallet::<T>::block_number();
504
505 let details = SubscriptionDetails {
506 subscriber: subscriber.clone(),
507 target: params.target.clone(),
508 call: params.call.clone(),
509 origin_kind: params.origin_kind.clone(),
510 };
511
512 let sub_id = params.sub_id.unwrap_or(Self::generate_sub_id(
513 &subscriber,
514 ¶ms,
515 ¤t_block,
516 ));
517
518 ensure!(
519 !Subscriptions::<T>::contains_key(sub_id),
520 Error::<T>::SubscriptionAlreadyExists
521 );
522
523 let subscription = Subscription {
524 id: sub_id,
525 state: SubscriptionState::Active,
526 credits_left: params.credits,
527 details,
528 created_at: current_block,
529 updated_at: current_block,
530 credits: params.credits,
531 frequency: params.frequency,
532 metadata: params.metadata,
533 last_delivered: None,
534 };
535
536 let fees = Self::calculate_subscription_fees(¶ms.credits);
538
539 Self::hold_fees(&subscriber, fees)?;
540
541 Self::hold_deposit(
542 &subscriber,
543 T::DepositCalculator::calculate_storage_deposit(&subscription),
544 )?;
545
546 Subscriptions::<T>::insert(sub_id, subscription);
547
548 SubCounter::<T>::mutate(|c| c.saturating_inc());
550
551 Self::deposit_event(Event::SubscriptionCreated { sub_id });
552
553 Ok(Some(T::WeightInfo::create_subscription()).into())
554 }
555
556 #[pallet::call_index(1)]
572 #[pallet::weight(T::WeightInfo::pause_subscription())]
573 pub fn pause_subscription(
574 origin: OriginFor<T>,
576 sub_id: T::SubscriptionId,
577 ) -> DispatchResult {
578 log::trace!(target: LOG_TARGET, "Pause subscription request received: {:?}", sub_id);
579 let subscriber = Self::ensure_signed_or_valid_xcm_origin(origin)?;
580 Subscriptions::<T>::try_mutate(sub_id, |maybe_sub| {
581 let sub = maybe_sub.as_mut().ok_or(Error::<T>::SubscriptionDoesNotExist)?;
582 ensure!(sub.details.subscriber == subscriber, Error::<T>::NotSubscriber);
583 ensure!(
584 sub.state == SubscriptionState::Active,
585 Error::<T>::SubscriptionInvalidTransition
586 );
587 sub.state = SubscriptionState::Paused;
588 Self::deposit_event(Event::SubscriptionPaused { sub_id });
589 Ok(())
590 })
591 }
592
593 #[pallet::call_index(2)]
608 #[pallet::weight(T::WeightInfo::kill_subscription())]
609 pub fn kill_subscription(
610 origin: OriginFor<T>,
612 sub_id: T::SubscriptionId,
613 ) -> DispatchResult {
614 log::trace!(target: LOG_TARGET, "Kill subscription request received: {:?}", sub_id);
615 let subscriber = Self::ensure_signed_or_valid_xcm_origin(origin)?;
616
617 let sub =
618 Self::get_subscription(&sub_id).ok_or(Error::<T>::SubscriptionDoesNotExist)?;
619 ensure!(sub.details.subscriber == subscriber, Error::<T>::NotSubscriber);
620
621 Self::terminate_subscription(&sub, sub_id)
622 }
623
624 #[pallet::call_index(3)]
638 #[pallet::weight(T::WeightInfo::update_subscription(T::MaxMetadataLen::get()))]
639 #[allow(clippy::useless_conversion)]
640 pub fn update_subscription(
641 origin: OriginFor<T>,
643 params: UpdateSubParamsOf<T>,
644 ) -> DispatchResultWithPostInfo {
645 log::trace!(target: LOG_TARGET, "Update subscription request received: {:?}", params);
646 let subscriber = Self::ensure_signed_or_valid_xcm_origin(origin)?;
647
648 Subscriptions::<T>::try_mutate(params.sub_id, |maybe_sub| {
649 let sub = maybe_sub.as_mut().ok_or(Error::<T>::SubscriptionDoesNotExist)?;
650 ensure!(sub.details.subscriber == subscriber, Error::<T>::NotSubscriber);
651 ensure!(
652 sub.state == SubscriptionState::Active ||
653 sub.state == SubscriptionState::Paused,
654 Error::<T>::SubscriptionNotUpdatable
655 );
656
657 let mut fees_diff = T::DiffBalance::new(Zero::zero(), BalanceDirection::None);
658
659 if let Some(credits) = params.credits {
660 fees_diff = T::FeesManager::calculate_diff_fees(&sub.credits, &credits);
661 sub.credits = credits;
662 }
663
664 if let Some(frequency) = params.frequency {
665 sub.frequency = frequency;
666 }
667
668 if let Some(metadata) = params.metadata.clone() {
669 sub.metadata = metadata;
670 }
671 sub.updated_at = frame_system::Pallet::<T>::block_number();
672
673 let old_sub = sub.clone();
674
675 let deposit_diff = T::DepositCalculator::calculate_diff_deposit(sub, &old_sub);
676
677 Self::manage_diff_fees(&subscriber, &fees_diff)?;
679 Self::manage_diff_deposit(&subscriber, &deposit_diff)?;
681
682 Self::deposit_event(Event::SubscriptionUpdated { sub_id: params.sub_id });
683 Ok::<(), DispatchError>(())
684 })?;
685
686 Ok(Some(T::WeightInfo::update_subscription(if let Some(Some(md)) = params.metadata {
687 md.len().try_into().unwrap_or(T::MaxMetadataLen::get())
688 } else {
689 0
690 }))
691 .into())
692 }
693
694 #[pallet::call_index(4)]
703 #[pallet::weight(T::WeightInfo::reactivate_subscription())]
704 pub fn reactivate_subscription(
705 origin: OriginFor<T>,
707 sub_id: T::SubscriptionId,
708 ) -> DispatchResult {
709 log::trace!(target: LOG_TARGET, "Reactivating subscription: {:?}", sub_id);
710 let subscriber = Self::ensure_signed_or_valid_xcm_origin(origin)?;
711 Subscriptions::<T>::try_mutate(sub_id, |maybe_sub| {
712 let sub = maybe_sub.as_mut().ok_or(Error::<T>::SubscriptionDoesNotExist)?;
713 ensure!(sub.details.subscriber == subscriber, Error::<T>::NotSubscriber);
714 ensure!(
715 sub.state == SubscriptionState::Paused,
716 Error::<T>::SubscriptionInvalidTransition
717 );
718 sub.state = SubscriptionState::Active;
719 Self::deposit_event(Event::SubscriptionReactivated { sub_id });
720 Ok(())
721 })
722 }
723
724 #[pallet::call_index(5)]
727 #[pallet::weight(T::WeightInfo::quote_subscription())]
728 #[allow(clippy::useless_conversion)]
729 pub fn quote_subscription(
730 origin: OriginFor<T>,
731 params: QuoteSubParamsOf<T>,
732 ) -> DispatchResultWithPostInfo {
733 log::trace!(target: LOG_TARGET, "Quote subscription request received: {:?}", params);
734 let requester: Location = T::XcmOriginFilter::ensure_origin(origin.clone())?;
736 let target = Self::extract_parachain_location(&requester)?;
737 let acc = &T::XcmLocationToAccountId::convert_location(&requester)
738 .ok_or_else(|| {
739 log::warn!(target: LOG_TARGET, "InvalidSubscriber: failed to convert XCM location to AccountId");
740 Error::<T>::InvalidSubscriber
741 })?;
742 let deposit = Self::calculate_storage_deposit_from_create_params(
743 acc,
744 ¶ms.quote_request.create_sub_params,
745 );
746
747 let credits = T::FeesManager::calculate_credits(
748 params.quote_request.create_sub_params.frequency,
749 params.quote_request.lifetime_pulses,
750 );
751
752 let fees = Self::calculate_subscription_fees(&credits);
753
754 let quote = Quote { req_ref: params.quote_request.req_ref, fees, deposit };
755
756 Self::xcm_send(
757 acc,
758 &target,
759 {
761 let mut call_data = params.call.clone().into_inner();
762 call_data.extend("e.encode());
763 call_data.into()
764 },
765 params.origin_kind,
766 )?;
767 Self::deposit_event(Event::SubQuoted { requester, quote });
768
769 Ok(Some(T::WeightInfo::quote_subscription()).into())
770 }
771
772 #[pallet::call_index(6)]
775 #[pallet::weight(T::WeightInfo::get_subscription_info())]
776 pub fn get_subscription_info(
777 origin: OriginFor<T>,
778 req: SubInfoRequestOf<T>,
779 ) -> DispatchResult {
780 log::trace!(target: LOG_TARGET, "Get subscription info request received: {:?}", req);
781 let requester: Location = T::XcmOriginFilter::ensure_origin(origin.clone())?;
783 let target = Self::extract_parachain_location(&requester)?;
784 let acc = &T::XcmLocationToAccountId::convert_location(&requester)
785 .ok_or_else(|| {
786 log::warn!(target: LOG_TARGET, "InvalidSubscriber: failed to convert XCM location to AccountId");
787 Error::<T>::InvalidSubscriber
788 })?;
789 let sub =
790 Self::get_subscription(&req.sub_id).ok_or(Error::<T>::SubscriptionDoesNotExist)?;
791
792 let response = SubInfoResponse { sub, req_ref: req.req_ref };
793 Self::xcm_send(
794 acc,
795 &target,
796 {
798 let mut call_data = req.call.clone().into_inner();
799 call_data.extend(&response.encode());
800 call_data.into()
801 },
802 req.origin_kind,
803 )?;
804 Self::deposit_event(Event::SubscriptionDistributed { sub_id: req.sub_id });
805
806 Ok(())
807 }
808 }
809}
810
811impl<T: Config> Pallet<T> {
812 pub fn calculate_subscription_fees(credits: &T::Credits) -> BalanceOf<T> {
817 T::FeesManager::calculate_subscription_fees(credits)
818 }
819
820 pub fn get_subscription(sub_id: &T::SubscriptionId) -> Option<SubscriptionOf<T>> {
824 Subscriptions::<T>::get(sub_id)
825 }
826
827 pub fn get_subscriptions_for_subscriber(subscriber: &T::AccountId) -> Vec<SubscriptionOf<T>> {
832 Subscriptions::<T>::iter()
833 .filter(|(_, sub)| &sub.details.subscriber == subscriber)
834 .map(|(_, sub)| sub)
835 .collect()
836 }
837
838 pub(crate) fn extract_parachain_location(
843 location: &Location,
844 ) -> Result<Location, DispatchError> {
845 let first_junction = match location.unpack() {
846 (1, interior) => interior.first(),
847 _ => None,
848 };
849
850 match first_junction {
851 Some(Junction::Parachain(id)) => Ok(Location::new(1, [Junction::Parachain(*id)])),
852 _ => Err(Error::<T>::InvalidSubscriber.into()),
853 }
854 }
855
856 pub(crate) fn calculate_storage_deposit_from_create_params(
860 subscriber: &T::AccountId,
861 params: &CreateSubParamsOf<T>,
862 ) -> BalanceOf<T> {
863 let current_block = frame_system::Pallet::<T>::block_number();
864
865 let details = SubscriptionDetails {
866 subscriber: subscriber.clone(),
867 target: params.target.clone(),
868 call: params.call.clone(),
869 origin_kind: OriginKind::default(),
870 };
871
872 let subscription = Subscription {
873 id: SubscriptionIdOf::<T>::from(H256::default()),
874 state: SubscriptionState::Active,
875 credits_left: params.credits,
876 details,
877 created_at: current_block,
878 updated_at: current_block,
879 credits: params.credits,
880 frequency: params.frequency,
881 metadata: params.metadata.clone(),
882 last_delivered: None,
883 };
884 T::DepositCalculator::calculate_storage_deposit(&subscription)
885 }
886
887 pub(crate) fn terminate_subscription(
889 sub: &SubscriptionOf<T>,
890 sub_id: T::SubscriptionId,
891 ) -> DispatchResult {
892 log::trace!(target: LOG_TARGET, "Terminating subscription: {:?}", sub_id);
893 let fees_diff = T::FeesManager::calculate_diff_fees(&sub.credits_left, &Zero::zero());
895 let sd = T::DepositCalculator::calculate_storage_deposit(sub);
896
897 Self::manage_diff_fees(&sub.details.subscriber, &fees_diff)?;
898 Self::release_deposit(&sub.details.subscriber, sd)?;
899
900 Subscriptions::<T>::remove(sub_id);
901
902 SubCounter::<T>::mutate(|c| c.saturating_dec());
904
905 Self::deposit_event(Event::SubscriptionTerminated { sub_id });
906 Ok(())
907 }
908
909 pub(crate) fn get_min_credits(sub: &SubscriptionOf<T>) -> T::Credits {
910 T::FeesManager::get_idle_credits(Some(sub))
911 }
912
913 fn distribute(pulse: T::Pulse) {
916 let current_block = frame_system::Pallet::<T>::block_number();
918
919 for (sub_id, mut sub) in Subscriptions::<T>::iter() {
920 if
922 sub.state == SubscriptionState::Active &&
924 (sub.last_delivered.is_none() ||
926 current_block >= sub.last_delivered.unwrap() + sub.frequency)
927 {
928 let consume_credits = T::FeesManager::get_consume_credits(Some(&sub));
930
931 if let Err(e) = Self::collect_fees(&sub, consume_credits) {
932 Self::pause_subscription_on_error(sub_id, sub, "Failed to collect fees", e);
933 continue;
934 }
935
936 sub.credits_left = sub.credits_left.saturating_sub(consume_credits);
938 sub.last_delivered = Some(current_block);
939
940 let subscriber = sub.subscriber();
941
942 if let Err(e) = Self::xcm_send(
944 subscriber,
945 &sub.details.target,
946 {
948 let mut call_data = sub.details.call.clone().into_inner();
949 call_data.extend((&pulse, &sub_id).encode());
950 call_data.into()
951 },
952 sub.details.origin_kind.clone(),
953 ) {
954 Self::pause_subscription_on_error(sub_id, sub, "Failed to dispatch XCM", e);
955 continue;
956 }
957
958 log::info!(target: LOG_TARGET, "Randomness distributed");
959 Self::deposit_event(Event::RandomnessDistributed { sub_id });
960 } else {
961 let idle_credits = T::FeesManager::get_idle_credits(Some(&sub));
962 if let Err(e) = Self::collect_fees(&sub, idle_credits) {
964 log::warn!(
965 target: LOG_TARGET,
966 "Failed to collect fees for idle subscription id = {:?}: {:?}", sub_id, e);
967 }
968 sub.credits_left = sub.credits_left.saturating_sub(idle_credits);
970 }
971
972 if sub.state != SubscriptionState::Finalized &&
974 sub.credits_left < Self::get_min_credits(&sub)
975 {
976 sub.state = SubscriptionState::Finalized;
977 }
978
979 Subscriptions::<T>::insert(sub_id, &sub);
981 }
982 }
983
984 fn pause_subscription_on_error(
986 sub_id: SubscriptionIdOf<T>,
987 mut sub: SubscriptionOf<T>,
988 message: &str,
989 err: DispatchError,
990 ) {
991 log::warn!(
992 target: LOG_TARGET,
993 "{}: subscription id = {:?}. Pausing subscription due to: {:?}",
994 message,
995 sub_id,
996 err
997 );
998 sub.state = SubscriptionState::Paused;
999 Subscriptions::<T>::insert(sub_id, &sub);
1000 Self::deposit_event(Event::SubscriptionPaused { sub_id });
1001 }
1002
1003 fn collect_fees(sub: &SubscriptionOf<T>, credits_consumed: T::Credits) -> DispatchResult {
1018 if credits_consumed.is_zero() {
1019 return Ok(());
1020 }
1021 let fees_to_collect = T::FeesManager::calculate_diff_fees(
1022 &sub.credits_left,
1023 &sub.credits_left.saturating_sub(credits_consumed),
1024 )
1025 .balance();
1026 let fees = T::FeesManager::collect_fees(&fees_to_collect, sub).map_err(|e| match e {
1027 FeesError::NotEnoughBalance { .. } => DispatchError::Other("NotEnoughBalance"),
1028 FeesError::Other(de) => de,
1029 })?;
1030 Self::deposit_event(Event::FeesCollected { sub_id: sub.id, fees });
1031 Ok(())
1032 }
1033
1034 fn generate_sub_id(
1039 subscriber: &T::AccountId,
1040 params: &CreateSubParamsOf<T>,
1041 current_block: &BlockNumberFor<T>,
1042 ) -> T::SubscriptionId {
1043 let mut salt = current_block.encode();
1044 salt.extend(subscriber.encode());
1045 params.hash(&salt).into()
1046 }
1047
1048 fn hold_fees(subscriber: &T::AccountId, fees: BalanceOf<T>) -> DispatchResult {
1053 T::Currency::hold(&HoldReason::Fees.into(), subscriber, fees)
1054 }
1055
1056 fn release_fees(subscriber: &T::AccountId, fees: BalanceOf<T>) -> DispatchResult {
1060 let _ = T::Currency::release(&HoldReason::Fees.into(), subscriber, fees, Precision::Exact)?;
1061 Ok(())
1062 }
1063
1064 fn manage_diff_fees(subscriber: &T::AccountId, diff: &T::DiffBalance) -> DispatchResult {
1069 match diff.direction() {
1070 BalanceDirection::Collect => Self::hold_fees(subscriber, diff.balance()),
1071 BalanceDirection::Release => Self::release_fees(subscriber, diff.balance()),
1072 BalanceDirection::None => Ok(()),
1073 }
1074 }
1075
1076 fn hold_deposit(subscriber: &T::AccountId, deposit: BalanceOf<T>) -> DispatchResult {
1081 T::Currency::hold(&HoldReason::StorageDeposit.into(), subscriber, deposit)
1082 }
1083
1084 fn release_deposit(subscriber: &T::AccountId, deposit: BalanceOf<T>) -> DispatchResult {
1089 let _ = T::Currency::release(
1090 &HoldReason::StorageDeposit.into(),
1091 subscriber,
1092 deposit,
1093 Precision::BestEffort,
1094 )?;
1095 Ok(())
1096 }
1097
1098 fn manage_diff_deposit(subscriber: &T::AccountId, diff: &T::DiffBalance) -> DispatchResult {
1103 match diff.direction() {
1104 BalanceDirection::Collect => Self::hold_deposit(subscriber, diff.balance()),
1105 BalanceDirection::Release => Self::release_deposit(subscriber, diff.balance()),
1106 BalanceDirection::None => Ok(()),
1107 }
1108 }
1109
1110 fn xcm_send(
1119 acc: &AccountIdOf<T>,
1120 target: &Location,
1121 call: DoubleEncoded<()>,
1122 origin_kind: OriginKind,
1123 ) -> DispatchResult {
1124 let origin = T::RuntimeOrigin::from(Some(acc.clone()).into());
1125 let fee_asset = Asset {
1126 id: AssetId(Location { parents: 1, interior: Junctions::Here }),
1127 fun: T::MaxXcmFees::get().into(),
1129 };
1130 let msg = Xcm(vec![
1131 WithdrawAsset(fee_asset.clone().into()),
1132 BuyExecution { weight_limit: Unlimited, fees: fee_asset.clone() },
1133 Transact { origin_kind: origin_kind.into(), fallback_max_weight: None, call },
1134 RefundSurplus,
1135 DepositAsset {
1136 assets: Wild(AllOf { id: fee_asset.id, fun: WildFungibility::Fungible }),
1137 beneficiary: T::LocalOriginToLocation::try_convert(origin.clone())
1139 .map_err(|_| Error::<T>::XcmSendError)?,
1140 },
1141 ]);
1142 let versioned_target: Box<VersionedLocation> =
1143 Box::new(VersionedLocation::V5(target.clone()));
1144 let versioned_msg: Box<VersionedXcm<()>> = Box::new(VersionedXcm::V5(msg.into()));
1145 let xcm_hash = T::Xcm::send(origin, versioned_target.clone(), versioned_msg.clone())?;
1146 log::trace!(
1147 target: LOG_TARGET,
1148 "XCM message sent with hash: {:?}. Target: {:?}. Msg: {:?}",
1149 xcm_hash,
1150 versioned_target,
1151 versioned_msg
1152 );
1153 Ok(())
1154 }
1155
1156 fn ensure_signed_or_valid_xcm_origin(
1168 origin: OriginFor<T>,
1169 ) -> Result<T::AccountId, DispatchError> {
1170 match T::XcmOriginFilter::ensure_origin(origin.clone()) {
1171 Ok(location) => T::XcmLocationToAccountId::convert_location(&location)
1172 .ok_or(DispatchError::from(Error::<T>::InvalidSubscriber)),
1173 Err(_) => ensure_signed(origin).map_err(|e| e.into()),
1174 }
1175 }
1176
1177 #[cfg(any(test, feature = "runtime-benchmarks"))]
1179 fn min_balance() -> BalanceOf<T> {
1180 <T::Currency as Inspect<AccountIdOf<T>>>::minimum_balance()
1182 }
1183}
1184
1185impl<T: Config> Dispatcher<T::Pulse> for Pallet<T> {
1186 fn dispatch(pulse: T::Pulse) {
1192 Pallet::<T>::distribute(pulse);
1193 }
1194
1195 fn dispatch_weight() -> Weight {
1196 T::WeightInfo::dispatch_pulse(SubCounter::<T>::get())
1197 }
1198}
1199
1200sp_api::decl_runtime_apis! {
1201 #[api_version(1)]
1202 pub trait IdnManagerApi<Balance, Credits, AccountId, Subscription, SubscriptionId> where
1203 Balance: Codec,
1204 Credits: Codec,
1205 AccountId: Codec,
1206 Subscription: Codec,
1207 SubscriptionId: Codec
1208 {
1209 fn calculate_subscription_fees(
1213 credits: Credits
1215 ) -> Balance;
1216
1217 fn get_subscription(
1221 sub_id: SubscriptionId
1223 ) -> Option<Subscription>;
1224
1225 fn get_subscriptions_for_subscriber(
1229 subscriber: AccountId
1231 ) -> Vec<Subscription>;
1232 }
1233}