1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
use brk_cohort::{AmountBucket, ByAddrType};
use brk_types::{Cents, Sats, TypeIndex};
use rustc_hash::FxHashMap;
use crate::distribution::{
addr::{
AddrTypeToActivityCounts, AddrTypeToExposedAddrCount, AddrTypeToExposedSupply,
AddrTypeToReusedAddrCount, AddrTypeToReusedAddrEventCount, AddrTypeToVec,
},
cohorts::AddrCohorts,
};
use super::super::cache::{AddrLookup, TrackingStatus};
/// Aggregated receive data for a single address within a block.
#[derive(Default)]
struct AggregatedReceive {
total_value: Sats,
output_count: u32,
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn process_received(
received_data: AddrTypeToVec<(TypeIndex, Sats)>,
cohorts: &mut AddrCohorts,
lookup: &mut AddrLookup<'_>,
price: Cents,
addr_count: &mut ByAddrType<u64>,
empty_addr_count: &mut ByAddrType<u64>,
activity_counts: &mut AddrTypeToActivityCounts,
reused_addr_count: &mut AddrTypeToReusedAddrCount,
total_reused_addr_count: &mut AddrTypeToReusedAddrCount,
output_to_reused_addr_count: &mut AddrTypeToReusedAddrEventCount,
active_reused_addr_count: &mut AddrTypeToReusedAddrEventCount,
exposed_addr_count: &mut AddrTypeToExposedAddrCount,
total_exposed_addr_count: &mut AddrTypeToExposedAddrCount,
exposed_supply: &mut AddrTypeToExposedSupply,
) {
let max_type_len = received_data
.iter()
.map(|(_, v)| v.len())
.max()
.unwrap_or(0);
let mut aggregated: FxHashMap<TypeIndex, AggregatedReceive> =
FxHashMap::with_capacity_and_hasher(max_type_len, Default::default());
for (output_type, vec) in received_data.unwrap().into_iter() {
if vec.is_empty() {
continue;
}
// Cache mutable refs for this address type
let type_addr_count = addr_count.get_mut(output_type).unwrap();
let type_empty_count = empty_addr_count.get_mut(output_type).unwrap();
let type_activity = activity_counts.get_mut_unwrap(output_type);
let type_reused_count = reused_addr_count.get_mut(output_type).unwrap();
let type_total_reused_count = total_reused_addr_count.get_mut(output_type).unwrap();
let type_output_to_reused_count = output_to_reused_addr_count.get_mut(output_type).unwrap();
let type_active_reused_count = active_reused_addr_count.get_mut(output_type).unwrap();
let type_exposed_count = exposed_addr_count.get_mut(output_type).unwrap();
let type_total_exposed_count = total_exposed_addr_count.get_mut(output_type).unwrap();
let type_exposed_supply = exposed_supply.get_mut(output_type).unwrap();
// Aggregate receives by address - each address processed exactly once
for (type_index, value) in vec {
let entry = aggregated.entry(type_index).or_default();
entry.total_value += value;
entry.output_count += 1;
}
for (type_index, recv) in aggregated.drain() {
let (addr_data, status) = lookup.get_or_create_for_receive(output_type, type_index);
// Track receiving activity - each address in receive aggregation
type_activity.receiving += 1;
// Capture state BEFORE the receive mutates funded_txo_count
let was_funded = addr_data.is_funded();
let was_reused = addr_data.is_reused();
let funded_txo_count_before = addr_data.funded_txo_count;
let was_pubkey_exposed = addr_data.is_pubkey_exposed(output_type);
let exposed_contribution_before = addr_data.exposed_supply_contribution(output_type);
match status {
TrackingStatus::New => {
*type_addr_count += 1;
}
TrackingStatus::WasEmpty => {
*type_addr_count += 1;
*type_empty_count -= 1;
// Reactivated - was empty, now has funds
type_activity.reactivated += 1;
}
TrackingStatus::Tracked => {}
}
let is_new_entry = matches!(status, TrackingStatus::New | TrackingStatus::WasEmpty);
if is_new_entry {
// New/was-empty address - just add to cohort
addr_data.receive_outputs(recv.total_value, price, recv.output_count);
let new_bucket = AmountBucket::from(recv.total_value);
cohorts
.amount_range
.get_mut_by_bucket(new_bucket)
.state
.as_mut()
.unwrap()
.add(addr_data);
} else {
let prev_balance = addr_data.balance();
let new_balance = prev_balance + recv.total_value;
let prev_bucket = AmountBucket::from(prev_balance);
let new_bucket = AmountBucket::from(new_balance);
if let Some((old_bucket, new_bucket)) = prev_bucket.transition_to(new_bucket) {
// Crossing cohort boundary - subtract from old, add to new
let cohort_state = cohorts
.amount_range
.get_mut_by_bucket(old_bucket)
.state
.as_mut()
.unwrap();
// Debug info for tracking down underflow issues
if cohort_state.inner.supply.utxo_count < addr_data.utxo_count() as u64 {
panic!(
"process_received: cohort underflow detected!\n\
output_type={:?}, type_index={:?}\n\
prev_balance={}, new_balance={}, total_value={}\n\
Addr: {:?}",
output_type,
type_index,
prev_balance,
new_balance,
recv.total_value,
addr_data
);
}
cohort_state.subtract(addr_data);
addr_data.receive_outputs(recv.total_value, price, recv.output_count);
cohorts
.amount_range
.get_mut_by_bucket(new_bucket)
.state
.as_mut()
.unwrap()
.add(addr_data);
} else {
// Staying in same cohort - just receive
cohorts
.amount_range
.get_mut_by_bucket(new_bucket)
.state
.as_mut()
.unwrap()
.receive_outputs(addr_data, recv.total_value, price, recv.output_count);
}
}
// Update reused counts based on the post-receive state
let is_now_reused = addr_data.is_reused();
if is_now_reused && !was_reused {
// Newly crossed the reuse threshold this block
*type_reused_count += 1;
*type_total_reused_count += 1;
} else if is_now_reused && !was_funded {
// Already-reused address reactivating into the funded set
*type_reused_count += 1;
}
// Block-level "active reused address" count: each address
// is processed exactly once here (via aggregation), so we
// count it once iff it is reused after the block's receives.
// The sender-side counterpart in process_sent dedupes
// against `received_addrs` so addresses that did both
// aren't double-counted.
if is_now_reused {
*type_active_reused_count += 1;
}
// Per-block reused-use count: every individual output to this
// address counts iff, at the moment the output arrives, the
// address had already received at least one prior output
// (i.e. it is an output-level "address reuse event"). With
// aggregation, that means we skip the very first output the
// address ever sees and count every subsequent one, so
// `skipped` is `max(0, 1 - before)`.
let skipped = 1u32.saturating_sub(funded_txo_count_before);
let counted = recv.output_count.saturating_sub(skipped);
*type_output_to_reused_count += u64::from(counted);
// Update exposed counts. The address's pubkey-exposure state
// is unchanged by a receive (spent_txo_count unchanged), so we
// can use the captured `was_pubkey_exposed` for both pre and post.
// After the receive the address is always funded, so it's in the
// funded exposed set iff its pubkey is exposed.
//
// Funded exposed enters when the address wasn't funded before but
// is now AND its pubkey is exposed.
// Total exposed (pk_exposed_at_funding types only) increments on
// first-ever receive (status == TrackingStatus::New); for other
// types it's incremented in process_sent on the first spend.
if !was_funded && was_pubkey_exposed {
*type_exposed_count += 1;
}
if output_type.pubkey_exposed_at_funding() && matches!(status, TrackingStatus::New) {
*type_total_exposed_count += 1;
}
// Update exposed supply via post-receive contribution delta.
let exposed_contribution_after = addr_data.exposed_supply_contribution(output_type);
// Receives can only add to balance and membership, so the delta
// is always non-negative.
*type_exposed_supply += exposed_contribution_after - exposed_contribution_before;
}
}
}