use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::future::Future;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::Instant;
use futures::Stream;
use either::Either;
use freenet_stdlib::client_api::{
ClientError as WsClientError, ClientRequest, ContractError as StdContractError,
ContractRequest, ContractResponse, DelegateError as StdDelegateError, DelegateRequest,
HostResponse::{self, DelegateResponse},
RequestError,
};
use freenet_stdlib::prelude::*;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use super::storages::Storage;
use crate::config::Config;
use crate::message::Transaction;
use crate::node::OpManager;
use crate::operations::get::GetResult;
use crate::operations::{OpEnum, OpError};
use crate::wasm_runtime::{
ContractRuntimeInterface, ContractStore, DelegateRuntimeInterface, DelegateStore, Runtime,
SecretsStore, StateStorage, StateStore, StateStoreError,
};
use crate::{
client_events::{ClientId, HostResult},
operations::{self, Operation},
};
pub(super) mod init_tracker;
pub(super) mod mock_runtime;
pub(super) mod mock_wasm_runtime;
#[cfg(test)]
mod pool_tests;
pub(super) mod runtime;
pub(crate) struct DelegateNotification {
pub delegate_key: DelegateKey,
pub contract_id: ContractInstanceId,
pub new_state: Arc<WrappedState>,
}
pub(crate) const DELEGATE_NOTIFICATION_CHANNEL_SIZE: usize = 1000;
pub(crate) const MAX_SUBSCRIBERS_PER_CONTRACT: usize = 256;
pub(crate) const MAX_SUBSCRIPTIONS_PER_CLIENT: usize = 50;
pub(crate) const SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE: usize = 64;
pub(crate) const MAX_DELTA_COMPUTATIONS_PER_FANOUT: usize = 32;
pub(crate) const FANOUT_WARNING_THRESHOLD: usize = 50;
pub(crate) const MAX_DELEGATE_CREATION_DEPTH: u32 = 4;
pub(crate) const MAX_DELEGATE_CREATIONS_PER_CALL: u32 = 8;
pub(crate) const MAX_CREATED_DELEGATES_PER_NODE: usize = 1024;
pub(crate) type DelegateNotificationSender = mpsc::Sender<DelegateNotification>;
pub(crate) type DelegateNotificationReceiver = mpsc::Receiver<DelegateNotification>;
pub(crate) use init_tracker::{
ContractInitTracker, InitCheckResult, MAX_CONCURRENT_INITIALIZATIONS,
MAX_QUEUED_OPS_PER_CONTRACT, SLOW_INIT_THRESHOLD, STALE_INIT_THRESHOLD, now_nanos,
};
pub(crate) use runtime::RuntimePool;
pub(crate) type OpRequestSender =
mpsc::Sender<(Transaction, oneshot::Sender<Result<OpEnum, OpRequestError>>)>;
pub(crate) type OpRequestReceiver =
mpsc::Receiver<(Transaction, oneshot::Sender<Result<OpEnum, OpRequestError>>)>;
pub(crate) fn op_request_channel() -> (OpRequestReceiver, OpRequestSender) {
let (tx, rx) = mpsc::channel(1000);
tracing::debug!(buffer_size = 1000, "Created op_request channel");
(rx, tx)
}
#[derive(Debug, Clone)]
pub enum OpRequestError {
Failed(String),
ChannelClosed,
}
impl std::fmt::Display for OpRequestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OpRequestError::Failed(msg) => write!(f, "Operation failed: {}", msg),
OpRequestError::ChannelClosed => write!(f, "Channel closed"),
}
}
}
impl std::error::Error for OpRequestError {}
#[derive(Debug)]
pub struct ExecutorError {
inner: Either<Box<RequestError>, anyhow::Error>,
fatal: bool,
}
enum InnerOpError {
Upsert(ContractKey),
Delegate(DelegateKey),
}
impl std::error::Error for ExecutorError {}
impl ExecutorError {
pub fn other(error: impl Into<anyhow::Error>) -> Self {
Self {
inner: Either::Right(error.into()),
fatal: false,
}
}
fn internal_error() -> Self {
Self {
inner: Either::Right(anyhow::anyhow!("internal error")),
fatal: false,
}
}
fn request(error: impl Into<RequestError>) -> Self {
Self {
inner: Either::Left(Box::new(error.into())),
fatal: false,
}
}
fn execution(
outer_error: crate::wasm_runtime::ContractError,
op: Option<InnerOpError>,
) -> Self {
use crate::wasm_runtime::RuntimeInnerError;
let error = outer_error.deref();
if let RuntimeInnerError::ContractExecError(e) = error {
if let Some(InnerOpError::Upsert(key)) = &op {
return ExecutorError::request(StdContractError::update_exec_error(*key, e));
}
}
if let RuntimeInnerError::DelegateNotFound(key) = error {
return ExecutorError::request(StdDelegateError::Missing(key.clone()));
}
if let RuntimeInnerError::DelegateExecError(e) = error {
return ExecutorError::request(StdDelegateError::ExecutionError(format!("{e}").into()));
}
if let (
RuntimeInnerError::SecretStoreError(
crate::wasm_runtime::SecretStoreError::MissingSecret(secret),
),
Some(InnerOpError::Delegate(key)),
) = (error, &op)
{
return ExecutorError::request(StdDelegateError::MissingSecret {
key: key.clone(),
secret: secret.clone(),
});
}
if let RuntimeInnerError::WasmError(e) = error {
match op {
Some(InnerOpError::Upsert(key)) => {
return ExecutorError::request(StdContractError::update_exec_error(key, e));
}
_ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
}
}
ExecutorError::other(outer_error)
}
pub fn is_request(&self) -> bool {
matches!(self.inner, Either::Left(_))
}
pub fn is_contract_exec_rejection(&self) -> bool {
match &self.inner {
Either::Left(req_err) => matches!(
req_err.as_ref(),
RequestError::ContractError(StdContractError::Update { cause, .. })
if cause.starts_with("execution error")
),
Either::Right(_) => false,
}
}
pub fn is_fatal(&self) -> bool {
self.fatal
}
pub fn is_missing_delegate(&self) -> bool {
matches!(
&self.inner,
Either::Left(err) if matches!(
err.as_ref(),
RequestError::DelegateError(StdDelegateError::Missing(_))
)
)
}
pub fn unwrap_request(self) -> RequestError {
match self.inner {
Either::Left(err) => *err,
Either::Right(_) => unreachable!("called unwrap_request on a non-request error"),
}
}
}
impl From<RequestError> for ExecutorError {
fn from(value: RequestError) -> Self {
Self {
inner: Either::Left(Box::new(value)),
fatal: false,
}
}
}
impl Display for ExecutorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.inner {
Either::Left(l) => write!(f, "{}", &**l),
Either::Right(r) => write!(f, "{}", &**r),
}
}
}
impl From<Box<RequestError>> for ExecutorError {
fn from(value: Box<RequestError>) -> Self {
Self {
inner: Either::Left(value),
fatal: false,
}
}
}
type Response = Result<HostResponse, ExecutorError>;
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationMode {
Local,
Network,
}
impl Display for OperationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationMode::Local => write!(f, "local"),
OperationMode::Network => write!(f, "network"),
}
}
}
pub struct ExecutorToEventLoopChannel<End: sealed::ChannelHalve> {
#[allow(dead_code)] op_manager: Arc<OpManager>,
end: End,
}
pub(crate) fn mediator_channels(
op_manager: Arc<OpManager>,
) -> (
ExecutorToEventLoopChannel<NetworkEventListenerHalve>,
mpsc::Sender<Transaction>,
mpsc::Receiver<OpEnum>,
) {
let (waiting_for_op_tx, waiting_for_op_rx) = mpsc::channel(1000);
let (response_for_tx, response_for_rx) = mpsc::channel(1000);
tracing::debug!(buffer_size = 1000, "Created mediator channels");
let listener_halve = ExecutorToEventLoopChannel {
op_manager,
end: NetworkEventListenerHalve {
waiting_for_op_rx,
response_for_tx,
},
};
(listener_halve, waiting_for_op_tx, response_for_rx)
}
const MAX_PENDING_REQUESTS: usize = 10_000;
const STALE_REQUEST_THRESHOLD: Duration = Duration::from_secs(180);
const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
struct PendingRequest {
response_tx: oneshot::Sender<Result<OpEnum, OpRequestError>>,
created_at: Instant,
}
pub(crate) async fn run_op_request_mediator(
mut op_request_receiver: OpRequestReceiver,
to_event_loop_tx: mpsc::Sender<Transaction>,
mut from_event_loop_rx: mpsc::Receiver<OpEnum>,
) {
use std::collections::BTreeMap;
let mut pending_responses: BTreeMap<Transaction, PendingRequest> = BTreeMap::new();
let mut cleanup_interval = tokio::time::interval(CLEANUP_INTERVAL);
cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tracing::info!("Op request mediator starting");
loop {
tokio::select! {
biased;
Some((transaction, response_tx)) = op_request_receiver.recv() => {
tracing::trace!(
tx = %transaction,
pending_count = pending_responses.len(),
"Mediator received operation request"
);
if pending_responses.len() >= MAX_PENDING_REQUESTS {
tracing::warn!(
tx = %transaction,
max = MAX_PENDING_REQUESTS,
"Mediator at capacity, rejecting request"
);
#[allow(clippy::let_underscore_must_use)]
let _ = response_tx.send(Err(OpRequestError::Failed(
"mediator at capacity".to_string()
)));
continue;
}
pending_responses.insert(transaction, PendingRequest {
response_tx,
created_at: Instant::now(),
});
if let Err(e) = to_event_loop_tx.send(transaction).await {
tracing::error!(
tx = %transaction,
error = %e,
"Failed to forward transaction to event loop - channel closed"
);
if let Some(pending) = pending_responses.remove(&transaction) {
#[allow(clippy::let_underscore_must_use)]
let _ = pending.response_tx.send(Err(OpRequestError::ChannelClosed));
}
}
}
Some(op_result) = from_event_loop_rx.recv() => {
let transaction = *op_result.id();
tracing::trace!(
tx = %transaction,
pending_count = pending_responses.len(),
"Mediator received response from event loop"
);
if let Some(pending) = pending_responses.remove(&transaction) {
if pending.response_tx.send(Ok(op_result)).is_err() {
tracing::debug!(
tx = %transaction,
"Executor dropped before receiving response"
);
}
} else {
tracing::warn!(
tx = %transaction,
"Received response for unknown transaction - executor may have timed out"
);
}
}
_ = cleanup_interval.tick() => {
let now = Instant::now();
let stale_threshold = STALE_REQUEST_THRESHOLD;
let stale_txs: Vec<Transaction> = pending_responses
.iter()
.filter(|(_, pending)| now.duration_since(pending.created_at) > stale_threshold)
.map(|(tx, _)| *tx)
.collect();
if !stale_txs.is_empty() {
tracing::warn!(
stale_count = stale_txs.len(),
pending_count = pending_responses.len(),
threshold_secs = stale_threshold.as_secs(),
"Cleaning up stale pending requests"
);
for tx in stale_txs {
if let Some(pending) = pending_responses.remove(&tx) {
tracing::debug!(
tx = %tx,
age_secs = now.duration_since(pending.created_at).as_secs(),
"Removing stale pending request"
);
#[allow(clippy::let_underscore_must_use)]
let _ = pending.response_tx.send(Err(OpRequestError::Failed(
"request exceeded stale threshold".to_string()
)));
}
}
}
}
else => {
tracing::info!(
pending_count = pending_responses.len(),
"Mediator channels closed, shutting down"
);
for (tx, pending) in std::mem::take(&mut pending_responses) {
tracing::debug!(tx = %tx, "Notifying orphaned waiter of shutdown");
#[allow(clippy::let_underscore_must_use)]
let _ = pending.response_tx.send(Err(OpRequestError::ChannelClosed));
}
break;
}
}
}
tracing::info!("Op request mediator stopped");
}
impl Stream for ExecutorToEventLoopChannel<NetworkEventListenerHalve> {
type Item = Transaction;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.end.waiting_for_op_rx).poll_recv(cx)
}
}
impl ExecutorToEventLoopChannel<Callback> {
pub async fn response(&mut self, result: OpEnum) {
let tx_id = *result.id();
if self.end.response_for_tx.send(result).await.is_err() {
tracing::debug!(
tx = %tx_id,
"Failed to send response to executor - channel closed"
);
}
}
}
pub(crate) struct Callback {
response_for_tx: mpsc::Sender<OpEnum>,
}
#[allow(dead_code)] pub(crate) struct NetworkEventListenerHalve {
waiting_for_op_rx: mpsc::Receiver<Transaction>,
response_for_tx: mpsc::Sender<OpEnum>,
}
mod sealed {
use super::{Callback, NetworkEventListenerHalve};
pub trait ChannelHalve {}
impl ChannelHalve for NetworkEventListenerHalve {}
impl ChannelHalve for Callback {}
}
trait ComposeNetworkMessage<Op>
where
Self: Sized,
Op: Operation + Send + 'static,
{
fn initiate_op(self, op_manager: &OpManager) -> Op;
fn resume_op(
op: Op,
op_manager: &OpManager,
) -> impl Future<Output = Result<(), OpError>> + Send;
}
#[allow(unused)]
struct GetContract {
instance_id: ContractInstanceId,
return_contract_code: bool,
}
impl ComposeNetworkMessage<operations::get::GetOp> for GetContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::get::GetOp {
operations::get::start_op(self.instance_id, self.return_contract_code, false, false)
}
async fn resume_op(op: operations::get::GetOp, op_manager: &OpManager) -> Result<(), OpError> {
let visited = operations::VisitedPeers::new(&op.id);
operations::get::request_get(op_manager, op, visited).await
}
}
#[allow(unused)]
struct SubscribeContract {
instance_id: ContractInstanceId,
}
impl ComposeNetworkMessage<operations::subscribe::SubscribeOp> for SubscribeContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::subscribe::SubscribeOp {
operations::subscribe::start_op(self.instance_id, false)
}
async fn resume_op(
op: operations::subscribe::SubscribeOp,
op_manager: &OpManager,
) -> Result<(), OpError> {
operations::subscribe::request_subscribe(op_manager, op).await
}
}
struct UpdateContract {
key: ContractKey,
new_state: WrappedState,
}
#[derive(Debug)]
pub(crate) enum UpsertResult {
NoChange,
Updated(WrappedState),
CurrentWon(WrappedState),
}
impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp {
let UpdateContract { key, new_state } = self;
let related_contracts = RelatedContracts::default();
let update_data = freenet_stdlib::prelude::UpdateData::State(
freenet_stdlib::prelude::State::from(new_state),
);
operations::update::start_op(key, update_data, related_contracts)
}
async fn resume_op(
op: operations::update::UpdateOp,
op_manager: &OpManager,
) -> Result<(), OpError> {
operations::update::request_update(op_manager, op).await
}
}
pub(crate) trait ContractExecutor: Send + 'static {
fn lookup_key(&self, instance_id: &ContractInstanceId) -> Option<ContractKey>;
fn fetch_contract(
&mut self,
key: ContractKey,
return_contract_code: bool,
) -> impl Future<
Output = Result<(Option<WrappedState>, Option<ContractContainer>), ExecutorError>,
> + Send;
fn upsert_contract_state(
&mut self,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> impl Future<Output = Result<UpsertResult, ExecutorError>> + Send;
fn register_contract_notifier(
&mut self,
key: ContractInstanceId,
cli_id: ClientId,
notification_ch: tokio::sync::mpsc::Sender<HostResult>,
summary: Option<StateSummary<'_>>,
) -> Result<(), Box<RequestError>>;
fn execute_delegate_request(
&mut self,
req: DelegateRequest<'_>,
origin_contract: Option<&ContractInstanceId>,
caller_delegate: Option<&DelegateKey>,
) -> impl Future<Output = Response> + Send;
fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo>;
fn notify_subscription_error(&self, key: ContractInstanceId, reason: String);
fn remove_client(&self, _client_id: ClientId) {}
fn summarize_contract_state(
&mut self,
key: ContractKey,
) -> impl Future<Output = Result<StateSummary<'static>, ExecutorError>> + Send;
fn get_contract_state_delta(
&mut self,
key: ContractKey,
their_summary: StateSummary<'static>,
) -> impl Future<Output = Result<StateDelta<'static>, ExecutorError>> + Send;
fn take_delegate_notification_rx(&mut self) -> Option<DelegateNotificationReceiver> {
None
}
}
pub(crate) type CorruptedStateRecoveryGuard = Arc<std::sync::Mutex<HashSet<ContractKey>>>;
type SharedNotifications =
Arc<dashmap::DashMap<ContractInstanceId, Vec<(ClientId, mpsc::Sender<HostResult>)>>>;
type SharedSummaries =
Arc<dashmap::DashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>>;
type SharedClientCounts = Arc<dashmap::DashMap<ClientId, usize>>;
pub struct Executor<R = Runtime, S: StateStorage = Storage> {
mode: OperationMode,
runtime: R,
pub state_store: StateStore<S>,
update_notifications: HashMap<ContractInstanceId, Vec<(ClientId, mpsc::Sender<HostResult>)>>,
client_subscription_counts: HashMap<ClientId, usize>,
subscriber_summaries:
HashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>,
delegate_origin_ids: HashMap<DelegateKey, Vec<ContractInstanceId>>,
init_tracker: ContractInitTracker,
op_sender: Option<OpRequestSender>,
op_manager: Option<Arc<OpManager>>,
shared_notifications: Option<SharedNotifications>,
shared_summaries: Option<SharedSummaries>,
shared_client_counts: Option<SharedClientCounts>,
pub(crate) recovery_guard: CorruptedStateRecoveryGuard,
summary_cache: LruCache<ContractKey, (u64, StateSummary<'static>)>,
delta_cache: LruCache<(ContractKey, u64, u64), StateDelta<'static>>,
delegate_notification_tx: Option<DelegateNotificationSender>,
}
impl<R, S> Executor<R, S>
where
S: StateStorage + Send + 'static,
<S as StateStorage>::Error: Into<anyhow::Error>,
{
pub(crate) async fn new(
state_store: StateStore<S>,
ctrl_handler: impl FnOnce() -> anyhow::Result<()>,
mode: OperationMode,
runtime: R,
op_sender: Option<OpRequestSender>,
op_manager: Option<Arc<OpManager>>,
) -> anyhow::Result<Self> {
ctrl_handler()?;
Ok(Self {
mode,
runtime,
state_store,
update_notifications: HashMap::default(),
client_subscription_counts: HashMap::default(),
subscriber_summaries: HashMap::default(),
delegate_origin_ids: HashMap::default(),
init_tracker: ContractInitTracker::new(),
op_sender,
op_manager,
shared_notifications: None,
shared_summaries: None,
shared_client_counts: None,
recovery_guard: Arc::new(std::sync::Mutex::new(HashSet::new())),
summary_cache: LruCache::new(NonZeroUsize::new(1024).unwrap()),
delta_cache: LruCache::new(NonZeroUsize::new(1024).unwrap()),
delegate_notification_tx: None,
})
}
pub fn test_data_dir(identifier: &str) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let unique_id = COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"freenet-executor-{identifier}-{}-{unique_id}",
std::process::id()
))
}
pub(crate) fn set_shared_notifications(
&mut self,
notifications: SharedNotifications,
summaries: SharedSummaries,
client_counts: SharedClientCounts,
) {
self.shared_notifications = Some(notifications);
self.shared_summaries = Some(summaries);
self.shared_client_counts = Some(client_counts);
}
pub(crate) fn set_recovery_guard(&mut self, guard: CorruptedStateRecoveryGuard) {
self.recovery_guard = guard;
}
pub(crate) fn set_delegate_notification_tx(&mut self, tx: DelegateNotificationSender) {
self.delegate_notification_tx = Some(tx);
}
pub(crate) async fn get_stores(
config: &Config,
) -> Result<
(
ContractStore,
DelegateStore,
SecretsStore,
StateStore<Storage>,
),
anyhow::Error,
> {
const MAX_MEM_CACHE: u32 = 10_000_000;
let db = Storage::new(&config.db_dir()).await?;
let state_store = StateStore::new(db.clone(), MAX_MEM_CACHE).unwrap();
let (contract_store, delegate_store, secret_store) = Self::get_runtime_stores(config, db)?;
Ok((contract_store, delegate_store, secret_store, state_store))
}
pub(crate) fn get_runtime_stores(
config: &Config,
db: Storage,
) -> Result<(ContractStore, DelegateStore, SecretsStore), anyhow::Error> {
const MAX_SIZE: u64 = 10 * 1024 * 1024;
let contract_store = ContractStore::new(config.contracts_dir(), MAX_SIZE, db.clone())?;
let delegate_store = DelegateStore::new(config.delegates_dir(), MAX_SIZE, db.clone())?;
let secret_store = SecretsStore::new(config.secrets_dir(), config.secrets.clone(), db)?;
Ok((contract_store, delegate_store, secret_store))
}
async fn op_request<Op, M>(&mut self, request: M) -> Result<Op::Result, ExecutorError>
where
Op: Operation + Send + TryFrom<OpEnum, Error = OpError> + 'static,
<Op as Operation>::Result: TryFrom<Op, Error = OpError>,
M: ComposeNetworkMessage<Op>,
{
let (op_sender, op_manager) = match (&self.op_sender, &self.op_manager) {
(Some(sender), Some(manager)) => (sender.clone(), manager.clone()),
_ => {
return Err(ExecutorError::other(anyhow::anyhow!(
"missing op_sender or op_manager"
)));
}
};
let op = request.initiate_op(&op_manager);
let transaction = *op.id();
let (response_tx, response_rx) = oneshot::channel();
op_sender
.send((transaction, response_tx))
.await
.map_err(|_| ExecutorError::other(anyhow::anyhow!("event loop channel closed")))?;
<M as ComposeNetworkMessage<Op>>::resume_op(op, &op_manager)
.await
.map_err(|e| {
tracing::debug!(
tx = %transaction,
error = %e,
"Failed to resume operation"
);
ExecutorError::other(e)
})?;
const OP_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
let op_result = tokio::time::timeout(OP_REQUEST_TIMEOUT, response_rx)
.await
.map_err(|_| {
tracing::warn!(
tx = %transaction,
timeout_secs = OP_REQUEST_TIMEOUT.as_secs(),
"Network operation timed out waiting for response"
);
ExecutorError::other(anyhow::anyhow!(
"network operation timed out after {} seconds",
OP_REQUEST_TIMEOUT.as_secs()
))
})?
.map_err(|_| {
ExecutorError::other(anyhow::anyhow!(
"response channel closed before receiving result"
))
})?;
let op_enum = op_result.map_err(|e| ExecutorError::other(anyhow::anyhow!("{}", e)))?;
let op: Op = op_enum.try_into().map_err(|err: OpError| {
tracing::error!(
tx = %transaction,
error = %err,
"Expected message of one type but got another"
);
ExecutorError::other(err)
})?;
let result = <Op::Result>::try_from(op).map_err(|err| {
tracing::debug!(
tx = %transaction,
error = %err,
"Failed to convert operation result"
);
ExecutorError::other(err)
})?;
Ok(result)
}
pub fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo> {
let mut subscriptions = Vec::new();
for (instance_id, client_list) in &self.update_notifications {
for (client_id, _channel) in client_list {
subscriptions.push(crate::message::SubscriptionInfo {
instance_id: *instance_id,
client_id: *client_id,
last_update: None,
});
}
}
subscriptions
}
}
#[cfg(test)]
pub(crate) mod test_fixtures {
use freenet_stdlib::prelude::*;
pub fn make_contract_key() -> ContractKey {
let code = ContractCode::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let params = Parameters::from(vec![10, 20, 30, 40]);
ContractKey::from_params_and_code(¶ms, &code)
}
pub fn make_contract_key_with_code(code_bytes: &[u8]) -> ContractKey {
let code = ContractCode::from(code_bytes.to_vec());
let params = Parameters::from(vec![10, 20, 30, 40]);
ContractKey::from_params_and_code(¶ms, &code)
}
pub fn make_state(data: &[u8]) -> WrappedState {
WrappedState::new(data.to_vec())
}
pub fn make_params(data: &[u8]) -> Parameters<'static> {
Parameters::from(data.to_vec())
}
pub fn make_delta(data: &[u8]) -> StateDelta<'static> {
StateDelta::from(data.to_vec())
}
}
#[cfg(test)]
mod tests {
use super::*;
mod executor_error_tests {
use super::*;
#[test]
fn test_executor_error_other_is_not_request() {
let err = ExecutorError::other(anyhow::anyhow!("some error"));
assert!(!err.is_request());
assert!(!err.is_fatal());
}
#[test]
fn test_executor_error_request_is_request() {
let err = ExecutorError::request(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "test".into(),
});
assert!(err.is_request());
assert!(!err.is_fatal());
}
#[test]
fn test_executor_error_internal_error() {
let err = ExecutorError::internal_error();
assert!(!err.is_request());
assert!(!err.is_fatal());
assert!(err.to_string().contains("internal error"));
}
#[test]
fn test_executor_error_display_left() {
let err = ExecutorError::request(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "test cause".into(),
});
let display = err.to_string();
assert!(display.contains("test cause") || display.contains("Put"));
}
#[test]
fn test_executor_error_display_right() {
let err = ExecutorError::other(anyhow::anyhow!("custom error message"));
assert!(err.to_string().contains("custom error message"));
}
#[test]
fn test_executor_error_from_request_error() {
let request_err = RequestError::ContractError(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "from conversion".into(),
});
let err: ExecutorError = request_err.into();
assert!(err.is_request());
}
#[test]
fn test_executor_error_from_boxed_request_error() {
let request_err = Box::new(RequestError::ContractError(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "boxed".into(),
}));
let err: ExecutorError = request_err.into();
assert!(err.is_request());
}
#[test]
fn test_unwrap_request_succeeds_for_request_error() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::Put {
key,
cause: "unwrap test".into(),
});
let _unwrapped = err.unwrap_request(); }
#[test]
#[should_panic]
fn test_unwrap_request_panics_for_other_error() {
let err = ExecutorError::other(anyhow::anyhow!("not a request"));
let _unwrapped = err.unwrap_request(); }
#[test]
fn test_contract_exec_rejection_for_update_error() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::update_exec_error(
key,
"New state version 100 must be higher than current version 100",
));
assert!(
err.is_contract_exec_rejection(),
"Update exec errors should be recognized as contract exec rejections"
);
}
#[test]
fn test_contract_exec_rejection_false_for_missing_parameters() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::Update {
key,
cause: "missing contract parameters".into(),
});
assert!(
!err.is_contract_exec_rejection(),
"Missing parameters errors should NOT be recognized as exec rejections"
);
}
#[test]
fn test_contract_exec_rejection_false_for_missing_contract() {
let key = test_fixtures::make_contract_key();
let err =
ExecutorError::request(StdContractError::MissingContract { key: (*key.id()) });
assert!(
!err.is_contract_exec_rejection(),
"MissingContract errors should NOT be recognized as exec rejections"
);
}
#[test]
fn test_contract_exec_rejection_false_for_other_error() {
let err = ExecutorError::other(anyhow::anyhow!("some other error"));
assert!(
!err.is_contract_exec_rejection(),
"Non-request errors should NOT be recognized as exec rejections"
);
}
#[test]
fn test_max_compute_time_exceeded_is_not_fatal() {
use crate::wasm_runtime::{ContractError, ContractExecError, RuntimeInnerError};
let contract_err: ContractError =
RuntimeInnerError::ContractExecError(ContractExecError::MaxComputeTimeExceeded)
.into();
let err = ExecutorError::execution(contract_err, None);
assert!(
!err.is_fatal(),
"MaxComputeTimeExceeded must not be fatal - it would kill the entire contract handler"
);
}
}
mod test_fixtures_tests {
use super::*;
#[test]
fn test_make_contract_key_is_consistent() {
let key1 = test_fixtures::make_contract_key();
let key2 = test_fixtures::make_contract_key();
assert_eq!(key1, key2);
}
#[test]
fn test_make_contract_key_with_different_code() {
let key1 = test_fixtures::make_contract_key_with_code(&[1, 2, 3]);
let key2 = test_fixtures::make_contract_key_with_code(&[4, 5, 6]);
assert_ne!(key1, key2);
}
#[test]
fn test_make_state() {
let state = test_fixtures::make_state(&[1, 2, 3, 4]);
assert_eq!(state.as_ref(), &[1, 2, 3, 4]);
}
#[test]
fn test_make_params() {
let params = test_fixtures::make_params(&[10, 20]);
assert_eq!(params.as_ref(), &[10, 20]);
}
#[test]
fn test_make_delta() {
let delta = test_fixtures::make_delta(&[100, 200]);
assert_eq!(delta.as_ref(), &[100, 200]);
}
}
}