radix_engine/track/
track.rs

1use crate::internal_prelude::*;
2use crate::kernel::call_frame::TransientSubstates;
3use crate::track::interface::{
4    CommitableSubstateStore, IOAccess, NodeSubstates, TrackedSubstateInfo,
5};
6use crate::track::state_updates::*;
7use radix_engine_interface::types::*;
8use radix_substate_store_interface::db_key_mapper::{SpreadPrefixKeyMapper, SubstateKeyContent};
9use radix_substate_store_interface::interface::DbPartitionKey;
10use radix_substate_store_interface::{
11    db_key_mapper::DatabaseKeyMapper,
12    interface::{DbSortKey, PartitionEntry, SubstateDatabase},
13};
14use sbor::rust::collections::btree_map::Entry;
15use sbor::rust::iter::empty;
16use sbor::rust::mem;
17
18use super::interface::{CanonicalPartition, CanonicalSubstateKey, StoreCommit, StoreCommitInfo};
19
20#[derive(Debug)]
21pub enum TrackFinalizeError {
22    TransientSubstateOwnsNode,
23}
24
25pub type Track<'s, S> = MappedTrack<'s, S, SpreadPrefixKeyMapper>;
26
27/// Transaction-wide states and side effects
28pub struct MappedTrack<'s, S: SubstateDatabase, M: DatabaseKeyMapper + 'static> {
29    /// Substate database, use `get_substate_from_db` and `list_entries_from_db` for access
30    substate_db: &'s S,
31
32    tracked_nodes: IndexMap<NodeId, TrackedNode>,
33    force_write_tracked_nodes: IndexMap<NodeId, TrackedNode>,
34    /// TODO: if time allows, consider merging into tracked nodes.
35    deleted_partitions: IndexSet<(NodeId, PartitionNumber)>,
36
37    transient_substates: TransientSubstates,
38
39    phantom_data: PhantomData<M>,
40}
41
42/// Records all the substates that have been read or written into, and all the partitions to delete.
43///
44/// `NodeId` in this struct isn't always valid.
45pub struct TrackedSubstates {
46    pub tracked_nodes: IndexMap<NodeId, TrackedNode>,
47    pub deleted_partitions: IndexSet<(NodeId, PartitionNumber)>,
48}
49
50impl TrackedSubstates {
51    pub fn to_state_updates(self) -> (IndexSet<NodeId>, StateUpdates) {
52        let mut new_nodes = index_set_new();
53        let mut state_updates = StateUpdates::empty();
54
55        for (node_id, partition_num) in self.deleted_partitions {
56            state_updates
57                .of_node(node_id)
58                .of_partition(partition_num)
59                .delete();
60        }
61
62        for (node_id, tracked_node) in self.tracked_nodes {
63            if tracked_node.is_new {
64                new_nodes.insert(node_id);
65            }
66
67            for (partition_num, tracked_partition) in tracked_node.tracked_partitions {
68                let partition_updates: IndexMap<_, _> = tracked_partition
69                    .substates
70                    .into_values()
71                    .filter_map(|tracked| {
72                        let update = match tracked.substate_value {
73                            TrackedSubstateValue::ReadOnly(..) | TrackedSubstateValue::Garbage => {
74                                return None
75                            }
76                            TrackedSubstateValue::ReadNonExistAndWrite(substate)
77                            | TrackedSubstateValue::New(substate) => {
78                                DatabaseUpdate::Set(substate.value.into())
79                            }
80                            TrackedSubstateValue::ReadExistAndWrite(_, write)
81                            | TrackedSubstateValue::WriteOnly(write) => match write {
82                                Write::Delete => DatabaseUpdate::Delete,
83                                Write::Update(substate) => {
84                                    DatabaseUpdate::Set(substate.value.into())
85                                }
86                            },
87                        };
88                        Some((tracked.substate_key, update))
89                    })
90                    .collect();
91
92                // Filter out empty partition updates, to avoid wasted work downstream (e.g. in the Merkle Tree in the node)
93                if !partition_updates.is_empty() {
94                    state_updates
95                        .of_node(node_id)
96                        .of_partition(partition_num)
97                        .mut_update_substates(partition_updates);
98                }
99            }
100        }
101
102        (new_nodes, state_updates)
103    }
104}
105
106impl<'s, S: SubstateDatabase, M: DatabaseKeyMapper> MappedTrack<'s, S, M> {
107    pub fn new(substate_db: &'s S) -> Self {
108        Self {
109            substate_db,
110            force_write_tracked_nodes: index_map_new(),
111            tracked_nodes: index_map_new(),
112            deleted_partitions: index_set_new(),
113            transient_substates: TransientSubstates::new(),
114            phantom_data: PhantomData,
115        }
116    }
117
118    // TODO cleanup interface to avoid redundant information
119    fn get_substate_from_db<E, F: FnMut(IOAccess) -> Result<(), E>>(
120        substate_db: &'s S,
121        partition_key: &DbPartitionKey,
122        sort_key: &DbSortKey,
123        on_io_access: &mut F,
124        canonical_substate_key: CanonicalSubstateKey,
125    ) -> Result<Option<IndexedScryptoValue>, E> {
126        let result = substate_db
127            .get_raw_substate_by_db_key(partition_key, sort_key)
128            .map(|e| IndexedScryptoValue::from_vec(e).expect("Failed to decode substate"));
129        if let Some(x) = &result {
130            on_io_access(IOAccess::ReadFromDb(canonical_substate_key, x.len()))?;
131        } else {
132            on_io_access(IOAccess::ReadFromDbNotFound(canonical_substate_key))?;
133        }
134        Ok(result)
135    }
136
137    // TODO cleanup interface to avoid redundant information
138    #[allow(clippy::type_complexity)]
139    fn list_entries_from_db<
140        'x,
141        E: 'x,
142        F: FnMut(IOAccess) -> Result<(), E> + 'x,
143        K: SubstateKeyContent,
144    >(
145        substate_db: &'x S,
146        partition_key: &DbPartitionKey,
147        on_io_access: &'x mut F,
148        canonical_partition: CanonicalPartition,
149    ) -> Box<dyn Iterator<Item = Result<(DbSortKey, (SubstateKey, IndexedScryptoValue)), E>> + 'x>
150    {
151        struct TracedIterator<
152            'a,
153            E,
154            F: FnMut(IOAccess) -> Result<(), E>,
155            M: DatabaseKeyMapper + 'static,
156            K: SubstateKeyContent,
157        > {
158            iterator: Box<dyn Iterator<Item = PartitionEntry> + 'a>,
159            on_io_access: &'a mut F,
160            canonical_partition: CanonicalPartition,
161            errored_out: bool,
162            phantom1: PhantomData<M>,
163            phantom2: PhantomData<K>,
164        }
165
166        impl<
167                'a,
168                E,
169                F: FnMut(IOAccess) -> Result<(), E>,
170                M: DatabaseKeyMapper + 'static,
171                K: SubstateKeyContent,
172            > Iterator for TracedIterator<'a, E, F, M, K>
173        {
174            type Item = Result<(DbSortKey, (SubstateKey, IndexedScryptoValue)), E>;
175
176            fn next(&mut self) -> Option<Self::Item> {
177                if self.errored_out {
178                    return None;
179                }
180
181                let result = self.iterator.next();
182                if let Some(x) = result {
183                    let substate_key = M::from_db_sort_key::<K>(&x.0);
184                    let substate_value =
185                        IndexedScryptoValue::from_vec(x.1).expect("Failed to decode substate");
186                    let io_access = IOAccess::ReadFromDb(
187                        CanonicalSubstateKey::of(self.canonical_partition, substate_key.clone()),
188                        substate_value.len(),
189                    );
190                    let result = (self.on_io_access)(io_access);
191                    match result {
192                        Ok(()) => Some(Ok((x.0, (substate_key, substate_value)))),
193                        Err(e) => {
194                            self.errored_out = true;
195                            Some(Err(e))
196                        }
197                    }
198                } else {
199                    None
200                }
201            }
202        }
203
204        Box::new(TracedIterator {
205            iterator: substate_db.list_raw_values_from_db_key(partition_key, None),
206            on_io_access,
207            canonical_partition,
208            errored_out: false,
209            phantom1: PhantomData::<M>,
210            phantom2: PhantomData::<K>,
211        })
212    }
213
214    /// Reverts all non force write changes.
215    ///
216    /// Note that dependencies will never be reverted.
217    pub fn revert_non_force_write_changes(&mut self) {
218        self.tracked_nodes
219            .retain(|_, tracked_node| !tracked_node.is_new);
220        for (_, tracked_node) in &mut self.tracked_nodes {
221            tracked_node.revert_writes();
222        }
223
224        let force_writes = mem::take(&mut self.force_write_tracked_nodes);
225
226        for (node_id, force_track_node) in force_writes {
227            for (partition_num, force_track_partition) in force_track_node.tracked_partitions {
228                for (db_sort_key, force_track_key) in force_track_partition.substates {
229                    let tracked_node = self.tracked_nodes.get_mut(&node_id).unwrap();
230                    let tracked_partition = tracked_node
231                        .tracked_partitions
232                        .get_mut(&partition_num)
233                        .unwrap();
234                    let tracked = &mut tracked_partition
235                        .substates
236                        .get_mut(&db_sort_key)
237                        .unwrap()
238                        .substate_value;
239                    *tracked = force_track_key.substate_value;
240                }
241            }
242        }
243    }
244
245    /// Finalizes changes captured by this substate store.
246    ///
247    ///  Returns the state changes and dependencies.
248    pub fn finalize(mut self) -> Result<(TrackedSubstates, &'s S), TrackFinalizeError> {
249        for (node_id, transient_substates) in self.transient_substates.transient_substates {
250            for (partition, substate_key) in transient_substates {
251                if let Some(tracked_partition) = self
252                    .tracked_nodes
253                    .get_mut(&node_id)
254                    .and_then(|tracked_node| tracked_node.tracked_partitions.get_mut(&partition))
255                {
256                    let db_sort_key = M::to_db_sort_key(&substate_key);
257                    let tracked_substate = tracked_partition.substates.remove(&db_sort_key);
258                    if let Some(substate) =
259                        tracked_substate.and_then(|s| s.substate_value.into_value())
260                    {
261                        if !substate.owned_nodes().is_empty() {
262                            return Err(TrackFinalizeError::TransientSubstateOwnsNode);
263                        }
264                    }
265                }
266            }
267        }
268
269        Ok((
270            TrackedSubstates {
271                tracked_nodes: self.tracked_nodes,
272                deleted_partitions: self.deleted_partitions,
273            },
274            self.substate_db,
275        ))
276    }
277
278    fn get_tracked_partition(
279        &mut self,
280        node_id: &NodeId,
281        partition_num: PartitionNumber,
282    ) -> &mut TrackedPartition {
283        self.tracked_nodes
284            .entry(*node_id)
285            .or_insert(TrackedNode::new(false))
286            .tracked_partitions
287            .entry(partition_num)
288            .or_default()
289    }
290
291    fn get_tracked_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
292        &mut self,
293        node_id: &NodeId,
294        partition_number: PartitionNumber,
295        substate_key: SubstateKey,
296        on_io_access: &mut F,
297    ) -> Result<&mut TrackedSubstateValue, E> {
298        let db_sort_key = M::to_db_sort_key(&substate_key);
299        let partition = &mut self
300            .tracked_nodes
301            .entry(*node_id)
302            .or_insert(TrackedNode::new(false))
303            .tracked_partitions
304            .entry(partition_number)
305            .or_default()
306            .substates;
307        let entry = partition.entry(db_sort_key.clone());
308
309        match entry {
310            Entry::Vacant(e) => {
311                if self
312                    .transient_substates
313                    .is_transient(node_id, partition_number, &substate_key)
314                {
315                    let tracked = TrackedSubstate {
316                        substate_key: substate_key.clone(),
317                        substate_value: TrackedSubstateValue::ReadOnly(ReadOnly::NonExistent),
318                    };
319                    let new_size = Some(tracked.size());
320                    e.insert(tracked);
321
322                    on_io_access(IOAccess::TrackSubstateUpdated {
323                        canonical_substate_key: CanonicalSubstateKey {
324                            node_id: *node_id,
325                            partition_number,
326                            substate_key,
327                        },
328                        old_size: None,
329                        new_size,
330                    })?;
331                } else {
332                    let db_partition_key = M::to_db_partition_key(node_id, partition_number);
333                    let substate_value = Self::get_substate_from_db(
334                        self.substate_db,
335                        &db_partition_key,
336                        &M::to_db_sort_key(&substate_key),
337                        on_io_access,
338                        CanonicalSubstateKey {
339                            node_id: *node_id,
340                            partition_number,
341                            substate_key: substate_key.clone(),
342                        },
343                    )?;
344
345                    let new_size;
346                    if let Some(value) = substate_value {
347                        let tracked = TrackedSubstate {
348                            substate_key: substate_key.clone(),
349                            substate_value: TrackedSubstateValue::ReadOnly(ReadOnly::Existent(
350                                RuntimeSubstate::new(value),
351                            )),
352                        };
353                        new_size = Some(tracked.size());
354                        e.insert(tracked);
355                    } else {
356                        let tracked = TrackedSubstate {
357                            substate_key: substate_key.clone(),
358                            substate_value: TrackedSubstateValue::ReadOnly(ReadOnly::NonExistent),
359                        };
360                        new_size = Some(tracked.size());
361                        e.insert(tracked);
362                    };
363
364                    // Notify upper layer
365                    on_io_access(IOAccess::TrackSubstateUpdated {
366                        canonical_substate_key: CanonicalSubstateKey {
367                            node_id: *node_id,
368                            partition_number,
369                            substate_key,
370                        },
371                        old_size: None,
372                        new_size,
373                    })?;
374                }
375            }
376            Entry::Occupied(..) => {}
377        }
378
379        Ok(&mut partition.get_mut(&db_sort_key).unwrap().substate_value)
380    }
381}
382
383impl<'s, S: SubstateDatabase, M: DatabaseKeyMapper> CommitableSubstateStore
384    for MappedTrack<'s, S, M>
385{
386    fn mark_as_transient(
387        &mut self,
388        node_id: NodeId,
389        partition_num: PartitionNumber,
390        substate_key: SubstateKey,
391    ) {
392        self.transient_substates
393            .mark_as_transient(node_id, partition_num, substate_key);
394    }
395
396    fn create_node<E, F: FnMut(IOAccess) -> Result<(), E>>(
397        &mut self,
398        node_id: NodeId,
399        node_substates: NodeSubstates,
400        on_io_access: &mut F,
401    ) -> Result<(), E> {
402        let mut tracked_partitions = index_map_new();
403
404        for (partition_number, partition) in node_substates {
405            let mut partition_substates = BTreeMap::new();
406            for (substate_key, substate_value) in partition {
407                let db_sort_key = M::to_db_sort_key(&substate_key);
408
409                let tracked = TrackedSubstate {
410                    substate_key: substate_key.clone(),
411                    substate_value: TrackedSubstateValue::New(RuntimeSubstate::new(substate_value)),
412                };
413                let new_size = Some(tracked.size());
414                let old_tracked = partition_substates.insert(db_sort_key, tracked);
415                assert!(old_tracked.is_none());
416
417                // Notify upper layer
418                on_io_access(IOAccess::TrackSubstateUpdated {
419                    canonical_substate_key: CanonicalSubstateKey {
420                        node_id,
421                        partition_number,
422                        substate_key,
423                    },
424                    old_size: None,
425                    new_size,
426                })?;
427            }
428            let tracked_partition = TrackedPartition::new_with_substates(partition_substates);
429            tracked_partitions.insert(partition_number, tracked_partition);
430        }
431
432        self.tracked_nodes.insert(
433            node_id,
434            TrackedNode {
435                tracked_partitions,
436                is_new: true,
437            },
438        );
439
440        Ok(())
441    }
442
443    fn get_tracked_substate_info(
444        &mut self,
445        node_id: &NodeId,
446        partition_num: PartitionNumber,
447        substate_key: &SubstateKey,
448    ) -> TrackedSubstateInfo {
449        let db_sort_key = M::to_db_sort_key(substate_key);
450        let info = self
451            .tracked_nodes
452            .get(node_id)
453            .and_then(|n| n.tracked_partitions.get(&partition_num))
454            .and_then(|p| p.substates.get(&db_sort_key))
455            .map(|s| match s.substate_value {
456                TrackedSubstateValue::New(..) | TrackedSubstateValue::Garbage => {
457                    TrackedSubstateInfo::New
458                }
459                TrackedSubstateValue::WriteOnly(..)
460                | TrackedSubstateValue::ReadExistAndWrite(..)
461                | TrackedSubstateValue::ReadNonExistAndWrite(..) => TrackedSubstateInfo::Updated,
462                TrackedSubstateValue::ReadOnly(..) => TrackedSubstateInfo::Unmodified,
463            })
464            .unwrap_or(TrackedSubstateInfo::Unmodified);
465
466        info
467    }
468
469    fn get_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
470        &mut self,
471        node_id: &NodeId,
472        partition_num: PartitionNumber,
473        substate_key: &SubstateKey,
474        on_io_access: &mut F,
475    ) -> Result<Option<&IndexedScryptoValue>, E> {
476        // Load the substate from state track
477        let tracked =
478            self.get_tracked_substate(node_id, partition_num, substate_key.clone(), on_io_access)?;
479
480        let value = tracked.get_runtime_substate_mut().map(|v| &v.value);
481
482        Ok(value)
483    }
484
485    fn set_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
486        &mut self,
487        node_id: NodeId,
488        partition_number: PartitionNumber,
489        substate_key: SubstateKey,
490        substate_value: IndexedScryptoValue,
491        on_io_access: &mut F,
492    ) -> Result<(), E> {
493        let tracked_partition = self
494            .tracked_nodes
495            .entry(node_id)
496            .or_insert(TrackedNode::new(false))
497            .tracked_partitions
498            .entry(partition_number)
499            .or_default();
500        let db_sort_key = M::to_db_sort_key(&substate_key);
501        let entry = tracked_partition.substates.entry(db_sort_key);
502
503        match entry {
504            Entry::Vacant(e) => {
505                let tracked = TrackedSubstate {
506                    substate_key: substate_key.clone(),
507                    substate_value: TrackedSubstateValue::WriteOnly(Write::Update(
508                        RuntimeSubstate::new(substate_value),
509                    )),
510                };
511                let new_size = Some(tracked.size());
512                e.insert(tracked);
513
514                // Notify upper layer
515                on_io_access(IOAccess::TrackSubstateUpdated {
516                    canonical_substate_key: CanonicalSubstateKey {
517                        node_id,
518                        partition_number,
519                        substate_key,
520                    },
521                    old_size: None,
522                    new_size,
523                })?;
524            }
525            Entry::Occupied(mut e) => {
526                let tracked = e.get_mut();
527
528                let old_size = Some(tracked.size());
529                tracked.substate_value.set(substate_value);
530                let new_size = Some(tracked.size());
531
532                // Notify upper layer
533                on_io_access(IOAccess::TrackSubstateUpdated {
534                    canonical_substate_key: CanonicalSubstateKey {
535                        node_id,
536                        partition_number,
537                        substate_key,
538                    },
539                    old_size,
540                    new_size,
541                })?;
542            }
543        }
544
545        Ok(())
546    }
547
548    fn force_write(
549        &mut self,
550        node_id: &NodeId,
551        partition_num: &PartitionNumber,
552        substate_key: &SubstateKey,
553    ) {
554        let tracked = self
555            .get_tracked_substate(
556                node_id,
557                *partition_num,
558                substate_key.clone(),
559                &mut |_| -> Result<(), ()> { Err(()) },
560            )
561            .expect("Should not need to go into store on close substate.");
562        let cloned_track = tracked.clone();
563
564        self.force_write_tracked_nodes
565            .entry(*node_id)
566            .or_insert(TrackedNode {
567                tracked_partitions: index_map_new(),
568                is_new: false,
569            })
570            .tracked_partitions
571            .entry(*partition_num)
572            .or_default()
573            .substates
574            .insert(
575                M::to_db_sort_key(substate_key),
576                TrackedSubstate {
577                    substate_key: substate_key.clone(),
578                    substate_value: cloned_track,
579                },
580            );
581    }
582
583    // Should not use on virtualized substates
584    fn remove_substate<E, F: FnMut(IOAccess) -> Result<(), E>>(
585        &mut self,
586        node_id: &NodeId,
587        partition_number: PartitionNumber,
588        substate_key: &SubstateKey,
589        on_io_access: &mut F,
590    ) -> Result<Option<IndexedScryptoValue>, E> {
591        let tracked = self.get_tracked_substate(
592            node_id,
593            partition_number,
594            substate_key.clone(),
595            on_io_access,
596        )?;
597
598        let old_size = Some(tracked.size());
599        let taken = tracked.take();
600        let new_size = Some(tracked.size());
601
602        // Notify upper layer
603        on_io_access(IOAccess::TrackSubstateUpdated {
604            canonical_substate_key: CanonicalSubstateKey {
605                node_id: *node_id,
606                partition_number,
607                substate_key: substate_key.clone(),
608            },
609            old_size,
610            new_size,
611        })?;
612
613        Ok(taken)
614    }
615
616    fn scan_keys<K: SubstateKeyContent, E, F: FnMut(IOAccess) -> Result<(), E>>(
617        &mut self,
618        node_id: &NodeId,
619        partition_number: PartitionNumber,
620        limit: u32,
621        on_io_access: &mut F,
622    ) -> Result<Vec<SubstateKey>, E> {
623        let limit: usize = limit.try_into().unwrap();
624        let mut items = Vec::new();
625
626        let node_updates = self.tracked_nodes.get(node_id);
627        let is_new = node_updates
628            .map(|tracked_node| tracked_node.is_new)
629            .unwrap_or(false);
630        let tracked_partition =
631            node_updates.and_then(|n| n.tracked_partitions.get(&partition_number));
632
633        if let Some(tracked_partition) = tracked_partition {
634            for tracked_substate in tracked_partition.substates.values() {
635                if items.len() == limit {
636                    return Ok(items);
637                }
638
639                // TODO: Check that substate is not write locked, before use outside of native blueprints
640                if let Some(_substate) = tracked_substate.substate_value.get() {
641                    items.push(tracked_substate.substate_key.clone());
642                }
643            }
644        }
645
646        // Optimization, no need to go into database if the node is just created
647        if items.len() == limit || is_new {
648            return Ok(items);
649        }
650
651        let db_partition_key = M::to_db_partition_key(node_id, partition_number);
652        let mut tracked_iter = IterationCountedIter::new(Self::list_entries_from_db::<E, F, K>(
653            self.substate_db,
654            &db_partition_key,
655            on_io_access,
656            CanonicalPartition {
657                node_id: *node_id,
658                partition_number,
659            },
660        ));
661
662        for result in &mut tracked_iter {
663            let (db_sort_key, (substate_key, _substate_value)) = result?;
664
665            if items.len() == limit {
666                break;
667            }
668
669            if tracked_partition
670                .map(|tracked_partition| tracked_partition.substates.contains_key(&db_sort_key))
671                .unwrap_or(false)
672            {
673                continue;
674            }
675
676            // TODO: cache read substates in Track (and notify upper layer)
677
678            items.push(substate_key);
679        }
680
681        // Update track
682        let num_iterations = tracked_iter.num_iterations;
683        let tracked_partition = self.get_tracked_partition(node_id, partition_number);
684        tracked_partition.range_read = u32::max(tracked_partition.range_read, num_iterations);
685
686        Ok(items)
687    }
688
689    fn drain_substates<K: SubstateKeyContent, E, F: FnMut(IOAccess) -> Result<(), E>>(
690        &mut self,
691        node_id: &NodeId,
692        partition_number: PartitionNumber,
693        limit: u32,
694        on_io_access: &mut F,
695    ) -> Result<Vec<(SubstateKey, IndexedScryptoValue)>, E> {
696        let limit: usize = limit.try_into().unwrap();
697        let mut items = Vec::new();
698
699        let node_updates = self.tracked_nodes.get_mut(node_id);
700        let is_new = node_updates
701            .as_ref()
702            .map(|tracked_node| tracked_node.is_new)
703            .unwrap_or(false);
704
705        // Check what we've currently got so far without going into database
706        let mut tracked_partition =
707            node_updates.and_then(|n| n.tracked_partitions.get_mut(&partition_number));
708        if let Some(tracked_partition) = tracked_partition.as_mut() {
709            for (_db_sort_key, tracked_substate) in tracked_partition.substates.iter_mut() {
710                if items.len() == limit {
711                    return Ok(items);
712                }
713
714                let old_size = Some(tracked_substate.size());
715                if let Some(value) = tracked_substate.substate_value.take() {
716                    items.push((tracked_substate.substate_key.clone(), value));
717                }
718                let new_size = Some(tracked_substate.size());
719
720                // Notify upper layer
721                on_io_access(IOAccess::TrackSubstateUpdated {
722                    canonical_substate_key: CanonicalSubstateKey {
723                        node_id: *node_id,
724                        partition_number,
725                        substate_key: tracked_substate.substate_key.clone(),
726                    },
727                    old_size,
728                    new_size,
729                })?;
730            }
731        }
732
733        // Optimization, no need to go into database if the node is just created
734        if items.len() == limit || is_new {
735            return Ok(items);
736        }
737
738        // Read from database
739        let db_partition_key = M::to_db_partition_key(node_id, partition_number);
740
741        let (new_updates, num_iterations) = {
742            let mut tracked_iter =
743                IterationCountedIter::new(Self::list_entries_from_db::<E, F, K>(
744                    self.substate_db,
745                    &db_partition_key,
746                    on_io_access,
747                    CanonicalPartition {
748                        node_id: *node_id,
749                        partition_number,
750                    },
751                ));
752            let new_updates = {
753                let mut new_updates = Vec::new();
754                for result in &mut tracked_iter {
755                    let (db_sort_key, (substate_key, substate_value)) = result?;
756
757                    if items.len() == limit {
758                        break;
759                    }
760
761                    if tracked_partition
762                        .as_ref()
763                        .map(|tracked_partition| {
764                            tracked_partition.substates.contains_key(&db_sort_key)
765                        })
766                        .unwrap_or(false)
767                    {
768                        continue;
769                    }
770
771                    let tracked = TrackedSubstate {
772                        substate_key: substate_key.clone(),
773                        substate_value: TrackedSubstateValue::ReadExistAndWrite(
774                            substate_value.clone(),
775                            Write::Delete,
776                        ),
777                    };
778                    new_updates.push((db_sort_key, tracked));
779                    items.push((substate_key, substate_value));
780                }
781                new_updates
782            };
783
784            (new_updates, tracked_iter.num_iterations)
785        };
786
787        // Update track
788        {
789            let tracked_partition = self.get_tracked_partition(node_id, partition_number);
790            tracked_partition.range_read = u32::max(tracked_partition.range_read, num_iterations);
791
792            for (db_sort_key, tracked_substate) in new_updates {
793                let substate_key = tracked_substate.substate_key.clone();
794                let new_size = Some(tracked_substate.size());
795                let old_size = tracked_partition
796                    .substates
797                    .insert(db_sort_key, tracked_substate)
798                    .map(|x| x.size());
799
800                // Notify upper layer
801                on_io_access(IOAccess::TrackSubstateUpdated {
802                    canonical_substate_key: CanonicalSubstateKey {
803                        node_id: *node_id,
804                        partition_number,
805                        substate_key,
806                    },
807                    old_size,
808                    new_size,
809                })?;
810            }
811        }
812
813        Ok(items)
814    }
815
816    #[allow(clippy::type_complexity)]
817    fn scan_sorted_substates<E, F: FnMut(IOAccess) -> Result<(), E>>(
818        &mut self,
819        node_id: &NodeId,
820        partition_number: PartitionNumber,
821        limit: u32,
822        on_io_access: &mut F,
823    ) -> Result<Vec<(SortedKey, IndexedScryptoValue)>, E> {
824        // TODO: ensure we abort if any substates are write locked.
825        let limit: usize = limit.try_into().unwrap();
826
827        // initialize the track partition, since we will definitely need it: either to read values from it OR to update the `range_read` on it
828        let tracked_node = self
829            .tracked_nodes
830            .entry(*node_id)
831            .or_insert(TrackedNode::new(false));
832        let tracked_partition = tracked_node
833            .tracked_partitions
834            .entry(partition_number)
835            .or_default();
836
837        // initialize the "from db" iterator: use `dyn`, since we want to skip it altogether if the node is marked as `is_new` in our track
838        let mut db_values_count = 0u32;
839        let raw_db_entries: Box<
840            dyn Iterator<Item = Result<(DbSortKey, (SubstateKey, IndexedScryptoValue)), E>>,
841        > = if tracked_node.is_new {
842            Box::new(empty()) // optimization: avoid touching the database altogether
843        } else {
844            let partition_key = M::to_db_partition_key(node_id, partition_number);
845            Box::new(Self::list_entries_from_db::<E, F, SortedKey>(
846                self.substate_db,
847                &partition_key,
848                on_io_access,
849                CanonicalPartition {
850                    node_id: *node_id,
851                    partition_number,
852                },
853            ))
854        };
855        let db_read_entries = raw_db_entries.inspect(|_| {
856            db_values_count += 1;
857        });
858
859        // initialize the "from track" iterator
860        let tracked_entry_changes =
861            tracked_partition
862                .substates
863                .iter()
864                .map(|(db_sort_key, tracked_substate)| {
865                    // TODO: ensure we abort if any substates are write locked.
866                    if let Some(value) = tracked_substate.substate_value.get() {
867                        (
868                            db_sort_key.clone(),
869                            Some((tracked_substate.substate_key.clone(), value.clone())),
870                        )
871                    } else {
872                        (db_sort_key.clone(), None)
873                    }
874                });
875
876        let mut items = Vec::new();
877        // construct the composite iterator, which applies changes read from our track on top of db values
878        for result in
879            OverlayingResultIterator::new(db_read_entries, tracked_entry_changes).take(limit)
880        {
881            let (_db_sort_key, (substate_key, substate_value)) = result?;
882            let sorted_key = match substate_key {
883                SubstateKey::Sorted(sorted) => sorted,
884                _ => panic!("Should be a sorted key"),
885            };
886            items.push((sorted_key, substate_value));
887        }
888
889        // Use the statistics (gathered by the `.inspect()`s above) to update the track's metadata and to return costing info
890        tracked_partition.range_read = u32::max(tracked_partition.range_read, db_values_count);
891
892        // TODO: cache read substates in Track (and notify upper layer)
893
894        Ok(items)
895    }
896
897    fn delete_partition(&mut self, node_id: &NodeId, partition_num: PartitionNumber) {
898        // This is used for transaction tracker only, for which we don't account for store access.
899
900        self.deleted_partitions.insert((*node_id, partition_num));
901    }
902
903    fn get_commit_info(&mut self) -> StoreCommitInfo {
904        let mut store_commit = Vec::new();
905
906        for (node_id, node) in &self.tracked_nodes {
907            for (partition_number, partition) in &node.tracked_partitions {
908                for (db_sort_key, substate) in &partition.substates {
909                    if self.transient_substates.is_transient(
910                        node_id,
911                        *partition_number,
912                        &substate.substate_key,
913                    ) {
914                        continue;
915                    }
916
917                    let canonical_substate_key = CanonicalSubstateKey {
918                        node_id: *node_id,
919                        partition_number: *partition_number,
920                        substate_key: substate.substate_key.clone(),
921                    };
922
923                    match &substate.substate_value {
924                        TrackedSubstateValue::New(v) => {
925                            store_commit.push(StoreCommit::Insert {
926                                canonical_substate_key,
927                                size: v.value.len(),
928                            });
929                        }
930                        TrackedSubstateValue::ReadOnly(_) => {
931                            // No op
932                        }
933                        TrackedSubstateValue::ReadExistAndWrite(old_value, write) => match write {
934                            Write::Update(x) => {
935                                store_commit.push(StoreCommit::Update {
936                                    canonical_substate_key,
937                                    size: x.value.len(),
938                                    old_size: old_value.len(),
939                                });
940                            }
941                            Write::Delete => {
942                                store_commit.push(StoreCommit::Delete {
943                                    canonical_substate_key,
944                                    old_size: old_value.len(),
945                                });
946                            }
947                        },
948                        TrackedSubstateValue::ReadNonExistAndWrite(value) => {
949                            store_commit.push(StoreCommit::Insert {
950                                canonical_substate_key,
951                                size: value.value.len(),
952                            });
953                        }
954                        TrackedSubstateValue::WriteOnly(write) => {
955                            let old_size = self
956                                .substate_db
957                                .get_raw_substate_by_db_key(
958                                    &M::to_db_partition_key(node_id, *partition_number),
959                                    db_sort_key,
960                                )
961                                .map(|x| x.len());
962
963                            match (old_size, write) {
964                                (Some(old_size), Write::Update(x)) => {
965                                    store_commit.push(StoreCommit::Update {
966                                        canonical_substate_key,
967                                        size: x.value.len(),
968                                        old_size,
969                                    });
970                                }
971                                (Some(old_size), Write::Delete) => {
972                                    store_commit.push(StoreCommit::Delete {
973                                        canonical_substate_key,
974                                        old_size,
975                                    });
976                                }
977                                (None, Write::Update(x)) => {
978                                    store_commit.push(StoreCommit::Insert {
979                                        canonical_substate_key,
980                                        size: x.value.len(),
981                                    });
982                                }
983                                (None, Write::Delete) => {
984                                    // TODO: this should never happen?
985                                }
986                            }
987                        }
988                        TrackedSubstateValue::Garbage => {
989                            // No op
990                        }
991                    }
992                }
993            }
994        }
995
996        store_commit
997    }
998}