kaspa_notify/subscription/
mod.rs

1use crate::{error::Result, events::EventType, notification::Notification, scope::Scope, subscription::context::SubscriptionContext};
2use borsh::{BorshDeserialize, BorshSerialize};
3use serde::{Deserialize, Serialize};
4use std::fmt::Display;
5use std::ops::Deref;
6use std::{
7    any::Any,
8    fmt::Debug,
9    hash::{Hash, Hasher},
10    sync::Arc,
11};
12
13pub mod array;
14pub mod compounded;
15pub mod context;
16pub mod single;
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
19#[borsh(use_discriminant = true)]
20pub enum Command {
21    Start = 0,
22    Stop = 1,
23}
24
25impl Display for Command {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        let label = match self {
28            Command::Start => "start",
29            Command::Stop => "stop",
30        };
31        write!(f, "{label}")
32    }
33}
34
35impl From<Command> for i32 {
36    fn from(item: Command) -> Self {
37        item as i32
38    }
39}
40
41impl From<i32> for Command {
42    // We make this conversion infallible by falling back to Start from any unexpected value.
43    fn from(item: i32) -> Self {
44        if item == 1 {
45            Command::Stop
46        } else {
47            Command::Start
48        }
49    }
50}
51
52/// Defines how an incoming UtxosChanged mutation must be propagated upwards
53#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
54pub enum UtxosChangedMutationPolicy {
55    /// Mutation granularity defined at address level
56    #[default]
57    AddressSet,
58
59    /// Mutation granularity reduced to all or nothing
60    Wildcard,
61}
62
63#[derive(Clone, Copy, Default, Debug)]
64pub struct MutationPolicies {
65    pub utxo_changed: UtxosChangedMutationPolicy,
66}
67
68impl MutationPolicies {
69    pub fn new(utxo_changed: UtxosChangedMutationPolicy) -> Self {
70        Self { utxo_changed }
71    }
72}
73
74/// A subscription mutation formed by a start/stop command and
75/// a notification scope.
76#[derive(Clone, Debug, PartialEq, Eq)]
77pub struct Mutation {
78    pub command: Command,
79    pub scope: Scope,
80}
81
82impl Mutation {
83    pub fn new(command: Command, scope: Scope) -> Self {
84        Self { command, scope }
85    }
86
87    #[inline(always)]
88    pub fn active(&self) -> bool {
89        self.command == Command::Start
90    }
91
92    #[inline(always)]
93    pub fn event_type(&self) -> EventType {
94        (&self.scope).into()
95    }
96}
97
98pub trait Subscription {
99    fn event_type(&self) -> EventType;
100    fn active(&self) -> bool;
101    fn scope(&self, context: &SubscriptionContext) -> Scope;
102}
103
104pub trait Compounded: Subscription + AsAny + DynEq + CompoundedClone + Debug + Send + Sync {
105    fn compound(&mut self, mutation: Mutation, context: &SubscriptionContext) -> Option<Mutation>;
106}
107
108impl PartialEq for dyn Compounded {
109    fn eq(&self, other: &dyn Compounded) -> bool {
110        DynEq::dyn_eq(self, other.as_any())
111    }
112}
113impl Eq for dyn Compounded {}
114
115pub type CompoundedSubscription = Box<dyn Compounded>;
116
117/// The result of applying a [`Mutation`] to a [`DynSubscription`]
118pub struct MutationOutcome {
119    /// Optional new mutated subscription state
120    pub mutated: Option<DynSubscription>,
121
122    /// Mutations applied to the [`DynSubscription`]
123    pub mutations: Vec<Mutation>,
124}
125
126impl MutationOutcome {
127    pub fn new() -> Self {
128        Self { mutated: None, mutations: vec![] }
129    }
130
131    pub fn with_mutations(mutations: Vec<Mutation>) -> Self {
132        Self { mutated: None, mutations }
133    }
134
135    pub fn with_mutated(mutated: DynSubscription, mutations: Vec<Mutation>) -> Self {
136        Self { mutated: Some(mutated), mutations }
137    }
138
139    /// Updates `target` to the mutated state if any, otherwise leave `target` as is.
140    pub fn apply_to(self, target: &mut DynSubscription) -> Self {
141        if let Some(ref mutated) = self.mutated {
142            *target = mutated.clone();
143        }
144        self
145    }
146
147    #[inline(always)]
148    pub fn has_new_state(&self) -> bool {
149        self.mutated.is_some()
150    }
151
152    #[inline(always)]
153    pub fn has_changes(&self) -> bool {
154        self.has_new_state() || !self.mutations.is_empty()
155    }
156}
157
158impl Default for MutationOutcome {
159    fn default() -> Self {
160        Self::new()
161    }
162}
163
164/// A single subscription (as opposed to a compounded one)
165pub trait Single: Subscription + AsAny + DynHash + DynEq + Debug + Send + Sync {
166    /// Applies a [`Mutation`] to a single subscription.
167    ///
168    /// On success, returns both an optional new state and the mutations, if any, resulting of the process.
169    ///
170    /// Implementation guidelines:
171    ///
172    /// - If the processing of the mutation yields no change, the returned outcome must have no new state and no mutations
173    ///   otherwise the outcome should contain both a new state (see next point for exception) and some mutations.
174    /// - If the subscription has inner mutability and its current state and incoming mutation do allow an inner mutation,
175    ///   the outcome new state must be empty.
176    fn apply_mutation(
177        &self,
178        arc_self: &Arc<dyn Single>,
179        mutation: Mutation,
180        policies: MutationPolicies,
181        context: &SubscriptionContext,
182    ) -> Result<MutationOutcome>;
183}
184
185pub trait MutateSingle: Deref<Target = dyn Single> {
186    /// Applies a [`Mutation`] to a single subscription.
187    ///
188    /// On success, updates `self` to the new state if any and returns both the optional new state and the mutations
189    /// resulting of the process as a [`MutationOutcome`].
190    fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies, context: &SubscriptionContext) -> Result<MutationOutcome>;
191}
192
193impl MutateSingle for Arc<dyn Single> {
194    fn mutate(&mut self, mutation: Mutation, policies: MutationPolicies, context: &SubscriptionContext) -> Result<MutationOutcome> {
195        let outcome = self.apply_mutation(self, mutation, policies, context)?.apply_to(self);
196        Ok(outcome)
197    }
198}
199
200pub trait BroadcastingSingle: Deref<Target = dyn Single> {
201    /// Returns the broadcasting instance of the subscription.
202    ///
203    /// This is used for grouping all the wildcard UtxosChanged subscriptions under
204    /// the same unique instance in the broadcaster plans, allowing message optimizations
205    /// during broadcasting of the notifications.
206    fn broadcasting(self, context: &SubscriptionContext) -> DynSubscription;
207}
208
209impl Hash for dyn Single {
210    fn hash<H: Hasher>(&self, state: &mut H) {
211        self.dyn_hash(state);
212    }
213}
214impl PartialEq for dyn Single {
215    fn eq(&self, other: &dyn Single) -> bool {
216        DynEq::dyn_eq(self, other.as_any())
217    }
218}
219impl Eq for dyn Single {}
220
221pub type DynSubscription = Arc<dyn Single>;
222
223pub trait AsAny {
224    fn as_any(&self) -> &dyn Any;
225}
226impl<T: Any> AsAny for T {
227    fn as_any(&self) -> &dyn Any {
228        self
229    }
230}
231
232pub trait DynHash {
233    fn dyn_hash(&self, state: &mut dyn Hasher);
234}
235impl<H: Hash + ?Sized> DynHash for H {
236    fn dyn_hash(&self, mut state: &mut dyn Hasher) {
237        self.hash(&mut state);
238    }
239}
240
241pub trait DynEq {
242    fn dyn_eq(&self, other: &dyn Any) -> bool;
243}
244impl<T: Eq + Any> DynEq for T {
245    fn dyn_eq(&self, other: &dyn Any) -> bool {
246        if let Some(other) = other.downcast_ref::<Self>() {
247            self == other
248        } else {
249            false
250        }
251    }
252}
253
254pub trait CompoundedClone {
255    fn clone_box(&self) -> Box<dyn Compounded>;
256}
257
258impl<T> CompoundedClone for T
259where
260    T: 'static + Compounded + Clone,
261{
262    fn clone_box(&self) -> Box<dyn Compounded> {
263        Box::new(self.clone())
264    }
265}
266
267pub trait ApplyTo {
268    fn apply_to<N: Notification>(&self, notification: &N) -> Option<N>;
269}