1use crate::internal_prelude::*;
2use crate::kernel::call_frame::TransientSubstates;
3use crate::track::interface::{
4 CommitableSubstateStore, IOAccess, NodeSubstates, TrackedSubstateInfo,
5};
6use crate::track::state_updates::*;
7use radix_engine_interface::types::*;
8use radix_substate_store_interface::db_key_mapper::{SpreadPrefixKeyMapper, SubstateKeyContent};
9use radix_substate_store_interface::interface::DbPartitionKey;
10use radix_substate_store_interface::{
11 db_key_mapper::DatabaseKeyMapper,
12 interface::{DbSortKey, PartitionEntry, SubstateDatabase},
13};
14use sbor::rust::collections::btree_map::Entry;
15use sbor::rust::iter::empty;
16use sbor::rust::mem;
17
18use super::interface::{CanonicalPartition, CanonicalSubstateKey, StoreCommit, StoreCommitInfo};
19
20#[derive(Debug)]
21pub enum TrackFinalizeError {
22 TransientSubstateOwnsNode,
23}
24
25pub type Track<'s, S> = MappedTrack<'s, S, SpreadPrefixKeyMapper>;
26
27pub struct MappedTrack<'s, S: SubstateDatabase, M: DatabaseKeyMapper + 'static> {
29 substate_db: &'s S,
31
32 tracked_nodes: IndexMap<NodeId, TrackedNode>,
33 force_write_tracked_nodes: IndexMap<NodeId, TrackedNode>,
34 deleted_partitions: IndexSet<(NodeId, PartitionNumber)>,
36
37 transient_substates: TransientSubstates,
38
39 phantom_data: PhantomData<M>,
40}
41
42pub struct TrackedSubstates {
46 pub tracked_nodes: IndexMap<NodeId, TrackedNode>,
47 pub deleted_partitions: IndexSet<(NodeId, PartitionNumber)>,
48}
49
50impl TrackedSubstates {
51 pub fn to_state_updates(self) -> (IndexSet<NodeId>, StateUpdates) {
52 let mut new_nodes = index_set_new();
53 let mut state_updates = StateUpdates::empty();
54
55 for (node_id, partition_num) in self.deleted_partitions {
56 state_updates
57 .of_node(node_id)
58 .of_partition(partition_num)
59 .delete();
60 }
61
62 for (node_id, tracked_node) in self.tracked_nodes {
63 if tracked_node.is_new {
64 new_nodes.insert(node_id);
65 }
66
67 for (partition_num, tracked_partition) in tracked_node.tracked_partitions {
68 let partition_updates: IndexMap<_, _> = tracked_partition
69 .substates
70 .into_values()
71 .filter_map(|tracked| {
72 let update = match tracked.substate_value {
73 TrackedSubstateValue::ReadOnly(..) | TrackedSubstateValue::Garbage => {
74 return None
75 }
76 TrackedSubstateValue::ReadNonExistAndWrite(substate)
77 | TrackedSubstateValue::New(substate) => {
78 DatabaseUpdate::Set(substate.value.into())
79 }
80 TrackedSubstateValue::ReadExistAndWrite(_, write)
81 | TrackedSubstateValue::WriteOnly(write) => match write {
82 Write::Delete => DatabaseUpdate::Delete,
83 Write::Update(substate) => {
84 DatabaseUpdate::Set(substate.value.into())
85 }
86 },
87 };
88 Some((tracked.substate_key, update))
89 })
90 .collect();
91
92 if !partition_updates.is_empty() {
94 state_updates
95 .of_node(node_id)
96 .of_partition(partition_num)
97 .mut_update_substates(partition_updates);
98 }
99 }
100 }
101
102 (new_nodes, state_updates)
103 }
104}
105
106impl<'s, S: SubstateDatabase, M: DatabaseKeyMapper> MappedTrack<'s, S, M> {
107 pub fn new(substate_db: &'s S) -> Self {
108 Self {
109 substate_db,
110 force_write_tracked_nodes: index_map_new(),
111 tracked_nodes: index_map_new(),
112 deleted_partitions: index_set_new(),
113 transient_substates: TransientSubstates::new(),
114 phantom_data: PhantomData,
115 }
116 }
117
118 fn get_substate_from_db<E, F: FnMut(IOAccess) -> Result<(), E>>(
120 substate_db: &'s S,
121 partition_key: &DbPartitionKey,
122 sort_key: &DbSortKey,
123 on_io_access: &mut F,
124 canonical_substate_key: CanonicalSubstateKey,
125 ) -> Result<Option<IndexedScryptoValue>, E> {
126 let result = substate_db
127 .get_raw_substate_by_db_key(partition_key, sort_key)
128 .map(|e| IndexedScryptoValue::from_vec(e).expect("Failed to decode substate"));
129 if let Some(x) = &result {
130 on_io_access(IOAccess::ReadFromDb(canonical_substate_key, x.len()))?;
131 } else {
132 on_io_access(IOAccess::ReadFromDbNotFound(canonical_substate_key))?;
133 }
134 Ok(result)
135 }
136
137 #[allow(clippy::type_complexity)]
139 fn list_entries_from_db<
140 'x,
141 E: 'x,
142 F: FnMut(IOAccess) -> Result<(), E> + 'x,
143 K: SubstateKeyContent,
144 >(
145 substate_db: &'x S,
146 partition_key: &DbPartitionKey,
147 on_io_access: &'x mut F,
148 canonical_partition: CanonicalPartition,
149 ) -> Box<dyn Iterator<Item = Result<(DbSortKey, (SubstateKey, IndexedScryptoValue)), E>> + 'x>
150 {
151 struct TracedIterator<
152 'a,
153 E,
154 F: FnMut(IOAccess) -> Result<(), E>,
155 M: DatabaseKeyMapper + 'static,
156 K: SubstateKeyContent,
157 > {
158 iterator: Box<dyn Iterator<Item = PartitionEntry> + 'a>,
159 on_io_access: &'a mut F,
160 canonical_partition: CanonicalPartition,
161 errored_out: bool,
162 phantom1: PhantomData<M>,
163 phantom2: PhantomData<K>,
164 }
165
166 impl<
167 'a,
168 E,
169 F: FnMut(IOAccess) -> Result<(), E>,
170 M: DatabaseKeyMapper + 'static,
171 K: SubstateKeyContent,
172 > Iterator for TracedIterator<'a, E, F, M, K>
173 {
174 type Item = Result<(DbSortKey, (SubstateKey, IndexedScryptoValue)), E>;
175
176 fn next(&mut self) -> Option<Self::Item> {
177 if self.errored_out {
178 return None;
179 }
180
181 let result = self.iterator.next();
182 if let Some(x) = result {
183 let substate_key = M::from_db_sort_key::<K>(&x.0);
184 let substate_value =
185 IndexedScryptoValue::from_vec(x.1).expect("Failed to decode substate");
186 let io_access = IOAccess::ReadFromDb(
187 CanonicalSubstateKey::of(self.canonical_partition, substate_key.clone()),
188 substate_value.len(),
189 );
190 let result = (self.on_io_access)(io_access);
191 match result {
192 Ok(()) => Some(Ok((x.0, (substate_key, substate_value)))),
193 Err(e) => {
194 self.errored_out = true;
195 Some(Err(e))
196 }
197 }
198 } else {
199 None
200 }
201 }
202 }
203
204 Box::new(TracedIterator {
205 iterator: substate_db.list_raw_values_from_db_key(partition_key, None),
206 on_io_access,
207 canonical_partition,
208 errored_out: false,
209 phantom1: PhantomData::<M>,
210 phantom2: PhantomData::<K>,
211 })
212 }
213
214 pub fn revert_non_force_write_changes(&mut self) {
218 self.tracked_nodes
219 .retain(|_, tracked_node| !tracked_node.is_new);
220 for (_, tracked_node) in &mut self.tracked_nodes {
221 tracked_node.revert_writes();
222 }
223
224 let force_writes = mem::take(&mut self.force_write_tracked_nodes);
225
226 for (node_id, force_track_node) in force_writes {
227 for (partition_num, force_track_partition) in force_track_node.tracked_partitions {
228 for (db_sort_key, force_track_key) in force_track_partition.substates {
229 let tracked_node = self.tracked_nodes.get_mut(&node_id).unwrap();
230 let tracked_partition = tracked_node
231 .tracked_partitions
232 .get_mut(&partition_num)
233 .unwrap();
234 let tracked = &mut tracked_partition
235 .substates
236 .get_mut(&db_sort_key)
237 .unwrap()
238 .substate_value;
239 *tracked = force_track_key.substate_value;
240 }
241 }
242 }
243 }
244
245 pub fn finalize(mut self) -> Result<(TrackedSubstates, &'s S), TrackFinalizeError> {
249 for (node_id, transient_substates) in self.transient_substates.transient_substates {
250 for (partition, substate_key) in transient_substates {
251 if let Some(tracked_partition) = self
252 .tracked_nodes
253 .get_mut(&node_id)
254 .and_then(|tracked_node| tracked_node.tracked_partitions.get_mut(&partition))
255 {
256 let db_sort_key = M::to_db_sort_key(&substate_key);
257 let tracked_substate = tracked_partition.substates.remove(&db_sort_key);
258 if let Some(substate) =
259 tracked_substate.and_then(|s| s.substate_value.into_value())
260 {
261 if !substate.owned_nodes().is_empty() {
262 return Err(TrackFinalizeError::TransientSubstateOwnsNode);
263 }
264 }
265 }
266 }
267 }
268
269 Ok((
270 TrackedSubstates {
271 tracked_nodes: self.tracked_nodes,
272 deleted_partitions: self.deleted_partitions,
273 },
274 self.substate_db,
275 ))
276 }
277
278 fn get_tracked_partition(
279 &mut self,
280 node_id: &NodeId,
281 partition_num: PartitionNumber,
282 ) -> &mut TrackedPartition {
283 self.tracked_nodes
284 .entry(*node_id)
285 .or_insert(TrackedNode::new(false))
286 .tracked_partitions
287 .entry(partition_num)
288 .or_default()
289 }
290
291 fn get_tracked_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
292 &mut self,
293 node_id: &NodeId,
294 partition_number: PartitionNumber,
295 substate_key: SubstateKey,
296 on_io_access: &mut F,
297 ) -> Result<&mut TrackedSubstateValue, E> {
298 let db_sort_key = M::to_db_sort_key(&substate_key);
299 let partition = &mut self
300 .tracked_nodes
301 .entry(*node_id)
302 .or_insert(TrackedNode::new(false))
303 .tracked_partitions
304 .entry(partition_number)
305 .or_default()
306 .substates;
307 let entry = partition.entry(db_sort_key.clone());
308
309 match entry {
310 Entry::Vacant(e) => {
311 if self
312 .transient_substates
313 .is_transient(node_id, partition_number, &substate_key)
314 {
315 let tracked = TrackedSubstate {
316 substate_key: substate_key.clone(),
317 substate_value: TrackedSubstateValue::ReadOnly(ReadOnly::NonExistent),
318 };
319 let new_size = Some(tracked.size());
320 e.insert(tracked);
321
322 on_io_access(IOAccess::TrackSubstateUpdated {
323 canonical_substate_key: CanonicalSubstateKey {
324 node_id: *node_id,
325 partition_number,
326 substate_key,
327 },
328 old_size: None,
329 new_size,
330 })?;
331 } else {
332 let db_partition_key = M::to_db_partition_key(node_id, partition_number);
333 let substate_value = Self::get_substate_from_db(
334 self.substate_db,
335 &db_partition_key,
336 &M::to_db_sort_key(&substate_key),
337 on_io_access,
338 CanonicalSubstateKey {
339 node_id: *node_id,
340 partition_number,
341 substate_key: substate_key.clone(),
342 },
343 )?;
344
345 let new_size;
346 if let Some(value) = substate_value {
347 let tracked = TrackedSubstate {
348 substate_key: substate_key.clone(),
349 substate_value: TrackedSubstateValue::ReadOnly(ReadOnly::Existent(
350 RuntimeSubstate::new(value),
351 )),
352 };
353 new_size = Some(tracked.size());
354 e.insert(tracked);
355 } else {
356 let tracked = TrackedSubstate {
357 substate_key: substate_key.clone(),
358 substate_value: TrackedSubstateValue::ReadOnly(ReadOnly::NonExistent),
359 };
360 new_size = Some(tracked.size());
361 e.insert(tracked);
362 };
363
364 on_io_access(IOAccess::TrackSubstateUpdated {
366 canonical_substate_key: CanonicalSubstateKey {
367 node_id: *node_id,
368 partition_number,
369 substate_key,
370 },
371 old_size: None,
372 new_size,
373 })?;
374 }
375 }
376 Entry::Occupied(..) => {}
377 }
378
379 Ok(&mut partition.get_mut(&db_sort_key).unwrap().substate_value)
380 }
381}
382
383impl<'s, S: SubstateDatabase, M: DatabaseKeyMapper> CommitableSubstateStore
384 for MappedTrack<'s, S, M>
385{
386 fn mark_as_transient(
387 &mut self,
388 node_id: NodeId,
389 partition_num: PartitionNumber,
390 substate_key: SubstateKey,
391 ) {
392 self.transient_substates
393 .mark_as_transient(node_id, partition_num, substate_key);
394 }
395
396 fn create_node<E, F: FnMut(IOAccess) -> Result<(), E>>(
397 &mut self,
398 node_id: NodeId,
399 node_substates: NodeSubstates,
400 on_io_access: &mut F,
401 ) -> Result<(), E> {
402 let mut tracked_partitions = index_map_new();
403
404 for (partition_number, partition) in node_substates {
405 let mut partition_substates = BTreeMap::new();
406 for (substate_key, substate_value) in partition {
407 let db_sort_key = M::to_db_sort_key(&substate_key);
408
409 let tracked = TrackedSubstate {
410 substate_key: substate_key.clone(),
411 substate_value: TrackedSubstateValue::New(RuntimeSubstate::new(substate_value)),
412 };
413 let new_size = Some(tracked.size());
414 let old_tracked = partition_substates.insert(db_sort_key, tracked);
415 assert!(old_tracked.is_none());
416
417 on_io_access(IOAccess::TrackSubstateUpdated {
419 canonical_substate_key: CanonicalSubstateKey {
420 node_id,
421 partition_number,
422 substate_key,
423 },
424 old_size: None,
425 new_size,
426 })?;
427 }
428 let tracked_partition = TrackedPartition::new_with_substates(partition_substates);
429 tracked_partitions.insert(partition_number, tracked_partition);
430 }
431
432 self.tracked_nodes.insert(
433 node_id,
434 TrackedNode {
435 tracked_partitions,
436 is_new: true,
437 },
438 );
439
440 Ok(())
441 }
442
443 fn get_tracked_substate_info(
444 &mut self,
445 node_id: &NodeId,
446 partition_num: PartitionNumber,
447 substate_key: &SubstateKey,
448 ) -> TrackedSubstateInfo {
449 let db_sort_key = M::to_db_sort_key(substate_key);
450 let info = self
451 .tracked_nodes
452 .get(node_id)
453 .and_then(|n| n.tracked_partitions.get(&partition_num))
454 .and_then(|p| p.substates.get(&db_sort_key))
455 .map(|s| match s.substate_value {
456 TrackedSubstateValue::New(..) | TrackedSubstateValue::Garbage => {
457 TrackedSubstateInfo::New
458 }
459 TrackedSubstateValue::WriteOnly(..)
460 | TrackedSubstateValue::ReadExistAndWrite(..)
461 | TrackedSubstateValue::ReadNonExistAndWrite(..) => TrackedSubstateInfo::Updated,
462 TrackedSubstateValue::ReadOnly(..) => TrackedSubstateInfo::Unmodified,
463 })
464 .unwrap_or(TrackedSubstateInfo::Unmodified);
465
466 info
467 }
468
469 fn get_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
470 &mut self,
471 node_id: &NodeId,
472 partition_num: PartitionNumber,
473 substate_key: &SubstateKey,
474 on_io_access: &mut F,
475 ) -> Result<Option<&IndexedScryptoValue>, E> {
476 let tracked =
478 self.get_tracked_substate(node_id, partition_num, substate_key.clone(), on_io_access)?;
479
480 let value = tracked.get_runtime_substate_mut().map(|v| &v.value);
481
482 Ok(value)
483 }
484
485 fn set_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
486 &mut self,
487 node_id: NodeId,
488 partition_number: PartitionNumber,
489 substate_key: SubstateKey,
490 substate_value: IndexedScryptoValue,
491 on_io_access: &mut F,
492 ) -> Result<(), E> {
493 let tracked_partition = self
494 .tracked_nodes
495 .entry(node_id)
496 .or_insert(TrackedNode::new(false))
497 .tracked_partitions
498 .entry(partition_number)
499 .or_default();
500 let db_sort_key = M::to_db_sort_key(&substate_key);
501 let entry = tracked_partition.substates.entry(db_sort_key);
502
503 match entry {
504 Entry::Vacant(e) => {
505 let tracked = TrackedSubstate {
506 substate_key: substate_key.clone(),
507 substate_value: TrackedSubstateValue::WriteOnly(Write::Update(
508 RuntimeSubstate::new(substate_value),
509 )),
510 };
511 let new_size = Some(tracked.size());
512 e.insert(tracked);
513
514 on_io_access(IOAccess::TrackSubstateUpdated {
516 canonical_substate_key: CanonicalSubstateKey {
517 node_id,
518 partition_number,
519 substate_key,
520 },
521 old_size: None,
522 new_size,
523 })?;
524 }
525 Entry::Occupied(mut e) => {
526 let tracked = e.get_mut();
527
528 let old_size = Some(tracked.size());
529 tracked.substate_value.set(substate_value);
530 let new_size = Some(tracked.size());
531
532 on_io_access(IOAccess::TrackSubstateUpdated {
534 canonical_substate_key: CanonicalSubstateKey {
535 node_id,
536 partition_number,
537 substate_key,
538 },
539 old_size,
540 new_size,
541 })?;
542 }
543 }
544
545 Ok(())
546 }
547
548 fn force_write(
549 &mut self,
550 node_id: &NodeId,
551 partition_num: &PartitionNumber,
552 substate_key: &SubstateKey,
553 ) {
554 let tracked = self
555 .get_tracked_substate(
556 node_id,
557 *partition_num,
558 substate_key.clone(),
559 &mut |_| -> Result<(), ()> { Err(()) },
560 )
561 .expect("Should not need to go into store on close substate.");
562 let cloned_track = tracked.clone();
563
564 self.force_write_tracked_nodes
565 .entry(*node_id)
566 .or_insert(TrackedNode {
567 tracked_partitions: index_map_new(),
568 is_new: false,
569 })
570 .tracked_partitions
571 .entry(*partition_num)
572 .or_default()
573 .substates
574 .insert(
575 M::to_db_sort_key(substate_key),
576 TrackedSubstate {
577 substate_key: substate_key.clone(),
578 substate_value: cloned_track,
579 },
580 );
581 }
582
583 fn remove_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
585 &mut self,
586 node_id: &NodeId,
587 partition_number: PartitionNumber,
588 substate_key: &SubstateKey,
589 on_io_access: &mut F,
590 ) -> Result<Option<IndexedScryptoValue>, E> {
591 let tracked = self.get_tracked_substate(
592 node_id,
593 partition_number,
594 substate_key.clone(),
595 on_io_access,
596 )?;
597
598 let old_size = Some(tracked.size());
599 let taken = tracked.take();
600 let new_size = Some(tracked.size());
601
602 on_io_access(IOAccess::TrackSubstateUpdated {
604 canonical_substate_key: CanonicalSubstateKey {
605 node_id: *node_id,
606 partition_number,
607 substate_key: substate_key.clone(),
608 },
609 old_size,
610 new_size,
611 })?;
612
613 Ok(taken)
614 }
615
616 fn scan_keys<K: SubstateKeyContent, E, F: FnMut(IOAccess) -> Result<(), E>>(
617 &mut self,
618 node_id: &NodeId,
619 partition_number: PartitionNumber,
620 limit: u32,
621 on_io_access: &mut F,
622 ) -> Result<Vec<SubstateKey>, E> {
623 let limit: usize = limit.try_into().unwrap();
624 let mut items = Vec::new();
625
626 let node_updates = self.tracked_nodes.get(node_id);
627 let is_new = node_updates
628 .map(|tracked_node| tracked_node.is_new)
629 .unwrap_or(false);
630 let tracked_partition =
631 node_updates.and_then(|n| n.tracked_partitions.get(&partition_number));
632
633 if let Some(tracked_partition) = tracked_partition {
634 for tracked_substate in tracked_partition.substates.values() {
635 if items.len() == limit {
636 return Ok(items);
637 }
638
639 if let Some(_substate) = tracked_substate.substate_value.get() {
641 items.push(tracked_substate.substate_key.clone());
642 }
643 }
644 }
645
646 if items.len() == limit || is_new {
648 return Ok(items);
649 }
650
651 let db_partition_key = M::to_db_partition_key(node_id, partition_number);
652 let mut tracked_iter = IterationCountedIter::new(Self::list_entries_from_db::<E, F, K>(
653 self.substate_db,
654 &db_partition_key,
655 on_io_access,
656 CanonicalPartition {
657 node_id: *node_id,
658 partition_number,
659 },
660 ));
661
662 for result in &mut tracked_iter {
663 let (db_sort_key, (substate_key, _substate_value)) = result?;
664
665 if items.len() == limit {
666 break;
667 }
668
669 if tracked_partition
670 .map(|tracked_partition| tracked_partition.substates.contains_key(&db_sort_key))
671 .unwrap_or(false)
672 {
673 continue;
674 }
675
676 items.push(substate_key);
679 }
680
681 let num_iterations = tracked_iter.num_iterations;
683 let tracked_partition = self.get_tracked_partition(node_id, partition_number);
684 tracked_partition.range_read = u32::max(tracked_partition.range_read, num_iterations);
685
686 Ok(items)
687 }
688
689 fn drain_substates<K: SubstateKeyContent, E, F: FnMut(IOAccess) -> Result<(), E>>(
690 &mut self,
691 node_id: &NodeId,
692 partition_number: PartitionNumber,
693 limit: u32,
694 on_io_access: &mut F,
695 ) -> Result<Vec<(SubstateKey, IndexedScryptoValue)>, E> {
696 let limit: usize = limit.try_into().unwrap();
697 let mut items = Vec::new();
698
699 let node_updates = self.tracked_nodes.get_mut(node_id);
700 let is_new = node_updates
701 .as_ref()
702 .map(|tracked_node| tracked_node.is_new)
703 .unwrap_or(false);
704
705 let mut tracked_partition =
707 node_updates.and_then(|n| n.tracked_partitions.get_mut(&partition_number));
708 if let Some(tracked_partition) = tracked_partition.as_mut() {
709 for (_db_sort_key, tracked_substate) in tracked_partition.substates.iter_mut() {
710 if items.len() == limit {
711 return Ok(items);
712 }
713
714 let old_size = Some(tracked_substate.size());
715 if let Some(value) = tracked_substate.substate_value.take() {
716 items.push((tracked_substate.substate_key.clone(), value));
717 }
718 let new_size = Some(tracked_substate.size());
719
720 on_io_access(IOAccess::TrackSubstateUpdated {
722 canonical_substate_key: CanonicalSubstateKey {
723 node_id: *node_id,
724 partition_number,
725 substate_key: tracked_substate.substate_key.clone(),
726 },
727 old_size,
728 new_size,
729 })?;
730 }
731 }
732
733 if items.len() == limit || is_new {
735 return Ok(items);
736 }
737
738 let db_partition_key = M::to_db_partition_key(node_id, partition_number);
740
741 let (new_updates, num_iterations) = {
742 let mut tracked_iter =
743 IterationCountedIter::new(Self::list_entries_from_db::<E, F, K>(
744 self.substate_db,
745 &db_partition_key,
746 on_io_access,
747 CanonicalPartition {
748 node_id: *node_id,
749 partition_number,
750 },
751 ));
752 let new_updates = {
753 let mut new_updates = Vec::new();
754 for result in &mut tracked_iter {
755 let (db_sort_key, (substate_key, substate_value)) = result?;
756
757 if items.len() == limit {
758 break;
759 }
760
761 if tracked_partition
762 .as_ref()
763 .map(|tracked_partition| {
764 tracked_partition.substates.contains_key(&db_sort_key)
765 })
766 .unwrap_or(false)
767 {
768 continue;
769 }
770
771 let tracked = TrackedSubstate {
772 substate_key: substate_key.clone(),
773 substate_value: TrackedSubstateValue::ReadExistAndWrite(
774 substate_value.clone(),
775 Write::Delete,
776 ),
777 };
778 new_updates.push((db_sort_key, tracked));
779 items.push((substate_key, substate_value));
780 }
781 new_updates
782 };
783
784 (new_updates, tracked_iter.num_iterations)
785 };
786
787 {
789 let tracked_partition = self.get_tracked_partition(node_id, partition_number);
790 tracked_partition.range_read = u32::max(tracked_partition.range_read, num_iterations);
791
792 for (db_sort_key, tracked_substate) in new_updates {
793 let substate_key = tracked_substate.substate_key.clone();
794 let new_size = Some(tracked_substate.size());
795 let old_size = tracked_partition
796 .substates
797 .insert(db_sort_key, tracked_substate)
798 .map(|x| x.size());
799
800 on_io_access(IOAccess::TrackSubstateUpdated {
802 canonical_substate_key: CanonicalSubstateKey {
803 node_id: *node_id,
804 partition_number,
805 substate_key,
806 },
807 old_size,
808 new_size,
809 })?;
810 }
811 }
812
813 Ok(items)
814 }
815
816 #[allow(clippy::type_complexity)]
817 fn scan_sorted_substates<E, F: FnMut(IOAccess) -> Result<(), E>>(
818 &mut self,
819 node_id: &NodeId,
820 partition_number: PartitionNumber,
821 limit: u32,
822 on_io_access: &mut F,
823 ) -> Result<Vec<(SortedKey, IndexedScryptoValue)>, E> {
824 let limit: usize = limit.try_into().unwrap();
826
827 let tracked_node = self
829 .tracked_nodes
830 .entry(*node_id)
831 .or_insert(TrackedNode::new(false));
832 let tracked_partition = tracked_node
833 .tracked_partitions
834 .entry(partition_number)
835 .or_default();
836
837 let mut db_values_count = 0u32;
839 let raw_db_entries: Box<
840 dyn Iterator<Item = Result<(DbSortKey, (SubstateKey, IndexedScryptoValue)), E>>,
841 > = if tracked_node.is_new {
842 Box::new(empty()) } else {
844 let partition_key = M::to_db_partition_key(node_id, partition_number);
845 Box::new(Self::list_entries_from_db::<E, F, SortedKey>(
846 self.substate_db,
847 &partition_key,
848 on_io_access,
849 CanonicalPartition {
850 node_id: *node_id,
851 partition_number,
852 },
853 ))
854 };
855 let db_read_entries = raw_db_entries.inspect(|_| {
856 db_values_count += 1;
857 });
858
859 let tracked_entry_changes =
861 tracked_partition
862 .substates
863 .iter()
864 .map(|(db_sort_key, tracked_substate)| {
865 if let Some(value) = tracked_substate.substate_value.get() {
867 (
868 db_sort_key.clone(),
869 Some((tracked_substate.substate_key.clone(), value.clone())),
870 )
871 } else {
872 (db_sort_key.clone(), None)
873 }
874 });
875
876 let mut items = Vec::new();
877 for result in
879 OverlayingResultIterator::new(db_read_entries, tracked_entry_changes).take(limit)
880 {
881 let (_db_sort_key, (substate_key, substate_value)) = result?;
882 let sorted_key = match substate_key {
883 SubstateKey::Sorted(sorted) => sorted,
884 _ => panic!("Should be a sorted key"),
885 };
886 items.push((sorted_key, substate_value));
887 }
888
889 tracked_partition.range_read = u32::max(tracked_partition.range_read, db_values_count);
891
892 Ok(items)
895 }
896
897 fn delete_partition(&mut self, node_id: &NodeId, partition_num: PartitionNumber) {
898 self.deleted_partitions.insert((*node_id, partition_num));
901 }
902
903 fn get_commit_info(&mut self) -> StoreCommitInfo {
904 let mut store_commit = Vec::new();
905
906 for (node_id, node) in &self.tracked_nodes {
907 for (partition_number, partition) in &node.tracked_partitions {
908 for (db_sort_key, substate) in &partition.substates {
909 if self.transient_substates.is_transient(
910 node_id,
911 *partition_number,
912 &substate.substate_key,
913 ) {
914 continue;
915 }
916
917 let canonical_substate_key = CanonicalSubstateKey {
918 node_id: *node_id,
919 partition_number: *partition_number,
920 substate_key: substate.substate_key.clone(),
921 };
922
923 match &substate.substate_value {
924 TrackedSubstateValue::New(v) => {
925 store_commit.push(StoreCommit::Insert {
926 canonical_substate_key,
927 size: v.value.len(),
928 });
929 }
930 TrackedSubstateValue::ReadOnly(_) => {
931 }
933 TrackedSubstateValue::ReadExistAndWrite(old_value, write) => match write {
934 Write::Update(x) => {
935 store_commit.push(StoreCommit::Update {
936 canonical_substate_key,
937 size: x.value.len(),
938 old_size: old_value.len(),
939 });
940 }
941 Write::Delete => {
942 store_commit.push(StoreCommit::Delete {
943 canonical_substate_key,
944 old_size: old_value.len(),
945 });
946 }
947 },
948 TrackedSubstateValue::ReadNonExistAndWrite(value) => {
949 store_commit.push(StoreCommit::Insert {
950 canonical_substate_key,
951 size: value.value.len(),
952 });
953 }
954 TrackedSubstateValue::WriteOnly(write) => {
955 let old_size = self
956 .substate_db
957 .get_raw_substate_by_db_key(
958 &M::to_db_partition_key(node_id, *partition_number),
959 db_sort_key,
960 )
961 .map(|x| x.len());
962
963 match (old_size, write) {
964 (Some(old_size), Write::Update(x)) => {
965 store_commit.push(StoreCommit::Update {
966 canonical_substate_key,
967 size: x.value.len(),
968 old_size,
969 });
970 }
971 (Some(old_size), Write::Delete) => {
972 store_commit.push(StoreCommit::Delete {
973 canonical_substate_key,
974 old_size,
975 });
976 }
977 (None, Write::Update(x)) => {
978 store_commit.push(StoreCommit::Insert {
979 canonical_substate_key,
980 size: x.value.len(),
981 });
982 }
983 (None, Write::Delete) => {
984 }
986 }
987 }
988 TrackedSubstateValue::Garbage => {
989 }
991 }
992 }
993 }
994 }
995
996 store_commit
997 }
998}