use std::collections::BTreeMap;
use std::sync::RwLock;
use std::time::Duration;
use dashmap::DashMap;
use tokio::time::Instant;
use freenet_stdlib::prelude::*;
use crate::ring::PeerKeyLocation;
use crate::topology::rate::Rate;
use super::running_average::RunningAverage;
const DEFAULT_USAGE_PERCENTILE: f64 = 0.5;
const ESTIMATED_USAGE_RATE_CACHE_TIME: Duration = Duration::from_secs(60);
pub(crate) const MAX_ATTRIBUTION_SOURCES: usize = 4096;
pub(crate) const ATTRIBUTION_SOURCE_TTL: Duration = Duration::from_secs(15 * 60);
pub(crate) struct Meter {
attribution_meters: AttributionMeters,
running_average_window_size: usize,
cached_estimated_usage_rate: RwLock<BTreeMap<ResourceType, (Rate, Instant)>>,
}
impl Meter {
pub fn new_with_window_size(running_average_window_size: usize) -> Self {
Meter {
attribution_meters: DashMap::new(),
running_average_window_size,
cached_estimated_usage_rate: RwLock::new(BTreeMap::new()),
}
}
pub(crate) fn attributed_usage_rate(
&self,
attribution: &AttributionSource,
resource: &ResourceType,
at_time: Instant,
) -> Option<Rate> {
match self.attribution_meters.get(attribution) {
Some(attribution_meters) => {
match attribution_meters.map.get(resource) {
Some(meter) => {
meter.get_rate_at_time(at_time)
}
None => Some(Rate::new(0.0, Duration::from_secs(1))), }
}
None => None, }
}
pub(crate) fn get_adjusted_usage_rate(
&mut self,
resource: &ResourceType,
at_time: Instant,
) -> Option<Rate> {
{
let cache = self.cached_estimated_usage_rate.read().unwrap();
if let Some((cached_rate, cached_time)) = cache.get(resource) {
if at_time - *cached_time <= ESTIMATED_USAGE_RATE_CACHE_TIME {
return Some(*cached_rate);
}
}
}
match self.calculate_estimated_usage_rate(resource, at_time) {
Some(estimated_usage_rate) => {
let mut cache = self.cached_estimated_usage_rate.write().unwrap();
cache.insert(*resource, (estimated_usage_rate, at_time));
Some(estimated_usage_rate)
}
None => None,
}
}
pub(crate) fn get_usage_rates(
&self,
resource: &ResourceType,
at_time: Instant,
) -> BTreeMap<AttributionSource, Rate> {
let mut rates = BTreeMap::new();
for entry in self.attribution_meters.iter() {
if let Some(meter) = entry.value().map.get(resource) {
if let Some(rate) = meter.get_rate_at_time(at_time) {
rates.insert(entry.key().clone(), rate);
}
}
}
rates
}
fn calculate_estimated_usage_rate(
&self,
resource: &ResourceType,
at_time: Instant,
) -> Option<Rate> {
let rates: Vec<Rate> = self
.attribution_meters
.iter()
.filter_map(|t| {
t.value()
.map
.get(resource)
.and_then(|m| m.get_rate_at_time(at_time))
})
.collect();
if rates.is_empty() {
return None;
}
let mut sorted_rates = rates;
sorted_rates.sort_unstable();
let percentile_index =
(DEFAULT_USAGE_PERCENTILE * sorted_rates.len() as f64).round() as usize;
let estimated_index = percentile_index.min(sorted_rates.len().saturating_sub(1));
sorted_rates.get(estimated_index).cloned()
}
pub(crate) fn retain_peer_sources(&self, live: &std::collections::HashSet<PeerKeyLocation>) {
self.attribution_meters.retain(|source, _| match source {
AttributionSource::Peer(peer) => live.contains(peer),
AttributionSource::Delegate(_) | AttributionSource::Contract(_) => true,
});
}
pub(crate) fn report(
&self,
attribution: &AttributionSource,
resource: ResourceType,
value: f64,
at_time: Instant,
) {
use dashmap::mapref::entry::Entry;
match self.attribution_meters.entry(attribution.clone()) {
Entry::Occupied(mut occupied) => {
let totals = occupied.get_mut();
totals.last_reported = totals.last_reported.max(at_time);
totals
.map
.entry(resource)
.or_insert_with(|| RunningAverage::new(self.running_average_window_size))
.insert_with_time(at_time, value);
return;
}
Entry::Vacant(_) => {}
}
self.evict_if_full(at_time);
let mut totals = self
.attribution_meters
.entry(attribution.clone())
.or_insert_with(|| ResourceTotals::new(at_time));
totals.last_reported = totals.last_reported.max(at_time);
totals
.map
.entry(resource)
.or_insert_with(|| RunningAverage::new(self.running_average_window_size))
.insert_with_time(at_time, value);
}
fn evict_if_full(&self, now: Instant) {
self.attribution_meters.retain(|_, totals| {
now.saturating_duration_since(totals.last_reported) < ATTRIBUTION_SOURCE_TTL
});
if self.attribution_meters.len() < MAX_ATTRIBUTION_SOURCES {
return;
}
let oldest = self
.attribution_meters
.iter()
.min_by_key(|entry| entry.value().last_reported)
.map(|entry| entry.key().clone());
if let Some(key) = oldest {
self.attribution_meters.remove(&key);
}
}
}
#[allow(dead_code)] #[derive(Eq, Hash, PartialEq, Clone, Debug)]
pub(crate) enum AttributionSource {
Peer(PeerKeyLocation),
Delegate(DelegateKey),
Contract(ContractInstanceId),
}
impl PartialOrd for AttributionSource {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl AttributionSource {
pub(crate) fn contributes_to(&self, resource: &ResourceType) -> bool {
use AttributionSource::*;
use ResourceType::*;
match (self, resource) {
(Peer(_), InboundBandwidthBytes) => true,
(Peer(_), OutboundBandwidthBytes) => true,
(Peer(_), ExecCpuMicros) => false,
(Peer(_), ExecFuelUnits) => false,
(Peer(_), StateBytesWritten) => false,
(Peer(_), BroadcastFanoutCost) => false,
(Delegate(_), InboundBandwidthBytes) => true,
(Delegate(_), OutboundBandwidthBytes) => true,
(Delegate(_), ExecCpuMicros) => false,
(Delegate(_), ExecFuelUnits) => false,
(Delegate(_), StateBytesWritten) => false,
(Delegate(_), BroadcastFanoutCost) => false,
(Contract(_), InboundBandwidthBytes) => false,
(Contract(_), OutboundBandwidthBytes) => false,
(Contract(_), ExecCpuMicros) => true,
(Contract(_), ExecFuelUnits) => true,
(Contract(_), StateBytesWritten) => true,
(Contract(_), BroadcastFanoutCost) => true,
}
}
}
impl Ord for AttributionSource {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
fn rank(source: &AttributionSource) -> u8 {
match source {
AttributionSource::Peer(_) => 0,
AttributionSource::Delegate(_) => 1,
AttributionSource::Contract(_) => 2,
}
}
match (self, other) {
(AttributionSource::Peer(a), AttributionSource::Peer(b)) => a.cmp(b),
(AttributionSource::Delegate(a), AttributionSource::Delegate(b)) => {
format!("{:?}", a).cmp(&format!("{:?}", b))
}
(AttributionSource::Contract(a), AttributionSource::Contract(b)) => a.cmp(b),
(a, b) => rank(a).cmp(&rank(b)),
}
}
}
#[derive(Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Copy, Debug)]
pub(crate) enum ResourceType {
InboundBandwidthBytes,
OutboundBandwidthBytes,
ExecCpuMicros,
ExecFuelUnits,
StateBytesWritten,
BroadcastFanoutCost,
}
impl ResourceType {
pub(crate) fn all() -> [ResourceType; 2] {
[
ResourceType::InboundBandwidthBytes,
ResourceType::OutboundBandwidthBytes,
]
}
#[allow(dead_code)] pub(crate) fn all_tracked() -> [ResourceType; 6] {
[
ResourceType::InboundBandwidthBytes,
ResourceType::OutboundBandwidthBytes,
ResourceType::ExecCpuMicros,
ResourceType::ExecFuelUnits,
ResourceType::StateBytesWritten,
ResourceType::BroadcastFanoutCost,
]
}
}
type AttributionMeters = DashMap<AttributionSource, ResourceTotals>;
struct ResourceTotals {
pub map: BTreeMap<ResourceType, RunningAverage>,
last_reported: Instant,
}
impl ResourceTotals {
fn new(at_time: Instant) -> Self {
ResourceTotals {
map: BTreeMap::new(),
last_reported: at_time,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_meter() {
let meter = Meter::new_with_window_size(100);
assert!(
meter
.attributed_usage_rate(
&AttributionSource::Peer(PeerKeyLocation::random()),
&ResourceType::InboundBandwidthBytes,
Instant::now(),
)
.is_none()
);
assert!(meter.attribution_meters.is_empty());
}
fn contract_source(byte: u8) -> AttributionSource {
AttributionSource::Contract(ContractInstanceId::new([byte; 32]))
}
#[test]
fn test_meter_attributed_usage() {
let meter = Meter::new_with_window_size(100);
let attribution = AttributionSource::Peer(PeerKeyLocation::random());
assert!(
meter
.attributed_usage_rate(
&attribution,
&ResourceType::InboundBandwidthBytes,
Instant::now()
)
.is_none()
);
assert!(
meter
.attributed_usage_rate(
&attribution,
&ResourceType::OutboundBandwidthBytes,
Instant::now()
)
.is_none()
);
meter.report(
&attribution,
ResourceType::InboundBandwidthBytes,
100.0,
Instant::now(),
);
assert_eq!(
meter
.attributed_usage_rate(
&attribution,
&ResourceType::InboundBandwidthBytes,
Instant::now()
)
.unwrap()
.per_second(),
100.0
);
}
#[test]
fn test_meter_report() -> anyhow::Result<()> {
let meter = Meter::new_with_window_size(100);
let attribution = AttributionSource::Peer(PeerKeyLocation::random());
meter.report(
&attribution,
ResourceType::InboundBandwidthBytes,
100.0,
Instant::now(),
);
assert_eq!(
meter
.attributed_usage_rate(
&attribution,
&ResourceType::InboundBandwidthBytes,
Instant::now()
)
.unwrap()
.per_second(),
100.0
);
meter.report(
&attribution,
ResourceType::InboundBandwidthBytes,
200.0,
Instant::now(),
);
assert_eq!(
meter
.attributed_usage_rate(
&attribution,
&ResourceType::InboundBandwidthBytes,
Instant::now()
)
.unwrap()
.per_second(),
300.0
);
let other_attribution = AttributionSource::Peer(PeerKeyLocation::random());
meter.report(
&other_attribution,
ResourceType::InboundBandwidthBytes,
150.0,
Instant::now(),
);
assert_eq!(
meter
.attributed_usage_rate(
&other_attribution,
&ResourceType::InboundBandwidthBytes,
Instant::now()
)
.unwrap()
.per_second(),
150.0
);
Ok(())
}
#[test]
fn test_eviction_skipped_below_cap() {
let meter = Meter::new_with_window_size(100);
let now = Instant::now();
for i in 0..8u8 {
meter.report(
&contract_source(i),
ResourceType::StateBytesWritten,
1.0,
now,
);
}
assert_eq!(meter.attribution_meters.len(), 8);
for i in 0..8u8 {
assert!(
meter
.attributed_usage_rate(
&contract_source(i),
&ResourceType::StateBytesWritten,
now
)
.is_some()
);
}
}
#[test]
fn test_ttl_evicts_stale_source_on_insert() {
let meter = Meter::new_with_window_size(100);
let t0 = Instant::now();
let stale = contract_source(1);
meter.report(&stale, ResourceType::StateBytesWritten, 1.0, t0);
assert_eq!(meter.attribution_meters.len(), 1);
let later = t0 + ATTRIBUTION_SOURCE_TTL + Duration::from_secs(1);
let fresh = contract_source(2);
meter.report(&fresh, ResourceType::StateBytesWritten, 1.0, later);
assert!(!meter.attribution_meters.contains_key(&stale));
assert!(meter.attribution_meters.contains_key(&fresh));
assert_eq!(meter.attribution_meters.len(), 1);
}
#[test]
fn test_ttl_refreshed_by_repeated_reports() {
let meter = Meter::new_with_window_size(100);
let t0 = Instant::now();
let kept = contract_source(1);
meter.report(&kept, ResourceType::StateBytesWritten, 1.0, t0);
let refresh = t0 + ATTRIBUTION_SOURCE_TTL - Duration::from_secs(1);
meter.report(&kept, ResourceType::StateBytesWritten, 1.0, refresh);
let later = refresh + Duration::from_secs(2);
meter.report(
&contract_source(2),
ResourceType::StateBytesWritten,
1.0,
later,
);
assert!(meter.attribution_meters.contains_key(&kept));
}
#[test]
fn test_cap_enforced_via_lru_eviction() {
let meter = Meter::new_with_window_size(100);
let base = Instant::now();
for i in 0..MAX_ATTRIBUTION_SOURCES {
let src = AttributionSource::Contract(ContractInstanceId::new(id_bytes(i as u32)));
let at = base + Duration::from_millis(i as u64);
meter.report(&src, ResourceType::StateBytesWritten, 1.0, at);
}
assert_eq!(meter.attribution_meters.len(), MAX_ATTRIBUTION_SOURCES);
let oldest = AttributionSource::Contract(ContractInstanceId::new(id_bytes(0)));
let newcomer = AttributionSource::Contract(ContractInstanceId::new(id_bytes(
MAX_ATTRIBUTION_SOURCES as u32,
)));
let at = base + Duration::from_millis(MAX_ATTRIBUTION_SOURCES as u64);
meter.report(&newcomer, ResourceType::StateBytesWritten, 1.0, at);
assert_eq!(meter.attribution_meters.len(), MAX_ATTRIBUTION_SOURCES);
assert!(!meter.attribution_meters.contains_key(&oldest));
assert!(meter.attribution_meters.contains_key(&newcomer));
}
#[test]
fn test_combined_phase_ttl_prune_avoids_lru() {
let meter = Meter::new_with_window_size(100);
let base = Instant::now();
for i in 0..MAX_ATTRIBUTION_SOURCES {
let src = AttributionSource::Contract(ContractInstanceId::new(id_bytes(i as u32)));
meter.report(&src, ResourceType::StateBytesWritten, 1.0, base);
}
assert_eq!(meter.attribution_meters.len(), MAX_ATTRIBUTION_SOURCES);
let later = base + ATTRIBUTION_SOURCE_TTL + Duration::from_secs(1);
let newcomer = AttributionSource::Contract(ContractInstanceId::new(id_bytes(
MAX_ATTRIBUTION_SOURCES as u32,
)));
meter.report(&newcomer, ResourceType::StateBytesWritten, 1.0, later);
assert_eq!(
meter.attribution_meters.len(),
1,
"TTL prune should have dropped all stale entries before LRU ran"
);
assert!(meter.attribution_meters.contains_key(&newcomer));
}
fn id_bytes(i: u32) -> [u8; 32] {
let mut bytes = [0u8; 32];
bytes[0..4].copy_from_slice(&i.to_le_bytes());
bytes
}
}