kaspa_notify/subscription/
single.rs

1use crate::{
2    address::tracker::{Index, Indexes},
3    error::Result,
4    events::EventType,
5    listener::ListenerId,
6    scope::{Scope, UtxosChangedScope, VirtualChainChangedScope},
7    subscription::{
8        context::SubscriptionContext, BroadcastingSingle, Command, DynSubscription, Mutation, MutationOutcome, MutationPolicies,
9        Single, Subscription, UtxosChangedMutationPolicy,
10    },
11};
12use itertools::Itertools;
13use kaspa_addresses::{Address, Prefix};
14use kaspa_consensus_core::tx::ScriptPublicKey;
15use kaspa_core::trace;
16use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
17use std::{
18    collections::hash_set,
19    fmt::{Debug, Display},
20    hash::{Hash, Hasher},
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26
27/// Subscription with a all or none scope.
28///
29/// To be used by all notifications which [`Scope`] variant is fieldless.
30#[derive(Eq, PartialEq, Hash, Clone, Debug)]
31pub struct OverallSubscription {
32    event_type: EventType,
33    active: bool,
34}
35
36impl OverallSubscription {
37    pub fn new(event_type: EventType, active: bool) -> Self {
38        Self { event_type, active }
39    }
40}
41
42impl Single for OverallSubscription {
43    fn apply_mutation(
44        &self,
45        _: &Arc<dyn Single>,
46        mutation: Mutation,
47        _: MutationPolicies,
48        _: &SubscriptionContext,
49    ) -> Result<MutationOutcome> {
50        assert_eq!(self.event_type(), mutation.event_type());
51        Ok(if self.active != mutation.active() {
52            let mutated = Self::new(self.event_type, mutation.active());
53            MutationOutcome::with_mutated(Arc::new(mutated), vec![mutation])
54        } else {
55            MutationOutcome::new()
56        })
57    }
58}
59
60impl Subscription for OverallSubscription {
61    #[inline(always)]
62    fn event_type(&self) -> EventType {
63        self.event_type
64    }
65
66    #[inline(always)]
67    fn active(&self) -> bool {
68        self.active
69    }
70
71    fn scope(&self, _context: &SubscriptionContext) -> Scope {
72        self.event_type.into()
73    }
74}
75
76/// Subscription to VirtualChainChanged notifications
77#[derive(Eq, PartialEq, Hash, Clone, Debug, Default)]
78pub struct VirtualChainChangedSubscription {
79    active: bool,
80    include_accepted_transaction_ids: bool,
81}
82
83impl VirtualChainChangedSubscription {
84    pub fn new(active: bool, include_accepted_transaction_ids: bool) -> Self {
85        Self { active, include_accepted_transaction_ids }
86    }
87    pub fn include_accepted_transaction_ids(&self) -> bool {
88        self.include_accepted_transaction_ids
89    }
90}
91
92impl Single for VirtualChainChangedSubscription {
93    fn apply_mutation(
94        &self,
95        _: &Arc<dyn Single>,
96        mutation: Mutation,
97        _: MutationPolicies,
98        _: &SubscriptionContext,
99    ) -> Result<MutationOutcome> {
100        assert_eq!(self.event_type(), mutation.event_type());
101        let result = if let Scope::VirtualChainChanged(ref scope) = mutation.scope {
102            // Here we want the code to (almost) match a double entry table structure
103            // by subscription state and by mutation
104            #[allow(clippy::collapsible_else_if)]
105            if !self.active {
106                // State None
107                if !mutation.active() {
108                    // Mutation None
109                    None
110                } else {
111                    // Here is an exception to the aforementioned goal
112                    // Mutations Reduced and All
113                    let mutated = Self::new(true, scope.include_accepted_transaction_ids);
114                    Some((Arc::new(mutated), vec![mutation]))
115                }
116            } else if !self.include_accepted_transaction_ids {
117                // State Reduced
118                if !mutation.active() {
119                    // Mutation None
120                    let mutated = Self::new(false, false);
121                    Some((Arc::new(mutated), vec![Mutation::new(Command::Stop, VirtualChainChangedScope::new(false).into())]))
122                } else if !scope.include_accepted_transaction_ids {
123                    // Mutation Reduced
124                    None
125                } else {
126                    // Mutation All
127                    let mutated = Self::new(true, true);
128                    Some((
129                        Arc::new(mutated),
130                        vec![Mutation::new(Command::Stop, VirtualChainChangedScope::new(false).into()), mutation],
131                    ))
132                }
133            } else {
134                // State All
135                if !mutation.active() {
136                    // Mutation None
137                    let mutated = Self::new(false, false);
138                    Some((Arc::new(mutated), vec![Mutation::new(Command::Stop, VirtualChainChangedScope::new(true).into())]))
139                } else if !scope.include_accepted_transaction_ids {
140                    // Mutation Reduced
141                    let mutated = Self::new(true, false);
142                    Some((Arc::new(mutated), vec![mutation, Mutation::new(Command::Stop, VirtualChainChangedScope::new(true).into())]))
143                } else {
144                    // Mutation All
145                    None
146                }
147            }
148        } else {
149            None
150        };
151        let outcome = match result {
152            Some((mutated, mutations)) => MutationOutcome::with_mutated(mutated, mutations),
153            None => MutationOutcome::new(),
154        };
155        Ok(outcome)
156    }
157}
158
159impl Subscription for VirtualChainChangedSubscription {
160    #[inline(always)]
161    fn event_type(&self) -> EventType {
162        EventType::VirtualChainChanged
163    }
164
165    #[inline(always)]
166    fn active(&self) -> bool {
167        self.active
168    }
169
170    fn scope(&self, _context: &SubscriptionContext) -> Scope {
171        VirtualChainChangedScope::new(self.include_accepted_transaction_ids).into()
172    }
173}
174
175static UTXOS_CHANGED_SUBSCRIPTIONS: AtomicUsize = AtomicUsize::new(0);
176
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178enum UtxosChangedMutation {
179    None,
180    Remove,
181    Add,
182    All,
183}
184
185impl From<(Command, &UtxosChangedScope)> for UtxosChangedMutation {
186    fn from((command, scope): (Command, &UtxosChangedScope)) -> Self {
187        match (command, scope.addresses.is_empty()) {
188            (Command::Stop, true) => Self::None,
189            (Command::Stop, false) => Self::Remove,
190            (Command::Start, false) => Self::Add,
191            (Command::Start, true) => Self::All,
192        }
193    }
194}
195
196#[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq)]
197pub enum UtxosChangedState {
198    /// Inactive
199    #[default]
200    None,
201
202    /// Active on a set of selected addresses
203    Selected,
204
205    /// Active on all addresses
206    All,
207}
208
209impl UtxosChangedState {
210    pub fn active(&self) -> bool {
211        match self {
212            UtxosChangedState::None => false,
213            UtxosChangedState::Selected | UtxosChangedState::All => true,
214        }
215    }
216}
217
218impl Display for UtxosChangedState {
219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220        match self {
221            UtxosChangedState::None => write!(f, "none"),
222            UtxosChangedState::Selected => write!(f, "selected"),
223            UtxosChangedState::All => write!(f, "all"),
224        }
225    }
226}
227
228#[derive(Debug, Clone)]
229pub struct UtxosChangedSubscriptionData {
230    /// State of the subscription
231    ///
232    /// Can be mutated without affecting neither equality nor hash of the struct
233    state: UtxosChangedState,
234
235    /// Address indexes in `SubscriptionContext`
236    ///
237    /// Can be mutated without affecting neither equality nor hash of the struct
238    indexes: Indexes,
239}
240
241impl UtxosChangedSubscriptionData {
242    fn with_capacity(state: UtxosChangedState, capacity: usize) -> Self {
243        let indexes = Indexes::with_capacity(capacity);
244        Self { state, indexes }
245    }
246
247    #[inline(always)]
248    pub fn update_state(&mut self, new_state: UtxosChangedState) {
249        self.state = new_state;
250    }
251
252    pub fn contains(&self, spk: &ScriptPublicKey, context: &SubscriptionContext) -> bool {
253        context.address_tracker.contains(&self.indexes, spk)
254    }
255
256    pub fn len(&self) -> usize {
257        self.indexes.len()
258    }
259
260    pub fn is_empty(&self) -> bool {
261        self.indexes.is_empty()
262    }
263
264    pub fn capacity(&self) -> usize {
265        self.indexes.capacity()
266    }
267
268    pub fn iter(&self) -> hash_set::Iter<'_, Index> {
269        self.indexes.iter()
270    }
271
272    pub fn contains_address(&self, address: &Address, context: &SubscriptionContext) -> bool {
273        context.address_tracker.contains_address(&self.indexes, address)
274    }
275
276    pub fn to_addresses(&self, prefix: Prefix, context: &SubscriptionContext) -> Vec<Address> {
277        self.indexes.iter().filter_map(|index| context.address_tracker.get_address_at_index(*index, prefix)).collect_vec()
278    }
279
280    pub fn register(&mut self, addresses: Vec<Address>, context: &SubscriptionContext) -> Result<Vec<Address>> {
281        Ok(context.address_tracker.register(&mut self.indexes, addresses)?)
282    }
283
284    pub fn unregister(&mut self, addresses: Vec<Address>, context: &SubscriptionContext) -> Vec<Address> {
285        context.address_tracker.unregister(&mut self.indexes, addresses)
286    }
287
288    pub fn unregister_indexes(&mut self, context: &SubscriptionContext) -> Vec<Address> {
289        // TODO: consider using a provided prefix
290        let removed = self.to_addresses(Prefix::Mainnet, context);
291        context.address_tracker.unregister_indexes(&mut self.indexes);
292        removed
293    }
294
295    pub fn to_all(&self) -> bool {
296        matches!(self.state, UtxosChangedState::All)
297    }
298}
299
300impl Display for UtxosChangedSubscriptionData {
301    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302        match self.state {
303            UtxosChangedState::None | UtxosChangedState::All => write!(f, "{}", self.state),
304            UtxosChangedState::Selected => write!(f, "{}({})", self.state, self.indexes.len()),
305        }
306    }
307}
308
309#[derive(Debug)]
310pub struct UtxosChangedSubscription {
311    /// Mutable inner data
312    data: RwLock<UtxosChangedSubscriptionData>,
313
314    /// ID of the listener owning this subscription
315    ///
316    /// This fully determines both equality and hash.
317    listener_id: ListenerId,
318}
319
320impl UtxosChangedSubscription {
321    pub fn new(state: UtxosChangedState, listener_id: ListenerId) -> Self {
322        Self::with_capacity(state, listener_id, 0)
323    }
324
325    pub fn with_capacity(state: UtxosChangedState, listener_id: ListenerId, capacity: usize) -> Self {
326        let data = RwLock::new(UtxosChangedSubscriptionData::with_capacity(state, capacity));
327        let subscription = Self { data, listener_id };
328        trace!(
329            "UtxosChangedSubscription: {} in total (new {})",
330            UTXOS_CHANGED_SUBSCRIPTIONS.fetch_add(1, Ordering::SeqCst) + 1,
331            subscription
332        );
333        subscription
334    }
335
336    #[cfg(test)]
337    pub fn with_addresses(active: bool, addresses: Vec<Address>, listener_id: ListenerId, context: &SubscriptionContext) -> Self {
338        let state = match (active, addresses.is_empty()) {
339            (false, _) => UtxosChangedState::None,
340            (true, false) => UtxosChangedState::Selected,
341            (true, true) => UtxosChangedState::All,
342        };
343        let subscription = Self::with_capacity(state, listener_id, addresses.len());
344        let _ = subscription.data_mut().register(addresses, context);
345        subscription
346    }
347
348    pub fn data(&self) -> RwLockReadGuard<UtxosChangedSubscriptionData> {
349        self.data.read()
350    }
351
352    pub fn data_mut(&self) -> RwLockWriteGuard<UtxosChangedSubscriptionData> {
353        self.data.write()
354    }
355
356    #[inline(always)]
357    pub fn state(&self) -> UtxosChangedState {
358        self.data().state
359    }
360
361    pub fn to_all(&self) -> bool {
362        matches!(self.data().state, UtxosChangedState::All)
363    }
364}
365
366impl Clone for UtxosChangedSubscription {
367    fn clone(&self) -> Self {
368        let subscription = Self { data: RwLock::new(self.data().clone()), listener_id: self.listener_id };
369        trace!(
370            "UtxosChangedSubscription: {} in total (clone {})",
371            UTXOS_CHANGED_SUBSCRIPTIONS.fetch_add(1, Ordering::SeqCst) + 1,
372            subscription
373        );
374        subscription
375    }
376}
377
378impl Display for UtxosChangedSubscription {
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        write!(f, "{}", self.data())
381    }
382}
383
384impl Drop for UtxosChangedSubscription {
385    fn drop(&mut self) {
386        trace!(
387            "UtxosChangedSubscription: {} in total (drop {})",
388            UTXOS_CHANGED_SUBSCRIPTIONS.fetch_sub(1, Ordering::SeqCst) - 1,
389            self
390        );
391    }
392}
393
394impl PartialEq for UtxosChangedSubscription {
395    /// Equality is specifically bound to the listener ID
396    fn eq(&self, other: &Self) -> bool {
397        self.listener_id == other.listener_id
398    }
399}
400impl Eq for UtxosChangedSubscription {}
401
402impl Hash for UtxosChangedSubscription {
403    /// Hash is specifically bound to the listener ID
404    fn hash<H: Hasher>(&self, state: &mut H) {
405        self.listener_id.hash(state);
406    }
407}
408
409impl Single for UtxosChangedSubscription {
410    fn apply_mutation(
411        &self,
412        current: &Arc<dyn Single>,
413        mutation: Mutation,
414        policies: MutationPolicies,
415        context: &SubscriptionContext,
416    ) -> Result<MutationOutcome> {
417        assert_eq!(self.event_type(), mutation.event_type());
418        let outcome = if let Scope::UtxosChanged(scope) = mutation.scope {
419            let mut data = self.data_mut();
420            let state = data.state;
421            let mutation_type = UtxosChangedMutation::from((mutation.command, &scope));
422            match (state, mutation_type) {
423                (UtxosChangedState::None, UtxosChangedMutation::None | UtxosChangedMutation::Remove) => {
424                    // State None + Mutations None or Remove(R) => No change
425                    MutationOutcome::new()
426                }
427                (UtxosChangedState::None, UtxosChangedMutation::Add) => {
428                    // State None + Mutation Add(A) => Mutated new state Selected(A)
429                    let addresses = data.register(scope.addresses, context)?;
430                    data.update_state(UtxosChangedState::Selected);
431                    let mutations = match policies.utxo_changed {
432                        UtxosChangedMutationPolicy::AddressSet => {
433                            vec![Mutation::new(mutation.command, UtxosChangedScope::new(addresses).into())]
434                        }
435                        UtxosChangedMutationPolicy::Wildcard => {
436                            vec![Mutation::new(mutation.command, UtxosChangedScope::default().into())]
437                        }
438                    };
439                    MutationOutcome::with_mutated(current.clone(), mutations)
440                }
441                (UtxosChangedState::None, UtxosChangedMutation::All) => {
442                    // State None + Mutation All => Mutated new state All
443                    data.update_state(UtxosChangedState::All);
444                    let mutations = vec![Mutation::new(mutation.command, UtxosChangedScope::default().into())];
445                    MutationOutcome::with_mutated(current.clone(), mutations)
446                }
447                (UtxosChangedState::Selected, UtxosChangedMutation::None) => {
448                    // State Selected(S) + Mutation None => Mutated new state None
449                    data.update_state(UtxosChangedState::None);
450                    let removed = data.unregister_indexes(context);
451                    assert!(!removed.is_empty(), "state Selected implies a non empty address set");
452                    let mutations = match policies.utxo_changed {
453                        UtxosChangedMutationPolicy::AddressSet => {
454                            vec![Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into())]
455                        }
456                        UtxosChangedMutationPolicy::Wildcard => {
457                            vec![Mutation::new(Command::Stop, UtxosChangedScope::default().into())]
458                        }
459                    };
460                    MutationOutcome::with_mutated(current.clone(), mutations)
461                }
462                (UtxosChangedState::Selected, UtxosChangedMutation::Remove) => {
463                    // State Selected(S) + Mutation Remove(R) => Mutated state Selected(S – R) or mutated new state None or no change
464                    let removed = data.unregister(scope.addresses, context);
465                    match (removed.is_empty(), data.indexes.is_empty()) {
466                        (false, false) => {
467                            let mutations = match policies.utxo_changed {
468                                UtxosChangedMutationPolicy::AddressSet => {
469                                    vec![Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into())]
470                                }
471                                UtxosChangedMutationPolicy::Wildcard => vec![],
472                            };
473                            MutationOutcome::with_mutations(mutations)
474                        }
475                        (false, true) => {
476                            data.update_state(UtxosChangedState::None);
477                            let mutations = match policies.utxo_changed {
478                                UtxosChangedMutationPolicy::AddressSet => {
479                                    vec![Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into())]
480                                }
481                                UtxosChangedMutationPolicy::Wildcard => {
482                                    vec![Mutation::new(Command::Stop, UtxosChangedScope::default().into())]
483                                }
484                            };
485                            MutationOutcome::with_mutated(current.clone(), mutations)
486                        }
487                        (true, _) => MutationOutcome::new(),
488                    }
489                }
490                (UtxosChangedState::Selected, UtxosChangedMutation::Add) => {
491                    // State Selected(S) + Mutation Add(A) => Mutated state Selected(A ∪ S)
492                    let added = data.register(scope.addresses, context)?;
493                    match added.is_empty() {
494                        false => {
495                            let mutations = match policies.utxo_changed {
496                                UtxosChangedMutationPolicy::AddressSet => {
497                                    vec![Mutation::new(Command::Start, UtxosChangedScope::new(added).into())]
498                                }
499                                UtxosChangedMutationPolicy::Wildcard => vec![],
500                            };
501                            MutationOutcome::with_mutations(mutations)
502                        }
503                        true => MutationOutcome::new(),
504                    }
505                }
506                (UtxosChangedState::Selected, UtxosChangedMutation::All) => {
507                    // State Selected(S) + Mutation All => Mutated new state All
508                    let removed = data.unregister_indexes(context);
509                    assert!(!removed.is_empty(), "state Selected implies a non empty address set");
510                    data.update_state(UtxosChangedState::All);
511                    let mutations = match policies.utxo_changed {
512                        UtxosChangedMutationPolicy::AddressSet => vec![
513                            Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into()),
514                            Mutation::new(Command::Start, UtxosChangedScope::default().into()),
515                        ],
516                        UtxosChangedMutationPolicy::Wildcard => vec![],
517                    };
518                    MutationOutcome::with_mutated(current.clone(), mutations)
519                }
520                (UtxosChangedState::All, UtxosChangedMutation::None) => {
521                    // State All + Mutation None => Mutated new state None
522                    data.update_state(UtxosChangedState::None);
523                    let mutations = vec![Mutation::new(Command::Stop, UtxosChangedScope::default().into())];
524                    MutationOutcome::with_mutated(current.clone(), mutations)
525                }
526                (UtxosChangedState::All, UtxosChangedMutation::Remove) => {
527                    // State All + Mutation Remove(R) => No change
528                    MutationOutcome::new()
529                }
530                (UtxosChangedState::All, UtxosChangedMutation::Add) => {
531                    // State All + Mutation Add(A) => Mutated new state Selectee(A)
532                    let added = data.register(scope.addresses, context)?;
533                    data.update_state(UtxosChangedState::Selected);
534                    let mutations = match policies.utxo_changed {
535                        UtxosChangedMutationPolicy::AddressSet => vec![
536                            Mutation::new(Command::Start, UtxosChangedScope::new(added).into()),
537                            Mutation::new(Command::Stop, UtxosChangedScope::default().into()),
538                        ],
539                        UtxosChangedMutationPolicy::Wildcard => vec![],
540                    };
541                    MutationOutcome::with_mutated(current.clone(), mutations)
542                }
543                (UtxosChangedState::All, UtxosChangedMutation::All) => {
544                    // State All <= Mutation All
545                    MutationOutcome::new()
546                }
547            }
548        } else {
549            MutationOutcome::new()
550        };
551        Ok(outcome)
552    }
553}
554
555impl Subscription for UtxosChangedSubscription {
556    fn event_type(&self) -> EventType {
557        EventType::UtxosChanged
558    }
559
560    fn active(&self) -> bool {
561        self.state().active()
562    }
563
564    fn scope(&self, context: &SubscriptionContext) -> Scope {
565        // TODO: consider using a provided prefix
566        UtxosChangedScope::new(self.data().to_addresses(Prefix::Mainnet, context)).into()
567    }
568}
569
570impl BroadcastingSingle for DynSubscription {
571    fn broadcasting(self, context: &SubscriptionContext) -> DynSubscription {
572        match self.event_type() {
573            EventType::UtxosChanged => {
574                let utxos_changed_subscription = self.as_any().downcast_ref::<UtxosChangedSubscription>().unwrap();
575                match utxos_changed_subscription.to_all() {
576                    true => context.utxos_changed_subscription_to_all.clone(),
577                    false => self,
578                }
579            }
580            _ => self,
581        }
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::super::*;
588    use super::*;
589    use crate::{address::test_helpers::get_3_addresses, scope::BlockAddedScope};
590    use std::collections::hash_map::DefaultHasher;
591
592    #[test]
593    fn test_subscription_hash() {
594        struct Comparison {
595            left: usize,
596            right: usize,
597            should_match: bool,
598        }
599        impl Comparison {
600            fn new(left: usize, right: usize, should_match: bool) -> Self {
601                Self { left, right, should_match }
602            }
603            fn compare(&self, name: &str, subscriptions: &[DynSubscription]) {
604                let equal = if self.should_match { "be equal" } else { "not be equal" };
605                // Compare Box dyn Single
606                #[allow(clippy::op_ref)]
607                let cmp = &subscriptions[self.left] == &subscriptions[self.right];
608                assert_eq!(
609                    cmp, self.should_match,
610                    "{name}: subscriptions should {equal}, comparing {:?} with {:?}",
611                    &subscriptions[self.left], &subscriptions[self.right],
612                );
613                // Compare Box dyn Single hash
614                assert_eq!(
615                    get_hash(&subscriptions[self.left]) == get_hash(&subscriptions[self.right]),
616                    self.should_match,
617                    "{name}: subscription hashes should {equal}, comparing {:?} => {} with {:?} => {}",
618                    &subscriptions[self.left],
619                    get_hash(&subscriptions[self.left]),
620                    &subscriptions[self.right],
621                    get_hash(&subscriptions[self.right]),
622                );
623                // Compare Arc dyn Single
624                let left_arc = subscriptions[self.left].clone();
625                let right_arc = subscriptions[self.right].clone();
626                assert_eq!(
627                    *left_arc == *right_arc,
628                    self.should_match,
629                    "{name}: subscriptions should {equal}, comparing {left_arc:?} with {right_arc:?}",
630                );
631                // Compare Arc dyn Single hash
632                assert_eq!(
633                    get_hash(&left_arc) == get_hash(&right_arc),
634                    self.should_match,
635                    "{name}: subscription hashes should {equal}, comparing {:?} => {} with {:?} => {}",
636                    left_arc,
637                    get_hash(&left_arc),
638                    right_arc,
639                    get_hash(&right_arc),
640                );
641            }
642        }
643
644        struct Test {
645            name: &'static str,
646            subscriptions: Vec<DynSubscription>,
647            comparisons: Vec<Comparison>,
648        }
649
650        let context = SubscriptionContext::new();
651        let addresses = get_3_addresses(false);
652        let mut sorted_addresses = addresses.clone();
653        sorted_addresses.sort();
654
655        let tests: Vec<Test> = vec![
656            Test {
657                name: "test basic overall subscription",
658                subscriptions: vec![
659                    Arc::new(OverallSubscription::new(EventType::BlockAdded, false)),
660                    Arc::new(OverallSubscription::new(EventType::BlockAdded, true)),
661                    Arc::new(OverallSubscription::new(EventType::BlockAdded, true)),
662                ],
663                comparisons: vec![Comparison::new(0, 1, false), Comparison::new(0, 2, false), Comparison::new(1, 2, true)],
664            },
665            Test {
666                name: "test virtual selected parent chain changed subscription",
667                subscriptions: vec![
668                    Arc::new(VirtualChainChangedSubscription::new(false, false)),
669                    Arc::new(VirtualChainChangedSubscription::new(true, false)),
670                    Arc::new(VirtualChainChangedSubscription::new(true, true)),
671                    Arc::new(VirtualChainChangedSubscription::new(true, true)),
672                ],
673                comparisons: vec![
674                    Comparison::new(0, 1, false),
675                    Comparison::new(0, 2, false),
676                    Comparison::new(0, 3, false),
677                    Comparison::new(1, 2, false),
678                    Comparison::new(1, 3, false),
679                    Comparison::new(2, 3, true),
680                ],
681            },
682            Test {
683                name: "test utxos changed subscription",
684                subscriptions: vec![
685                    Arc::new(UtxosChangedSubscription::with_addresses(false, vec![], 0, &context)),
686                    Arc::new(UtxosChangedSubscription::with_addresses(true, addresses[0..2].to_vec(), 1, &context)),
687                    Arc::new(UtxosChangedSubscription::with_addresses(true, addresses[0..3].to_vec(), 2, &context)),
688                    Arc::new(UtxosChangedSubscription::with_addresses(true, sorted_addresses[0..3].to_vec(), 2, &context)),
689                    Arc::new(UtxosChangedSubscription::with_addresses(true, vec![], 3, &context)),
690                    Arc::new(UtxosChangedSubscription::with_addresses(true, vec![], 4, &context)),
691                ],
692                comparisons: vec![
693                    Comparison::new(0, 0, true),
694                    Comparison::new(0, 1, false),
695                    Comparison::new(0, 2, false),
696                    Comparison::new(0, 3, false),
697                    Comparison::new(0, 4, false),
698                    Comparison::new(0, 5, false),
699                    Comparison::new(1, 1, true),
700                    Comparison::new(1, 2, false),
701                    Comparison::new(1, 3, false),
702                    Comparison::new(1, 4, false),
703                    Comparison::new(1, 5, false),
704                    Comparison::new(2, 2, true),
705                    Comparison::new(2, 3, true),
706                    Comparison::new(2, 4, false),
707                    Comparison::new(2, 5, false),
708                    Comparison::new(3, 3, true),
709                    Comparison::new(3, 4, false),
710                    Comparison::new(3, 5, false),
711                    Comparison::new(4, 4, true),
712                    Comparison::new(4, 5, false),
713                    Comparison::new(5, 5, true),
714                ],
715            },
716        ];
717
718        for test in tests.iter() {
719            for comparison in test.comparisons.iter() {
720                comparison.compare(test.name, &test.subscriptions);
721            }
722        }
723    }
724
725    fn get_hash<T: Hash>(item: &T) -> u64 {
726        let mut hasher = DefaultHasher::default();
727        item.hash(&mut hasher);
728        hasher.finish()
729    }
730
731    struct MutationTest {
732        name: &'static str,
733        state: DynSubscription,
734        mutation: Mutation,
735        new_state: DynSubscription,
736        outcome: MutationOutcome,
737    }
738
739    struct MutationTests {
740        tests: Vec<MutationTest>,
741    }
742
743    impl MutationTests {
744        pub const LISTENER_ID: ListenerId = 1;
745
746        fn new(tests: Vec<MutationTest>) -> Self {
747            Self { tests }
748        }
749
750        fn run(&self, context: &SubscriptionContext) {
751            for test in self.tests.iter() {
752                let mut new_state = test.state.clone();
753                let outcome = new_state.mutate(test.mutation.clone(), Default::default(), context).unwrap();
754                assert_eq!(test.new_state.active(), new_state.active(), "Testing '{}': wrong new state activity", test.name);
755                assert_eq!(*test.new_state, *new_state, "Testing '{}': wrong new state", test.name);
756                assert_eq!(test.outcome.has_new_state(), outcome.has_new_state(), "Testing '{}': wrong new state presence", test.name);
757                assert_eq!(test.outcome.mutations, outcome.mutations, "Testing '{}': wrong mutations", test.name);
758            }
759        }
760    }
761
762    #[test]
763    fn test_overall_mutation() {
764        let context = SubscriptionContext::new();
765
766        fn s(active: bool) -> DynSubscription {
767            Arc::new(OverallSubscription { event_type: EventType::BlockAdded, active })
768        }
769        fn m(command: Command) -> Mutation {
770            Mutation { command, scope: Scope::BlockAdded(BlockAddedScope {}) }
771        }
772
773        // Subscriptions
774        let none = || s(false);
775        let all = || s(true);
776
777        // Mutations
778        let start_all = || m(Command::Start);
779        let stop_all = || m(Command::Stop);
780
781        // Tests
782        let tests = MutationTests::new(vec![
783            MutationTest {
784                name: "OverallSubscription None to All",
785                state: none(),
786                mutation: start_all(),
787                new_state: all(),
788                outcome: MutationOutcome::with_mutated(all(), vec![start_all()]),
789            },
790            MutationTest {
791                name: "OverallSubscription None to None",
792                state: none(),
793                mutation: stop_all(),
794                new_state: none(),
795                outcome: MutationOutcome::new(),
796            },
797            MutationTest {
798                name: "OverallSubscription All to All",
799                state: all(),
800                mutation: start_all(),
801                new_state: all(),
802                outcome: MutationOutcome::new(),
803            },
804            MutationTest {
805                name: "OverallSubscription All to None",
806                state: all(),
807                mutation: stop_all(),
808                new_state: none(),
809                outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
810            },
811        ]);
812        tests.run(&context)
813    }
814
815    #[test]
816    fn test_virtual_chain_changed_mutation() {
817        let context = SubscriptionContext::new();
818
819        fn s(active: bool, include_accepted_transaction_ids: bool) -> DynSubscription {
820            Arc::new(VirtualChainChangedSubscription { active, include_accepted_transaction_ids })
821        }
822        fn m(command: Command, include_accepted_transaction_ids: bool) -> Mutation {
823            Mutation { command, scope: Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }) }
824        }
825
826        // Subscriptions
827        let none = || s(false, false);
828        let reduced = || s(true, false);
829        let all = || s(true, true);
830
831        // Mutations
832        let start_all = || m(Command::Start, true);
833        let stop_all = || m(Command::Stop, true);
834        let start_reduced = || m(Command::Start, false);
835        let stop_reduced = || m(Command::Stop, false);
836
837        // Tests
838        let tests = MutationTests::new(vec![
839            MutationTest {
840                name: "VirtualChainChangedSubscription None to All",
841                state: none(),
842                mutation: start_all(),
843                new_state: all(),
844                outcome: MutationOutcome::with_mutated(all(), vec![start_all()]),
845            },
846            MutationTest {
847                name: "VirtualChainChangedSubscription None to Reduced",
848                state: none(),
849                mutation: start_reduced(),
850                new_state: reduced(),
851                outcome: MutationOutcome::with_mutated(reduced(), vec![start_reduced()]),
852            },
853            MutationTest {
854                name: "VirtualChainChangedSubscription None to None (stop reduced)",
855                state: none(),
856                mutation: stop_reduced(),
857                new_state: none(),
858                outcome: MutationOutcome::new(),
859            },
860            MutationTest {
861                name: "VirtualChainChangedSubscription None to None (stop all)",
862                state: none(),
863                mutation: stop_all(),
864                new_state: none(),
865                outcome: MutationOutcome::new(),
866            },
867            MutationTest {
868                name: "VirtualChainChangedSubscription Reduced to All",
869                state: reduced(),
870                mutation: start_all(),
871                new_state: all(),
872                outcome: MutationOutcome::with_mutated(all(), vec![stop_reduced(), start_all()]),
873            },
874            MutationTest {
875                name: "VirtualChainChangedSubscription Reduced to Reduced",
876                state: reduced(),
877                mutation: start_reduced(),
878                new_state: reduced(),
879                outcome: MutationOutcome::new(),
880            },
881            MutationTest {
882                name: "VirtualChainChangedSubscription Reduced to None (stop reduced)",
883                state: reduced(),
884                mutation: stop_reduced(),
885                new_state: none(),
886                outcome: MutationOutcome::with_mutated(none(), vec![stop_reduced()]),
887            },
888            MutationTest {
889                name: "VirtualChainChangedSubscription Reduced to None (stop all)",
890                state: reduced(),
891                mutation: stop_all(),
892                new_state: none(),
893                outcome: MutationOutcome::with_mutated(none(), vec![stop_reduced()]),
894            },
895            MutationTest {
896                name: "VirtualChainChangedSubscription All to All",
897                state: all(),
898                mutation: start_all(),
899                new_state: all(),
900                outcome: MutationOutcome::new(),
901            },
902            MutationTest {
903                name: "VirtualChainChangedSubscription All to Reduced",
904                state: all(),
905                mutation: start_reduced(),
906                new_state: reduced(),
907                outcome: MutationOutcome::with_mutated(reduced(), vec![start_reduced(), stop_all()]),
908            },
909            MutationTest {
910                name: "VirtualChainChangedSubscription All to None (stop reduced)",
911                state: all(),
912                mutation: stop_reduced(),
913                new_state: none(),
914                outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
915            },
916            MutationTest {
917                name: "VirtualChainChangedSubscription All to None (stop all)",
918                state: all(),
919                mutation: stop_all(),
920                new_state: none(),
921                outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
922            },
923        ]);
924        tests.run(&context)
925    }
926
927    #[test]
928    fn test_utxos_changed_mutation() {
929        let context = SubscriptionContext::new();
930        let a_stock = get_3_addresses(true);
931
932        let av = |indexes: &[usize]| indexes.iter().map(|idx| (a_stock[*idx]).clone()).collect::<Vec<_>>();
933        let ah = |indexes: &[usize]| indexes.iter().map(|idx| (a_stock[*idx]).clone()).collect::<Vec<_>>();
934        let s = |active: bool, indexes: &[usize]| {
935            Arc::new(UtxosChangedSubscription::with_addresses(active, ah(indexes).to_vec(), MutationTests::LISTENER_ID, &context))
936                as DynSubscription
937        };
938        let m = |command: Command, indexes: &[usize]| -> Mutation {
939            Mutation { command, scope: Scope::UtxosChanged(UtxosChangedScope::new(av(indexes))) }
940        };
941
942        // Subscriptions
943        let none = || s(false, &[]);
944        let selected_0 = || s(true, &[0]);
945        let selected_1 = || s(true, &[1]);
946        let selected_2 = || s(true, &[2]);
947        let selected_01 = || s(true, &[0, 1]);
948        let selected_02 = || s(true, &[0, 2]);
949        let selected_012 = || s(true, &[0, 1, 2]);
950        let all = || s(true, &[]);
951
952        // Mutations
953        let start_all = || m(Command::Start, &[]);
954        let stop_all = || m(Command::Stop, &[]);
955        let start_0 = || m(Command::Start, &[0]);
956        let start_1 = || m(Command::Start, &[1]);
957        let start_01 = || m(Command::Start, &[0, 1]);
958        let stop_0 = || m(Command::Stop, &[0]);
959        let stop_1 = || m(Command::Stop, &[1]);
960        let stop_01 = || m(Command::Stop, &[0, 1]);
961
962        // Tests
963        let tests = MutationTests::new(vec![
964            MutationTest {
965                name: "UtxosChangedSubscription None to All (add all)",
966                state: none(),
967                mutation: start_all(),
968                new_state: all(),
969                outcome: MutationOutcome::with_mutated(all(), vec![start_all()]),
970            },
971            MutationTest {
972                name: "UtxosChangedSubscription None to Selected 0 (add set)",
973                state: none(),
974                mutation: start_0(),
975                new_state: selected_0(),
976                outcome: MutationOutcome::with_mutated(selected_0(), vec![start_0()]),
977            },
978            MutationTest {
979                name: "UtxosChangedSubscription None to None (stop set)",
980                state: none(),
981                mutation: stop_0(),
982                new_state: none(),
983                outcome: MutationOutcome::new(),
984            },
985            MutationTest {
986                name: "UtxosChangedSubscription None to None (stop all)",
987                state: none(),
988                mutation: stop_all(),
989                new_state: none(),
990                outcome: MutationOutcome::new(),
991            },
992            MutationTest {
993                name: "UtxosChangedSubscription Selected 01 to All (add all)",
994                state: selected_01(),
995                mutation: start_all(),
996                new_state: all(),
997                outcome: MutationOutcome::with_mutated(all(), vec![stop_01(), start_all()]),
998            },
999            MutationTest {
1000                name: "UtxosChangedSubscription Selected 01 to 01 (add set with total intersection)",
1001                state: selected_01(),
1002                mutation: start_1(),
1003                new_state: selected_01(),
1004                outcome: MutationOutcome::new(),
1005            },
1006            MutationTest {
1007                name: "UtxosChangedSubscription Selected 0 to 01 (add set with partial intersection)",
1008                state: selected_0(),
1009                mutation: start_01(),
1010                new_state: selected_01(),
1011                outcome: MutationOutcome::with_mutations(vec![start_1()]),
1012            },
1013            MutationTest {
1014                name: "UtxosChangedSubscription Selected 2 to 012 (add set with no intersection)",
1015                state: selected_2(),
1016                mutation: start_01(),
1017                new_state: selected_012(),
1018                outcome: MutationOutcome::with_mutations(vec![start_01()]),
1019            },
1020            MutationTest {
1021                name: "UtxosChangedSubscription Selected 01 to None (remove superset)",
1022                state: selected_1(),
1023                mutation: stop_01(),
1024                new_state: none(),
1025                outcome: MutationOutcome::with_mutated(none(), vec![stop_1()]),
1026            },
1027            MutationTest {
1028                name: "UtxosChangedSubscription Selected 01 to None (remove set with total intersection)",
1029                state: selected_01(),
1030                mutation: stop_01(),
1031                new_state: none(),
1032                outcome: MutationOutcome::with_mutated(none(), vec![stop_01()]),
1033            },
1034            MutationTest {
1035                name: "UtxosChangedSubscription Selected 02 to 2 (remove set with partial intersection)",
1036                state: selected_02(),
1037                mutation: stop_01(),
1038                new_state: selected_2(),
1039                outcome: MutationOutcome::with_mutations(vec![stop_0()]),
1040            },
1041            MutationTest {
1042                name: "UtxosChangedSubscription Selected 02 to 02 (remove set with no intersection)",
1043                state: selected_02(),
1044                mutation: stop_1(),
1045                new_state: selected_02(),
1046                outcome: MutationOutcome::new(),
1047            },
1048            MutationTest {
1049                name: "UtxosChangedSubscription All to All (add all)",
1050                state: all(),
1051                mutation: start_all(),
1052                new_state: all(),
1053                outcome: MutationOutcome::new(),
1054            },
1055            MutationTest {
1056                name: "UtxosChangedSubscription All to Selected 01 (add set)",
1057                state: all(),
1058                mutation: start_01(),
1059                new_state: selected_01(),
1060                outcome: MutationOutcome::with_mutated(selected_01(), vec![start_01(), stop_all()]),
1061            },
1062            MutationTest {
1063                name: "UtxosChangedSubscription All to All (remove set)",
1064                state: all(),
1065                mutation: stop_01(),
1066                new_state: all(),
1067                outcome: MutationOutcome::new(),
1068            },
1069            MutationTest {
1070                name: "UtxosChangedSubscription All to None (remove all)",
1071                state: all(),
1072                mutation: stop_all(),
1073                new_state: none(),
1074                outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
1075            },
1076        ]);
1077        tests.run(&context)
1078    }
1079}