1use std::collections::BTreeMap;
2
3use crate::errors::RuntimeError;
4use borsh::{BorshDeserialize, BorshSerialize};
5use near_parameters::config::CongestionControlConfig;
6use near_primitives_core::types::{Gas, ShardId};
7use near_schema_checker_lib::ProtocolSchema;
8use ordered_float::NotNan;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub struct CongestionControl {
20 config: CongestionControlConfig,
21 info: CongestionInfo,
23 missed_chunks_count: u64,
26}
27
28impl CongestionControl {
29 pub fn new(
30 config: CongestionControlConfig,
31 info: CongestionInfo,
32 missed_chunks_count: u64,
33 ) -> Self {
34 Self { config, info, missed_chunks_count }
35 }
36
37 pub fn config(&self) -> &CongestionControlConfig {
38 &self.config
39 }
40
41 pub fn congestion_info(&self) -> &CongestionInfo {
42 &self.info
43 }
44
45 pub fn congestion_level(&self) -> f64 {
46 let incoming_congestion = self.incoming_congestion();
47 let outgoing_congestion = self.outgoing_congestion();
48 let memory_congestion = self.memory_congestion();
49 let missed_chunks_congestion = self.missed_chunks_congestion();
50
51 incoming_congestion
52 .max(outgoing_congestion)
53 .max(memory_congestion)
54 .max(missed_chunks_congestion)
55 }
56
57 fn incoming_congestion(&self) -> f64 {
58 self.info.incoming_congestion(&self.config)
59 }
60
61 fn outgoing_congestion(&self) -> f64 {
62 self.info.outgoing_congestion(&self.config)
63 }
64
65 fn memory_congestion(&self) -> f64 {
66 self.info.memory_congestion(&self.config)
67 }
68
69 fn missed_chunks_congestion(&self) -> f64 {
70 if self.missed_chunks_count <= 1 {
71 return 0.0;
72 }
73
74 clamped_f64_fraction(
75 self.missed_chunks_count as u128,
76 self.config.max_congestion_missed_chunks,
77 )
78 }
79
80 pub fn outgoing_gas_limit(&self, sender_shard: ShardId) -> Gas {
82 let congestion = self.congestion_level();
83
84 if Self::is_fully_congested(congestion) {
85 if sender_shard == ShardId::from(self.info.allowed_shard()) {
87 self.config.allowed_shard_outgoing_gas
88 } else {
89 Gas::ZERO
90 }
91 } else {
92 mix_gas(self.config.max_outgoing_gas, self.config.min_outgoing_gas, congestion)
93 }
94 }
95
96 pub fn is_fully_congested(congestion_level: f64) -> bool {
97 debug_assert!(congestion_level <= 1.0);
100 congestion_level == 1.0
101 }
102
103 pub fn outgoing_size_limit(&self, sender_shard: ShardId) -> u64 {
105 if sender_shard == ShardId::from(self.info.allowed_shard()) {
106 self.config.outgoing_receipts_big_size_limit
108 } else {
109 self.config.outgoing_receipts_usual_size_limit
111 }
112 }
113
114 pub fn process_tx_limit(&self) -> Gas {
117 mix_gas(self.config.max_tx_gas, self.config.min_tx_gas, self.incoming_congestion())
118 }
119
120 pub fn shard_accepts_transactions(&self) -> ShardAcceptsTransactions {
125 let incoming_congestion = self.incoming_congestion();
126 let outgoing_congestion = self.outgoing_congestion();
127 let memory_congestion = self.memory_congestion();
128 let missed_chunks_congestion = self.missed_chunks_congestion();
129
130 let congestion_level = incoming_congestion
131 .max(outgoing_congestion)
132 .max(memory_congestion)
133 .max(missed_chunks_congestion);
134
135 let congestion_level =
137 NotNan::new(congestion_level).unwrap_or_else(|_| NotNan::new(1.0).unwrap());
138 if *congestion_level < self.config.reject_tx_congestion_threshold {
139 return ShardAcceptsTransactions::Yes;
140 }
141
142 let reason = if missed_chunks_congestion >= *congestion_level {
143 RejectTransactionReason::MissedChunks { missed_chunks: self.missed_chunks_count }
144 } else if incoming_congestion >= *congestion_level {
145 RejectTransactionReason::IncomingCongestion { congestion_level }
146 } else if outgoing_congestion >= *congestion_level {
147 RejectTransactionReason::OutgoingCongestion { congestion_level }
148 } else {
149 RejectTransactionReason::MemoryCongestion { congestion_level }
150 };
151 ShardAcceptsTransactions::No(reason)
152 }
153}
154
155pub enum ShardAcceptsTransactions {
157 Yes,
158 No(RejectTransactionReason),
159}
160
161pub enum RejectTransactionReason {
163 IncomingCongestion { congestion_level: NotNan<f64> },
164 OutgoingCongestion { congestion_level: NotNan<f64> },
165 MemoryCongestion { congestion_level: NotNan<f64> },
166 MissedChunks { missed_chunks: u64 },
167}
168
169#[derive(
176 BorshSerialize,
177 BorshDeserialize,
178 serde::Serialize,
179 serde::Deserialize,
180 Debug,
181 Clone,
182 Copy,
183 PartialEq,
184 Eq,
185 ProtocolSchema,
186)]
187#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
188pub enum CongestionInfo {
189 V1(CongestionInfoV1),
190}
191
192impl Default for CongestionInfo {
193 fn default() -> Self {
194 Self::V1(CongestionInfoV1::default())
195 }
196}
197
198impl CongestionInfo {
199 pub fn validate_extra_and_header(extra: &CongestionInfo, header: &CongestionInfo) -> bool {
206 match (extra, header) {
207 (CongestionInfo::V1(extra), CongestionInfo::V1(header)) => {
208 extra.delayed_receipts_gas == header.delayed_receipts_gas
209 && extra.buffered_receipts_gas == header.buffered_receipts_gas
210 && extra.receipt_bytes == header.receipt_bytes
211 && extra.allowed_shard == header.allowed_shard
212 }
213 }
214 }
215
216 pub fn delayed_receipts_gas(&self) -> u128 {
217 match self {
218 CongestionInfo::V1(inner) => inner.delayed_receipts_gas,
219 }
220 }
221
222 pub fn buffered_receipts_gas(&self) -> u128 {
223 match self {
224 CongestionInfo::V1(inner) => inner.buffered_receipts_gas,
225 }
226 }
227
228 pub fn receipt_bytes(&self) -> u64 {
229 match self {
230 CongestionInfo::V1(inner) => inner.receipt_bytes,
231 }
232 }
233
234 pub fn allowed_shard(&self) -> u16 {
235 match self {
236 CongestionInfo::V1(inner) => inner.allowed_shard,
237 }
238 }
239
240 pub fn set_allowed_shard(&mut self, allowed_shard: u16) {
241 match self {
242 CongestionInfo::V1(inner) => inner.allowed_shard = allowed_shard,
243 }
244 }
245
246 pub fn add_receipt_bytes(&mut self, bytes: u64) -> Result<(), RuntimeError> {
247 match self {
248 CongestionInfo::V1(inner) => {
249 inner.receipt_bytes = inner.receipt_bytes.checked_add(bytes).ok_or_else(|| {
250 RuntimeError::UnexpectedIntegerOverflow("add_receipt_bytes".into())
251 })?;
252 }
253 }
254 Ok(())
255 }
256
257 pub fn remove_receipt_bytes(&mut self, bytes: u64) -> Result<(), RuntimeError> {
258 match self {
259 CongestionInfo::V1(inner) => {
260 inner.receipt_bytes = inner.receipt_bytes.checked_sub(bytes).ok_or_else(|| {
261 RuntimeError::UnexpectedIntegerOverflow("remove_receipt_bytes".into())
262 })?;
263 }
264 }
265 Ok(())
266 }
267
268 pub fn add_delayed_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> {
269 match self {
270 CongestionInfo::V1(inner) => {
271 inner.delayed_receipts_gas = inner
272 .delayed_receipts_gas
273 .checked_add(gas.as_gas().into())
274 .ok_or_else(|| {
275 RuntimeError::UnexpectedIntegerOverflow("add_delayed_receipt_gas".into())
276 })?;
277 }
278 }
279 Ok(())
280 }
281
282 pub fn remove_delayed_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> {
283 match self {
284 CongestionInfo::V1(inner) => {
285 inner.delayed_receipts_gas = inner
286 .delayed_receipts_gas
287 .checked_sub(gas.as_gas().into())
288 .ok_or_else(|| {
289 RuntimeError::UnexpectedIntegerOverflow("remove_delayed_receipt_gas".into())
290 })?;
291 }
292 }
293 Ok(())
294 }
295
296 pub fn add_buffered_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> {
297 match self {
298 CongestionInfo::V1(inner) => {
299 inner.buffered_receipts_gas = inner
300 .buffered_receipts_gas
301 .checked_add(gas.as_gas().into())
302 .ok_or_else(|| {
303 RuntimeError::UnexpectedIntegerOverflow("add_buffered_receipt_gas".into())
304 })?;
305 }
306 }
307 Ok(())
308 }
309
310 pub fn remove_buffered_receipt_gas(&mut self, gas: u128) -> Result<(), RuntimeError> {
311 match self {
312 CongestionInfo::V1(inner) => {
313 inner.buffered_receipts_gas =
314 inner.buffered_receipts_gas.checked_sub(gas).ok_or_else(|| {
315 RuntimeError::UnexpectedIntegerOverflow(
316 "remove_buffered_receipt_gas".into(),
317 )
318 })?;
319 }
320 }
321 Ok(())
322 }
323
324 pub fn localized_congestion_level(&self, config: &CongestionControlConfig) -> f64 {
326 let incoming_congestion = self.incoming_congestion(config);
327 let outgoing_congestion = self.outgoing_congestion(config);
328 let memory_congestion = self.memory_congestion(config);
329 incoming_congestion.max(outgoing_congestion).max(memory_congestion)
330 }
331
332 pub fn incoming_congestion(&self, config: &CongestionControlConfig) -> f64 {
333 clamped_f64_fraction(
334 self.delayed_receipts_gas(),
335 config.max_congestion_incoming_gas.as_gas(),
336 )
337 }
338
339 pub fn outgoing_congestion(&self, config: &CongestionControlConfig) -> f64 {
340 clamped_f64_fraction(
341 self.buffered_receipts_gas(),
342 config.max_congestion_outgoing_gas.as_gas(),
343 )
344 }
345
346 pub fn memory_congestion(&self, config: &CongestionControlConfig) -> f64 {
347 clamped_f64_fraction(self.receipt_bytes() as u128, config.max_congestion_memory_consumption)
348 }
349
350 pub fn finalize_allowed_shard(
362 &mut self,
363 own_shard: ShardId,
364 all_shards: &[ShardId],
365 congestion_seed: u64,
366 ) {
367 let allowed_shard = Self::get_new_allowed_shard(own_shard, all_shards, congestion_seed);
368 self.set_allowed_shard(allowed_shard.into());
369 }
370
371 fn get_new_allowed_shard(
372 own_shard: ShardId,
373 all_shards: &[ShardId],
374 congestion_seed: u64,
375 ) -> ShardId {
376 if let Some(index) = congestion_seed.checked_rem(all_shards.len() as u64) {
377 return *all_shards
379 .get(index as usize)
380 .expect("`checked_rem` should have ensured array access is in bound");
381 }
382 return own_shard;
385 }
386}
387
388#[derive(Clone, Debug, Default, Eq, PartialEq)]
391pub struct BlockCongestionInfo {
392 shards_congestion_info: BTreeMap<ShardId, ExtendedCongestionInfo>,
396}
397
398impl BlockCongestionInfo {
399 pub fn new(shards_congestion_info: BTreeMap<ShardId, ExtendedCongestionInfo>) -> Self {
400 Self { shards_congestion_info }
401 }
402
403 pub fn iter(&self) -> impl Iterator<Item = (&ShardId, &ExtendedCongestionInfo)> {
404 self.shards_congestion_info.iter()
405 }
406
407 pub fn all_shards(&self) -> Vec<ShardId> {
408 self.shards_congestion_info.keys().copied().collect()
409 }
410
411 pub fn get(&self, shard_id: &ShardId) -> Option<&ExtendedCongestionInfo> {
412 self.shards_congestion_info.get(shard_id)
413 }
414
415 pub fn get_mut(&mut self, shard_id: &ShardId) -> Option<&mut ExtendedCongestionInfo> {
416 self.shards_congestion_info.get_mut(shard_id)
417 }
418
419 pub fn insert(
420 &mut self,
421 shard_id: ShardId,
422 value: ExtendedCongestionInfo,
423 ) -> Option<ExtendedCongestionInfo> {
424 self.shards_congestion_info.insert(shard_id, value)
425 }
426
427 pub fn is_empty(&self) -> bool {
428 self.shards_congestion_info.is_empty()
429 }
430}
431
432#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
435pub struct ExtendedCongestionInfo {
436 pub congestion_info: CongestionInfo,
437 pub missed_chunks_count: u64,
438}
439
440impl ExtendedCongestionInfo {
441 pub fn new(congestion_info: CongestionInfo, missed_chunks_count: u64) -> Self {
442 Self { congestion_info, missed_chunks_count }
443 }
444}
445
446#[derive(
448 BorshSerialize,
449 BorshDeserialize,
450 serde::Serialize,
451 serde::Deserialize,
452 Default,
453 Debug,
454 Clone,
455 Copy,
456 PartialEq,
457 Eq,
458 ProtocolSchema,
459)]
460#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
461pub struct CongestionInfoV1 {
462 pub delayed_receipts_gas: u128,
464 pub buffered_receipts_gas: u128,
466 pub receipt_bytes: u64,
469 pub allowed_shard: u16,
471}
472
473#[inline]
475fn clamped_f64_fraction(value: u128, max: u64) -> f64 {
476 assert!(max > 0);
477 if max as u128 <= value { 1.0 } else { value as f64 / max as f64 }
478}
479
480fn mix(left: u64, right: u64, ratio: f64) -> u64 {
486 debug_assert!(ratio >= 0.0);
487 debug_assert!(ratio <= 1.0);
488
489 let left_part = left as f64 * (1.0 - ratio);
495 let right_part = right as f64 * ratio;
496 let total = left_part + right_part;
498
499 return total.round() as u64;
503}
504
505fn mix_gas(left: Gas, right: Gas, ratio: f64) -> Gas {
506 Gas::from_gas(mix(left.as_gas(), right.as_gas(), ratio))
507}
508
509impl ShardAcceptsTransactions {
510 pub fn is_yes(&self) -> bool {
511 matches!(self, ShardAcceptsTransactions::Yes)
512 }
513
514 pub fn is_no(&self) -> bool {
515 !self.is_yes()
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use itertools::Itertools;
522 use near_parameters::RuntimeConfigStore;
523 use near_primitives_core::version::PROTOCOL_VERSION;
524
525 use super::*;
526
527 fn get_config() -> CongestionControlConfig {
528 let runtime_config_store = RuntimeConfigStore::new(None);
530 let runtime_config = runtime_config_store.get_config(PROTOCOL_VERSION);
531 runtime_config.congestion_control_config
532 }
533
534 #[test]
535 fn test_mix() {
536 assert_eq!(500, mix(0, 1000, 0.5));
537 assert_eq!(0, mix(0, 0, 0.3));
538 assert_eq!(1000, mix(1000, 1000, 0.1));
539 assert_eq!(60, mix(50, 80, 0.33));
540 }
541
542 #[test]
543 fn test_mix_edge_cases() {
544 assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.33));
546 assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.63));
547 assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.99));
548
549 assert_eq!(u64::MAX, mix(u64::MAX - 1, u64::MAX, 0.25));
551 assert_eq!(u64::MAX, mix(u64::MAX - 255, u64::MAX, 0.25));
552 assert_eq!(u64::MAX, mix(u64::MAX - 1023, u64::MAX, 0.25));
553
554 assert_eq!(u64::MAX - 2047, mix(u64::MAX - 1024, u64::MAX, 0.25));
555 assert_eq!(u64::MAX - 2047, mix(u64::MAX - 1500, u64::MAX, 0.25));
556 assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2047, u64::MAX, 0.25));
557 assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2048, u64::MAX, 0.25));
558 assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2049, u64::MAX, 0.25));
559 assert_eq!(u64::MAX - 2047, mix(u64::MAX - 3000, u64::MAX, 0.25));
560
561 assert_eq!(u64::MAX - 4095, mix(u64::MAX - 4000, u64::MAX, 0.25));
562 }
563
564 #[test]
565 fn test_clamped_f64_fraction() {
566 assert_eq!(0.0, clamped_f64_fraction(0, 10));
567 assert_eq!(0.5, clamped_f64_fraction(5, 10));
568 assert_eq!(1.0, clamped_f64_fraction(10, 10));
569
570 assert_eq!(0.0, clamped_f64_fraction(0, 1));
571 assert_eq!(0.0, clamped_f64_fraction(0, u64::MAX));
572
573 assert_eq!(0.5, clamped_f64_fraction(1, 2));
574 assert_eq!(0.5, clamped_f64_fraction(100, 200));
575 assert_eq!(0.5, clamped_f64_fraction(u64::MAX as u128 / 2, u64::MAX));
576
577 assert_eq!(1.0, clamped_f64_fraction(11, 10));
579 assert_eq!(1.0, clamped_f64_fraction(u128::MAX, 10));
580 assert_eq!(1.0, clamped_f64_fraction(u128::MAX, u64::MAX));
581 }
582
583 #[test]
585 fn test_default_congestion() {
586 let config = get_config();
587 let info = CongestionInfo::default();
588 let congestion_control = CongestionControl::new(config, info, 0);
589
590 assert_eq!(0.0, info.memory_congestion(&config));
591 assert_eq!(0.0, info.incoming_congestion(&config));
592 assert_eq!(0.0, info.outgoing_congestion(&config));
593 assert_eq!(0.0, info.localized_congestion_level(&config));
594
595 assert_eq!(0.0, congestion_control.memory_congestion());
596 assert_eq!(0.0, congestion_control.incoming_congestion());
597 assert_eq!(0.0, congestion_control.outgoing_congestion());
598 assert_eq!(0.0, congestion_control.congestion_level());
599
600 let acceptable_diff = Gas::from_gas(1);
601
602 let diff =
603 if config.max_outgoing_gas > congestion_control.outgoing_gas_limit(ShardId::new(0)) {
604 config
605 .max_outgoing_gas
606 .saturating_sub(congestion_control.outgoing_gas_limit(ShardId::new(0)))
607 } else {
608 congestion_control
609 .outgoing_gas_limit(ShardId::new(0))
610 .saturating_sub(config.max_outgoing_gas)
611 };
612 assert!(diff <= acceptable_diff);
613
614 let diff = if config.max_tx_gas > congestion_control.process_tx_limit() {
615 config.max_tx_gas.saturating_sub(congestion_control.process_tx_limit())
616 } else {
617 congestion_control.process_tx_limit().saturating_sub(config.max_tx_gas)
618 };
619 assert!(diff <= acceptable_diff);
620 assert!(congestion_control.shard_accepts_transactions().is_yes());
621 }
622
623 #[test]
624 fn test_memory_congestion() {
625 let config = get_config();
626 let mut info = CongestionInfo::default();
627
628 info.add_receipt_bytes(config.max_congestion_memory_consumption).unwrap();
629 info.add_receipt_bytes(500).unwrap();
630 info.remove_receipt_bytes(500).unwrap();
631
632 {
633 let control = CongestionControl::new(config, info, 0);
634 assert_eq!(1.0, control.congestion_level());
635 assert_eq!(Gas::ZERO, control.outgoing_gas_limit(ShardId::new(1)));
637 assert!(control.shard_accepts_transactions().is_no());
638 assert_eq!(config.max_tx_gas, control.process_tx_limit());
640 }
641
642 assert_eq!(0.8, config.reject_tx_congestion_threshold);
644
645 info.remove_receipt_bytes(config.max_congestion_memory_consumption / 5).unwrap();
647 {
648 let control = CongestionControl::new(config, info, 0);
649 assert_eq!(0.8, control.congestion_level());
650 assert_eq!(
651 mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.8),
652 control.outgoing_gas_limit(ShardId::new(1))
653 );
654 assert!(control.shard_accepts_transactions().is_no());
656 }
657
658 info.remove_receipt_bytes(7 * config.max_congestion_memory_consumption / 10).unwrap();
660 {
661 let control = CongestionControl::new(config, info, 0);
662 assert_eq!(0.1, control.congestion_level());
663 assert_eq!(
664 mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.1),
665 control.outgoing_gas_limit(ShardId::new(1))
666 );
667 assert!(control.shard_accepts_transactions().is_yes());
669 }
670 }
671
672 #[test]
673 fn test_incoming_congestion() {
674 let config = get_config();
675 let mut info = CongestionInfo::default();
676
677 info.add_delayed_receipt_gas(config.max_congestion_incoming_gas).unwrap();
678 info.add_delayed_receipt_gas(Gas::from_gas(500)).unwrap();
679 info.remove_delayed_receipt_gas(Gas::from_gas(500)).unwrap();
680
681 {
682 let control = CongestionControl::new(config, info, 0);
683 assert_eq!(1.0, control.congestion_level());
684 assert_eq!(Gas::ZERO, control.outgoing_gas_limit(ShardId::new(1)));
686 assert!(control.shard_accepts_transactions().is_no());
687 assert_eq!(config.min_tx_gas, control.process_tx_limit());
689 }
690
691 assert_eq!(0.8, config.reject_tx_congestion_threshold);
693
694 info.remove_delayed_receipt_gas(config.max_congestion_incoming_gas.checked_div(5).unwrap())
696 .unwrap();
697 {
698 let control = CongestionControl::new(config, info, 0);
699 assert_eq!(0.8, control.congestion_level());
700 assert_eq!(
701 mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.8),
702 control.outgoing_gas_limit(ShardId::new(1))
703 );
704 assert!(control.shard_accepts_transactions().is_no());
706 }
707
708 info.remove_delayed_receipt_gas(
710 config.max_congestion_incoming_gas.checked_mul(7).unwrap().checked_div(10).unwrap(),
711 )
712 .unwrap();
713 {
714 let control = CongestionControl::new(config, info, 0);
715 assert_eq!(0.1, control.congestion_level());
716 assert_eq!(
717 mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.1),
718 control.outgoing_gas_limit(ShardId::new(1))
719 );
720 assert!(control.shard_accepts_transactions().is_yes());
722 }
723 }
724
725 #[test]
726 fn test_outgoing_congestion() {
727 let config = get_config();
728 let mut info = CongestionInfo::default();
729
730 info.add_buffered_receipt_gas(config.max_congestion_outgoing_gas).unwrap();
731 info.add_buffered_receipt_gas(Gas::from_gas(500)).unwrap();
732 info.remove_buffered_receipt_gas(500).unwrap();
733
734 let control = CongestionControl::new(config, info, 0);
735 assert_eq!(1.0, control.congestion_level());
736 assert_eq!(Gas::ZERO, control.outgoing_gas_limit(ShardId::new(1)));
738 assert!(control.shard_accepts_transactions().is_no());
739 assert_eq!(config.max_tx_gas, control.process_tx_limit());
741
742 assert_eq!(0.8, config.reject_tx_congestion_threshold);
744
745 let gas_diff = config.max_congestion_outgoing_gas.checked_div(5).unwrap();
747 info.remove_buffered_receipt_gas(gas_diff.as_gas().into()).unwrap();
748 let control = CongestionControl::new(config, info, 0);
749 assert_eq!(0.8, control.congestion_level());
750 assert_eq!(
751 mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.8),
752 control.outgoing_gas_limit(ShardId::new(1))
753 );
754 assert!(control.shard_accepts_transactions().is_no());
756
757 let gas_diff =
759 config.max_congestion_outgoing_gas.checked_mul(7).unwrap().checked_div(10).unwrap();
760 info.remove_buffered_receipt_gas(gas_diff.as_gas().into()).unwrap();
761 let control = CongestionControl::new(config, info, 0);
762 assert_eq!(0.1, control.congestion_level());
763 assert_eq!(
764 mix_gas(config.max_outgoing_gas, config.min_outgoing_gas, 0.1),
765 control.outgoing_gas_limit(ShardId::new(1))
766 );
767 assert!(control.shard_accepts_transactions().is_yes());
769 }
770
771 #[test]
772 fn test_missed_chunks_congestion() {
773 let mut config = get_config();
776 config.max_congestion_missed_chunks = 10;
777
778 let info = CongestionInfo::default();
779
780 let make = |count| CongestionControl::new(config, info, count);
782
783 assert_eq!(make(0).congestion_level(), 0.0);
784 assert_eq!(make(1).congestion_level(), 0.0);
785 assert_eq!(make(2).congestion_level(), 0.2);
786 assert_eq!(make(3).congestion_level(), 0.3);
787 assert_eq!(make(10).congestion_level(), 1.0);
788 assert_eq!(make(20).congestion_level(), 1.0);
789
790 let mut info = CongestionInfo::default();
792 info.add_buffered_receipt_gas(config.max_congestion_outgoing_gas.checked_div(2).unwrap())
793 .unwrap();
794 let make = |count| CongestionControl::new(config, info, count);
795
796 assert_eq!(make(0).congestion_level(), 0.5);
798 assert_eq!(make(1).congestion_level(), 0.5);
799 assert_eq!(make(2).congestion_level(), 0.5);
800 assert_eq!(make(5).congestion_level(), 0.5);
801 assert_eq!(make(6).congestion_level(), 0.6);
802 assert_eq!(make(10).congestion_level(), 1.0);
803 assert_eq!(make(20).congestion_level(), 1.0);
804
805 assert_eq!(make(0).info.localized_congestion_level(&config), 0.5);
807 assert_eq!(make(1).info.localized_congestion_level(&config), 0.5);
808 assert_eq!(make(2).info.localized_congestion_level(&config), 0.5);
809 assert_eq!(make(5).info.localized_congestion_level(&config), 0.5);
810 assert_eq!(make(6).info.localized_congestion_level(&config), 0.5);
811 assert_eq!(make(10).info.localized_congestion_level(&config), 0.5);
812 assert_eq!(make(20).info.localized_congestion_level(&config), 0.5);
813 }
814
815 #[test]
816 fn test_missed_chunks_finalize() {
817 let mut config = get_config();
820 config.max_congestion_missed_chunks = 10;
821
822 let mut info = CongestionInfo::default();
824 info.add_buffered_receipt_gas(config.max_congestion_outgoing_gas.checked_div(2).unwrap())
825 .unwrap();
826
827 let shard = ShardId::new(2);
828 let all_shards = [0, 1, 2, 3, 4].into_iter().map(ShardId::new).collect_vec();
829
830 let missed_chunks_count = 0;
833 let mut control = CongestionControl::new(config, info, missed_chunks_count);
834 control.info.finalize_allowed_shard(shard, &all_shards, 3);
835
836 let expected_outgoing_limit = 0.5 * config.min_outgoing_gas.as_gas() as f64
837 + 0.5 * config.max_outgoing_gas.as_gas() as f64;
838 for &shard in &all_shards {
839 assert_eq!(
840 control.outgoing_gas_limit(shard),
841 Gas::from_gas(expected_outgoing_limit as u64)
842 );
843 }
844
845 let missed_chunks_count = 8;
848 let mut control = CongestionControl::new(config, info, missed_chunks_count);
849 control.info.finalize_allowed_shard(shard, &all_shards, 3);
850
851 let expected_outgoing_limit =
852 mix(config.max_outgoing_gas.as_gas(), config.min_outgoing_gas.as_gas(), 0.8) as f64;
853 for &shard in &all_shards {
854 assert_eq!(
855 control.outgoing_gas_limit(shard),
856 Gas::from_gas(expected_outgoing_limit as u64)
857 );
858 }
859
860 let missed_chunks_count = config.max_congestion_missed_chunks;
863 let mut control = CongestionControl::new(config, info, missed_chunks_count);
864 control.info.finalize_allowed_shard(shard, &all_shards, 3);
865
866 for shard in all_shards {
868 if shard == ShardId::from(control.info.allowed_shard()) {
869 assert_eq!(control.outgoing_gas_limit(shard), config.allowed_shard_outgoing_gas);
870 } else {
871 assert_eq!(control.outgoing_gas_limit(shard), Gas::ZERO);
872 }
873 }
874 }
875}