radix_engine/kernel/
substate_io.rs

1use crate::kernel::call_frame::{
2    CallFrameDrainSubstatesError, CallFrameRemoveSubstateError, CallFrameScanKeysError,
3    CallFrameScanSortedSubstatesError, CallFrameSetSubstateError, CreateNodeError, DropNodeError,
4    MovePartitionError, NonGlobalNodeRefs, OpenSubstateError, PersistNodeError, TransientSubstates,
5    WriteSubstateError,
6};
7use crate::kernel::heap::{Heap, HeapRemoveNodeError};
8use crate::kernel::substate_locks::SubstateLocks;
9use crate::track::interface::{
10    CallbackError, CommitableSubstateStore, IOAccess, NodeSubstates, TrackedSubstateInfo,
11};
12use radix_common::prelude::{NodeId, PartitionNumber};
13use radix_common::types::{SortedKey, SubstateKey};
14use radix_common::ScryptoSbor;
15use radix_engine_interface::api::LockFlags;
16use radix_engine_interface::types::IndexedScryptoValue;
17use radix_substate_store_interface::db_key_mapper::SubstateKeyContent;
18use sbor::prelude::Vec;
19use sbor::rust::collections::BTreeSet;
20use sbor::rust::collections::LinkedList;
21use sbor::Sbor;
22
23#[derive(Debug, Copy, Clone, Sbor, PartialEq, Eq, PartialOrd, Ord)]
24pub enum SubstateDevice {
25    Heap,
26    Store,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct LockData {
31    pub flags: LockFlags,
32    device: SubstateDevice,
33    virtualized: Option<IndexedScryptoValue>,
34}
35
36/// Callback for store access, from SubstateIO
37pub trait IOAccessHandler<E> {
38    fn on_io_access(&mut self, heap: &Heap, io_access: IOAccess) -> Result<(), E>;
39}
40
41/// Callback for substate read, from SubstateIO
42pub trait SubstateReadHandler {
43    type Error;
44
45    fn on_read_substate(
46        &mut self,
47        heap: &Heap,
48        value: &IndexedScryptoValue,
49        location: SubstateDevice,
50    ) -> Result<(), Self::Error>;
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, ScryptoSbor)]
54pub enum ProcessSubstateIOWriteError {
55    NonGlobalRefNotAllowed(NodeId),
56    PersistNodeError(PersistNodeError),
57}
58
59pub struct SubstateIO<'g, S: CommitableSubstateStore> {
60    pub heap: Heap,
61    pub store: &'g mut S,
62    pub non_global_node_refs: NonGlobalNodeRefs,
63    pub substate_locks: SubstateLocks<LockData>,
64    pub heap_transient_substates: TransientSubstates,
65    pub pinned_to_heap: BTreeSet<NodeId>,
66}
67
68impl<'g, S: CommitableSubstateStore + 'g> SubstateIO<'g, S> {
69    /// Creates a new node with partitions/substates at the given device.
70    /// No additional node movement occurs (For example, owned nodes in node substates
71    /// are not moved and must be done manually using other interfaces)
72    pub fn create_node<E>(
73        &mut self,
74        device: SubstateDevice,
75        node_id: NodeId,
76        node_substates: NodeSubstates,
77        handler: &mut impl IOAccessHandler<E>,
78    ) -> Result<(), CallbackError<CreateNodeError, E>> {
79        match device {
80            SubstateDevice::Heap => {
81                self.heap
82                    .create_node(node_id, node_substates, &mut |heap, io_access| {
83                        handler.on_io_access(heap, io_access)
84                    })
85            }
86            SubstateDevice::Store => {
87                self.store
88                    .create_node(node_id, node_substates, &mut |io_access| {
89                        handler.on_io_access(&self.heap, io_access)
90                    })
91            }
92        }
93        .map_err(CallbackError::CallbackError)?;
94
95        Ok(())
96    }
97
98    pub fn drop_node<E>(
99        &mut self,
100        device: SubstateDevice,
101        node_id: &NodeId,
102        handler: &mut impl IOAccessHandler<E>,
103    ) -> Result<NodeSubstates, CallbackError<DropNodeError, E>> {
104        if self.substate_locks.node_is_locked(node_id) {
105            return Err(CallbackError::Error(DropNodeError::SubstateBorrowed(
106                node_id.clone().into(),
107            )));
108        }
109
110        if self.non_global_node_refs.node_is_referenced(node_id) {
111            return Err(CallbackError::Error(DropNodeError::NodeBorrowed(
112                node_id.clone().into(),
113            )));
114        }
115
116        let node_substates = match device {
117            SubstateDevice::Heap => {
118                match self.heap.remove_node(node_id, &mut |heap, io_access| {
119                    handler.on_io_access(heap, io_access)
120                }) {
121                    Ok(substates) => substates,
122                    Err(CallbackError::Error(HeapRemoveNodeError::NodeNotFound(node_id))) => {
123                        panic!("Frame owned node {:?} not found in heap", node_id)
124                    }
125                    Err(CallbackError::CallbackError(e)) => {
126                        return Err(CallbackError::CallbackError(e));
127                    }
128                }
129            }
130            SubstateDevice::Store => {
131                panic!("Node drops not supported for store")
132            }
133        };
134
135        Ok(node_substates)
136    }
137
138    pub fn move_node_from_heap_to_store<E>(
139        &mut self,
140        node_id: &NodeId,
141        handler: &mut impl IOAccessHandler<E>,
142    ) -> Result<(), CallbackError<PersistNodeError, E>> {
143        // TODO: Add locked substate checks, though this is not required since
144        // the system layer currently maintains the invariant that a call frame cannot
145        // open a substate of an owned node
146
147        let mut queue = LinkedList::new();
148        queue.push_back(node_id.clone());
149
150        while let Some(node_id) = queue.pop_front() {
151            if self.non_global_node_refs.node_is_referenced(&node_id) {
152                return Err(CallbackError::Error(PersistNodeError::NodeBorrowed(
153                    node_id.clone().into(),
154                )));
155            }
156
157            if self.pinned_to_heap.contains(&node_id) {
158                return Err(CallbackError::Error(
159                    PersistNodeError::CannotPersistPinnedNode(node_id.clone().into()),
160                ));
161            }
162
163            let node_substates = match self.heap.remove_node(&node_id, &mut |heap, io_access| {
164                handler.on_io_access(heap, io_access)
165            }) {
166                Ok(substates) => substates,
167                Err(CallbackError::Error(HeapRemoveNodeError::NodeNotFound(node_id))) => {
168                    panic!("Frame owned node {:?} not found in heap", node_id)
169                }
170                Err(CallbackError::CallbackError(e)) => {
171                    return Err(CallbackError::CallbackError(e));
172                }
173            };
174
175            for (_partition_num, module_substates) in &node_substates {
176                for (_substate_key, substate_value) in module_substates {
177                    for reference in substate_value.references() {
178                        if !reference.is_global() {
179                            return Err(CallbackError::Error(
180                                PersistNodeError::ContainsNonGlobalRef(reference.clone().into()),
181                            ));
182                        }
183                    }
184
185                    for node_id in substate_value.owned_nodes() {
186                        queue.push_back(*node_id);
187                    }
188                }
189            }
190
191            if let Some(transient_substates) = self
192                .heap_transient_substates
193                .transient_substates
194                .remove(&node_id)
195            {
196                for (partition_num, substate_key) in transient_substates {
197                    self.store
198                        .mark_as_transient(node_id, partition_num, substate_key);
199                }
200            }
201
202            self.store
203                .create_node(node_id.clone(), node_substates, &mut |io_access| {
204                    handler.on_io_access(&self.heap, io_access)
205                })
206                .map_err(CallbackError::CallbackError)?;
207        }
208
209        Ok(())
210    }
211
212    pub fn move_partition<'f, E>(
213        &mut self,
214        src_device: SubstateDevice,
215        src_node_id: &NodeId,
216        src_partition_number: PartitionNumber,
217        dest_device: SubstateDevice,
218        dest_node_id: &NodeId,
219        dest_partition_number: PartitionNumber,
220        handler: &mut impl IOAccessHandler<E>,
221    ) -> Result<(), CallbackError<MovePartitionError, E>> {
222        // TODO: Use more granular partition lock checks?
223        if self.substate_locks.node_is_locked(src_node_id) {
224            return Err(CallbackError::Error(MovePartitionError::SubstateBorrowed(
225                src_node_id.clone().into(),
226            )));
227        }
228        if self.substate_locks.node_is_locked(dest_node_id) {
229            return Err(CallbackError::Error(MovePartitionError::SubstateBorrowed(
230                dest_node_id.clone().into(),
231            )));
232        }
233
234        // Move
235        let partition_substates = match src_device {
236            SubstateDevice::Heap => self
237                .heap
238                .remove_partition(src_node_id, src_partition_number, &mut |heap, io_access| {
239                    handler.on_io_access(heap, io_access)
240                })
241                .map_err(|e| match e {
242                    CallbackError::Error(e) => {
243                        CallbackError::Error(MovePartitionError::HeapRemovePartitionError(e))
244                    }
245                    CallbackError::CallbackError(e) => CallbackError::CallbackError(e),
246                })?,
247            SubstateDevice::Store => {
248                return Err(CallbackError::Error(
249                    MovePartitionError::MoveFromStoreNotPermitted,
250                ));
251            }
252        };
253
254        for (substate_key, substate_value) in partition_substates {
255            match dest_device {
256                SubstateDevice::Heap => {
257                    self.heap
258                        .set_substate(
259                            *dest_node_id,
260                            dest_partition_number,
261                            substate_key,
262                            substate_value,
263                            &mut |heap, io_access| handler.on_io_access(heap, io_access),
264                        )
265                        .map_err(CallbackError::CallbackError)?;
266                }
267                SubstateDevice::Store => {
268                    // This should never actually occur since Vaults/Buckets which use transient substates are never globalized
269                    if let Some(transient_substates) = self
270                        .heap_transient_substates
271                        .transient_substates
272                        .get_mut(src_node_id)
273                    {
274                        if transient_substates.remove(&(src_partition_number, substate_key.clone()))
275                        {
276                            self.store.mark_as_transient(
277                                *dest_node_id,
278                                dest_partition_number,
279                                substate_key.clone(),
280                            )
281                        }
282                    }
283
284                    // Recursively move nodes to store
285                    for own in substate_value.owned_nodes() {
286                        self.move_node_from_heap_to_store(own, handler)
287                            .map_err(|e| e.map(|e| MovePartitionError::PersistNodeError(e)))?;
288                    }
289
290                    for reference in substate_value.references() {
291                        if !reference.is_global() {
292                            return Err(CallbackError::Error(
293                                MovePartitionError::NonGlobalRefNotAllowed(
294                                    reference.clone().into(),
295                                ),
296                            ));
297                        }
298                    }
299
300                    self.store
301                        .set_substate(
302                            *dest_node_id,
303                            dest_partition_number,
304                            substate_key,
305                            substate_value,
306                            &mut |io_access| handler.on_io_access(&self.heap, io_access),
307                        )
308                        .map_err(CallbackError::CallbackError)?
309                }
310            }
311        }
312
313        Ok(())
314    }
315
316    pub fn open_substate<E, D: FnOnce() -> IndexedScryptoValue>(
317        &mut self,
318        device: SubstateDevice,
319        node_id: &NodeId,
320        partition_num: PartitionNumber,
321        substate_key: &SubstateKey,
322        flags: LockFlags,
323        default: Option<D>,
324        handler: &mut impl IOAccessHandler<E>,
325    ) -> Result<(u32, &IndexedScryptoValue), CallbackError<OpenSubstateError, E>> {
326        match device {
327            SubstateDevice::Heap => {
328                if flags.contains(LockFlags::UNMODIFIED_BASE) {
329                    return Err(CallbackError::Error(
330                        OpenSubstateError::LockUnmodifiedBaseOnHeapNode,
331                    ));
332                }
333            }
334            SubstateDevice::Store => {
335                // Check substate state
336                if flags.contains(LockFlags::UNMODIFIED_BASE) {
337                    match self
338                        .store
339                        .get_tracked_substate_info(node_id, partition_num, substate_key)
340                    {
341                        TrackedSubstateInfo::New => {
342                            return Err(CallbackError::Error(
343                                OpenSubstateError::LockUnmodifiedBaseOnNewSubstate(
344                                    node_id.clone().into(),
345                                    partition_num,
346                                    substate_key.clone(),
347                                ),
348                            ));
349                        }
350                        TrackedSubstateInfo::Updated => {
351                            return Err(CallbackError::Error(
352                                OpenSubstateError::LockUnmodifiedBaseOnOnUpdatedSubstate(
353                                    node_id.clone().into(),
354                                    partition_num,
355                                    substate_key.clone(),
356                                ),
357                            ));
358                        }
359                        TrackedSubstateInfo::Unmodified => {
360                            // Okay
361                        }
362                    }
363                }
364            }
365        }
366
367        let substate_value = Self::get_substate_internal(
368            &mut self.heap,
369            &mut self.store,
370            device,
371            node_id,
372            partition_num,
373            substate_key,
374            handler,
375        )?;
376
377        let (lock_data, substate_value) = if let Some(substate_value) = substate_value {
378            let lock_data = LockData {
379                flags,
380                device,
381                virtualized: None,
382            };
383
384            (lock_data, Some(substate_value))
385        } else if let Some(compute_default) = default {
386            let default_value = compute_default();
387            if !default_value.owned_nodes().is_empty() {
388                return Err(CallbackError::Error(OpenSubstateError::InvalidDefaultValue));
389            }
390
391            let lock_data = LockData {
392                flags,
393                device,
394                virtualized: Some(default_value),
395            };
396
397            (lock_data, None)
398        } else {
399            return Err(CallbackError::Error(OpenSubstateError::SubstateFault));
400        };
401
402        let global_lock_handle = match self.substate_locks.lock(
403            node_id,
404            partition_num,
405            substate_key,
406            !flags.contains(LockFlags::MUTABLE),
407            lock_data,
408        ) {
409            Some(handle) => handle,
410            None => {
411                return Err(CallbackError::Error(OpenSubstateError::SubstateLocked(
412                    node_id.clone().into(),
413                    partition_num,
414                    substate_key.clone(),
415                )));
416            }
417        };
418
419        let substate_value = substate_value.unwrap_or_else(|| {
420            let (.., data) = self.substate_locks.get(global_lock_handle);
421            data.virtualized.as_ref().unwrap()
422        });
423
424        Ok((global_lock_handle, substate_value))
425    }
426
427    pub fn read_substate<H: SubstateReadHandler>(
428        &mut self,
429        global_lock_handle: u32,
430        handler: &mut H,
431    ) -> Result<&IndexedScryptoValue, H::Error> {
432        let (node_id, partition_num, substate_key, lock_data) =
433            self.substate_locks.get(global_lock_handle);
434
435        // If substate is current virtualized, just return it
436        if let Some(virtualized) = &lock_data.virtualized {
437            // TODO: Should we callback for costing in this case?
438            return Ok(virtualized);
439        }
440
441        let substate = match lock_data.device {
442            SubstateDevice::Heap => self
443                .heap
444                .get_substate(node_id, *partition_num, substate_key)
445                .unwrap(),
446            SubstateDevice::Store => self
447                .store
448                .get_substate(node_id, *partition_num, substate_key, &mut |_| Err(()))
449                .expect("Getting substate on handled substate should not incur a store access.")
450                .unwrap(),
451        };
452
453        handler.on_read_substate(&self.heap, substate, lock_data.device)?;
454
455        Ok(substate)
456    }
457
458    pub fn write_substate<E>(
459        &mut self,
460        global_lock_handle: u32,
461        substate: IndexedScryptoValue,
462        handler: &mut impl IOAccessHandler<E>,
463    ) -> Result<(), CallbackError<WriteSubstateError, E>> {
464        let (node_id, partition_num, substate_key, lock_data) =
465            self.substate_locks.get_mut(global_lock_handle);
466        if !lock_data.flags.contains(LockFlags::MUTABLE) {
467            return Err(CallbackError::Error(WriteSubstateError::NoWritePermission));
468        }
469
470        // Remove any virtualized state if it exists
471        let _ = lock_data.virtualized.take();
472
473        let node_id = node_id.clone();
474        let partition_num = partition_num.clone();
475        let substate_key = substate_key.clone();
476
477        match lock_data.device {
478            SubstateDevice::Heap => self.heap.set_substate(
479                node_id,
480                partition_num,
481                substate_key,
482                substate,
483                &mut |heap, io_access| handler.on_io_access(heap, io_access),
484            ),
485            SubstateDevice::Store => self.store.set_substate(
486                node_id,
487                partition_num,
488                substate_key,
489                substate,
490                &mut |io_access| handler.on_io_access(&self.heap, io_access),
491            ),
492        }
493        .map_err(|e| CallbackError::CallbackError(e))?;
494
495        Ok(())
496    }
497
498    pub fn close_substate(
499        &mut self,
500        global_lock_handle: u32,
501    ) -> (NodeId, PartitionNumber, SubstateKey, LockFlags) {
502        let (node_id, partition_num, substate_key, lock_data) =
503            self.substate_locks.unlock(global_lock_handle);
504
505        if lock_data.flags.contains(LockFlags::FORCE_WRITE) {
506            self.store
507                .force_write(&node_id, &partition_num, &substate_key);
508        }
509
510        (node_id, partition_num, substate_key, lock_data.flags)
511    }
512
513    pub fn set_substate<'f, E>(
514        &mut self,
515        device: SubstateDevice,
516        node_id: &NodeId,
517        partition_num: PartitionNumber,
518        substate_key: SubstateKey,
519        value: IndexedScryptoValue,
520        handler: &mut impl IOAccessHandler<E>,
521    ) -> Result<(), CallbackError<CallFrameSetSubstateError, E>> {
522        if self
523            .substate_locks
524            .is_locked(node_id, partition_num, &substate_key)
525        {
526            return Err(CallbackError::Error(
527                CallFrameSetSubstateError::SubstateLocked(
528                    node_id.clone().into(),
529                    partition_num,
530                    substate_key,
531                ),
532            ));
533        }
534
535        match device {
536            SubstateDevice::Heap => self.heap.set_substate(
537                *node_id,
538                partition_num,
539                substate_key,
540                value,
541                &mut |heap, io_access| handler.on_io_access(heap, io_access),
542            ),
543            SubstateDevice::Store => self.store.set_substate(
544                *node_id,
545                partition_num,
546                substate_key,
547                value,
548                &mut |io_access| handler.on_io_access(&self.heap, io_access),
549            ),
550        }
551        .map_err(CallbackError::CallbackError)?;
552
553        Ok(())
554    }
555
556    pub fn remove_substate<'f, E>(
557        &mut self,
558        device: SubstateDevice,
559        node_id: &NodeId,
560        partition_num: PartitionNumber,
561        key: &SubstateKey,
562        handler: &mut impl IOAccessHandler<E>,
563    ) -> Result<Option<IndexedScryptoValue>, CallbackError<CallFrameRemoveSubstateError, E>> {
564        if self.substate_locks.is_locked(node_id, partition_num, key) {
565            return Err(CallbackError::Error(
566                CallFrameRemoveSubstateError::SubstateLocked(
567                    node_id.clone().into(),
568                    partition_num,
569                    key.clone(),
570                ),
571            ));
572        }
573
574        let removed = match device {
575            SubstateDevice::Heap => {
576                self.heap
577                    .remove_substate(node_id, partition_num, key, &mut |heap, io_access| {
578                        handler.on_io_access(heap, io_access)
579                    })
580            }
581            SubstateDevice::Store => {
582                self.store
583                    .remove_substate(node_id, partition_num, key, &mut |io_access| {
584                        handler.on_io_access(&self.heap, io_access)
585                    })
586            }
587        }
588        .map_err(CallbackError::CallbackError)?;
589
590        Ok(removed)
591    }
592
593    pub fn scan_keys<K: SubstateKeyContent, E>(
594        &mut self,
595        device: SubstateDevice,
596        node_id: &NodeId,
597        partition_num: PartitionNumber,
598        count: u32,
599        handler: &mut impl IOAccessHandler<E>,
600    ) -> Result<Vec<SubstateKey>, CallbackError<CallFrameScanKeysError, E>> {
601        let keys = match device {
602            SubstateDevice::Heap => self.heap.scan_keys(node_id, partition_num, count),
603            SubstateDevice::Store => self
604                .store
605                .scan_keys::<K, E, _>(node_id, partition_num, count, &mut |io_access| {
606                    handler.on_io_access(&self.heap, io_access)
607                })
608                .map_err(|e| CallbackError::CallbackError(e))?,
609        };
610
611        Ok(keys)
612    }
613
614    pub fn drain_substates<K: SubstateKeyContent, E>(
615        &mut self,
616        device: SubstateDevice,
617        node_id: &NodeId,
618        partition_num: PartitionNumber,
619        count: u32,
620        handler: &mut impl IOAccessHandler<E>,
621    ) -> Result<
622        Vec<(SubstateKey, IndexedScryptoValue)>,
623        CallbackError<CallFrameDrainSubstatesError, E>,
624    > {
625        let substates = match device {
626            SubstateDevice::Heap => {
627                self.heap
628                    .drain_substates(node_id, partition_num, count, &mut |heap, io_access| {
629                        handler.on_io_access(heap, io_access)
630                    })
631            }
632            SubstateDevice::Store => self.store.drain_substates::<K, E, _>(
633                node_id,
634                partition_num,
635                count,
636                &mut |io_access| handler.on_io_access(&self.heap, io_access),
637            ),
638        }
639        .map_err(|e| CallbackError::CallbackError(e))?;
640
641        // TODO: Should check if any substate is locked
642
643        Ok(substates)
644    }
645
646    // Substate Virtualization does not apply to this call
647    // Should this be prevented at this layer?
648    pub fn scan_sorted<'f, E>(
649        &mut self,
650        device: SubstateDevice,
651        node_id: &NodeId,
652        partition_num: PartitionNumber,
653        count: u32,
654        handler: &mut impl IOAccessHandler<E>,
655    ) -> Result<
656        Vec<(SortedKey, IndexedScryptoValue)>,
657        CallbackError<CallFrameScanSortedSubstatesError, E>,
658    > {
659        let substates = match device {
660            SubstateDevice::Heap => {
661                // This should never be triggered because sorted index store is
662                // used by consensus manager only.
663                panic!("Unexpected code path")
664            }
665            SubstateDevice::Store => self
666                .store
667                .scan_sorted_substates(node_id, partition_num, count, &mut |io_access| {
668                    handler.on_io_access(&self.heap, io_access)
669                })
670                .map_err(|e| CallbackError::CallbackError(e))?,
671        };
672
673        // TODO: Should check if any substate is locked
674
675        Ok(substates)
676    }
677
678    fn get_substate_internal<'a, E>(
679        heap: &'a mut Heap,
680        store: &'a mut S,
681        location: SubstateDevice,
682        node_id: &NodeId,
683        partition_num: PartitionNumber,
684        substate_key: &SubstateKey,
685        handler: &mut impl IOAccessHandler<E>,
686    ) -> Result<Option<&'a IndexedScryptoValue>, CallbackError<OpenSubstateError, E>> {
687        let value = match location {
688            SubstateDevice::Heap => heap.get_substate(node_id, partition_num, substate_key),
689            SubstateDevice::Store => store
690                .get_substate(node_id, partition_num, substate_key, &mut |io_access| {
691                    handler.on_io_access(heap, io_access)
692                })
693                .map_err(|e| CallbackError::CallbackError(e))?,
694        };
695
696        Ok(value)
697    }
698}