corevm_engine/
accumulate.rs

1use crate::initial_state_hash;
2use alloc::{borrow::Cow, vec::Vec};
3use codec::{Decode, DecodeAll, Encode};
4use corevm_host::{
5	fs, CoreVmInstruction, CoreVmOutput, PageAddr, PageInfo, Range, RangeSet, StorageKey, VmSpec,
6};
7use jam_types::{AccumulateItem, Hash, Memo, SegmentTreeRoot, ServiceId, WorkError, WorkOutput};
8use log::debug;
9
10/// Operations that [`AccumulateEngine`] uses during accumulation.
11///
12/// Each operation corresponds to a JAM host-call.
13pub trait AccumulateOps {
14	/// Implemenatation-specific error type.
15	type Error: core::fmt::Debug;
16
17	/// Read value under the specified key from the storage.
18	fn get(&self, key: &StorageKey) -> Option<Cow<'_, [u8]>>;
19
20	/// Write value under the specified key to the storage.
21	fn set(&mut self, key: StorageKey, value: Cow<'_, [u8]>) -> Result<(), Self::Error>;
22
23	/// Read value under the specified key from the storage.
24	fn remove(&mut self, key: &StorageKey);
25
26	/// Same as [`set`](Self::set) but encodes the provided value using JAM codec.
27	fn set_typed(&mut self, key: StorageKey, value: &impl Encode) -> Result<(), Self::Error> {
28		value.using_encoded(|bytes| self.set(key, bytes.into()))
29	}
30
31	/// Same as [`get`](Self::get) but decodes the retrieved value using JAM codec.
32	fn get_typed<T: Decode>(&self, key: &StorageKey) -> Result<Option<T>, codec::Error> {
33		self.get(key).map(|bytes| T::decode_all(&mut bytes.as_ref())).transpose()
34	}
35}
36
37impl<A: AccumulateOps + ?Sized> AccumulateOps for &mut A {
38	type Error = A::Error;
39
40	fn get(&self, key: &StorageKey) -> Option<Cow<'_, [u8]>> {
41		AccumulateOps::get(*self, key)
42	}
43
44	fn set(&mut self, key: StorageKey, value: Cow<'_, [u8]>) -> Result<(), Self::Error> {
45		AccumulateOps::set(*self, key, value)
46	}
47
48	fn remove(&mut self, key: &StorageKey) {
49		AccumulateOps::remove(*self, key)
50	}
51}
52
53/// An engine that drives execution of `accumulate` and `on_transfer` CoreVM service entry points.
54pub struct AccumulateEngine<A: AccumulateOps> {
55	ops: A,
56}
57
58impl<A: AccumulateOps> AccumulateEngine<A> {
59	/// Create new engine with the specified accumulation API implementation.
60	pub fn new(ops: A) -> Self {
61		Self { ops }
62	}
63
64	/// Accumulate the specified items.
65	///
66	/// Returns accumulation result for each item.
67	pub fn run(
68		&mut self,
69		items: &[AccumulateItem],
70	) -> Vec<Result<(), AccumulationError<A::Error>>> {
71		let results = items
72			.iter()
73			.map(|item| match item {
74				AccumulateItem::WorkItem(item) =>
75					self.accumulate(item.result.as_ref(), item.exports_root),
76				AccumulateItem::Transfer(item) => self.transfer(item.source, item.memo),
77			})
78			.collect();
79		results
80	}
81
82	/// Get the underlying accumulation API implementation.
83	pub fn into_inner(self) -> A {
84		self.ops
85	}
86
87	fn accumulate(
88		&mut self,
89		result: Result<&WorkOutput, &WorkError>,
90		exports_root: SegmentTreeRoot,
91	) -> Result<(), AccumulationError<A::Error>> {
92		use AccumulationError::*;
93		let output = result.map_err(|e| Work(e.clone()))?;
94		let output = CoreVmOutput::decode_all(&mut &output[..])?;
95
96		if self.ops.get_typed::<fs::BlockRef>(&StorageKey::ExecEnvRef)? != Some(output.exec_ref) {
97			return Err(ExecEnvRefMismatch);
98		}
99
100		// Update VM state hash.
101		if self.ops.get_typed::<Hash>(&StorageKey::StateHash)? != Some(output.old_hash) {
102			return Err(PriorStateMismatch);
103		}
104		self.set_typed(StorageKey::StateHash, &output.new_hash)?;
105		let mut resident_pages =
106			self.ops.get_typed::<RangeSet>(&StorageKey::ResidentPages)?.unwrap_or_default();
107		// Check imported pages' hashes.
108		for (address, our_hash) in output.touched_imported_pages.iter() {
109			let key = StorageKey::PageInfo(PageAddr(*address));
110			let their_hash = self
111				.ops
112				.get_typed::<PageInfo>(&key)?
113				.ok_or(NoSuchPage(PageAddr(*address)))?
114				.hash;
115			if &their_hash != our_hash {
116				return Err(PageHashMismatch);
117			}
118		}
119		// Update memory pages' metadata.
120		let mut export_index = 0;
121		for (address, hash) in output.updated_pages.iter() {
122			let key = StorageKey::PageInfo(PageAddr(*address));
123			if hash == &[0; 32] {
124				// The page was freed.
125				self.ops.get_typed::<PageInfo>(&key)?.ok_or(NoSuchPage(PageAddr(*address)))?;
126				resident_pages.remove(&Range::new(*address, address + 1));
127				self.ops.remove(&key);
128			} else {
129				resident_pages.insert(Range::new(*address, address + 1));
130				let info = PageInfo { hash: *hash, exports_root, export_index };
131				self.set_typed(key, &info)?;
132				export_index += 1;
133			}
134		}
135		self.set_typed(StorageKey::ResidentPages, &resident_pages)?;
136		// Update video mode.
137		if let Some(ref mode) = output.vm_state.video {
138			self.set_typed(StorageKey::VideoMode, mode)?;
139		}
140		// Update audio mode.
141		if let Some(ref mode) = output.vm_state.audio {
142			self.set_typed(StorageKey::AudioMode, mode)?;
143		}
144		// Store VM output specification.
145		let spec = VmSpec { exports_root, output: output.vm_output, state: output.vm_state };
146		self.set_typed(StorageKey::VmSpec, &spec)?;
147
148		Ok(())
149	}
150
151	fn transfer(
152		&mut self,
153		source: ServiceId,
154		memo: Memo,
155	) -> Result<(), AccumulationError<A::Error>> {
156		use AccumulationError::*;
157		let instr = CoreVmInstruction::decode(&mut &memo[..])?;
158		let owner = self.ops.get_typed::<ServiceId>(&StorageKey::Owner)?;
159		if let Some(owner) = owner {
160			if owner != source {
161				return Err(NotTheOwner(owner, source));
162			}
163		}
164		match instr {
165			CoreVmInstruction::Reset { gas, exec_ref } => {
166				if owner.is_none() {
167					debug!("Setting owner to {source:x}");
168					self.set_typed(StorageKey::Owner, &source)?;
169				}
170				debug!("Resetting VM: gas = {gas}, exec = {exec_ref}");
171				self.set_typed(StorageKey::Gas, &gas)?;
172				self.set_typed(StorageKey::ExecEnvRef, &exec_ref)?;
173				self.set_typed(StorageKey::StateHash, &initial_state_hash())?;
174				// Now clear memory pages.
175				if let Some(spec) = self.ops.get_typed::<VmSpec>(&StorageKey::VmSpec)? {
176					for range in spec.state.resident_pages.as_slice().iter() {
177						for address in range.start..range.end {
178							self.ops.remove(&StorageKey::PageInfo(PageAddr(address)));
179						}
180					}
181				}
182				// Clear the VM state.
183				self.ops.remove(&StorageKey::VmSpec);
184				self.ops.remove(&StorageKey::VideoMode);
185				self.ops.remove(&StorageKey::AudioMode);
186			},
187			CoreVmInstruction::SetOwner(owner) => {
188				debug!("Setting owner to {owner:x}");
189				self.set_typed(StorageKey::Owner, &owner)?;
190			},
191		}
192		Ok(())
193	}
194
195	fn set_typed(
196		&mut self,
197		key: StorageKey,
198		value: &impl Encode,
199	) -> Result<(), AccumulationError<A::Error>> {
200		self.ops.set_typed(key, value).map_err(AccumulationError::Api)
201	}
202}
203
204/// Accumulation error.
205#[derive(Debug, thiserror::Error)]
206pub enum AccumulationError<E: core::fmt::Debug> {
207	#[error("Accumulation API error: {0:?}")]
208	Api(E),
209	#[error("Work error: {0:?}")]
210	Work(WorkError),
211	#[error("JAM codec error: {0:?}")]
212	Codec(codec::Error),
213	#[error("Prior VM state mismatch")]
214	PriorStateMismatch,
215	#[error("Page {0} not found in storage")]
216	NoSuchPage(PageAddr),
217	#[error("Page hash mismatch")]
218	PageHashMismatch,
219	#[error("ExecEnvRef mismatch")]
220	ExecEnvRefMismatch,
221	#[error("Transfer source is not the owner of the service: actual owner {0:x}, transfer source {1:x}")]
222	NotTheOwner(ServiceId, ServiceId),
223}
224
225impl<E: core::fmt::Debug> From<codec::Error> for AccumulationError<E> {
226	fn from(e: codec::Error) -> Self {
227		Self::Codec(e)
228	}
229}