#![cfg_attr(not(feature = "std"), no_std)]
extern crate alloc;
use crate::journal::{Limiter, LimitsStatus};
use alloc::vec::Vec;
use ethexe_common::{
HashOf, PromisePolicy,
gear::{CHUNK_PROCESSING_GAS_LIMIT, MessageType},
injected::Promise,
};
use ext::Ext;
use gear_core::{
code::{
CodeMetadata, InstrumentedCode, InstrumentedCodeAndMetadata, MAX_WASM_PAGES_AMOUNT,
SyscallKind,
},
gas::GasAllowanceCounter,
gas_metering::Schedule,
ids::ActorId,
message::{DispatchKind, IncomingDispatch, IncomingMessage},
rpc::ReplyInfo,
};
use gear_core_processor::{
ContextCharged, ProcessExecutionContext,
common::{ExecutableActorData, JournalNote},
configs::{BlockConfig, SyscallName},
};
use gear_lazy_pages_common::LazyPagesInterface;
use gprimitives::{H256, MessageId};
use gsys::{GasMultiplier, Percent};
use journal::RuntimeJournalHandler;
use parity_scale_codec::{Decode, Encode};
use state::{Dispatch, ProgramState, Storage};
pub use gear_core_processor::configs::BlockInfo;
pub use journal::{
NativeJournalHandler as JournalHandler, RuntimeDispatchReport, RuntimeGasBurnReport,
RuntimeQueueReport, WAIT_UP_TO_SAFE_DURATION,
};
pub use schedule::{Handler as ScheduleHandler, Restorer as ScheduleRestorer};
pub use transitions::{
FinalizedBlockTransitions, InBlockTransitions, NonFinalTransition, TransitionsConfig,
};
#[cfg(any(test, feature = "mock"))]
pub mod proptest;
pub mod state;
mod ext;
mod journal;
mod schedule;
mod transitions;
pub const RUNTIME_ID: u32 = 2;
pub const CODES_INSTRUMENTATION_VERSION: u32 = 2;
pub const MAX_OUTGOING_MESSAGES_PER_EXECUTION: u32 = 4;
pub const MAX_OUTGOING_MESSAGES_BYTES_PER_EXECUTION: u32 = 4 * 1024;
pub const MAX_OUTGOING_MESSAGES_PER_RUN: u32 = 16;
pub const MAX_OUTGOING_MESSAGES_BYTES_PER_RUN: u32 = 4 * 1024;
pub const MAX_CALL_REPLIES_PER_RUN: u32 = 1;
pub type ProgramJournals = Vec<(Vec<JournalNote>, MessageType, bool)>;
#[derive(Debug, Encode, Decode)]
pub struct ProcessQueueContext {
pub program_id: ActorId,
pub state_root: H256,
pub queue_type: MessageType,
pub gas_allowance: GasAllowanceCounter,
pub block_info: BlockInfo,
pub promise_policy: PromisePolicy,
pub code: Option<(InstrumentedCode, CodeMetadata)>,
}
pub trait RuntimeInterface: Storage {
type LazyPages: LazyPagesInterface + 'static;
fn init_lazy_pages(&self);
fn random_data(&self) -> (Vec<u8>, u32);
fn update_state_hash(&self, state_hash: &H256);
fn publish_promise(&self, promise: &Promise);
}
pub struct TransitionController<'a, S: Storage + ?Sized> {
pub storage: &'a S,
pub transitions: &'a mut InBlockTransitions,
}
impl<S: Storage + ?Sized> TransitionController<'_, S> {
pub fn update_state<T>(
&mut self,
program_id: ActorId,
f: impl FnOnce(&mut ProgramState, &S, &mut InBlockTransitions) -> T,
) -> T {
let state_hash = self
.transitions
.state_of(&program_id)
.expect("failed to find program in known states")
.hash;
let mut state = self
.storage
.program_state(state_hash)
.expect("failed to read state from storage");
let res = f(&mut state, self.storage, self.transitions);
let canonical_queue_size = state.canonical_queue.cached_queue_size;
let injected_queue_size = state.injected_queue.cached_queue_size;
let new_state_hash = self.storage.write_program_state(state);
self.transitions.modify_state(
program_id,
new_state_hash,
canonical_queue_size,
injected_queue_size,
);
res
}
}
pub fn process_queue<RI>(ctx: ProcessQueueContext, ri: &RI) -> (ProgramJournals, u64)
where
RI: RuntimeInterface + 'static,
RI::LazyPages: Send,
{
let (journals, gas_spent, _report) = process_queue_with_report(ctx, ri);
(journals, gas_spent)
}
pub fn process_queue_with_report<RI>(
mut ctx: ProcessQueueContext,
ri: &RI,
) -> (ProgramJournals, u64, RuntimeQueueReport)
where
RI: RuntimeInterface + 'static,
RI::LazyPages: Send,
{
let mut program_state = ri.program_state(ctx.state_root).unwrap();
log::trace!(
"Processing {:?} queue for program {}",
ctx.queue_type,
ctx.program_id
);
let is_queue_empty = match ctx.queue_type {
MessageType::Canonical => program_state.canonical_queue.hash.is_empty(),
MessageType::Injected => program_state.injected_queue.hash.is_empty(),
};
if is_queue_empty {
return (Vec::new(), 0, RuntimeQueueReport::default());
}
let queue = program_state
.queue_from_msg_type(ctx.queue_type)
.hash
.map(|hash| ri.message_queue(hash).expect("Cannot get message queue"))
.expect("Queue cannot be empty at this point");
let block_config = BlockConfig {
block_info: ctx.block_info,
forbidden_funcs: [
SyscallName::CreateProgramWGas, SyscallName::CreateProgram, SyscallName::ReplyDeposit, SyscallName::SignalCode, SyscallName::Random, SyscallName::ReplyCommitWGas, SyscallName::SignalFrom, SyscallName::ReplyInputWGas, SyscallName::ReplyWGas, SyscallName::ReservationReplyCommit, SyscallName::ReservationReply, SyscallName::ReservationSendCommit, SyscallName::ReservationSend, SyscallName::ReserveGas, SyscallName::SendCommitWGas, SyscallName::SendInputWGas, SyscallName::SendWGas, SyscallName::SystemReserveGas, SyscallName::UnreserveGas, SyscallName::Wait, ]
.into(),
gas_multiplier: GasMultiplier::from_value_per_gas(100),
costs: Schedule::default().process_costs(),
max_pages: MAX_WASM_PAGES_AMOUNT.into(),
outgoing_limit: MAX_OUTGOING_MESSAGES_PER_EXECUTION,
outgoing_bytes_limit: MAX_OUTGOING_MESSAGES_BYTES_PER_EXECUTION,
performance_multiplier: Percent::new(100),
existential_deposit: 0,
mailbox_threshold: 0,
max_reservations: 0,
reserve_for: 0,
};
let mut mega_journal = Vec::new();
let mut report = RuntimeQueueReport::default();
let initial_gas_allowance = ctx.gas_allowance.left();
let mut limiter = Limiter {
outgoing_messages: MAX_OUTGOING_MESSAGES_PER_RUN,
outgoing_messages_bytes: MAX_OUTGOING_MESSAGES_BYTES_PER_RUN,
call_replies: MAX_CALL_REPLIES_PER_RUN,
};
ri.init_lazy_pages();
for dispatch in queue {
let dispatch_id = dispatch.id;
let message_type = dispatch.message_type;
let call_reply = dispatch.call;
let is_first_execution = dispatch.context.is_none();
let is_promise_required = dispatch.kind.is_handle() && dispatch.message_type.is_injected();
let journal = process_dispatch(dispatch, &block_config, &program_state, &ctx, ri);
let mut handler = RuntimeJournalHandler {
storage: ri,
program_state: &mut program_state,
gas_allowance_counter: &mut ctx.gas_allowance,
gas_multiplier: &block_config.gas_multiplier,
message_type: ctx.queue_type,
is_first_execution,
stop_processing: false,
call_reply,
limiter: &mut limiter,
};
if ctx.queue_type.is_canonical() && ctx.promise_policy.is_enabled() {
debug_assert!(false, "Promise policy must be disabled for canonical queue");
}
if is_promise_required && ctx.promise_policy.is_enabled() {
parse_journal_for_injected_dispatch(ri, &journal, dispatch_id);
}
let (unhandled_journal_notes, new_state_hash, dispatch_report) =
handler.handle_journal_with_report(journal);
report.extend(dispatch_report);
mega_journal.push((unhandled_journal_notes, message_type, call_reply));
if let Some(new_state_hash) = new_state_hash {
ri.update_state_hash(&new_state_hash);
}
if handler.stop_processing {
break;
}
match limiter.status() {
LimitsStatus::WithinLimits => {}
status => {
log::trace!("Limits exceeded: {status:?}, stopping execution of the queue");
break;
}
}
}
let gas_spent = initial_gas_allowance
.checked_sub(ctx.gas_allowance.left())
.expect("cannot spend more gas than allowed");
(mega_journal, gas_spent, report)
}
fn parse_journal_for_injected_dispatch<RI>(ri: &RI, journal: &[JournalNote], dispatch_id: MessageId)
where
RI: RuntimeInterface,
{
let maybe_reply = journal.iter().find_map(|note| {
let JournalNote::SendDispatch {
message_id,
dispatch,
..
} = note
else {
return None;
};
if *message_id != dispatch_id || !dispatch.kind().is_reply() {
return None;
}
let Some(code) = dispatch.reply_details().map(|d| d.to_reply_code()) else {
log::error!(
"received reply dispatch without reply details; protocol invariant violated: \
initial_dispatch_id={dispatch_id:?}, send_dispatch={dispatch:?}"
);
return None;
};
Some(ReplyInfo {
value: dispatch.value(),
code,
payload: dispatch.message().payload_bytes().to_vec(),
})
});
if let Some(reply) = maybe_reply {
let tx_hash = unsafe { HashOf::new(dispatch_id.into_bytes().into()) };
let promise = Promise { reply, tx_hash };
ri.publish_promise(&promise);
}
}
fn process_dispatch<RI>(
dispatch: Dispatch,
block_config: &BlockConfig,
program_state: &ProgramState,
ctx: &ProcessQueueContext,
ri: &RI,
) -> Vec<JournalNote>
where
RI: RuntimeInterface + 'static,
RI::LazyPages: Send,
{
let Dispatch {
id: dispatch_id,
kind,
source,
payload,
value,
details,
context,
..
} = dispatch;
let &ProcessQueueContext {
program_id,
ref code,
..
} = ctx;
let payload = payload.query(ri).expect("failed to get payload");
let gas_limit = block_config
.gas_multiplier
.value_to_gas(program_state.executable_balance)
.min(CHUNK_PROCESSING_GAS_LIMIT);
let incoming_message =
IncomingMessage::new(dispatch_id, source, payload, gas_limit, value, details);
let dispatch = IncomingDispatch::new(kind, incoming_message, context);
let context = ContextCharged::new(program_id, dispatch, ctx.gas_allowance.left());
let context = match context.charge_for_program(block_config) {
Ok(context) => context,
Err(journal) => return journal,
};
let active_state = match &program_state.program {
state::Program::Active(state) => state,
state::Program::Terminated(program_id) => {
log::trace!("Program {program_id} has failed init");
return gear_core_processor::process_failed_init(context);
}
state::Program::Exited(program_id) => {
log::trace!("Program {program_id} has exited");
return gear_core_processor::process_program_exited(context, *program_id);
}
};
if active_state.initialized && kind == DispatchKind::Init {
unreachable!(
"Init message {dispatch_id} is sent to already initialized program {program_id}",
);
}
if !active_state.initialized && !matches!(kind, DispatchKind::Init | DispatchKind::Reply) {
log::trace!(
"Program {program_id} is not yet finished initialization, so cannot process handle message"
);
return gear_core_processor::process_uninitialized(context);
}
let context = match context.charge_for_code_metadata(block_config) {
Ok(context) => context,
Err(journal) => return journal,
};
let context = match context.charge_for_instrumented_code(block_config, 0) {
Ok(context) => context,
Err(journal) => return journal,
};
let Some((code, code_metadata)) = code else {
log::trace!(
"Missing instrumented code for program {program_id}, skipping execution of dispatch {dispatch_id} due to reinstrumentation failure"
);
return gear_core_processor::process_reinstrumentation_error(context);
};
let allocations = active_state
.allocations_hash
.map_or_default(|hash| ri.allocations(hash).expect("Cannot get allocations"));
let context = match context.charge_for_allocations(block_config, allocations.tree_len()) {
Ok(context) => context,
Err(journal) => return journal,
};
let actor_data = ExecutableActorData {
allocations: allocations.into(),
gas_reservation_map: Default::default(), memory_infix: active_state.memory_infix,
};
let context = match context.charge_for_module_instantiation(
block_config,
actor_data,
code.instantiated_section_sizes(),
code_metadata,
) {
Ok(context) => context,
Err(journal) => return journal,
};
let execution_context = ProcessExecutionContext::new(
context,
InstrumentedCodeAndMetadata {
instrumented_code: code.clone(),
metadata: code_metadata.clone(),
},
program_state.balance,
SyscallKind::Eth,
);
let random_data = ri.random_data();
gear_core_processor::process::<Ext<RI>>(block_config, execution_context, random_data)
.unwrap_or_else(|err| unreachable!("{err}"))
}
pub const fn pack_u32_to_i64(low: u32, high: u32) -> i64 {
let mut result = 0u64;
result |= (high as u64) << 32;
result |= low as u64;
result as i64
}
pub const fn unpack_i64_to_u32(val: i64) -> (u32, u32) {
let val = val as u64;
let high = (val >> 32) as u32;
let low = val as u32;
(low, high)
}
#[cfg(test)]
mod tests {
use alloc::collections::BTreeSet;
use super::*;
use crate::state::MemStorage;
use gear_core::code::{InstantiatedSectionSizes, InstrumentationStatus};
impl RuntimeInterface for MemStorage {
type LazyPages = ();
fn init_lazy_pages(&self) {}
fn random_data(&self) -> (Vec<u8>, u32) {
(Vec::new(), 0)
}
fn update_state_hash(&self, _state_hash: &H256) {}
fn publish_promise(&self, _promise: &Promise) {}
}
fn empty_queue_context(storage: &MemStorage) -> ProcessQueueContext {
ProcessQueueContext {
program_id: ActorId::from(42),
state_root: storage.write_program_state(ProgramState::zero()),
queue_type: MessageType::Canonical,
gas_allowance: GasAllowanceCounter::new(1_000_000),
block_info: BlockInfo::default(),
promise_policy: PromisePolicy::Disabled,
code: Some((
InstrumentedCode::new(Vec::new(), InstantiatedSectionSizes::new(0, 0, 0, 0, 0, 0)),
CodeMetadata::new(
0,
BTreeSet::new(),
0.into(),
None,
InstrumentationStatus::NotInstrumented,
),
)),
}
}
#[test]
fn process_queue_with_report_keeps_empty_queue_abi_compatible() {
let storage = MemStorage::default();
let (journals, gas_spent, report) =
process_queue_with_report(empty_queue_context(&storage), &storage);
let (legacy_journals, legacy_gas_spent) =
process_queue(empty_queue_context(&storage), &storage);
assert!(journals.is_empty());
assert_eq!(gas_spent, 0);
assert_eq!(report, RuntimeQueueReport::default());
assert!(legacy_journals.is_empty());
assert_eq!(legacy_gas_spent, gas_spent);
}
}