kaspa_notify/subscription/context.rs
1use crate::{
2 address::tracker::Tracker,
3 listener::ListenerId,
4 subscription::{
5 single::{UtxosChangedState, UtxosChangedSubscription},
6 DynSubscription,
7 },
8};
9use std::{ops::Deref, sync::Arc};
10
11#[cfg(test)]
12use kaspa_addresses::Address;
13
14#[derive(Debug)]
15pub struct SubscriptionContextInner {
16 pub address_tracker: Tracker,
17 pub utxos_changed_subscription_to_all: DynSubscription,
18}
19
20impl SubscriptionContextInner {
21 const CONTEXT_LISTENER_ID: ListenerId = ListenerId::MAX;
22
23 pub fn new() -> Self {
24 Self::with_options(None)
25 }
26
27 pub fn with_options(max_addresses: Option<usize>) -> Self {
28 let address_tracker = Tracker::new(max_addresses);
29 let utxos_changed_subscription_all =
30 Arc::new(UtxosChangedSubscription::new(UtxosChangedState::All, Self::CONTEXT_LISTENER_ID));
31 Self { address_tracker, utxos_changed_subscription_to_all: utxos_changed_subscription_all }
32 }
33
34 #[cfg(test)]
35 pub fn with_addresses(addresses: &[Address]) -> Self {
36 let address_tracker = Tracker::with_addresses(addresses);
37 let utxos_changed_subscription_all =
38 Arc::new(UtxosChangedSubscription::new(UtxosChangedState::All, Self::CONTEXT_LISTENER_ID));
39 Self { address_tracker, utxos_changed_subscription_to_all: utxos_changed_subscription_all }
40 }
41}
42
43impl Default for SubscriptionContextInner {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49#[derive(Clone, Debug, Default)]
50pub struct SubscriptionContext {
51 inner: Arc<SubscriptionContextInner>,
52}
53
54impl SubscriptionContext {
55 pub fn new() -> Self {
56 Self::with_options(None)
57 }
58
59 pub fn with_options(max_addresses: Option<usize>) -> Self {
60 let inner = Arc::new(SubscriptionContextInner::with_options(max_addresses));
61 Self { inner }
62 }
63
64 #[cfg(test)]
65 pub fn with_addresses(addresses: &[Address]) -> Self {
66 let inner = Arc::new(SubscriptionContextInner::with_addresses(addresses));
67 Self { inner }
68 }
69}
70
71impl Deref for SubscriptionContext {
72 type Target = SubscriptionContextInner;
73
74 fn deref(&self) -> &Self::Target {
75 &self.inner
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use crate::{
82 address::tracker::{CounterMap, Index, IndexSet, Indexer, RefCount},
83 subscription::SubscriptionContext,
84 };
85 use itertools::Itertools;
86 use kaspa_addresses::{Address, Prefix};
87 use kaspa_alloc::init_allocator_with_default_settings;
88 use kaspa_core::trace;
89 use kaspa_math::Uint256;
90 use std::collections::{HashMap, HashSet};
91 use workflow_perf_monitor::mem::get_process_memory_info;
92
93 fn create_addresses(count: usize) -> Vec<Address> {
94 (0..count)
95 .map(|i| Address::new(Prefix::Mainnet, kaspa_addresses::Version::PubKey, &Uint256::from_u64(i as u64).to_le_bytes()))
96 .collect()
97 }
98
99 fn measure_consumed_memory<T, F: FnOnce() -> Vec<T>, F2: FnOnce(&T) -> (usize, usize)>(
100 item_len: usize,
101 num_items: usize,
102 ctor: F,
103 length_and_capacity: F2,
104 ) -> Vec<T> {
105 let before = get_process_memory_info().unwrap();
106
107 trace!("Creating items...");
108 let items = ctor();
109
110 let after = get_process_memory_info().unwrap();
111
112 trace!("Required item length: {}", item_len);
113 trace!("Memory consumed: {}", (after.resident_set_size - before.resident_set_size) / num_items as u64);
114 trace!(
115 "Memory/idx: {}",
116 ((after.resident_set_size - before.resident_set_size) as f64 / num_items as f64 / item_len as f64 * 10.0).round() / 10.0
117 );
118
119 let (len, capacity) = length_and_capacity(&items[0]);
120 match len > 0 {
121 true => trace!(
122 "Actual item: len = {}, capacity = {}, free space = +{:.1}%",
123 len,
124 capacity,
125 (capacity - len) as f64 * 100.0 / len as f64
126 ),
127 false => trace!("Actual item: len = {}, capacity = {}", len, capacity),
128 }
129
130 items
131 }
132
133 fn init_and_measure_consumed_memory<T, F: FnOnce() -> Vec<T>, F2: FnOnce(&T) -> (usize, usize)>(
134 item_len: usize,
135 num_items: usize,
136 ctor: F,
137 length_and_capacity: F2,
138 ) -> Vec<T> {
139 init_allocator_with_default_settings();
140 kaspa_core::log::try_init_logger("INFO,kaspa_notify::subscription::context=trace");
141 measure_consumed_memory(item_len, num_items, ctor, length_and_capacity)
142 }
143
144 #[test]
145 #[ignore = "measuring consumed memory"]
146 // ITEM = SubscriptionContext
147 // (measuring IndexMap<ScriptPublicKey, u16>)
148 //
149 // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/ADDR
150 // --------------------------------------------------
151 // 10_000_000 5 1_098_744_627 109.9
152 // 1_000_000 50 103_581_696 104.0
153 // 100_000 100 9_157_836 91.6
154 // 10_000 1_000 977_666 97.8
155 // 1_000 10_000 94_633 94.6
156 // 100 100_000 9_617 96.2
157 // 10 1_000_000 1_325 132.5
158 // 1 10_000_000 410 410.0
159 fn test_subscription_context_size() {
160 const ITEM_LEN: usize = 10_000_000;
161 const NUM_ITEMS: usize = 5;
162
163 init_allocator_with_default_settings();
164 kaspa_core::log::try_init_logger("INFO,kaspa_notify::subscription::context=trace");
165
166 trace!("Creating addresses...");
167 let addresses = create_addresses(ITEM_LEN);
168
169 let _ = measure_consumed_memory(
170 ITEM_LEN,
171 NUM_ITEMS,
172 || (0..NUM_ITEMS).map(|_| SubscriptionContext::with_addresses(&addresses)).collect_vec(),
173 |x| (x.address_tracker.len(), x.address_tracker.capacity()),
174 );
175 }
176
177 #[test]
178 #[ignore = "measuring consumed memory"]
179 // ITEM = HashMap<u32, u16>
180 //
181 // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/IDX
182 // --------------------------------------------------
183 // 10_000_000 10 151_214_489 15.1
184 // 1_000_000 100 18_926_059 18.9
185 // 100_000 1_000 1_187_864 11.9
186 // 10_000 10_000 152_063 15.2
187 // 1_000 100_000 20_576 20.6
188 // 100 1_000_000 1_336 13.4
189 // 10 10_000_000 241 24.1
190 // 1 10_000_000 128 128.4
191 fn test_hash_map_u32_u16_size() {
192 const ITEM_LEN: usize = 1;
193 const NUM_ITEMS: usize = 10_000_000;
194
195 let _ = init_and_measure_consumed_memory(
196 ITEM_LEN,
197 NUM_ITEMS,
198 || {
199 (0..NUM_ITEMS)
200 .map(|_| (0..ITEM_LEN as Index).map(|i| (i, (ITEM_LEN as Index - i) as RefCount)).rev().collect::<HashMap<_, _>>())
201 .collect_vec()
202 },
203 |x| (x.len(), x.capacity()),
204 );
205 }
206
207 #[test]
208 #[ignore = "measuring consumed memory"]
209 // ITEM = CounterMap
210 // (measuring HashMap<u32, u16>)
211 //
212 // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/IDX
213 // --------------------------------------------------
214 // 10_000_000 10 151_239_065 15.1
215 // 1_000_000 100 18_927_534 18.9
216 // 100_000 1_000 1_188_024 11.9
217 // 10_000 10_000 152_077 15.2
218 // 1_000 100_000 20_587 20.6
219 // 100 1_000_000 1_344 13.4
220 // 10 10_000_000 249 24.9
221 // 1 10_000_000 136 136.5
222 fn test_counter_map_size() {
223 const ITEM_LEN: usize = 10;
224 const NUM_ITEMS: usize = 10_000_000;
225
226 let _ = init_and_measure_consumed_memory(
227 ITEM_LEN,
228 NUM_ITEMS,
229 || {
230 (0..NUM_ITEMS)
231 .map(|_| {
232 // Reserve the required capacity
233 // Note: the resulting allocated HashMap bucket count is (capacity * 8 / 7).next_power_of_two()
234 let mut item = CounterMap::with_capacity(ITEM_LEN);
235
236 (0..ITEM_LEN as Index).for_each(|x| {
237 item.insert(x);
238 });
239 item
240 })
241 .collect_vec()
242 },
243 |x| (x.len(), x.capacity()),
244 );
245 }
246
247 #[test]
248 #[ignore = "measuring consumed memory"]
249 // ITEM = HashSet<u32>
250 //
251 // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/IDX
252 // --------------------------------------------------
253 // 10_000_000 10 84'094'976 8.4
254 // 1_000_000 100 10'524'508 10.5
255 // 100_000 1_000 662_720 6.6
256 // 10_000 10_000 86_369 8.6
257 // 1_000 100_000 12_372 12.4
258 // 100 1_000_000 821 8.2
259 // 10 10_000_000 144 14.4
260 // 1 10_000_000 112 112.0
261 fn test_hash_set_u32_size() {
262 const ITEM_LEN: usize = 1_000_000;
263 const NUM_ITEMS: usize = 100;
264
265 let _ = init_and_measure_consumed_memory(
266 ITEM_LEN,
267 NUM_ITEMS,
268 || (0..NUM_ITEMS).map(|_| (0..ITEM_LEN as Index).rev().collect::<HashSet<_>>()).collect_vec(),
269 |x| (x.len(), x.capacity()),
270 );
271 }
272
273 #[test]
274 #[ignore = "measuring consumed memory"]
275 // ITEM = HashSet<u32> emptied
276 //
277 // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/IDX
278 // --------------------------------------------------
279 // 10_000_000 10 84'094'976 8.4
280 // 1_000_000 100 10'524'508 10.5
281 // 100_000 1_000 662_720 6.6
282 // 10_000 10_000 86_369 8.6
283 // 1_000 100_000 12_372 12.4
284 // 100 1_000_000 821 8.2
285 // 10 10_000_000 144 14.4
286 // 1 10_000_000 112 112.0
287 fn test_emptied_hash_set_u32_size() {
288 const ITEM_LEN: usize = 1_000_000;
289 const NUM_ITEMS: usize = 100;
290
291 let _ = init_and_measure_consumed_memory(
292 ITEM_LEN,
293 NUM_ITEMS,
294 || {
295 (0..NUM_ITEMS)
296 .map(|_| {
297 let mut set = (0..ITEM_LEN as Index).rev().collect::<HashSet<_>>();
298 let original_capacity = set.capacity();
299 let _ = set.drain();
300 assert!(set.is_empty());
301 assert_eq!(original_capacity, set.capacity());
302 set
303 })
304 .collect_vec()
305 },
306 |x| (x.len(), x.capacity()),
307 );
308 }
309
310 #[test]
311 #[ignore = "measuring consumed memory"]
312 // ITEM = IndexSet
313 // (measuring HashSet<u32>)
314 //
315 // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/IDX
316 // --------------------------------------------------
317 // 10_000_000 10 84_119_961 8.4
318 // 1_000_000 100 10_526_720 10.5
319 // 100_000 1_000 662_974 6.6
320 // 10_000 10_000 86_424 8.6
321 // 1_000 100_000 12_381 12.4
322 // 100 1_000_000 830 8.3
323 // 10 10_000_000 152 15.2
324 // 1 10_000_000 120 120.0
325 fn test_index_set_size() {
326 const ITEM_LEN: usize = 10_000_000;
327 const NUM_ITEMS: usize = 10;
328
329 let _ = init_and_measure_consumed_memory(
330 ITEM_LEN,
331 NUM_ITEMS,
332 || {
333 (0..NUM_ITEMS)
334 .map(|_| {
335 // Reserve the required capacity
336 // Note: the resulting allocated HashSet bucket count is (capacity * 8 / 7).next_power_of_two()
337 let mut item = IndexSet::with_capacity(ITEM_LEN);
338
339 (0..ITEM_LEN as Index).for_each(|x| {
340 item.insert(x);
341 });
342 item
343 })
344 .collect_vec()
345 },
346 |x| (x.len(), x.capacity()),
347 );
348 }
349
350 #[test]
351 #[ignore = "measuring consumed memory"]
352 // ITEM = Vec<u32>
353 //
354 // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/IDX
355 // --------------------------------------------------
356 // 10_000_000 10 40_208_384 4.0
357 // 1_000_000 100 4_026_245 4.0
358 // 100_000 1_000 403_791 4.0
359 // 10_000 10_000 41_235 4.1
360 // 1_000 100_000 4_141 4.1
361 // 100 1_000_000 478 4.8
362 // 10 10_000_000 72 7.2
363 // 1 10_000_000 32 32.0
364 fn test_vec_u32_size() {
365 const ITEM_LEN: usize = 10_000_000;
366 const NUM_ITEMS: usize = 10;
367
368 let _ = init_and_measure_consumed_memory(
369 ITEM_LEN,
370 NUM_ITEMS,
371 || (0..NUM_ITEMS).map(|_| (0..ITEM_LEN as Index).collect::<Vec<_>>()).collect_vec(),
372 |x| (x.len(), x.capacity()),
373 );
374 }
375 // #[test]
376 // #[ignore = "measuring consumed memory"]
377 // // ITEM = DashSet
378 // // (measuring DashSet<u32>)
379 // //
380 // // ITEM_LEN NUM_ITEMS MEMORY/ITEM MEM/IDX
381 // // --------------------------------------------------
382 // // 10_000_000 10 96_439_500 9.6
383 // // 1_000_000 100 11_942_010 11.9
384 // // 100_000 1_000 826_400 8.3
385 // // 10_000 10_000 107_060 10.7
386 // // 1_000 100_000 19_114 19.1
387 // // 100 1_000_000 12_717 127.2
388 // // 10 1_000_000 8_865 886.5
389 // // 1 1_000_000 8_309 8309.0
390 // fn test_dash_set_size() {
391 // const ITEM_LEN: usize = 1;
392 // const NUM_ITEMS: usize = 1_000_000;
393
394 // init_allocator_with_default_settings();
395 // kaspa_core::log::try_init_logger("INFO,kaspa_notify::subscription::context=trace");
396
397 // let before = get_process_memory_info().unwrap();
398 // trace!("Creating sets...");
399 // let sets = (0..NUM_ITEMS)
400 // .map(|_| {
401 // // Rely on organic growth rather than pre-defined capacity
402 // let item = DashSet::new();
403 // (0..ITEM_LEN as Index).for_each(|x| {
404 // item.insert(x);
405 // });
406 // item
407 // })
408 // .collect_vec();
409
410 // let after = get_process_memory_info().unwrap();
411 // trace!("Set length: {}", sets[0].len());
412 // trace!("Memory consumed: {}", (after.resident_set_size - before.resident_set_size) / NUM_ITEMS as u64);
413 // }
414}