kaspa_notify/subscription/
context.rs

1use crate::{
2    address::tracker::Tracker,
3    listener::ListenerId,
4    subscription::{
5        single::{UtxosChangedState, UtxosChangedSubscription},
6        DynSubscription,
7    },
8};
9use std::{ops::Deref, sync::Arc};
10
11#[cfg(test)]
12use kaspa_addresses::Address;
13
14#[derive(Debug)]
15pub struct SubscriptionContextInner {
16    pub address_tracker: Tracker,
17    pub utxos_changed_subscription_to_all: DynSubscription,
18}
19
20impl SubscriptionContextInner {
21    const CONTEXT_LISTENER_ID: ListenerId = ListenerId::MAX;
22
23    pub fn new() -> Self {
24        Self::with_options(None)
25    }
26
27    pub fn with_options(max_addresses: Option<usize>) -> Self {
28        let address_tracker = Tracker::new(max_addresses);
29        let utxos_changed_subscription_all =
30            Arc::new(UtxosChangedSubscription::new(UtxosChangedState::All, Self::CONTEXT_LISTENER_ID));
31        Self { address_tracker, utxos_changed_subscription_to_all: utxos_changed_subscription_all }
32    }
33
34    #[cfg(test)]
35    pub fn with_addresses(addresses: &[Address]) -> Self {
36        let address_tracker = Tracker::with_addresses(addresses);
37        let utxos_changed_subscription_all =
38            Arc::new(UtxosChangedSubscription::new(UtxosChangedState::All, Self::CONTEXT_LISTENER_ID));
39        Self { address_tracker, utxos_changed_subscription_to_all: utxos_changed_subscription_all }
40    }
41}
42
43impl Default for SubscriptionContextInner {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49#[derive(Clone, Debug, Default)]
50pub struct SubscriptionContext {
51    inner: Arc<SubscriptionContextInner>,
52}
53
54impl SubscriptionContext {
55    pub fn new() -> Self {
56        Self::with_options(None)
57    }
58
59    pub fn with_options(max_addresses: Option<usize>) -> Self {
60        let inner = Arc::new(SubscriptionContextInner::with_options(max_addresses));
61        Self { inner }
62    }
63
64    #[cfg(test)]
65    pub fn with_addresses(addresses: &[Address]) -> Self {
66        let inner = Arc::new(SubscriptionContextInner::with_addresses(addresses));
67        Self { inner }
68    }
69}
70
71impl Deref for SubscriptionContext {
72    type Target = SubscriptionContextInner;
73
74    fn deref(&self) -> &Self::Target {
75        &self.inner
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use crate::{
82        address::tracker::{CounterMap, Index, IndexSet, Indexer, RefCount},
83        subscription::SubscriptionContext,
84    };
85    use itertools::Itertools;
86    use kaspa_addresses::{Address, Prefix};
87    use kaspa_alloc::init_allocator_with_default_settings;
88    use kaspa_core::trace;
89    use kaspa_math::Uint256;
90    use std::collections::{HashMap, HashSet};
91    use workflow_perf_monitor::mem::get_process_memory_info;
92
93    fn create_addresses(count: usize) -> Vec<Address> {
94        (0..count)
95            .map(|i| Address::new(Prefix::Mainnet, kaspa_addresses::Version::PubKey, &Uint256::from_u64(i as u64).to_le_bytes()))
96            .collect()
97    }
98
99    fn measure_consumed_memory<T, F: FnOnce() -> Vec<T>, F2: FnOnce(&T) -> (usize, usize)>(
100        item_len: usize,
101        num_items: usize,
102        ctor: F,
103        length_and_capacity: F2,
104    ) -> Vec<T> {
105        let before = get_process_memory_info().unwrap();
106
107        trace!("Creating items...");
108        let items = ctor();
109
110        let after = get_process_memory_info().unwrap();
111
112        trace!("Required item length: {}", item_len);
113        trace!("Memory consumed: {}", (after.resident_set_size - before.resident_set_size) / num_items as u64);
114        trace!(
115            "Memory/idx: {}",
116            ((after.resident_set_size - before.resident_set_size) as f64 / num_items as f64 / item_len as f64 * 10.0).round() / 10.0
117        );
118
119        let (len, capacity) = length_and_capacity(&items[0]);
120        match len > 0 {
121            true => trace!(
122                "Actual item: len = {}, capacity = {}, free space = +{:.1}%",
123                len,
124                capacity,
125                (capacity - len) as f64 * 100.0 / len as f64
126            ),
127            false => trace!("Actual item: len = {}, capacity = {}", len, capacity),
128        }
129
130        items
131    }
132
133    fn init_and_measure_consumed_memory<T, F: FnOnce() -> Vec<T>, F2: FnOnce(&T) -> (usize, usize)>(
134        item_len: usize,
135        num_items: usize,
136        ctor: F,
137        length_and_capacity: F2,
138    ) -> Vec<T> {
139        init_allocator_with_default_settings();
140        kaspa_core::log::try_init_logger("INFO,kaspa_notify::subscription::context=trace");
141        measure_consumed_memory(item_len, num_items, ctor, length_and_capacity)
142    }
143
144    #[test]
145    #[ignore = "measuring consumed memory"]
146    // ITEM = SubscriptionContext
147    // (measuring IndexMap<ScriptPublicKey, u16>)
148    //
149    //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM   MEM/ADDR
150    // --------------------------------------------------
151    // 10_000_000            5   1_098_744_627      109.9
152    //  1_000_000           50     103_581_696      104.0
153    //    100_000          100       9_157_836       91.6
154    //     10_000        1_000         977_666       97.8
155    //      1_000       10_000          94_633       94.6
156    //        100      100_000           9_617       96.2
157    //         10    1_000_000           1_325      132.5
158    //          1   10_000_000             410      410.0
159    fn test_subscription_context_size() {
160        const ITEM_LEN: usize = 10_000_000;
161        const NUM_ITEMS: usize = 5;
162
163        init_allocator_with_default_settings();
164        kaspa_core::log::try_init_logger("INFO,kaspa_notify::subscription::context=trace");
165
166        trace!("Creating addresses...");
167        let addresses = create_addresses(ITEM_LEN);
168
169        let _ = measure_consumed_memory(
170            ITEM_LEN,
171            NUM_ITEMS,
172            || (0..NUM_ITEMS).map(|_| SubscriptionContext::with_addresses(&addresses)).collect_vec(),
173            |x| (x.address_tracker.len(), x.address_tracker.capacity()),
174        );
175    }
176
177    #[test]
178    #[ignore = "measuring consumed memory"]
179    // ITEM = HashMap<u32, u16>
180    //
181    //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM    MEM/IDX
182    // --------------------------------------------------
183    // 10_000_000           10     151_214_489       15.1
184    //  1_000_000          100      18_926_059       18.9
185    //    100_000        1_000       1_187_864       11.9
186    //     10_000       10_000         152_063       15.2
187    //      1_000      100_000          20_576       20.6
188    //        100    1_000_000           1_336       13.4
189    //         10   10_000_000             241       24.1
190    //          1   10_000_000             128      128.4
191    fn test_hash_map_u32_u16_size() {
192        const ITEM_LEN: usize = 1;
193        const NUM_ITEMS: usize = 10_000_000;
194
195        let _ = init_and_measure_consumed_memory(
196            ITEM_LEN,
197            NUM_ITEMS,
198            || {
199                (0..NUM_ITEMS)
200                    .map(|_| (0..ITEM_LEN as Index).map(|i| (i, (ITEM_LEN as Index - i) as RefCount)).rev().collect::<HashMap<_, _>>())
201                    .collect_vec()
202            },
203            |x| (x.len(), x.capacity()),
204        );
205    }
206
207    #[test]
208    #[ignore = "measuring consumed memory"]
209    // ITEM = CounterMap
210    // (measuring HashMap<u32, u16>)
211    //
212    //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM    MEM/IDX
213    // --------------------------------------------------
214    // 10_000_000           10     151_239_065       15.1
215    //  1_000_000          100      18_927_534       18.9
216    //    100_000        1_000       1_188_024       11.9
217    //     10_000       10_000         152_077       15.2
218    //      1_000      100_000          20_587       20.6
219    //        100    1_000_000           1_344       13.4
220    //         10   10_000_000             249       24.9
221    //          1   10_000_000             136      136.5
222    fn test_counter_map_size() {
223        const ITEM_LEN: usize = 10;
224        const NUM_ITEMS: usize = 10_000_000;
225
226        let _ = init_and_measure_consumed_memory(
227            ITEM_LEN,
228            NUM_ITEMS,
229            || {
230                (0..NUM_ITEMS)
231                    .map(|_| {
232                        // Reserve the required capacity
233                        // Note: the resulting allocated HashMap bucket count is (capacity * 8 / 7).next_power_of_two()
234                        let mut item = CounterMap::with_capacity(ITEM_LEN);
235
236                        (0..ITEM_LEN as Index).for_each(|x| {
237                            item.insert(x);
238                        });
239                        item
240                    })
241                    .collect_vec()
242            },
243            |x| (x.len(), x.capacity()),
244        );
245    }
246
247    #[test]
248    #[ignore = "measuring consumed memory"]
249    // ITEM = HashSet<u32>
250    //
251    //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM    MEM/IDX
252    // --------------------------------------------------
253    // 10_000_000           10      84'094'976        8.4
254    //  1_000_000          100      10'524'508       10.5
255    //    100_000        1_000         662_720        6.6
256    //     10_000       10_000          86_369        8.6
257    //      1_000      100_000          12_372       12.4
258    //        100    1_000_000             821        8.2
259    //         10   10_000_000             144       14.4
260    //          1   10_000_000             112      112.0
261    fn test_hash_set_u32_size() {
262        const ITEM_LEN: usize = 1_000_000;
263        const NUM_ITEMS: usize = 100;
264
265        let _ = init_and_measure_consumed_memory(
266            ITEM_LEN,
267            NUM_ITEMS,
268            || (0..NUM_ITEMS).map(|_| (0..ITEM_LEN as Index).rev().collect::<HashSet<_>>()).collect_vec(),
269            |x| (x.len(), x.capacity()),
270        );
271    }
272
273    #[test]
274    #[ignore = "measuring consumed memory"]
275    // ITEM = HashSet<u32> emptied
276    //
277    //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM    MEM/IDX
278    // --------------------------------------------------
279    // 10_000_000           10      84'094'976        8.4
280    //  1_000_000          100      10'524'508       10.5
281    //    100_000        1_000         662_720        6.6
282    //     10_000       10_000          86_369        8.6
283    //      1_000      100_000          12_372       12.4
284    //        100    1_000_000             821        8.2
285    //         10   10_000_000             144       14.4
286    //          1   10_000_000             112      112.0
287    fn test_emptied_hash_set_u32_size() {
288        const ITEM_LEN: usize = 1_000_000;
289        const NUM_ITEMS: usize = 100;
290
291        let _ = init_and_measure_consumed_memory(
292            ITEM_LEN,
293            NUM_ITEMS,
294            || {
295                (0..NUM_ITEMS)
296                    .map(|_| {
297                        let mut set = (0..ITEM_LEN as Index).rev().collect::<HashSet<_>>();
298                        let original_capacity = set.capacity();
299                        let _ = set.drain();
300                        assert!(set.is_empty());
301                        assert_eq!(original_capacity, set.capacity());
302                        set
303                    })
304                    .collect_vec()
305            },
306            |x| (x.len(), x.capacity()),
307        );
308    }
309
310    #[test]
311    #[ignore = "measuring consumed memory"]
312    // ITEM = IndexSet
313    // (measuring HashSet<u32>)
314    //
315    //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM    MEM/IDX
316    // --------------------------------------------------
317    // 10_000_000           10      84_119_961        8.4
318    //  1_000_000          100      10_526_720       10.5
319    //    100_000        1_000         662_974        6.6
320    //     10_000       10_000          86_424        8.6
321    //      1_000      100_000          12_381       12.4
322    //        100    1_000_000             830        8.3
323    //         10   10_000_000             152       15.2
324    //          1   10_000_000             120      120.0
325    fn test_index_set_size() {
326        const ITEM_LEN: usize = 10_000_000;
327        const NUM_ITEMS: usize = 10;
328
329        let _ = init_and_measure_consumed_memory(
330            ITEM_LEN,
331            NUM_ITEMS,
332            || {
333                (0..NUM_ITEMS)
334                    .map(|_| {
335                        // Reserve the required capacity
336                        // Note: the resulting allocated HashSet bucket count is (capacity * 8 / 7).next_power_of_two()
337                        let mut item = IndexSet::with_capacity(ITEM_LEN);
338
339                        (0..ITEM_LEN as Index).for_each(|x| {
340                            item.insert(x);
341                        });
342                        item
343                    })
344                    .collect_vec()
345            },
346            |x| (x.len(), x.capacity()),
347        );
348    }
349
350    #[test]
351    #[ignore = "measuring consumed memory"]
352    // ITEM = Vec<u32>
353    //
354    //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM    MEM/IDX
355    // --------------------------------------------------
356    // 10_000_000           10      40_208_384        4.0
357    //  1_000_000          100       4_026_245        4.0
358    //    100_000        1_000         403_791        4.0
359    //     10_000       10_000          41_235        4.1
360    //      1_000      100_000           4_141        4.1
361    //        100    1_000_000             478        4.8
362    //         10   10_000_000              72        7.2
363    //          1   10_000_000              32       32.0
364    fn test_vec_u32_size() {
365        const ITEM_LEN: usize = 10_000_000;
366        const NUM_ITEMS: usize = 10;
367
368        let _ = init_and_measure_consumed_memory(
369            ITEM_LEN,
370            NUM_ITEMS,
371            || (0..NUM_ITEMS).map(|_| (0..ITEM_LEN as Index).collect::<Vec<_>>()).collect_vec(),
372            |x| (x.len(), x.capacity()),
373        );
374    }
375    // #[test]
376    // #[ignore = "measuring consumed memory"]
377    // // ITEM = DashSet
378    // // (measuring DashSet<u32>)
379    // //
380    // //   ITEM_LEN    NUM_ITEMS     MEMORY/ITEM    MEM/IDX
381    // // --------------------------------------------------
382    // // 10_000_000           10      96_439_500        9.6
383    // //  1_000_000          100      11_942_010       11.9
384    // //    100_000        1_000         826_400        8.3
385    // //     10_000       10_000         107_060       10.7
386    // //      1_000      100_000          19_114       19.1
387    // //        100    1_000_000          12_717      127.2
388    // //         10    1_000_000           8_865      886.5
389    // //          1    1_000_000           8_309     8309.0
390    // fn test_dash_set_size() {
391    //     const ITEM_LEN: usize = 1;
392    //     const NUM_ITEMS: usize = 1_000_000;
393
394    //     init_allocator_with_default_settings();
395    //     kaspa_core::log::try_init_logger("INFO,kaspa_notify::subscription::context=trace");
396
397    //     let before = get_process_memory_info().unwrap();
398    //     trace!("Creating sets...");
399    //     let sets = (0..NUM_ITEMS)
400    //         .map(|_| {
401    //             // Rely on organic growth rather than pre-defined capacity
402    //             let item = DashSet::new();
403    //             (0..ITEM_LEN as Index).for_each(|x| {
404    //                 item.insert(x);
405    //             });
406    //             item
407    //         })
408    //         .collect_vec();
409
410    //     let after = get_process_memory_info().unwrap();
411    //     trace!("Set length: {}", sets[0].len());
412    //     trace!("Memory consumed: {}", (after.resident_set_size - before.resident_set_size) / NUM_ITEMS as u64);
413    // }
414}