1use crate::{
2 address::tracker::{Index, Indexes},
3 error::Result,
4 events::EventType,
5 listener::ListenerId,
6 scope::{Scope, UtxosChangedScope, VirtualChainChangedScope},
7 subscription::{
8 context::SubscriptionContext, BroadcastingSingle, Command, DynSubscription, Mutation, MutationOutcome, MutationPolicies,
9 Single, Subscription, UtxosChangedMutationPolicy,
10 },
11};
12use itertools::Itertools;
13use kaspa_addresses::{Address, Prefix};
14use kaspa_consensus_core::tx::ScriptPublicKey;
15use kaspa_core::trace;
16use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
17use std::{
18 collections::hash_set,
19 fmt::{Debug, Display},
20 hash::{Hash, Hasher},
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27#[derive(Eq, PartialEq, Hash, Clone, Debug)]
31pub struct OverallSubscription {
32 event_type: EventType,
33 active: bool,
34}
35
36impl OverallSubscription {
37 pub fn new(event_type: EventType, active: bool) -> Self {
38 Self { event_type, active }
39 }
40}
41
42impl Single for OverallSubscription {
43 fn apply_mutation(
44 &self,
45 _: &Arc<dyn Single>,
46 mutation: Mutation,
47 _: MutationPolicies,
48 _: &SubscriptionContext,
49 ) -> Result<MutationOutcome> {
50 assert_eq!(self.event_type(), mutation.event_type());
51 Ok(if self.active != mutation.active() {
52 let mutated = Self::new(self.event_type, mutation.active());
53 MutationOutcome::with_mutated(Arc::new(mutated), vec![mutation])
54 } else {
55 MutationOutcome::new()
56 })
57 }
58}
59
60impl Subscription for OverallSubscription {
61 #[inline(always)]
62 fn event_type(&self) -> EventType {
63 self.event_type
64 }
65
66 #[inline(always)]
67 fn active(&self) -> bool {
68 self.active
69 }
70
71 fn scope(&self, _context: &SubscriptionContext) -> Scope {
72 self.event_type.into()
73 }
74}
75
76#[derive(Eq, PartialEq, Hash, Clone, Debug, Default)]
78pub struct VirtualChainChangedSubscription {
79 active: bool,
80 include_accepted_transaction_ids: bool,
81}
82
83impl VirtualChainChangedSubscription {
84 pub fn new(active: bool, include_accepted_transaction_ids: bool) -> Self {
85 Self { active, include_accepted_transaction_ids }
86 }
87 pub fn include_accepted_transaction_ids(&self) -> bool {
88 self.include_accepted_transaction_ids
89 }
90}
91
92impl Single for VirtualChainChangedSubscription {
93 fn apply_mutation(
94 &self,
95 _: &Arc<dyn Single>,
96 mutation: Mutation,
97 _: MutationPolicies,
98 _: &SubscriptionContext,
99 ) -> Result<MutationOutcome> {
100 assert_eq!(self.event_type(), mutation.event_type());
101 let result = if let Scope::VirtualChainChanged(ref scope) = mutation.scope {
102 #[allow(clippy::collapsible_else_if)]
105 if !self.active {
106 if !mutation.active() {
108 None
110 } else {
111 let mutated = Self::new(true, scope.include_accepted_transaction_ids);
114 Some((Arc::new(mutated), vec![mutation]))
115 }
116 } else if !self.include_accepted_transaction_ids {
117 if !mutation.active() {
119 let mutated = Self::new(false, false);
121 Some((Arc::new(mutated), vec![Mutation::new(Command::Stop, VirtualChainChangedScope::new(false).into())]))
122 } else if !scope.include_accepted_transaction_ids {
123 None
125 } else {
126 let mutated = Self::new(true, true);
128 Some((
129 Arc::new(mutated),
130 vec![Mutation::new(Command::Stop, VirtualChainChangedScope::new(false).into()), mutation],
131 ))
132 }
133 } else {
134 if !mutation.active() {
136 let mutated = Self::new(false, false);
138 Some((Arc::new(mutated), vec![Mutation::new(Command::Stop, VirtualChainChangedScope::new(true).into())]))
139 } else if !scope.include_accepted_transaction_ids {
140 let mutated = Self::new(true, false);
142 Some((Arc::new(mutated), vec![mutation, Mutation::new(Command::Stop, VirtualChainChangedScope::new(true).into())]))
143 } else {
144 None
146 }
147 }
148 } else {
149 None
150 };
151 let outcome = match result {
152 Some((mutated, mutations)) => MutationOutcome::with_mutated(mutated, mutations),
153 None => MutationOutcome::new(),
154 };
155 Ok(outcome)
156 }
157}
158
159impl Subscription for VirtualChainChangedSubscription {
160 #[inline(always)]
161 fn event_type(&self) -> EventType {
162 EventType::VirtualChainChanged
163 }
164
165 #[inline(always)]
166 fn active(&self) -> bool {
167 self.active
168 }
169
170 fn scope(&self, _context: &SubscriptionContext) -> Scope {
171 VirtualChainChangedScope::new(self.include_accepted_transaction_ids).into()
172 }
173}
174
175static UTXOS_CHANGED_SUBSCRIPTIONS: AtomicUsize = AtomicUsize::new(0);
176
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178enum UtxosChangedMutation {
179 None,
180 Remove,
181 Add,
182 All,
183}
184
185impl From<(Command, &UtxosChangedScope)> for UtxosChangedMutation {
186 fn from((command, scope): (Command, &UtxosChangedScope)) -> Self {
187 match (command, scope.addresses.is_empty()) {
188 (Command::Stop, true) => Self::None,
189 (Command::Stop, false) => Self::Remove,
190 (Command::Start, false) => Self::Add,
191 (Command::Start, true) => Self::All,
192 }
193 }
194}
195
196#[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq)]
197pub enum UtxosChangedState {
198 #[default]
200 None,
201
202 Selected,
204
205 All,
207}
208
209impl UtxosChangedState {
210 pub fn active(&self) -> bool {
211 match self {
212 UtxosChangedState::None => false,
213 UtxosChangedState::Selected | UtxosChangedState::All => true,
214 }
215 }
216}
217
218impl Display for UtxosChangedState {
219 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 match self {
221 UtxosChangedState::None => write!(f, "none"),
222 UtxosChangedState::Selected => write!(f, "selected"),
223 UtxosChangedState::All => write!(f, "all"),
224 }
225 }
226}
227
228#[derive(Debug, Clone)]
229pub struct UtxosChangedSubscriptionData {
230 state: UtxosChangedState,
234
235 indexes: Indexes,
239}
240
241impl UtxosChangedSubscriptionData {
242 fn with_capacity(state: UtxosChangedState, capacity: usize) -> Self {
243 let indexes = Indexes::with_capacity(capacity);
244 Self { state, indexes }
245 }
246
247 #[inline(always)]
248 pub fn update_state(&mut self, new_state: UtxosChangedState) {
249 self.state = new_state;
250 }
251
252 pub fn contains(&self, spk: &ScriptPublicKey, context: &SubscriptionContext) -> bool {
253 context.address_tracker.contains(&self.indexes, spk)
254 }
255
256 pub fn len(&self) -> usize {
257 self.indexes.len()
258 }
259
260 pub fn is_empty(&self) -> bool {
261 self.indexes.is_empty()
262 }
263
264 pub fn capacity(&self) -> usize {
265 self.indexes.capacity()
266 }
267
268 pub fn iter(&self) -> hash_set::Iter<'_, Index> {
269 self.indexes.iter()
270 }
271
272 pub fn contains_address(&self, address: &Address, context: &SubscriptionContext) -> bool {
273 context.address_tracker.contains_address(&self.indexes, address)
274 }
275
276 pub fn to_addresses(&self, prefix: Prefix, context: &SubscriptionContext) -> Vec<Address> {
277 self.indexes.iter().filter_map(|index| context.address_tracker.get_address_at_index(*index, prefix)).collect_vec()
278 }
279
280 pub fn register(&mut self, addresses: Vec<Address>, context: &SubscriptionContext) -> Result<Vec<Address>> {
281 Ok(context.address_tracker.register(&mut self.indexes, addresses)?)
282 }
283
284 pub fn unregister(&mut self, addresses: Vec<Address>, context: &SubscriptionContext) -> Vec<Address> {
285 context.address_tracker.unregister(&mut self.indexes, addresses)
286 }
287
288 pub fn unregister_indexes(&mut self, context: &SubscriptionContext) -> Vec<Address> {
289 let removed = self.to_addresses(Prefix::Mainnet, context);
291 context.address_tracker.unregister_indexes(&mut self.indexes);
292 removed
293 }
294
295 pub fn to_all(&self) -> bool {
296 matches!(self.state, UtxosChangedState::All)
297 }
298}
299
300impl Display for UtxosChangedSubscriptionData {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 match self.state {
303 UtxosChangedState::None | UtxosChangedState::All => write!(f, "{}", self.state),
304 UtxosChangedState::Selected => write!(f, "{}({})", self.state, self.indexes.len()),
305 }
306 }
307}
308
309#[derive(Debug)]
310pub struct UtxosChangedSubscription {
311 data: RwLock<UtxosChangedSubscriptionData>,
313
314 listener_id: ListenerId,
318}
319
320impl UtxosChangedSubscription {
321 pub fn new(state: UtxosChangedState, listener_id: ListenerId) -> Self {
322 Self::with_capacity(state, listener_id, 0)
323 }
324
325 pub fn with_capacity(state: UtxosChangedState, listener_id: ListenerId, capacity: usize) -> Self {
326 let data = RwLock::new(UtxosChangedSubscriptionData::with_capacity(state, capacity));
327 let subscription = Self { data, listener_id };
328 trace!(
329 "UtxosChangedSubscription: {} in total (new {})",
330 UTXOS_CHANGED_SUBSCRIPTIONS.fetch_add(1, Ordering::SeqCst) + 1,
331 subscription
332 );
333 subscription
334 }
335
336 #[cfg(test)]
337 pub fn with_addresses(active: bool, addresses: Vec<Address>, listener_id: ListenerId, context: &SubscriptionContext) -> Self {
338 let state = match (active, addresses.is_empty()) {
339 (false, _) => UtxosChangedState::None,
340 (true, false) => UtxosChangedState::Selected,
341 (true, true) => UtxosChangedState::All,
342 };
343 let subscription = Self::with_capacity(state, listener_id, addresses.len());
344 let _ = subscription.data_mut().register(addresses, context);
345 subscription
346 }
347
348 pub fn data(&self) -> RwLockReadGuard<UtxosChangedSubscriptionData> {
349 self.data.read()
350 }
351
352 pub fn data_mut(&self) -> RwLockWriteGuard<UtxosChangedSubscriptionData> {
353 self.data.write()
354 }
355
356 #[inline(always)]
357 pub fn state(&self) -> UtxosChangedState {
358 self.data().state
359 }
360
361 pub fn to_all(&self) -> bool {
362 matches!(self.data().state, UtxosChangedState::All)
363 }
364}
365
366impl Clone for UtxosChangedSubscription {
367 fn clone(&self) -> Self {
368 let subscription = Self { data: RwLock::new(self.data().clone()), listener_id: self.listener_id };
369 trace!(
370 "UtxosChangedSubscription: {} in total (clone {})",
371 UTXOS_CHANGED_SUBSCRIPTIONS.fetch_add(1, Ordering::SeqCst) + 1,
372 subscription
373 );
374 subscription
375 }
376}
377
378impl Display for UtxosChangedSubscription {
379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380 write!(f, "{}", self.data())
381 }
382}
383
384impl Drop for UtxosChangedSubscription {
385 fn drop(&mut self) {
386 trace!(
387 "UtxosChangedSubscription: {} in total (drop {})",
388 UTXOS_CHANGED_SUBSCRIPTIONS.fetch_sub(1, Ordering::SeqCst) - 1,
389 self
390 );
391 }
392}
393
394impl PartialEq for UtxosChangedSubscription {
395 fn eq(&self, other: &Self) -> bool {
397 self.listener_id == other.listener_id
398 }
399}
400impl Eq for UtxosChangedSubscription {}
401
402impl Hash for UtxosChangedSubscription {
403 fn hash<H: Hasher>(&self, state: &mut H) {
405 self.listener_id.hash(state);
406 }
407}
408
409impl Single for UtxosChangedSubscription {
410 fn apply_mutation(
411 &self,
412 current: &Arc<dyn Single>,
413 mutation: Mutation,
414 policies: MutationPolicies,
415 context: &SubscriptionContext,
416 ) -> Result<MutationOutcome> {
417 assert_eq!(self.event_type(), mutation.event_type());
418 let outcome = if let Scope::UtxosChanged(scope) = mutation.scope {
419 let mut data = self.data_mut();
420 let state = data.state;
421 let mutation_type = UtxosChangedMutation::from((mutation.command, &scope));
422 match (state, mutation_type) {
423 (UtxosChangedState::None, UtxosChangedMutation::None | UtxosChangedMutation::Remove) => {
424 MutationOutcome::new()
426 }
427 (UtxosChangedState::None, UtxosChangedMutation::Add) => {
428 let addresses = data.register(scope.addresses, context)?;
430 data.update_state(UtxosChangedState::Selected);
431 let mutations = match policies.utxo_changed {
432 UtxosChangedMutationPolicy::AddressSet => {
433 vec![Mutation::new(mutation.command, UtxosChangedScope::new(addresses).into())]
434 }
435 UtxosChangedMutationPolicy::Wildcard => {
436 vec![Mutation::new(mutation.command, UtxosChangedScope::default().into())]
437 }
438 };
439 MutationOutcome::with_mutated(current.clone(), mutations)
440 }
441 (UtxosChangedState::None, UtxosChangedMutation::All) => {
442 data.update_state(UtxosChangedState::All);
444 let mutations = vec![Mutation::new(mutation.command, UtxosChangedScope::default().into())];
445 MutationOutcome::with_mutated(current.clone(), mutations)
446 }
447 (UtxosChangedState::Selected, UtxosChangedMutation::None) => {
448 data.update_state(UtxosChangedState::None);
450 let removed = data.unregister_indexes(context);
451 assert!(!removed.is_empty(), "state Selected implies a non empty address set");
452 let mutations = match policies.utxo_changed {
453 UtxosChangedMutationPolicy::AddressSet => {
454 vec![Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into())]
455 }
456 UtxosChangedMutationPolicy::Wildcard => {
457 vec![Mutation::new(Command::Stop, UtxosChangedScope::default().into())]
458 }
459 };
460 MutationOutcome::with_mutated(current.clone(), mutations)
461 }
462 (UtxosChangedState::Selected, UtxosChangedMutation::Remove) => {
463 let removed = data.unregister(scope.addresses, context);
465 match (removed.is_empty(), data.indexes.is_empty()) {
466 (false, false) => {
467 let mutations = match policies.utxo_changed {
468 UtxosChangedMutationPolicy::AddressSet => {
469 vec![Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into())]
470 }
471 UtxosChangedMutationPolicy::Wildcard => vec![],
472 };
473 MutationOutcome::with_mutations(mutations)
474 }
475 (false, true) => {
476 data.update_state(UtxosChangedState::None);
477 let mutations = match policies.utxo_changed {
478 UtxosChangedMutationPolicy::AddressSet => {
479 vec![Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into())]
480 }
481 UtxosChangedMutationPolicy::Wildcard => {
482 vec![Mutation::new(Command::Stop, UtxosChangedScope::default().into())]
483 }
484 };
485 MutationOutcome::with_mutated(current.clone(), mutations)
486 }
487 (true, _) => MutationOutcome::new(),
488 }
489 }
490 (UtxosChangedState::Selected, UtxosChangedMutation::Add) => {
491 let added = data.register(scope.addresses, context)?;
493 match added.is_empty() {
494 false => {
495 let mutations = match policies.utxo_changed {
496 UtxosChangedMutationPolicy::AddressSet => {
497 vec![Mutation::new(Command::Start, UtxosChangedScope::new(added).into())]
498 }
499 UtxosChangedMutationPolicy::Wildcard => vec![],
500 };
501 MutationOutcome::with_mutations(mutations)
502 }
503 true => MutationOutcome::new(),
504 }
505 }
506 (UtxosChangedState::Selected, UtxosChangedMutation::All) => {
507 let removed = data.unregister_indexes(context);
509 assert!(!removed.is_empty(), "state Selected implies a non empty address set");
510 data.update_state(UtxosChangedState::All);
511 let mutations = match policies.utxo_changed {
512 UtxosChangedMutationPolicy::AddressSet => vec![
513 Mutation::new(Command::Stop, UtxosChangedScope::new(removed).into()),
514 Mutation::new(Command::Start, UtxosChangedScope::default().into()),
515 ],
516 UtxosChangedMutationPolicy::Wildcard => vec![],
517 };
518 MutationOutcome::with_mutated(current.clone(), mutations)
519 }
520 (UtxosChangedState::All, UtxosChangedMutation::None) => {
521 data.update_state(UtxosChangedState::None);
523 let mutations = vec![Mutation::new(Command::Stop, UtxosChangedScope::default().into())];
524 MutationOutcome::with_mutated(current.clone(), mutations)
525 }
526 (UtxosChangedState::All, UtxosChangedMutation::Remove) => {
527 MutationOutcome::new()
529 }
530 (UtxosChangedState::All, UtxosChangedMutation::Add) => {
531 let added = data.register(scope.addresses, context)?;
533 data.update_state(UtxosChangedState::Selected);
534 let mutations = match policies.utxo_changed {
535 UtxosChangedMutationPolicy::AddressSet => vec![
536 Mutation::new(Command::Start, UtxosChangedScope::new(added).into()),
537 Mutation::new(Command::Stop, UtxosChangedScope::default().into()),
538 ],
539 UtxosChangedMutationPolicy::Wildcard => vec![],
540 };
541 MutationOutcome::with_mutated(current.clone(), mutations)
542 }
543 (UtxosChangedState::All, UtxosChangedMutation::All) => {
544 MutationOutcome::new()
546 }
547 }
548 } else {
549 MutationOutcome::new()
550 };
551 Ok(outcome)
552 }
553}
554
555impl Subscription for UtxosChangedSubscription {
556 fn event_type(&self) -> EventType {
557 EventType::UtxosChanged
558 }
559
560 fn active(&self) -> bool {
561 self.state().active()
562 }
563
564 fn scope(&self, context: &SubscriptionContext) -> Scope {
565 UtxosChangedScope::new(self.data().to_addresses(Prefix::Mainnet, context)).into()
567 }
568}
569
570impl BroadcastingSingle for DynSubscription {
571 fn broadcasting(self, context: &SubscriptionContext) -> DynSubscription {
572 match self.event_type() {
573 EventType::UtxosChanged => {
574 let utxos_changed_subscription = self.as_any().downcast_ref::<UtxosChangedSubscription>().unwrap();
575 match utxos_changed_subscription.to_all() {
576 true => context.utxos_changed_subscription_to_all.clone(),
577 false => self,
578 }
579 }
580 _ => self,
581 }
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::super::*;
588 use super::*;
589 use crate::{address::test_helpers::get_3_addresses, scope::BlockAddedScope};
590 use std::collections::hash_map::DefaultHasher;
591
592 #[test]
593 fn test_subscription_hash() {
594 struct Comparison {
595 left: usize,
596 right: usize,
597 should_match: bool,
598 }
599 impl Comparison {
600 fn new(left: usize, right: usize, should_match: bool) -> Self {
601 Self { left, right, should_match }
602 }
603 fn compare(&self, name: &str, subscriptions: &[DynSubscription]) {
604 let equal = if self.should_match { "be equal" } else { "not be equal" };
605 #[allow(clippy::op_ref)]
607 let cmp = &subscriptions[self.left] == &subscriptions[self.right];
608 assert_eq!(
609 cmp, self.should_match,
610 "{name}: subscriptions should {equal}, comparing {:?} with {:?}",
611 &subscriptions[self.left], &subscriptions[self.right],
612 );
613 assert_eq!(
615 get_hash(&subscriptions[self.left]) == get_hash(&subscriptions[self.right]),
616 self.should_match,
617 "{name}: subscription hashes should {equal}, comparing {:?} => {} with {:?} => {}",
618 &subscriptions[self.left],
619 get_hash(&subscriptions[self.left]),
620 &subscriptions[self.right],
621 get_hash(&subscriptions[self.right]),
622 );
623 let left_arc = subscriptions[self.left].clone();
625 let right_arc = subscriptions[self.right].clone();
626 assert_eq!(
627 *left_arc == *right_arc,
628 self.should_match,
629 "{name}: subscriptions should {equal}, comparing {left_arc:?} with {right_arc:?}",
630 );
631 assert_eq!(
633 get_hash(&left_arc) == get_hash(&right_arc),
634 self.should_match,
635 "{name}: subscription hashes should {equal}, comparing {:?} => {} with {:?} => {}",
636 left_arc,
637 get_hash(&left_arc),
638 right_arc,
639 get_hash(&right_arc),
640 );
641 }
642 }
643
644 struct Test {
645 name: &'static str,
646 subscriptions: Vec<DynSubscription>,
647 comparisons: Vec<Comparison>,
648 }
649
650 let context = SubscriptionContext::new();
651 let addresses = get_3_addresses(false);
652 let mut sorted_addresses = addresses.clone();
653 sorted_addresses.sort();
654
655 let tests: Vec<Test> = vec![
656 Test {
657 name: "test basic overall subscription",
658 subscriptions: vec![
659 Arc::new(OverallSubscription::new(EventType::BlockAdded, false)),
660 Arc::new(OverallSubscription::new(EventType::BlockAdded, true)),
661 Arc::new(OverallSubscription::new(EventType::BlockAdded, true)),
662 ],
663 comparisons: vec![Comparison::new(0, 1, false), Comparison::new(0, 2, false), Comparison::new(1, 2, true)],
664 },
665 Test {
666 name: "test virtual selected parent chain changed subscription",
667 subscriptions: vec![
668 Arc::new(VirtualChainChangedSubscription::new(false, false)),
669 Arc::new(VirtualChainChangedSubscription::new(true, false)),
670 Arc::new(VirtualChainChangedSubscription::new(true, true)),
671 Arc::new(VirtualChainChangedSubscription::new(true, true)),
672 ],
673 comparisons: vec![
674 Comparison::new(0, 1, false),
675 Comparison::new(0, 2, false),
676 Comparison::new(0, 3, false),
677 Comparison::new(1, 2, false),
678 Comparison::new(1, 3, false),
679 Comparison::new(2, 3, true),
680 ],
681 },
682 Test {
683 name: "test utxos changed subscription",
684 subscriptions: vec![
685 Arc::new(UtxosChangedSubscription::with_addresses(false, vec![], 0, &context)),
686 Arc::new(UtxosChangedSubscription::with_addresses(true, addresses[0..2].to_vec(), 1, &context)),
687 Arc::new(UtxosChangedSubscription::with_addresses(true, addresses[0..3].to_vec(), 2, &context)),
688 Arc::new(UtxosChangedSubscription::with_addresses(true, sorted_addresses[0..3].to_vec(), 2, &context)),
689 Arc::new(UtxosChangedSubscription::with_addresses(true, vec![], 3, &context)),
690 Arc::new(UtxosChangedSubscription::with_addresses(true, vec![], 4, &context)),
691 ],
692 comparisons: vec![
693 Comparison::new(0, 0, true),
694 Comparison::new(0, 1, false),
695 Comparison::new(0, 2, false),
696 Comparison::new(0, 3, false),
697 Comparison::new(0, 4, false),
698 Comparison::new(0, 5, false),
699 Comparison::new(1, 1, true),
700 Comparison::new(1, 2, false),
701 Comparison::new(1, 3, false),
702 Comparison::new(1, 4, false),
703 Comparison::new(1, 5, false),
704 Comparison::new(2, 2, true),
705 Comparison::new(2, 3, true),
706 Comparison::new(2, 4, false),
707 Comparison::new(2, 5, false),
708 Comparison::new(3, 3, true),
709 Comparison::new(3, 4, false),
710 Comparison::new(3, 5, false),
711 Comparison::new(4, 4, true),
712 Comparison::new(4, 5, false),
713 Comparison::new(5, 5, true),
714 ],
715 },
716 ];
717
718 for test in tests.iter() {
719 for comparison in test.comparisons.iter() {
720 comparison.compare(test.name, &test.subscriptions);
721 }
722 }
723 }
724
725 fn get_hash<T: Hash>(item: &T) -> u64 {
726 let mut hasher = DefaultHasher::default();
727 item.hash(&mut hasher);
728 hasher.finish()
729 }
730
731 struct MutationTest {
732 name: &'static str,
733 state: DynSubscription,
734 mutation: Mutation,
735 new_state: DynSubscription,
736 outcome: MutationOutcome,
737 }
738
739 struct MutationTests {
740 tests: Vec<MutationTest>,
741 }
742
743 impl MutationTests {
744 pub const LISTENER_ID: ListenerId = 1;
745
746 fn new(tests: Vec<MutationTest>) -> Self {
747 Self { tests }
748 }
749
750 fn run(&self, context: &SubscriptionContext) {
751 for test in self.tests.iter() {
752 let mut new_state = test.state.clone();
753 let outcome = new_state.mutate(test.mutation.clone(), Default::default(), context).unwrap();
754 assert_eq!(test.new_state.active(), new_state.active(), "Testing '{}': wrong new state activity", test.name);
755 assert_eq!(*test.new_state, *new_state, "Testing '{}': wrong new state", test.name);
756 assert_eq!(test.outcome.has_new_state(), outcome.has_new_state(), "Testing '{}': wrong new state presence", test.name);
757 assert_eq!(test.outcome.mutations, outcome.mutations, "Testing '{}': wrong mutations", test.name);
758 }
759 }
760 }
761
762 #[test]
763 fn test_overall_mutation() {
764 let context = SubscriptionContext::new();
765
766 fn s(active: bool) -> DynSubscription {
767 Arc::new(OverallSubscription { event_type: EventType::BlockAdded, active })
768 }
769 fn m(command: Command) -> Mutation {
770 Mutation { command, scope: Scope::BlockAdded(BlockAddedScope {}) }
771 }
772
773 let none = || s(false);
775 let all = || s(true);
776
777 let start_all = || m(Command::Start);
779 let stop_all = || m(Command::Stop);
780
781 let tests = MutationTests::new(vec![
783 MutationTest {
784 name: "OverallSubscription None to All",
785 state: none(),
786 mutation: start_all(),
787 new_state: all(),
788 outcome: MutationOutcome::with_mutated(all(), vec![start_all()]),
789 },
790 MutationTest {
791 name: "OverallSubscription None to None",
792 state: none(),
793 mutation: stop_all(),
794 new_state: none(),
795 outcome: MutationOutcome::new(),
796 },
797 MutationTest {
798 name: "OverallSubscription All to All",
799 state: all(),
800 mutation: start_all(),
801 new_state: all(),
802 outcome: MutationOutcome::new(),
803 },
804 MutationTest {
805 name: "OverallSubscription All to None",
806 state: all(),
807 mutation: stop_all(),
808 new_state: none(),
809 outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
810 },
811 ]);
812 tests.run(&context)
813 }
814
815 #[test]
816 fn test_virtual_chain_changed_mutation() {
817 let context = SubscriptionContext::new();
818
819 fn s(active: bool, include_accepted_transaction_ids: bool) -> DynSubscription {
820 Arc::new(VirtualChainChangedSubscription { active, include_accepted_transaction_ids })
821 }
822 fn m(command: Command, include_accepted_transaction_ids: bool) -> Mutation {
823 Mutation { command, scope: Scope::VirtualChainChanged(VirtualChainChangedScope { include_accepted_transaction_ids }) }
824 }
825
826 let none = || s(false, false);
828 let reduced = || s(true, false);
829 let all = || s(true, true);
830
831 let start_all = || m(Command::Start, true);
833 let stop_all = || m(Command::Stop, true);
834 let start_reduced = || m(Command::Start, false);
835 let stop_reduced = || m(Command::Stop, false);
836
837 let tests = MutationTests::new(vec![
839 MutationTest {
840 name: "VirtualChainChangedSubscription None to All",
841 state: none(),
842 mutation: start_all(),
843 new_state: all(),
844 outcome: MutationOutcome::with_mutated(all(), vec![start_all()]),
845 },
846 MutationTest {
847 name: "VirtualChainChangedSubscription None to Reduced",
848 state: none(),
849 mutation: start_reduced(),
850 new_state: reduced(),
851 outcome: MutationOutcome::with_mutated(reduced(), vec![start_reduced()]),
852 },
853 MutationTest {
854 name: "VirtualChainChangedSubscription None to None (stop reduced)",
855 state: none(),
856 mutation: stop_reduced(),
857 new_state: none(),
858 outcome: MutationOutcome::new(),
859 },
860 MutationTest {
861 name: "VirtualChainChangedSubscription None to None (stop all)",
862 state: none(),
863 mutation: stop_all(),
864 new_state: none(),
865 outcome: MutationOutcome::new(),
866 },
867 MutationTest {
868 name: "VirtualChainChangedSubscription Reduced to All",
869 state: reduced(),
870 mutation: start_all(),
871 new_state: all(),
872 outcome: MutationOutcome::with_mutated(all(), vec![stop_reduced(), start_all()]),
873 },
874 MutationTest {
875 name: "VirtualChainChangedSubscription Reduced to Reduced",
876 state: reduced(),
877 mutation: start_reduced(),
878 new_state: reduced(),
879 outcome: MutationOutcome::new(),
880 },
881 MutationTest {
882 name: "VirtualChainChangedSubscription Reduced to None (stop reduced)",
883 state: reduced(),
884 mutation: stop_reduced(),
885 new_state: none(),
886 outcome: MutationOutcome::with_mutated(none(), vec![stop_reduced()]),
887 },
888 MutationTest {
889 name: "VirtualChainChangedSubscription Reduced to None (stop all)",
890 state: reduced(),
891 mutation: stop_all(),
892 new_state: none(),
893 outcome: MutationOutcome::with_mutated(none(), vec![stop_reduced()]),
894 },
895 MutationTest {
896 name: "VirtualChainChangedSubscription All to All",
897 state: all(),
898 mutation: start_all(),
899 new_state: all(),
900 outcome: MutationOutcome::new(),
901 },
902 MutationTest {
903 name: "VirtualChainChangedSubscription All to Reduced",
904 state: all(),
905 mutation: start_reduced(),
906 new_state: reduced(),
907 outcome: MutationOutcome::with_mutated(reduced(), vec![start_reduced(), stop_all()]),
908 },
909 MutationTest {
910 name: "VirtualChainChangedSubscription All to None (stop reduced)",
911 state: all(),
912 mutation: stop_reduced(),
913 new_state: none(),
914 outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
915 },
916 MutationTest {
917 name: "VirtualChainChangedSubscription All to None (stop all)",
918 state: all(),
919 mutation: stop_all(),
920 new_state: none(),
921 outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
922 },
923 ]);
924 tests.run(&context)
925 }
926
927 #[test]
928 fn test_utxos_changed_mutation() {
929 let context = SubscriptionContext::new();
930 let a_stock = get_3_addresses(true);
931
932 let av = |indexes: &[usize]| indexes.iter().map(|idx| (a_stock[*idx]).clone()).collect::<Vec<_>>();
933 let ah = |indexes: &[usize]| indexes.iter().map(|idx| (a_stock[*idx]).clone()).collect::<Vec<_>>();
934 let s = |active: bool, indexes: &[usize]| {
935 Arc::new(UtxosChangedSubscription::with_addresses(active, ah(indexes).to_vec(), MutationTests::LISTENER_ID, &context))
936 as DynSubscription
937 };
938 let m = |command: Command, indexes: &[usize]| -> Mutation {
939 Mutation { command, scope: Scope::UtxosChanged(UtxosChangedScope::new(av(indexes))) }
940 };
941
942 let none = || s(false, &[]);
944 let selected_0 = || s(true, &[0]);
945 let selected_1 = || s(true, &[1]);
946 let selected_2 = || s(true, &[2]);
947 let selected_01 = || s(true, &[0, 1]);
948 let selected_02 = || s(true, &[0, 2]);
949 let selected_012 = || s(true, &[0, 1, 2]);
950 let all = || s(true, &[]);
951
952 let start_all = || m(Command::Start, &[]);
954 let stop_all = || m(Command::Stop, &[]);
955 let start_0 = || m(Command::Start, &[0]);
956 let start_1 = || m(Command::Start, &[1]);
957 let start_01 = || m(Command::Start, &[0, 1]);
958 let stop_0 = || m(Command::Stop, &[0]);
959 let stop_1 = || m(Command::Stop, &[1]);
960 let stop_01 = || m(Command::Stop, &[0, 1]);
961
962 let tests = MutationTests::new(vec![
964 MutationTest {
965 name: "UtxosChangedSubscription None to All (add all)",
966 state: none(),
967 mutation: start_all(),
968 new_state: all(),
969 outcome: MutationOutcome::with_mutated(all(), vec![start_all()]),
970 },
971 MutationTest {
972 name: "UtxosChangedSubscription None to Selected 0 (add set)",
973 state: none(),
974 mutation: start_0(),
975 new_state: selected_0(),
976 outcome: MutationOutcome::with_mutated(selected_0(), vec![start_0()]),
977 },
978 MutationTest {
979 name: "UtxosChangedSubscription None to None (stop set)",
980 state: none(),
981 mutation: stop_0(),
982 new_state: none(),
983 outcome: MutationOutcome::new(),
984 },
985 MutationTest {
986 name: "UtxosChangedSubscription None to None (stop all)",
987 state: none(),
988 mutation: stop_all(),
989 new_state: none(),
990 outcome: MutationOutcome::new(),
991 },
992 MutationTest {
993 name: "UtxosChangedSubscription Selected 01 to All (add all)",
994 state: selected_01(),
995 mutation: start_all(),
996 new_state: all(),
997 outcome: MutationOutcome::with_mutated(all(), vec![stop_01(), start_all()]),
998 },
999 MutationTest {
1000 name: "UtxosChangedSubscription Selected 01 to 01 (add set with total intersection)",
1001 state: selected_01(),
1002 mutation: start_1(),
1003 new_state: selected_01(),
1004 outcome: MutationOutcome::new(),
1005 },
1006 MutationTest {
1007 name: "UtxosChangedSubscription Selected 0 to 01 (add set with partial intersection)",
1008 state: selected_0(),
1009 mutation: start_01(),
1010 new_state: selected_01(),
1011 outcome: MutationOutcome::with_mutations(vec![start_1()]),
1012 },
1013 MutationTest {
1014 name: "UtxosChangedSubscription Selected 2 to 012 (add set with no intersection)",
1015 state: selected_2(),
1016 mutation: start_01(),
1017 new_state: selected_012(),
1018 outcome: MutationOutcome::with_mutations(vec![start_01()]),
1019 },
1020 MutationTest {
1021 name: "UtxosChangedSubscription Selected 01 to None (remove superset)",
1022 state: selected_1(),
1023 mutation: stop_01(),
1024 new_state: none(),
1025 outcome: MutationOutcome::with_mutated(none(), vec![stop_1()]),
1026 },
1027 MutationTest {
1028 name: "UtxosChangedSubscription Selected 01 to None (remove set with total intersection)",
1029 state: selected_01(),
1030 mutation: stop_01(),
1031 new_state: none(),
1032 outcome: MutationOutcome::with_mutated(none(), vec![stop_01()]),
1033 },
1034 MutationTest {
1035 name: "UtxosChangedSubscription Selected 02 to 2 (remove set with partial intersection)",
1036 state: selected_02(),
1037 mutation: stop_01(),
1038 new_state: selected_2(),
1039 outcome: MutationOutcome::with_mutations(vec![stop_0()]),
1040 },
1041 MutationTest {
1042 name: "UtxosChangedSubscription Selected 02 to 02 (remove set with no intersection)",
1043 state: selected_02(),
1044 mutation: stop_1(),
1045 new_state: selected_02(),
1046 outcome: MutationOutcome::new(),
1047 },
1048 MutationTest {
1049 name: "UtxosChangedSubscription All to All (add all)",
1050 state: all(),
1051 mutation: start_all(),
1052 new_state: all(),
1053 outcome: MutationOutcome::new(),
1054 },
1055 MutationTest {
1056 name: "UtxosChangedSubscription All to Selected 01 (add set)",
1057 state: all(),
1058 mutation: start_01(),
1059 new_state: selected_01(),
1060 outcome: MutationOutcome::with_mutated(selected_01(), vec![start_01(), stop_all()]),
1061 },
1062 MutationTest {
1063 name: "UtxosChangedSubscription All to All (remove set)",
1064 state: all(),
1065 mutation: stop_01(),
1066 new_state: all(),
1067 outcome: MutationOutcome::new(),
1068 },
1069 MutationTest {
1070 name: "UtxosChangedSubscription All to None (remove all)",
1071 state: all(),
1072 mutation: stop_all(),
1073 new_state: none(),
1074 outcome: MutationOutcome::with_mutated(none(), vec![stop_all()]),
1075 },
1076 ]);
1077 tests.run(&context)
1078 }
1079}