1#![allow(clippy::indexing_slicing)]
4use std::{
5 cmp::Ordering,
6 mem,
7 ops::{Index, IndexMut},
8};
9
10use crate::message::{Message, SignedMessage};
11use crate::networks::ChainConfig;
12use crate::shim::{
13 address::Address,
14 econ::TokenAmount,
15 gas::{Gas, price_list_by_network_version},
16};
17use crate::{
18 blocks::{BLOCK_MESSAGE_LIMIT, Tipset},
19 shim::crypto::SignatureType,
20};
21use ahash::HashMap;
22use num_traits::Zero;
23use slotmap::{SlotMap, new_key_type};
24use tracing::warn;
25
26use super::errors::Error;
27use crate::message_pool::{
28 provider::Provider,
29 utils::{get_gas_perf, get_gas_reward},
30};
31
32new_key_type! {
33 pub struct NodeKey;
34}
35
36pub(in crate::message_pool) struct Chains {
45 pub map: SlotMap<NodeKey, MsgChainNode>,
46 pub key_vec: Vec<NodeKey>,
47}
48
49impl Chains {
50 pub(in crate::message_pool) fn sort_effective(&mut self) {
52 let mut chains = mem::take(&mut self.key_vec);
53 chains.sort_by(|a, b| {
54 let a = self.map.get(*a).unwrap();
55 let b = self.map.get(*b).unwrap();
56 a.cmp_effective(b)
57 });
58 let _ = mem::replace(&mut self.key_vec, chains);
59 }
60
61 pub(in crate::message_pool) fn sort_range_effective(
63 &mut self,
64 range: std::ops::RangeFrom<usize>,
65 ) {
66 let mut chains = mem::take(&mut self.key_vec);
67 chains[range].sort_by(|a, b| {
68 self.map
69 .get(*a)
70 .unwrap()
71 .cmp_effective(self.map.get(*b).unwrap())
72 });
73 let _ = mem::replace(&mut self.key_vec, chains);
74 }
75
76 pub(in crate::message_pool) fn get_mut_with_prev_eff(
80 &mut self,
81 k: NodeKey,
82 ) -> (Option<&mut MsgChainNode>, Option<(f64, u64)>) {
83 let node = self.map.get(k);
84 let prev = if let Some(node) = node {
85 if let Some(prev_key) = node.prev {
86 let prev_node = self.map.get(prev_key).unwrap();
87 Some((prev_node.eff_perf, prev_node.gas_limit))
88 } else {
89 None
90 }
91 } else {
92 None
93 };
94
95 let node = self.map.get_mut(k);
96 (node, prev)
97 }
98
99 pub(in crate::message_pool) fn get(&self, k: NodeKey) -> Option<&MsgChainNode> {
101 self.map.get(k)
102 }
103}
104
105impl Chains {
106 pub(in crate::message_pool) fn new() -> Self {
107 Self {
108 map: SlotMap::with_key(),
109 key_vec: vec![],
110 }
111 }
112
113 pub(in crate::message_pool) fn push_with(
116 &mut self,
117 cur_chain: MsgChainNode,
118 node_vec: &mut Vec<NodeKey>,
119 ) {
120 let key = self.map.insert(cur_chain);
121 node_vec.push(key);
122 }
123
124 pub(in crate::message_pool) fn sort(&mut self, rev: bool) {
127 let mut chains = mem::take(&mut self.key_vec);
129 chains.sort_by(|a, b| {
130 let a = self.map.get(*a).unwrap();
131 let b = self.map.get(*b).unwrap();
132 if rev { b.compare(a) } else { a.compare(b) }
133 });
134 let _ = mem::replace(&mut self.key_vec, chains);
135 }
136
137 pub(in crate::message_pool) fn get_mut(&mut self, k: NodeKey) -> Option<&mut MsgChainNode> {
139 self.map.get_mut(k)
140 }
141
142 pub(in crate::message_pool) fn get_mut_at(&mut self, i: usize) -> Option<&mut MsgChainNode> {
144 let key = self.key_vec.get(i)?;
145 self.get_mut(*key)
146 }
147
148 pub(in crate::message_pool) fn get_from(&self, i: usize, vec: &[NodeKey]) -> &MsgChainNode {
150 #[allow(clippy::indexing_slicing)]
151 self.map.get(vec[i]).unwrap()
152 }
153
154 pub(in crate::message_pool) fn get_mut_from(
156 &mut self,
157 i: usize,
158 vec: &[NodeKey],
159 ) -> &mut MsgChainNode {
160 #[allow(clippy::indexing_slicing)]
161 self.map.get_mut(vec[i]).unwrap()
162 }
163
164 pub(in crate::message_pool) fn get_key_at(&self, i: usize) -> Option<NodeKey> {
166 self.key_vec.get(i).copied()
167 }
168
169 pub(in crate::message_pool) fn get_at(&self, i: usize) -> Option<&MsgChainNode> {
171 self.map.get(self.get_key_at(i)?)
172 }
173
174 pub(in crate::message_pool) fn len(&self) -> usize {
176 self.map.len()
177 }
178
179 pub(in crate::message_pool) fn is_empty(&self) -> bool {
182 self.map.is_empty()
183 }
184
185 #[tracing::instrument(skip_all, level = "debug")]
187 pub(in crate::message_pool) fn trim_msgs_at(
188 &mut self,
189 idx: usize,
190 gas_limit: u64,
191 msg_limit: usize,
192 base_fee: &TokenAmount,
193 ) {
194 let prev = match idx {
195 0 => None,
196 _ => self
197 .get_at(idx - 1)
198 .map(|prev| (prev.eff_perf, prev.gas_limit)),
199 };
200 let chain_node = self.get_mut_at(idx).unwrap();
201 let mut i = chain_node.msgs.len() as i64 - 1;
202
203 while i >= 0
204 && (chain_node.gas_limit > gas_limit
205 || chain_node.gas_perf < 0.0
206 || i >= msg_limit as i64)
207 {
208 #[allow(clippy::indexing_slicing)]
209 let msg = &chain_node.msgs[i as usize];
210 let gas_reward = get_gas_reward(msg, base_fee);
211 chain_node.gas_reward -= gas_reward;
212 chain_node.gas_limit = chain_node.gas_limit.saturating_sub(msg.gas_limit());
213 if chain_node.gas_limit > 0 {
214 chain_node.gas_perf = get_gas_perf(&chain_node.gas_reward, chain_node.gas_limit);
215 if chain_node.bp != 0.0 {
216 chain_node.set_eff_perf(prev);
217 }
218 } else {
219 chain_node.gas_perf = 0.0;
220 chain_node.eff_perf = 0.0;
221 }
222 i -= 1;
223 }
224
225 if i < 0 {
226 chain_node.msgs.clear();
227 chain_node.valid = false;
228 } else {
229 chain_node.msgs.truncate(i as usize + 1);
230 }
231
232 let next = chain_node.next;
233 if next.is_some() {
234 self.invalidate(next);
235 }
236 }
237
238 pub(in crate::message_pool) fn invalidate(&mut self, mut key: Option<NodeKey>) {
239 let mut next_keys = vec![];
240
241 while let Some(nk) = key {
242 let chain_node = self.map.get(nk).unwrap();
243 next_keys.push(nk);
244 key = chain_node.next;
245 }
246
247 for k in next_keys.iter().rev() {
248 if let Some(node) = self.map.get_mut(*k) {
249 node.valid = false;
250 node.msgs.clear();
251 node.next = None;
252 }
253 }
254 }
255
256 pub(in crate::message_pool) fn drop_invalid(&mut self, key_vec: &mut Vec<NodeKey>) {
258 let mut valid_keys = vec![];
259 for k in key_vec.iter() {
260 if self.map.get(*k).map(|n| n.valid).unwrap() {
261 valid_keys.push(*k);
262 } else {
263 self.map.remove(*k);
264 }
265 }
266
267 *key_vec = valid_keys;
268 }
269}
270
271impl Index<usize> for Chains {
272 type Output = MsgChainNode;
273 fn index(&self, i: usize) -> &Self::Output {
274 self.get_at(i).unwrap()
275 }
276}
277
278impl IndexMut<usize> for Chains {
279 fn index_mut(&mut self, i: usize) -> &mut Self::Output {
280 #[allow(clippy::indexing_slicing)]
281 self.map.get_mut(self.key_vec[i]).unwrap()
282 }
283}
284
285#[derive(Clone, Debug)]
287pub struct MsgChainNode {
288 pub msgs: Vec<SignedMessage>,
289 pub gas_reward: TokenAmount,
290 pub gas_limit: u64,
291 pub gas_perf: f64,
292 pub eff_perf: f64,
293 pub bp: f64,
294 pub parent_offset: f64,
295 pub valid: bool,
296 pub merged: bool,
297 pub next: Option<NodeKey>,
298 pub prev: Option<NodeKey>,
299 pub sig_type: Option<SignatureType>,
300}
301
302impl MsgChainNode {
303 pub fn compare(&self, other: &Self) -> Ordering {
304 if approx_cmp(self.gas_perf, other.gas_perf) == Ordering::Greater
305 || approx_cmp(self.gas_perf, other.gas_perf) == Ordering::Equal
306 && self.gas_reward.cmp(&other.gas_reward) == Ordering::Greater
307 {
308 return Ordering::Greater;
309 }
310
311 Ordering::Less
312 }
313
314 pub fn set_eff_perf(&mut self, prev: Option<(f64, u64)>) {
315 let mut eff_perf = self.gas_perf * self.bp;
316 if let Some(prev) = prev
317 && eff_perf > 0.0
318 {
319 let prev_eff_perf = prev.0;
320 let prev_gas_limit = prev.1;
321 let eff_perf_with_parent = (eff_perf * self.gas_limit as f64
322 + prev_eff_perf * prev_gas_limit as f64)
323 / (self.gas_limit + prev_gas_limit) as f64;
324 self.parent_offset = eff_perf - eff_perf_with_parent;
325 eff_perf = eff_perf_with_parent;
326 }
327 self.eff_perf = eff_perf;
328 }
329}
330
331impl MsgChainNode {
332 pub(in crate::message_pool) fn cmp_effective(&self, other: &Self) -> Ordering {
333 match (self.merged, other.merged) {
338 (true, false) => return Ordering::Greater,
339 (false, true) => return Ordering::Less,
340 _ => {}
341 }
342
343 if self.gas_perf >= 0.0 && other.gas_perf < 0.0
344 || self.eff_perf > other.eff_perf
345 || (approx_cmp(self.eff_perf, other.eff_perf) == Ordering::Equal
346 && self.gas_perf > other.gas_perf)
347 || (approx_cmp(self.eff_perf, other.eff_perf) == Ordering::Equal
348 && approx_cmp(self.gas_perf, other.gas_perf) == Ordering::Equal
349 && self.gas_reward > other.gas_reward)
350 {
351 return Ordering::Greater;
352 }
353
354 Ordering::Less
355 }
356
357 pub fn set_null_effective_perf(&mut self) {
358 if self.gas_perf < 0.0 {
359 self.eff_perf = self.gas_perf;
360 } else {
361 self.eff_perf = 0.0;
362 }
363 }
364}
365
366impl std::default::Default for MsgChainNode {
367 fn default() -> Self {
368 Self {
369 msgs: vec![],
370 gas_reward: TokenAmount::zero(),
371 gas_limit: 0,
372 gas_perf: 0.0,
373 eff_perf: 0.0,
374 bp: 0.0,
375 parent_offset: 0.0,
376 valid: true,
377 merged: false,
378 next: None,
379 prev: None,
380 sig_type: None,
381 }
382 }
383}
384
385pub(in crate::message_pool) fn create_message_chains<T>(
386 api: &T,
387 actor: &Address,
388 mset: &HashMap<u64, SignedMessage>,
389 base_fee: &TokenAmount,
390 ts: &Tipset,
391 chains: &mut Chains,
392 chain_config: &ChainConfig,
393) -> Result<(), Error>
394where
395 T: Provider,
396{
397 let mut msgs: Vec<SignedMessage> = mset.values().cloned().collect();
399 msgs.sort_by_key(|v| v.sequence());
400
401 let Ok(actor_state) = api.get_actor_after(actor, ts) else {
410 tracing::warn!("failed to load actor state, not building chain for {actor}");
411 return Ok(());
412 };
413 let mut cur_seq = actor_state.sequence;
414 let mut balance: TokenAmount = TokenAmount::from(&actor_state.balance);
415
416 let mut gas_limit = 0;
417 let mut skip = 0;
418 let mut i = 0;
419 let mut rewards = Vec::with_capacity(msgs.len());
420
421 while let Some(m) = msgs.get(i) {
422 if m.sequence() < cur_seq {
423 warn!(
424 "encountered message from actor {} with nonce {} less than the current nonce {}",
425 actor,
426 m.sequence(),
427 cur_seq
428 );
429 skip += 1;
430 i += 1;
431 continue;
432 }
433
434 if m.sequence() != cur_seq {
435 break;
436 }
437 cur_seq += 1;
438
439 let network_version = chain_config.network_version(ts.epoch());
440
441 let min_gas = price_list_by_network_version(network_version)
442 .on_chain_message(m.chain_length()?)
443 .total();
444
445 if Gas::new(m.gas_limit()) < min_gas {
446 break;
447 }
448 gas_limit += m.gas_limit();
449 if gas_limit > crate::shim::econ::BLOCK_GAS_LIMIT {
450 break;
451 }
452
453 let required = m.required_funds();
454 if balance < required {
455 break;
456 }
457
458 balance -= required;
459 let value = m.value();
460 balance -= value;
461
462 let gas_reward = get_gas_reward(m, base_fee);
463 rewards.push(gas_reward);
464 i += 1;
465 }
466
467 let mut msgs = if i > skip {
469 #[allow(clippy::indexing_slicing)]
470 msgs[skip..i].to_vec()
471 } else {
472 return Ok(());
473 };
474
475 if msgs.len() > BLOCK_MESSAGE_LIMIT {
477 warn!(
478 "dropping {} messages from {actor} as they exceed the block message limit of {BLOCK_MESSAGE_LIMIT}",
479 msgs.len() - BLOCK_MESSAGE_LIMIT,
480 );
481 msgs.truncate(BLOCK_MESSAGE_LIMIT);
482 };
483
484 let mut cur_chain = MsgChainNode::default();
485 let mut node_vec = vec![];
486
487 let new_chain = |m: SignedMessage, reward: &TokenAmount| -> MsgChainNode {
488 let gl = m.gas_limit();
489 let sig_type = Some(m.signature().sig_type);
490 MsgChainNode {
491 msgs: vec![m],
492 gas_reward: reward.clone(),
493 gas_limit: gl,
494 gas_perf: get_gas_perf(reward, gl),
495 eff_perf: 0.0,
496 bp: 0.0,
497 parent_offset: 0.0,
498 valid: true,
499 merged: false,
500 prev: None,
501 next: None,
502 sig_type,
503 }
504 };
505
506 for (i, (m, reward)) in msgs.into_iter().zip(rewards.iter()).enumerate() {
509 if i == 0 {
510 cur_chain = new_chain(m, reward);
511 continue;
512 }
513
514 let gas_reward = cur_chain.gas_reward.clone() + reward;
515 let gas_limit = cur_chain.gas_limit + m.gas_limit();
516 let gas_perf = get_gas_perf(&gas_reward, gas_limit);
517
518 if gas_perf < cur_chain.gas_perf {
521 chains.push_with(cur_chain, &mut node_vec);
522 cur_chain = new_chain(m, reward);
523 } else {
524 cur_chain.msgs.push(m);
525 cur_chain.gas_reward = gas_reward;
526 cur_chain.gas_limit = gas_limit;
527 cur_chain.gas_perf = gas_perf;
528 }
529 }
530
531 chains.push_with(cur_chain, &mut node_vec);
532
533 loop {
535 let mut merged = 0;
536 for i in (1..node_vec.len()).rev() {
537 if chains.get_from(i, &node_vec).gas_perf >= chains.get_from(i - 1, &node_vec).gas_perf
538 {
539 let chain_i_msg = chains.get_from(i, &node_vec).msgs.clone();
541 chains
542 .get_mut_from(i - 1, &node_vec)
543 .msgs
544 .extend(chain_i_msg);
545
546 let chain_i_gas_reward = chains.get_from(i, &node_vec).gas_reward.clone();
548 chains.get_mut_from(i - 1, &node_vec).gas_reward += chain_i_gas_reward;
549
550 let chain_i_gas_limit = chains.get_from(i, &node_vec).gas_limit;
552 chains.get_mut_from(i - 1, &node_vec).gas_limit += chain_i_gas_limit;
553
554 let chain_i_gas_perf = get_gas_perf(
556 &chains.get_from(i - 1, &node_vec).gas_reward,
557 chains.get_from(i - 1, &node_vec).gas_limit,
558 );
559 chains.get_mut_from(i - 1, &node_vec).gas_perf = chain_i_gas_perf;
560 chains.get_mut_from(i, &node_vec).valid = false;
562 merged += 1;
563 }
564 }
565
566 if merged == 0 {
567 break;
568 }
569
570 chains.drop_invalid(&mut node_vec);
571 }
572
573 if node_vec.len() > 1 {
574 for (&k1, &k2) in node_vec.iter().zip(node_vec.iter().skip(1)) {
575 let n1 = chains
577 .get_mut(k1)
578 .ok_or_else(|| Error::Other(format!("{k1:?} should present in `chains`")))?;
579 n1.next = Some(k2);
580 let n2 = chains
584 .get_mut(k2)
585 .ok_or_else(|| Error::Other(format!("{k2:?} should present in `chains`")))?;
586 n2.prev = Some(k1);
587 }
589 }
590
591 chains.key_vec.extend(node_vec);
593
594 Ok(())
595}
596
597fn approx_cmp(a: f64, b: f64) -> Ordering {
598 if (a - b).abs() <= (a * f64::EPSILON).abs() {
599 Ordering::Equal
600 } else {
601 a.partial_cmp(&b).unwrap()
602 }
603}