1use crate::initial_state_hash;
2use alloc::{borrow::Cow, vec::Vec};
3use codec::{Decode, DecodeAll, Encode};
4use corevm_host::{
5 fs, CoreVmInstruction, CoreVmOutput, PageAddr, PageInfo, Range, RangeSet, StorageKey, VmSpec,
6};
7use jam_types::{AccumulateItem, Hash, Memo, SegmentTreeRoot, ServiceId, WorkError, WorkOutput};
8use log::debug;
9
10pub trait AccumulateOps {
14 type Error: core::fmt::Debug;
16
17 fn get(&self, key: &StorageKey) -> Option<Cow<'_, [u8]>>;
19
20 fn set(&mut self, key: StorageKey, value: Cow<'_, [u8]>) -> Result<(), Self::Error>;
22
23 fn remove(&mut self, key: &StorageKey);
25
26 fn set_typed(&mut self, key: StorageKey, value: &impl Encode) -> Result<(), Self::Error> {
28 value.using_encoded(|bytes| self.set(key, bytes.into()))
29 }
30
31 fn get_typed<T: Decode>(&self, key: &StorageKey) -> Result<Option<T>, codec::Error> {
33 self.get(key).map(|bytes| T::decode_all(&mut bytes.as_ref())).transpose()
34 }
35}
36
37impl<A: AccumulateOps + ?Sized> AccumulateOps for &mut A {
38 type Error = A::Error;
39
40 fn get(&self, key: &StorageKey) -> Option<Cow<'_, [u8]>> {
41 AccumulateOps::get(*self, key)
42 }
43
44 fn set(&mut self, key: StorageKey, value: Cow<'_, [u8]>) -> Result<(), Self::Error> {
45 AccumulateOps::set(*self, key, value)
46 }
47
48 fn remove(&mut self, key: &StorageKey) {
49 AccumulateOps::remove(*self, key)
50 }
51}
52
53pub struct AccumulateEngine<A: AccumulateOps> {
55 ops: A,
56}
57
58impl<A: AccumulateOps> AccumulateEngine<A> {
59 pub fn new(ops: A) -> Self {
61 Self { ops }
62 }
63
64 pub fn run(
68 &mut self,
69 items: &[AccumulateItem],
70 ) -> Vec<Result<(), AccumulationError<A::Error>>> {
71 let results = items
72 .iter()
73 .map(|item| match item {
74 AccumulateItem::WorkItem(item) =>
75 self.accumulate(item.result.as_ref(), item.exports_root),
76 AccumulateItem::Transfer(item) => self.transfer(item.source, item.memo),
77 })
78 .collect();
79 results
80 }
81
82 pub fn into_inner(self) -> A {
84 self.ops
85 }
86
87 fn accumulate(
88 &mut self,
89 result: Result<&WorkOutput, &WorkError>,
90 exports_root: SegmentTreeRoot,
91 ) -> Result<(), AccumulationError<A::Error>> {
92 use AccumulationError::*;
93 let output = result.map_err(|e| Work(e.clone()))?;
94 let output = CoreVmOutput::decode_all(&mut &output[..])?;
95
96 if self.ops.get_typed::<fs::BlockRef>(&StorageKey::ExecEnvRef)? != Some(output.exec_ref) {
97 return Err(ExecEnvRefMismatch);
98 }
99
100 if self.ops.get_typed::<Hash>(&StorageKey::StateHash)? != Some(output.old_hash) {
102 return Err(PriorStateMismatch);
103 }
104 self.set_typed(StorageKey::StateHash, &output.new_hash)?;
105 let mut resident_pages =
106 self.ops.get_typed::<RangeSet>(&StorageKey::ResidentPages)?.unwrap_or_default();
107 for (address, our_hash) in output.touched_imported_pages.iter() {
109 let key = StorageKey::PageInfo(PageAddr(*address));
110 let their_hash = self
111 .ops
112 .get_typed::<PageInfo>(&key)?
113 .ok_or(NoSuchPage(PageAddr(*address)))?
114 .hash;
115 if &their_hash != our_hash {
116 return Err(PageHashMismatch);
117 }
118 }
119 let mut export_index = 0;
121 for (address, hash) in output.updated_pages.iter() {
122 let key = StorageKey::PageInfo(PageAddr(*address));
123 if hash == &[0; 32] {
124 self.ops.get_typed::<PageInfo>(&key)?.ok_or(NoSuchPage(PageAddr(*address)))?;
126 resident_pages.remove(&Range::new(*address, address + 1));
127 self.ops.remove(&key);
128 } else {
129 resident_pages.insert(Range::new(*address, address + 1));
130 let info = PageInfo { hash: *hash, exports_root, export_index };
131 self.set_typed(key, &info)?;
132 export_index += 1;
133 }
134 }
135 self.set_typed(StorageKey::ResidentPages, &resident_pages)?;
136 if let Some(ref mode) = output.vm_state.video {
138 self.set_typed(StorageKey::VideoMode, mode)?;
139 }
140 if let Some(ref mode) = output.vm_state.audio {
142 self.set_typed(StorageKey::AudioMode, mode)?;
143 }
144 let spec = VmSpec { exports_root, output: output.vm_output, state: output.vm_state };
146 self.set_typed(StorageKey::VmSpec, &spec)?;
147
148 Ok(())
149 }
150
151 fn transfer(
152 &mut self,
153 source: ServiceId,
154 memo: Memo,
155 ) -> Result<(), AccumulationError<A::Error>> {
156 use AccumulationError::*;
157 let instr = CoreVmInstruction::decode(&mut &memo[..])?;
158 let owner = self.ops.get_typed::<ServiceId>(&StorageKey::Owner)?;
159 if let Some(owner) = owner {
160 if owner != source {
161 return Err(NotTheOwner(owner, source));
162 }
163 }
164 match instr {
165 CoreVmInstruction::Reset { gas, exec_ref } => {
166 if owner.is_none() {
167 debug!("Setting owner to {source:x}");
168 self.set_typed(StorageKey::Owner, &source)?;
169 }
170 debug!("Resetting VM: gas = {gas}, exec = {exec_ref}");
171 self.set_typed(StorageKey::Gas, &gas)?;
172 self.set_typed(StorageKey::ExecEnvRef, &exec_ref)?;
173 self.set_typed(StorageKey::StateHash, &initial_state_hash())?;
174 if let Some(spec) = self.ops.get_typed::<VmSpec>(&StorageKey::VmSpec)? {
176 for range in spec.state.resident_pages.as_slice().iter() {
177 for address in range.start..range.end {
178 self.ops.remove(&StorageKey::PageInfo(PageAddr(address)));
179 }
180 }
181 }
182 self.ops.remove(&StorageKey::VmSpec);
184 self.ops.remove(&StorageKey::VideoMode);
185 self.ops.remove(&StorageKey::AudioMode);
186 },
187 CoreVmInstruction::SetOwner(owner) => {
188 debug!("Setting owner to {owner:x}");
189 self.set_typed(StorageKey::Owner, &owner)?;
190 },
191 }
192 Ok(())
193 }
194
195 fn set_typed(
196 &mut self,
197 key: StorageKey,
198 value: &impl Encode,
199 ) -> Result<(), AccumulationError<A::Error>> {
200 self.ops.set_typed(key, value).map_err(AccumulationError::Api)
201 }
202}
203
204#[derive(Debug, thiserror::Error)]
206pub enum AccumulationError<E: core::fmt::Debug> {
207 #[error("Accumulation API error: {0:?}")]
208 Api(E),
209 #[error("Work error: {0:?}")]
210 Work(WorkError),
211 #[error("JAM codec error: {0:?}")]
212 Codec(codec::Error),
213 #[error("Prior VM state mismatch")]
214 PriorStateMismatch,
215 #[error("Page {0} not found in storage")]
216 NoSuchPage(PageAddr),
217 #[error("Page hash mismatch")]
218 PageHashMismatch,
219 #[error("ExecEnvRef mismatch")]
220 ExecEnvRefMismatch,
221 #[error("Transfer source is not the owner of the service: actual owner {0:x}, transfer source {1:x}")]
222 NotTheOwner(ServiceId, ServiceId),
223}
224
225impl<E: core::fmt::Debug> From<codec::Error> for AccumulationError<E> {
226 fn from(e: codec::Error) -> Self {
227 Self::Codec(e)
228 }
229}