near_primitives/
congestion_info.rs

1use std::collections::BTreeMap;
2
3use crate::errors::RuntimeError;
4use borsh::{BorshDeserialize, BorshSerialize};
5use near_parameters::config::CongestionControlConfig;
6use near_primitives_core::types::{Gas, ShardId};
7use near_schema_checker_lib::ProtocolSchema;
8use ordered_float::NotNan;
9
10/// This class combines the congestion control config, congestion info and
11/// missed chunks count. It contains the main congestion control logic and
12/// exposes methods that can be used for congestion control.
13///
14/// Use this struct to make congestion control decisions, by looking at the
15/// congestion info of a previous chunk produced on a remote shard. For building
16/// up a congestion info for the local shard, this struct should not be
17/// necessary. Use `CongestionInfo` directly.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub struct CongestionControl {
20    config: CongestionControlConfig,
21    /// Finalized congestion info of a previous chunk.
22    info: CongestionInfo,
23    /// How many block heights had no chunk since the last successful chunk on
24    /// the respective shard.
25    missed_chunks_count: u64,
26}
27
28impl CongestionControl {
29    pub fn new(
30        config: CongestionControlConfig,
31        info: CongestionInfo,
32        missed_chunks_count: u64,
33    ) -> Self {
34        Self { config, info, missed_chunks_count }
35    }
36
37    pub fn config(&self) -> &CongestionControlConfig {
38        &self.config
39    }
40
41    pub fn congestion_info(&self) -> &CongestionInfo {
42        &self.info
43    }
44
45    pub fn congestion_level(&self) -> f64 {
46        let incoming_congestion = self.incoming_congestion();
47        let outgoing_congestion = self.outgoing_congestion();
48        let memory_congestion = self.memory_congestion();
49        let missed_chunks_congestion = self.missed_chunks_congestion();
50
51        incoming_congestion
52            .max(outgoing_congestion)
53            .max(memory_congestion)
54            .max(missed_chunks_congestion)
55    }
56
57    fn incoming_congestion(&self) -> f64 {
58        self.info.incoming_congestion(&self.config)
59    }
60
61    fn outgoing_congestion(&self) -> f64 {
62        self.info.outgoing_congestion(&self.config)
63    }
64
65    fn memory_congestion(&self) -> f64 {
66        self.info.memory_congestion(&self.config)
67    }
68
69    fn missed_chunks_congestion(&self) -> f64 {
70        if self.missed_chunks_count <= 1 {
71            return 0.0;
72        }
73
74        clamped_f64_fraction(
75            self.missed_chunks_count as u128,
76            self.config.max_congestion_missed_chunks,
77        )
78    }
79
80    /// How much gas another shard can send to us in the next block.
81    pub fn outgoing_gas_limit(&self, sender_shard: ShardId) -> Gas {
82        let congestion = self.congestion_level();
83
84        if Self::is_fully_congested(congestion) {
85            // Red traffic light: reduce to minimum speed
86            if sender_shard == ShardId::from(self.info.allowed_shard()) {
87                self.config.allowed_shard_outgoing_gas
88            } else {
89                Gas::ZERO
90            }
91        } else {
92            mix_gas(self.config.max_outgoing_gas, self.config.min_outgoing_gas, congestion)
93        }
94    }
95
96    pub fn is_fully_congested(congestion_level: f64) -> bool {
97        // note: using float equality is okay here because
98        // `clamped_f64_fraction` clamps to exactly 1.0.
99        debug_assert!(congestion_level <= 1.0);
100        congestion_level == 1.0
101    }
102
103    /// How much data another shard can send to us in the next block.
104    pub fn outgoing_size_limit(&self, sender_shard: ShardId) -> u64 {
105        if sender_shard == ShardId::from(self.info.allowed_shard()) {
106            // The allowed shard is allowed to send more data to us.
107            self.config.outgoing_receipts_big_size_limit
108        } else {
109            // Other shards have a low standard limit.
110            self.config.outgoing_receipts_usual_size_limit
111        }
112    }
113
114    /// How much gas we accept for executing new transactions going to any
115    /// uncongested shards.
116    pub fn process_tx_limit(&self) -> Gas {
117        mix_gas(self.config.max_tx_gas, self.config.min_tx_gas, self.incoming_congestion())
118    }
119
120    /// Whether we can accept new transaction with the receiver set to this shard.
121    ///
122    /// If the shard doesn't accept new transaction, provide the reason for
123    /// extra debugging information.
124    pub fn shard_accepts_transactions(&self) -> ShardAcceptsTransactions {
125        let incoming_congestion = self.incoming_congestion();
126        let outgoing_congestion = self.outgoing_congestion();
127        let memory_congestion = self.memory_congestion();
128        let missed_chunks_congestion = self.missed_chunks_congestion();
129
130        let congestion_level = incoming_congestion
131            .max(outgoing_congestion)
132            .max(memory_congestion)
133            .max(missed_chunks_congestion);
134
135        // Convert to NotNan here, if not possible, the max above is already meaningless.
136        let congestion_level =
137            NotNan::new(congestion_level).unwrap_or_else(|_| NotNan::new(1.0).unwrap());
138        if *congestion_level < self.config.reject_tx_congestion_threshold {
139            return ShardAcceptsTransactions::Yes;
140        }
141
142        let reason = if missed_chunks_congestion >= *congestion_level {
143            RejectTransactionReason::MissedChunks { missed_chunks: self.missed_chunks_count }
144        } else if incoming_congestion >= *congestion_level {
145            RejectTransactionReason::IncomingCongestion { congestion_level }
146        } else if outgoing_congestion >= *congestion_level {
147            RejectTransactionReason::OutgoingCongestion { congestion_level }
148        } else {
149            RejectTransactionReason::MemoryCongestion { congestion_level }
150        };
151        ShardAcceptsTransactions::No(reason)
152    }
153}
154
155/// Result of [`CongestionControl::shard_accepts_transactions`].
156pub enum ShardAcceptsTransactions {
157    Yes,
158    No(RejectTransactionReason),
159}
160
161/// Detailed information for why a shard rejects new transactions.
162pub enum RejectTransactionReason {
163    IncomingCongestion { congestion_level: NotNan<f64> },
164    OutgoingCongestion { congestion_level: NotNan<f64> },
165    MemoryCongestion { congestion_level: NotNan<f64> },
166    MissedChunks { missed_chunks: u64 },
167}
168
169/// Stores the congestion level of a shard.
170///
171/// The CongestionInfo is a part of the ChunkHeader. It is versioned and each
172/// version should not be changed. Rather a new version with the desired changes
173/// should be added and used in place of the old one. When adding new versions
174/// please also update the default.
175#[derive(
176    BorshSerialize,
177    BorshDeserialize,
178    serde::Serialize,
179    serde::Deserialize,
180    Debug,
181    Clone,
182    Copy,
183    PartialEq,
184    Eq,
185    ProtocolSchema,
186)]
187#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
188pub enum CongestionInfo {
189    V1(CongestionInfoV1),
190}
191
192impl Default for CongestionInfo {
193    fn default() -> Self {
194        Self::V1(CongestionInfoV1::default())
195    }
196}
197
198impl CongestionInfo {
199    // A helper method to compare the congestion info from the chunk extra of
200    // the previous chunk and the header of the current chunk. It returns true
201    // if the congestion info was correctly set in the chunk header based on the
202    // information from the chunk extra.
203    //
204    // TODO(congestion_control) validate allowed shard
205    pub fn validate_extra_and_header(extra: &CongestionInfo, header: &CongestionInfo) -> bool {
206        match (extra, header) {
207            (CongestionInfo::V1(extra), CongestionInfo::V1(header)) => {
208                extra.delayed_receipts_gas == header.delayed_receipts_gas
209                    && extra.buffered_receipts_gas == header.buffered_receipts_gas
210                    && extra.receipt_bytes == header.receipt_bytes
211                    && extra.allowed_shard == header.allowed_shard
212            }
213        }
214    }
215
216    pub fn delayed_receipts_gas(&self) -> u128 {
217        match self {
218            CongestionInfo::V1(inner) => inner.delayed_receipts_gas,
219        }
220    }
221
222    pub fn buffered_receipts_gas(&self) -> u128 {
223        match self {
224            CongestionInfo::V1(inner) => inner.buffered_receipts_gas,
225        }
226    }
227
228    pub fn receipt_bytes(&self) -> u64 {
229        match self {
230            CongestionInfo::V1(inner) => inner.receipt_bytes,
231        }
232    }
233
234    pub fn allowed_shard(&self) -> u16 {
235        match self {
236            CongestionInfo::V1(inner) => inner.allowed_shard,
237        }
238    }
239
240    pub fn set_allowed_shard(&mut self, allowed_shard: u16) {
241        match self {
242            CongestionInfo::V1(inner) => inner.allowed_shard = allowed_shard,
243        }
244    }
245
246    pub fn add_receipt_bytes(&mut self, bytes: u64) -> Result<(), RuntimeError> {
247        match self {
248            CongestionInfo::V1(inner) => {
249                inner.receipt_bytes = inner.receipt_bytes.checked_add(bytes).ok_or_else(|| {
250                    RuntimeError::UnexpectedIntegerOverflow("add_receipt_bytes".into())
251                })?;
252            }
253        }
254        Ok(())
255    }
256
257    pub fn remove_receipt_bytes(&mut self, bytes: u64) -> Result<(), RuntimeError> {
258        match self {
259            CongestionInfo::V1(inner) => {
260                inner.receipt_bytes = inner.receipt_bytes.checked_sub(bytes).ok_or_else(|| {
261                    RuntimeError::UnexpectedIntegerOverflow("remove_receipt_bytes".into())
262                })?;
263            }
264        }
265        Ok(())
266    }
267
268    pub fn add_delayed_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> {
269        match self {
270            CongestionInfo::V1(inner) => {
271                inner.delayed_receipts_gas = inner
272                    .delayed_receipts_gas
273                    .checked_add(gas.as_gas().into())
274                    .ok_or_else(|| {
275                        RuntimeError::UnexpectedIntegerOverflow("add_delayed_receipt_gas".into())
276                    })?;
277            }
278        }
279        Ok(())
280    }
281
282    pub fn remove_delayed_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> {
283        match self {
284            CongestionInfo::V1(inner) => {
285                inner.delayed_receipts_gas = inner
286                    .delayed_receipts_gas
287                    .checked_sub(gas.as_gas().into())
288                    .ok_or_else(|| {
289                        RuntimeError::UnexpectedIntegerOverflow("remove_delayed_receipt_gas".into())
290                    })?;
291            }
292        }
293        Ok(())
294    }
295
296    pub fn add_buffered_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> {
297        match self {
298            CongestionInfo::V1(inner) => {
299                inner.buffered_receipts_gas = inner
300                    .buffered_receipts_gas
301                    .checked_add(gas.as_gas().into())
302                    .ok_or_else(|| {
303                        RuntimeError::UnexpectedIntegerOverflow("add_buffered_receipt_gas".into())
304                    })?;
305            }
306        }
307        Ok(())
308    }
309
310    pub fn remove_buffered_receipt_gas(&mut self, gas: u128) -> Result<(), RuntimeError> {
311        match self {
312            CongestionInfo::V1(inner) => {
313                inner.buffered_receipts_gas =
314                    inner.buffered_receipts_gas.checked_sub(gas).ok_or_else(|| {
315                        RuntimeError::UnexpectedIntegerOverflow(
316                            "remove_buffered_receipt_gas".into(),
317                        )
318                    })?;
319            }
320        }
321        Ok(())
322    }
323
324    /// Congestion level ignoring the chain context (missed chunks count).
325    pub fn localized_congestion_level(&self, config: &CongestionControlConfig) -> f64 {
326        let incoming_congestion = self.incoming_congestion(config);
327        let outgoing_congestion = self.outgoing_congestion(config);
328        let memory_congestion = self.memory_congestion(config);
329        incoming_congestion.max(outgoing_congestion).max(memory_congestion)
330    }
331
332    pub fn incoming_congestion(&self, config: &CongestionControlConfig) -> f64 {
333        clamped_f64_fraction(
334            self.delayed_receipts_gas(),
335            config.max_congestion_incoming_gas.as_gas(),
336        )
337    }
338
339    pub fn outgoing_congestion(&self, config: &CongestionControlConfig) -> f64 {
340        clamped_f64_fraction(
341            self.buffered_receipts_gas(),
342            config.max_congestion_outgoing_gas.as_gas(),
343        )
344    }
345
346    pub fn memory_congestion(&self, config: &CongestionControlConfig) -> f64 {
347        clamped_f64_fraction(self.receipt_bytes() as u128, config.max_congestion_memory_consumption)
348    }
349
350    /// Computes and sets the `allowed_shard` field.
351    ///
352    /// If in a fully congested state, decide which shard of the shards is
353    /// allowed to forward gas to `own_shard` this round. In this case, we stop all
354    /// of the shards from sending anything to `own_shard`. But to guarantee
355    /// progress, we allow one shard to send `allowed_shard_outgoing_gas`
356    /// in the next chunk.
357    ///
358    /// It is also used to determine the size limit for outgoing receipts from sender shards.
359    /// Only the allowed shard can send receipts of size `outgoing_receipts_big_size_limit`.
360    /// Other shards can only send receipts of size `outgoing_receipts_usual_size_limit`.
361    pub fn finalize_allowed_shard(
362        &mut self,
363        own_shard: ShardId,
364        all_shards: &[ShardId],
365        congestion_seed: u64,
366    ) {
367        let allowed_shard = Self::get_new_allowed_shard(own_shard, all_shards, congestion_seed);
368        self.set_allowed_shard(allowed_shard.into());
369    }
370
371    fn get_new_allowed_shard(
372        own_shard: ShardId,
373        all_shards: &[ShardId],
374        congestion_seed: u64,
375    ) -> ShardId {
376        if let Some(index) = congestion_seed.checked_rem(all_shards.len() as u64) {
377            // round robin for other shards based on the seed
378            return *all_shards
379                .get(index as usize)
380                .expect("`checked_rem` should have ensured array access is in bound");
381        }
382        // checked_rem failed, hence all_shards.len() is 0
383        // own_shard is the only choice.
384        return own_shard;
385    }
386}
387
388/// The block congestion info contains the congestion info for all shards in the
389/// block extended with the missed chunks count.
390#[derive(Clone, Debug, Default, Eq, PartialEq)]
391pub struct BlockCongestionInfo {
392    /// The per shard congestion info. It's important that the data structure is
393    /// deterministic because the allowed shard id selection depends on the
394    /// order of shard ids in this map. Ideally it should also be sorted by shard id.
395    shards_congestion_info: BTreeMap<ShardId, ExtendedCongestionInfo>,
396}
397
398impl BlockCongestionInfo {
399    pub fn new(shards_congestion_info: BTreeMap<ShardId, ExtendedCongestionInfo>) -> Self {
400        Self { shards_congestion_info }
401    }
402
403    pub fn iter(&self) -> impl Iterator<Item = (&ShardId, &ExtendedCongestionInfo)> {
404        self.shards_congestion_info.iter()
405    }
406
407    pub fn all_shards(&self) -> Vec<ShardId> {
408        self.shards_congestion_info.keys().copied().collect()
409    }
410
411    pub fn get(&self, shard_id: &ShardId) -> Option<&ExtendedCongestionInfo> {
412        self.shards_congestion_info.get(shard_id)
413    }
414
415    pub fn get_mut(&mut self, shard_id: &ShardId) -> Option<&mut ExtendedCongestionInfo> {
416        self.shards_congestion_info.get_mut(shard_id)
417    }
418
419    pub fn insert(
420        &mut self,
421        shard_id: ShardId,
422        value: ExtendedCongestionInfo,
423    ) -> Option<ExtendedCongestionInfo> {
424        self.shards_congestion_info.insert(shard_id, value)
425    }
426
427    pub fn is_empty(&self) -> bool {
428        self.shards_congestion_info.is_empty()
429    }
430}
431
432/// The extended congestion info contains the congestion info and extra
433/// information extracted from the block that is needed for congestion control.
434#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
435pub struct ExtendedCongestionInfo {
436    pub congestion_info: CongestionInfo,
437    pub missed_chunks_count: u64,
438}
439
440impl ExtendedCongestionInfo {
441    pub fn new(congestion_info: CongestionInfo, missed_chunks_count: u64) -> Self {
442        Self { congestion_info, missed_chunks_count }
443    }
444}
445
446/// Stores the congestion level of a shard.
447#[derive(
448    BorshSerialize,
449    BorshDeserialize,
450    serde::Serialize,
451    serde::Deserialize,
452    Default,
453    Debug,
454    Clone,
455    Copy,
456    PartialEq,
457    Eq,
458    ProtocolSchema,
459)]
460#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
461pub struct CongestionInfoV1 {
462    /// Sum of gas in currently delayed receipts.
463    pub delayed_receipts_gas: u128,
464    /// Sum of gas in currently buffered receipts.
465    pub buffered_receipts_gas: u128,
466    /// Size of borsh serialized receipts stored in state because they
467    /// were delayed, buffered, postponed, or yielded.
468    pub receipt_bytes: u64,
469    /// If fully congested, only this shard can forward receipts.
470    pub allowed_shard: u16,
471}
472
473/// Returns `value / max` clamped to te range [0,1].
474#[inline]
475fn clamped_f64_fraction(value: u128, max: u64) -> f64 {
476    assert!(max > 0);
477    if max as u128 <= value { 1.0 } else { value as f64 / max as f64 }
478}
479
480/// linearly interpolate between two values
481///
482/// This method treats u16 as a fraction of u16::MAX.
483/// This makes multiplication of numbers on the upper end of `u128` better behaved
484/// than using f64 which lacks precision for such high numbers and might have platform incompatibilities.
485fn mix(left: u64, right: u64, ratio: f64) -> u64 {
486    debug_assert!(ratio >= 0.0);
487    debug_assert!(ratio <= 1.0);
488
489    // Note on precision: f64 is only precise to 53 binary digits. That is
490    // enough to represent ~9 PGAS without error. Precision above that is
491    // rounded according to the IEEE 754-2008 standard which Rust's f64
492    // implements.
493    // For example, a value of 100 Pgas is rounded to steps of 8 gas.
494    let left_part = left as f64 * (1.0 - ratio);
495    let right_part = right as f64 * ratio;
496    // Accumulated error is doubled again, up to 16 gas for 100 Pgas.
497    let total = left_part + right_part;
498
499    // Conversion is save because left and right were both u64 and the result is
500    // between the two. Even with precision errors, we cannot breach the
501    // boundaries.
502    return total.round() as u64;
503}
504
505fn mix_gas(left: Gas, right: Gas, ratio: f64) -> Gas {
506    Gas::from_gas(mix(left.as_gas(), right.as_gas(), ratio))
507}
508
509impl ShardAcceptsTransactions {
510    pub fn is_yes(&self) -> bool {
511        matches!(self, ShardAcceptsTransactions::Yes)
512    }
513
514    pub fn is_no(&self) -> bool {
515        !self.is_yes()
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use itertools::Itertools;
522    use near_parameters::RuntimeConfigStore;
523    use near_primitives_core::version::PROTOCOL_VERSION;
524
525    use super::*;
526
527    fn get_config() -> CongestionControlConfig {
528        // Fix the initial configuration of congestion control for the tests.
529        let runtime_config_store = RuntimeConfigStore::new(None);
530        let runtime_config = runtime_config_store.get_config(PROTOCOL_VERSION);
531        runtime_config.congestion_control_config
532    }
533
534    #[test]
535    fn test_mix() {
536        assert_eq!(500, mix(0, 1000, 0.5));
537        assert_eq!(0, mix(0, 0, 0.3));
538        assert_eq!(1000, mix(1000, 1000, 0.1));
539        assert_eq!(60, mix(50, 80, 0.33));
540    }
541
542    #[test]
543    fn test_mix_edge_cases() {
544        // at `u64::MAX` we should see no precision errors
545        assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.33));
546        assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.63));
547        assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.99));
548
549        // precision errors must be consistent
550        assert_eq!(u64::MAX, mix(u64::MAX - 1, u64::MAX, 0.25));
551        assert_eq!(u64::MAX, mix(u64::MAX - 255, u64::MAX, 0.25));
552        assert_eq!(u64::MAX, mix(u64::MAX - 1023, u64::MAX, 0.25));
553
554        assert_eq!(u64::MAX - 2047, mix(u64::MAX - 1024, u64::MAX, 0.25));
555        assert_eq!(u64::MAX - 2047, mix(u64::MAX - 1500, u64::MAX, 0.25));
556        assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2047, u64::MAX, 0.25));
557        assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2048, u64::MAX, 0.25));
558        assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2049, u64::MAX, 0.25));
559        assert_eq!(u64::MAX - 2047, mix(u64::MAX - 3000, u64::MAX, 0.25));
560
561        assert_eq!(u64::MAX - 4095, mix(u64::MAX - 4000, u64::MAX, 0.25));
562    }
563
564    #[test]
565    fn test_clamped_f64_fraction() {
566        assert_eq!(0.0, clamped_f64_fraction(0, 10));
567        assert_eq!(0.5, clamped_f64_fraction(5, 10));
568        assert_eq!(1.0, clamped_f64_fraction(10, 10));
569
570        assert_eq!(0.0, clamped_f64_fraction(0, 1));
571        assert_eq!(0.0, clamped_f64_fraction(0, u64::MAX));
572
573        assert_eq!(0.5, clamped_f64_fraction(1, 2));
574        assert_eq!(0.5, clamped_f64_fraction(100, 200));
575        assert_eq!(0.5, clamped_f64_fraction(u64::MAX as u128 / 2, u64::MAX));
576
577        // test clamp
578        assert_eq!(1.0, clamped_f64_fraction(11, 10));
579        assert_eq!(1.0, clamped_f64_fraction(u128::MAX, 10));
580        assert_eq!(1.0, clamped_f64_fraction(u128::MAX, u64::MAX));
581    }
582
583    /// Default congestion info should be no congestion => maximally permissive.
584    #[test]
585    fn test_default_congestion() {
586        let config = get_config();
587        let info = CongestionInfo::default();
588        let congestion_control = CongestionControl::new(config, info, 0);
589
590        assert_eq!(0.0, info.memory_congestion(&config));
591        assert_eq!(0.0, info.incoming_congestion(&config));
592        assert_eq!(0.0, info.outgoing_congestion(&config));
593        assert_eq!(0.0, info.localized_congestion_level(&config));
594
595        assert_eq!(0.0, congestion_control.memory_congestion());
596        assert_eq!(0.0, congestion_control.incoming_congestion());
597        assert_eq!(0.0, congestion_control.outgoing_congestion());
598        assert_eq!(0.0, congestion_control.congestion_level());
599
600        let acceptable_diff = Gas::from_gas(1);
601
602        let diff =
603            if config.max_outgoing_gas > congestion_control.outgoing_gas_limit(ShardId::new(0)) {
604                config
605                    .max_outgoing_gas
606                    .saturating_sub(congestion_control.outgoing_gas_limit(ShardId::new(0)))
607            } else {
608                congestion_control
609                    .outgoing_gas_limit(ShardId::new(0))
610                    .saturating_sub(config.max_outgoing_gas)
611            };
612        assert!(diff <= acceptable_diff);
613
614        let diff = if config.max_tx_gas > congestion_control.process_tx_limit() {
615            config.max_tx_gas.saturating_sub(congestion_control.process_tx_limit())
616        } else {
617            congestion_control.process_tx_limit().saturating_sub(config.max_tx_gas)
618        };
619        assert!(diff <= acceptable_diff);
620        assert!(congestion_control.shard_accepts_transactions().is_yes());
621    }
622
623    #[test]
624    fn test_memory_congestion() {
625        let config = get_config();
626        let mut info = CongestionInfo::default();
627
628        info.add_receipt_bytes(config.max_congestion_memory_consumption).unwrap();
629        info.add_receipt_bytes(500).unwrap();
630        info.remove_receipt_bytes(500).unwrap();
631
632        {
633            let control = CongestionControl::new(config, info, 0);
634            assert_eq!(1.0, control.congestion_level());
635            // fully congested, no more forwarding allowed
636            assert_eq!(Gas::ZERO, control.outgoing_gas_limit(ShardId::new(1)));
637            assert!(control.shard_accepts_transactions().is_no());
638            // processing to other shards is not restricted by memory congestion
639            assert_eq!(config.max_tx_gas, control.process_tx_limit());
640        }
641
642        // Assert threshold is 80%. Change this number if the config changes
643        assert_eq!(0.8, config.reject_tx_congestion_threshold);
644
645        // reduce congestion to 80%
646        info.remove_receipt_bytes(config.max_congestion_memory_consumption / 5).unwrap();
647        {
648            let control = CongestionControl::new(config, info, 0);
649            assert_eq!(0.8, control.congestion_level());
650            assert_eq!(
651                mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.8),
652                control.outgoing_gas_limit(ShardId::new(1))
653            );
654            // at 80%, still no new transactions are allowed
655            assert!(control.shard_accepts_transactions().is_no());
656        }
657
658        // reduce congestion to 10%
659        info.remove_receipt_bytes(7 * config.max_congestion_memory_consumption / 10).unwrap();
660        {
661            let control = CongestionControl::new(config, info, 0);
662            assert_eq!(0.1, control.congestion_level());
663            assert_eq!(
664                mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.1),
665                control.outgoing_gas_limit(ShardId::new(1))
666            );
667            // at 12.5%, new transactions are allowed (threshold is 0.25)
668            assert!(control.shard_accepts_transactions().is_yes());
669        }
670    }
671
672    #[test]
673    fn test_incoming_congestion() {
674        let config = get_config();
675        let mut info = CongestionInfo::default();
676
677        info.add_delayed_receipt_gas(config.max_congestion_incoming_gas).unwrap();
678        info.add_delayed_receipt_gas(Gas::from_gas(500)).unwrap();
679        info.remove_delayed_receipt_gas(Gas::from_gas(500)).unwrap();
680
681        {
682            let control = CongestionControl::new(config, info, 0);
683            assert_eq!(1.0, control.congestion_level());
684            // fully congested, no more forwarding allowed
685            assert_eq!(Gas::ZERO, control.outgoing_gas_limit(ShardId::new(1)));
686            assert!(control.shard_accepts_transactions().is_no());
687            // processing to other shards is restricted by own incoming congestion
688            assert_eq!(config.min_tx_gas, control.process_tx_limit());
689        }
690
691        // Assert threshold is 80%. Change this number if the config changes
692        assert_eq!(0.8, config.reject_tx_congestion_threshold);
693
694        // reduce congestion to 80%
695        info.remove_delayed_receipt_gas(config.max_congestion_incoming_gas.checked_div(5).unwrap())
696            .unwrap();
697        {
698            let control = CongestionControl::new(config, info, 0);
699            assert_eq!(0.8, control.congestion_level());
700            assert_eq!(
701                mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.8),
702                control.outgoing_gas_limit(ShardId::new(1))
703            );
704            // at 80%, still no new transactions are allowed
705            assert!(control.shard_accepts_transactions().is_no());
706        }
707
708        // reduce congestion to 10%
709        info.remove_delayed_receipt_gas(
710            config.max_congestion_incoming_gas.checked_mul(7).unwrap().checked_div(10).unwrap(),
711        )
712        .unwrap();
713        {
714            let control = CongestionControl::new(config, info, 0);
715            assert_eq!(0.1, control.congestion_level());
716            assert_eq!(
717                mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.1),
718                control.outgoing_gas_limit(ShardId::new(1))
719            );
720            // at 10%, new transactions are allowed (threshold is 80%)
721            assert!(control.shard_accepts_transactions().is_yes());
722        }
723    }
724
725    #[test]
726    fn test_outgoing_congestion() {
727        let config = get_config();
728        let mut info = CongestionInfo::default();
729
730        info.add_buffered_receipt_gas(config.max_congestion_outgoing_gas).unwrap();
731        info.add_buffered_receipt_gas(Gas::from_gas(500)).unwrap();
732        info.remove_buffered_receipt_gas(500).unwrap();
733
734        let control = CongestionControl::new(config, info, 0);
735        assert_eq!(1.0, control.congestion_level());
736        // fully congested, no more forwarding allowed
737        assert_eq!(Gas::ZERO, control.outgoing_gas_limit(ShardId::new(1)));
738        assert!(control.shard_accepts_transactions().is_no());
739        // processing to other shards is not restricted by own outgoing congestion
740        assert_eq!(config.max_tx_gas, control.process_tx_limit());
741
742        // Assert threshold is 80%. Change this number if the config changes
743        assert_eq!(0.8, config.reject_tx_congestion_threshold);
744
745        // reduce congestion to 80%
746        let gas_diff = config.max_congestion_outgoing_gas.checked_div(5).unwrap();
747        info.remove_buffered_receipt_gas(gas_diff.as_gas().into()).unwrap();
748        let control = CongestionControl::new(config, info, 0);
749        assert_eq!(0.8, control.congestion_level());
750        assert_eq!(
751            mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.8),
752            control.outgoing_gas_limit(ShardId::new(1))
753        );
754        // at 80%, still no new transactions to us are allowed
755        assert!(control.shard_accepts_transactions().is_no());
756
757        // reduce congestion to 10%
758        let gas_diff =
759            config.max_congestion_outgoing_gas.checked_mul(7).unwrap().checked_div(10).unwrap();
760        info.remove_buffered_receipt_gas(gas_diff.as_gas().into()).unwrap();
761        let control = CongestionControl::new(config, info, 0);
762        assert_eq!(0.1, control.congestion_level());
763        assert_eq!(
764            mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.1),
765            control.outgoing_gas_limit(ShardId::new(1))
766        );
767        // at 10%, new transactions are allowed
768        assert!(control.shard_accepts_transactions().is_yes());
769    }
770
771    #[test]
772    fn test_missed_chunks_congestion() {
773        // The default config is quite restricting, allow more missed chunks for
774        // this test to check the middle cases.
775        let mut config = get_config();
776        config.max_congestion_missed_chunks = 10;
777
778        let info = CongestionInfo::default();
779
780        // Test missed chunks congestion without any other congestion
781        let make = |count| CongestionControl::new(config, info, count);
782
783        assert_eq!(make(0).congestion_level(), 0.0);
784        assert_eq!(make(1).congestion_level(), 0.0);
785        assert_eq!(make(2).congestion_level(), 0.2);
786        assert_eq!(make(3).congestion_level(), 0.3);
787        assert_eq!(make(10).congestion_level(), 1.0);
788        assert_eq!(make(20).congestion_level(), 1.0);
789
790        // Test missed chunks congestion with outgoing congestion
791        let mut info = CongestionInfo::default();
792        info.add_buffered_receipt_gas(config.max_congestion_outgoing_gas.checked_div(2).unwrap())
793            .unwrap();
794        let make = |count| CongestionControl::new(config, info, count);
795
796        // include missing chunks congestion
797        assert_eq!(make(0).congestion_level(), 0.5);
798        assert_eq!(make(1).congestion_level(), 0.5);
799        assert_eq!(make(2).congestion_level(), 0.5);
800        assert_eq!(make(5).congestion_level(), 0.5);
801        assert_eq!(make(6).congestion_level(), 0.6);
802        assert_eq!(make(10).congestion_level(), 1.0);
803        assert_eq!(make(20).congestion_level(), 1.0);
804
805        // exclude missing chunks congestion
806        assert_eq!(make(0).info.localized_congestion_level(&config), 0.5);
807        assert_eq!(make(1).info.localized_congestion_level(&config), 0.5);
808        assert_eq!(make(2).info.localized_congestion_level(&config), 0.5);
809        assert_eq!(make(5).info.localized_congestion_level(&config), 0.5);
810        assert_eq!(make(6).info.localized_congestion_level(&config), 0.5);
811        assert_eq!(make(10).info.localized_congestion_level(&config), 0.5);
812        assert_eq!(make(20).info.localized_congestion_level(&config), 0.5);
813    }
814
815    #[test]
816    fn test_missed_chunks_finalize() {
817        // The default config is quite restricting, allow more missed chunks for
818        // this test to check the middle cases.
819        let mut config = get_config();
820        config.max_congestion_missed_chunks = 10;
821
822        // Setup half congested congestion info.
823        let mut info = CongestionInfo::default();
824        info.add_buffered_receipt_gas(config.max_congestion_outgoing_gas.checked_div(2).unwrap())
825            .unwrap();
826
827        let shard = ShardId::new(2);
828        let all_shards = [0, 1, 2, 3, 4].into_iter().map(ShardId::new).collect_vec();
829
830        // Test without missed chunks congestion.
831
832        let missed_chunks_count = 0;
833        let mut control = CongestionControl::new(config, info, missed_chunks_count);
834        control.info.finalize_allowed_shard(shard, &all_shards, 3);
835
836        let expected_outgoing_limit = 0.5 * config.min_outgoing_gas.as_gas() as f64
837            + 0.5 * config.max_outgoing_gas.as_gas() as f64;
838        for &shard in &all_shards {
839            assert_eq!(
840                control.outgoing_gas_limit(shard),
841                Gas::from_gas(expected_outgoing_limit as u64)
842            );
843        }
844
845        // Test with some missed chunks congestion.
846
847        let missed_chunks_count = 8;
848        let mut control = CongestionControl::new(config, info, missed_chunks_count);
849        control.info.finalize_allowed_shard(shard, &all_shards, 3);
850
851        let expected_outgoing_limit =
852            mix(config.max_outgoing_gas.as_gas(), config.min_outgoing_gas.as_gas(), 0.8) as f64;
853        for &shard in &all_shards {
854            assert_eq!(
855                control.outgoing_gas_limit(shard),
856                Gas::from_gas(expected_outgoing_limit as u64)
857            );
858        }
859
860        // Test with full missed chunks congestion.
861
862        let missed_chunks_count = config.max_congestion_missed_chunks;
863        let mut control = CongestionControl::new(config, info, missed_chunks_count);
864        control.info.finalize_allowed_shard(shard, &all_shards, 3);
865
866        // Full congestion - only the allowed shard should be able to send something.
867        for shard in all_shards {
868            if shard == ShardId::from(control.info.allowed_shard()) {
869                assert_eq!(control.outgoing_gas_limit(shard), config.allowed_shard_outgoing_gas);
870            } else {
871                assert_eq!(control.outgoing_gas_limit(shard), Gas::ZERO);
872            }
873        }
874    }
875}