use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::future::Future;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use either::Either;
use freenet_stdlib::client_api::{
ClientError as WsClientError, ClientRequest, ContractError as StdContractError,
ContractRequest, ContractResponse, DelegateError as StdDelegateError, DelegateRequest,
HostResponse::{self, DelegateResponse},
RequestError,
};
use freenet_stdlib::prelude::*;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use super::storages::Storage;
use crate::config::Config;
use crate::node::OpManager;
use crate::operations::get::GetResult;
use crate::wasm_runtime::{
ContractRuntimeInterface, ContractStore, DelegateRuntimeInterface, DelegateStore, Runtime,
SecretsStore, StateStorage, StateStore, StateStoreError,
};
use crate::{
client_events::{ClientId, HostResult},
operations,
};
pub(super) mod init_tracker;
pub(super) mod mock_runtime;
pub(super) mod mock_wasm_runtime;
#[cfg(test)]
mod pool_tests;
pub(super) mod runtime;
pub(crate) struct DelegateNotification {
pub delegate_key: DelegateKey,
pub contract_id: ContractInstanceId,
pub new_state: Arc<WrappedState>,
}
pub(crate) const DELEGATE_NOTIFICATION_CHANNEL_SIZE: usize = 1000;
pub(crate) const MAX_SUBSCRIBERS_PER_CONTRACT: usize = 256;
pub(crate) const MAX_SUBSCRIPTIONS_PER_CLIENT: usize = 50;
pub(crate) const SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE: usize = 64;
pub(crate) const MAX_DELTA_COMPUTATIONS_PER_FANOUT: usize = 32;
pub(crate) const FANOUT_WARNING_THRESHOLD: usize = 50;
pub(crate) const MAX_DELEGATE_CREATION_DEPTH: u32 = 4;
pub(crate) const MAX_DELEGATE_CREATIONS_PER_CALL: u32 = 8;
pub(crate) const MAX_CREATED_DELEGATES_PER_NODE: usize = 1024;
pub(crate) type DelegateNotificationSender = mpsc::Sender<DelegateNotification>;
pub(crate) type DelegateNotificationReceiver = mpsc::Receiver<DelegateNotification>;
pub(crate) use init_tracker::{
ContractInitTracker, InitCheckResult, MAX_CONCURRENT_INITIALIZATIONS,
MAX_QUEUED_OPS_PER_CONTRACT, SLOW_INIT_THRESHOLD, STALE_INIT_THRESHOLD, now_nanos,
};
pub(crate) use runtime::RuntimePool;
#[derive(Debug)]
pub struct ExecutorError {
inner: Either<Box<RequestError>, anyhow::Error>,
fatal: bool,
}
enum InnerOpError {
Upsert(ContractKey),
Delegate(DelegateKey),
}
impl std::error::Error for ExecutorError {}
impl ExecutorError {
pub fn other(error: impl Into<anyhow::Error>) -> Self {
Self {
inner: Either::Right(error.into()),
fatal: false,
}
}
fn internal_error() -> Self {
Self {
inner: Either::Right(anyhow::anyhow!("internal error")),
fatal: false,
}
}
fn request(error: impl Into<RequestError>) -> Self {
Self {
inner: Either::Left(Box::new(error.into())),
fatal: false,
}
}
fn execution(
outer_error: crate::wasm_runtime::ContractError,
op: Option<InnerOpError>,
) -> Self {
use crate::wasm_runtime::RuntimeInnerError;
let error = outer_error.deref();
if let RuntimeInnerError::ContractExecError(e) = error {
if let Some(InnerOpError::Upsert(key)) = &op {
return ExecutorError::request(StdContractError::update_exec_error(*key, e));
}
}
if let RuntimeInnerError::DelegateNotFound(key) = error {
return ExecutorError::request(StdDelegateError::Missing(key.clone()));
}
if let RuntimeInnerError::DelegateExecError(e) = error {
return ExecutorError::request(StdDelegateError::ExecutionError(format!("{e}").into()));
}
if let (
RuntimeInnerError::SecretStoreError(
crate::wasm_runtime::SecretStoreError::MissingSecret(secret),
),
Some(InnerOpError::Delegate(key)),
) = (error, &op)
{
return ExecutorError::request(StdDelegateError::MissingSecret {
key: key.clone(),
secret: secret.clone(),
});
}
if let RuntimeInnerError::WasmError(e) = error {
match op {
Some(InnerOpError::Upsert(key)) => {
return ExecutorError::request(StdContractError::update_exec_error(key, e));
}
_ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
}
}
ExecutorError::other(outer_error)
}
pub fn is_request(&self) -> bool {
matches!(self.inner, Either::Left(_))
}
pub fn is_contract_exec_rejection(&self) -> bool {
match &self.inner {
Either::Left(req_err) => matches!(
req_err.as_ref(),
RequestError::ContractError(StdContractError::Update { cause, .. })
if cause.starts_with("execution error")
),
Either::Right(_) => false,
}
}
pub fn is_missing_contract_parameters(&self) -> bool {
match &self.inner {
Either::Left(req_err) => matches!(
req_err.as_ref(),
RequestError::ContractError(StdContractError::Update { cause, .. })
if cause.contains("missing contract parameters")
),
Either::Right(_) => false,
}
}
pub fn is_invalid_update_rejection(&self) -> bool {
match &self.inner {
Either::Left(req_err) => matches!(
req_err.as_ref(),
RequestError::ContractError(StdContractError::Update { cause, .. })
if cause.starts_with("execution error: invalid contract update")
),
Either::Right(_) => false,
}
}
pub fn is_fatal(&self) -> bool {
self.fatal
}
pub fn is_missing_delegate(&self) -> bool {
matches!(
&self.inner,
Either::Left(err) if matches!(
err.as_ref(),
RequestError::DelegateError(StdDelegateError::Missing(_))
)
)
}
pub fn unwrap_request(self) -> RequestError {
match self.inner {
Either::Left(err) => *err,
Either::Right(_) => unreachable!("called unwrap_request on a non-request error"),
}
}
}
impl From<RequestError> for ExecutorError {
fn from(value: RequestError) -> Self {
Self {
inner: Either::Left(Box::new(value)),
fatal: false,
}
}
}
impl Display for ExecutorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.inner {
Either::Left(l) => write!(f, "{}", &**l),
Either::Right(r) => write!(f, "{}", &**r),
}
}
}
impl From<Box<RequestError>> for ExecutorError {
fn from(value: Box<RequestError>) -> Self {
Self {
inner: Either::Left(value),
fatal: false,
}
}
}
type Response = Result<HostResponse, ExecutorError>;
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationMode {
Local,
Network,
}
impl Display for OperationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationMode::Local => write!(f, "local"),
OperationMode::Network => write!(f, "network"),
}
}
}
pub(crate) struct ExecutorTransactionStream;
impl futures::Stream for ExecutorTransactionStream {
type Item = crate::message::Transaction;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::task::Poll::Pending
}
}
#[derive(Debug)]
pub(crate) enum UpsertResult {
NoChange,
Updated(WrappedState),
CurrentWon(WrappedState),
}
pub(crate) trait ContractExecutor: Send + 'static {
fn lookup_key(&self, instance_id: &ContractInstanceId) -> Option<ContractKey>;
fn fetch_contract(
&mut self,
key: ContractKey,
return_contract_code: bool,
) -> impl Future<
Output = Result<(Option<WrappedState>, Option<ContractContainer>), ExecutorError>,
> + Send;
fn upsert_contract_state(
&mut self,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> impl Future<Output = Result<UpsertResult, ExecutorError>> + Send;
fn register_contract_notifier(
&mut self,
key: ContractInstanceId,
cli_id: ClientId,
notification_ch: tokio::sync::mpsc::Sender<HostResult>,
summary: Option<StateSummary<'_>>,
) -> Result<(), Box<RequestError>>;
fn execute_delegate_request(
&mut self,
req: DelegateRequest<'_>,
origin_contract: Option<&ContractInstanceId>,
caller_delegate: Option<&DelegateKey>,
) -> impl Future<Output = Response> + Send;
fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo>;
fn remove_client(&self, _client_id: ClientId) {}
fn summarize_contract_state(
&mut self,
key: ContractKey,
) -> impl Future<Output = Result<StateSummary<'static>, ExecutorError>> + Send;
fn get_contract_state_delta(
&mut self,
key: ContractKey,
their_summary: StateSummary<'static>,
) -> impl Future<Output = Result<StateDelta<'static>, ExecutorError>> + Send;
fn take_delegate_notification_rx(&mut self) -> Option<DelegateNotificationReceiver> {
None
}
}
pub(crate) type CorruptedStateRecoveryGuard = Arc<std::sync::Mutex<HashSet<ContractKey>>>;
type SharedNotifications =
Arc<dashmap::DashMap<ContractInstanceId, Vec<(ClientId, mpsc::Sender<HostResult>)>>>;
type SharedSummaries =
Arc<dashmap::DashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>>;
type SharedClientCounts = Arc<dashmap::DashMap<ClientId, usize>>;
pub struct Executor<R = Runtime, S: StateStorage = Storage> {
mode: OperationMode,
runtime: R,
pub state_store: StateStore<S>,
update_notifications: HashMap<ContractInstanceId, Vec<(ClientId, mpsc::Sender<HostResult>)>>,
client_subscription_counts: HashMap<ClientId, usize>,
subscriber_summaries:
HashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>,
delegate_origin_ids: HashMap<DelegateKey, Vec<ContractInstanceId>>,
init_tracker: ContractInitTracker,
op_manager: Option<Arc<OpManager>>,
shared_notifications: Option<SharedNotifications>,
shared_summaries: Option<SharedSummaries>,
shared_client_counts: Option<SharedClientCounts>,
pub(crate) recovery_guard: CorruptedStateRecoveryGuard,
summary_cache: LruCache<ContractKey, (u64, StateSummary<'static>)>,
delta_cache: LruCache<(ContractKey, u64, u64), StateDelta<'static>>,
delegate_notification_tx: Option<DelegateNotificationSender>,
}
impl<R, S> Executor<R, S>
where
S: StateStorage + Send + 'static,
<S as StateStorage>::Error: Into<anyhow::Error>,
{
pub(crate) async fn new(
state_store: StateStore<S>,
ctrl_handler: impl FnOnce() -> anyhow::Result<()>,
mode: OperationMode,
runtime: R,
op_manager: Option<Arc<OpManager>>,
) -> anyhow::Result<Self> {
ctrl_handler()?;
Ok(Self {
mode,
runtime,
state_store,
update_notifications: HashMap::default(),
client_subscription_counts: HashMap::default(),
subscriber_summaries: HashMap::default(),
delegate_origin_ids: HashMap::default(),
init_tracker: ContractInitTracker::new(),
op_manager,
shared_notifications: None,
shared_summaries: None,
shared_client_counts: None,
recovery_guard: Arc::new(std::sync::Mutex::new(HashSet::new())),
summary_cache: LruCache::new(NonZeroUsize::new(1024).unwrap()),
delta_cache: LruCache::new(NonZeroUsize::new(1024).unwrap()),
delegate_notification_tx: None,
})
}
pub fn test_data_dir(identifier: &str) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let unique_id = COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"freenet-executor-{identifier}-{}-{unique_id}",
std::process::id()
))
}
pub(crate) fn set_shared_notifications(
&mut self,
notifications: SharedNotifications,
summaries: SharedSummaries,
client_counts: SharedClientCounts,
) {
self.shared_notifications = Some(notifications);
self.shared_summaries = Some(summaries);
self.shared_client_counts = Some(client_counts);
}
pub(crate) fn set_recovery_guard(&mut self, guard: CorruptedStateRecoveryGuard) {
self.recovery_guard = guard;
}
pub(crate) fn set_delegate_notification_tx(&mut self, tx: DelegateNotificationSender) {
self.delegate_notification_tx = Some(tx);
}
pub(crate) async fn get_stores(
config: &Config,
) -> Result<
(
ContractStore,
DelegateStore,
SecretsStore,
StateStore<Storage>,
),
anyhow::Error,
> {
const MAX_MEM_CACHE: u32 = 10_000_000;
let db = Storage::new(&config.db_dir()).await?;
let state_store = StateStore::new(db.clone(), MAX_MEM_CACHE).unwrap();
let (contract_store, delegate_store, secret_store) = Self::get_runtime_stores(config, db)?;
Ok((contract_store, delegate_store, secret_store, state_store))
}
pub(crate) fn get_runtime_stores(
config: &Config,
db: Storage,
) -> Result<(ContractStore, DelegateStore, SecretsStore), anyhow::Error> {
const MAX_SIZE: u64 = 10 * 1024 * 1024;
let contract_store = ContractStore::new(config.contracts_dir(), MAX_SIZE, db.clone())?;
let delegate_store = DelegateStore::new(config.delegates_dir(), MAX_SIZE, db.clone())?;
let secret_store = SecretsStore::new(config.secrets_dir(), config.secrets.clone(), db)?;
Ok((contract_store, delegate_store, secret_store))
}
pub fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo> {
let mut subscriptions = Vec::new();
for (instance_id, client_list) in &self.update_notifications {
for (client_id, _channel) in client_list {
subscriptions.push(crate::message::SubscriptionInfo {
instance_id: *instance_id,
client_id: *client_id,
last_update: None,
});
}
}
subscriptions
}
}
#[cfg(test)]
pub(crate) mod test_fixtures {
use freenet_stdlib::prelude::*;
pub fn make_contract_key() -> ContractKey {
let code = ContractCode::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let params = Parameters::from(vec![10, 20, 30, 40]);
ContractKey::from_params_and_code(¶ms, &code)
}
pub fn make_contract_key_with_code(code_bytes: &[u8]) -> ContractKey {
let code = ContractCode::from(code_bytes.to_vec());
let params = Parameters::from(vec![10, 20, 30, 40]);
ContractKey::from_params_and_code(¶ms, &code)
}
pub fn make_state(data: &[u8]) -> WrappedState {
WrappedState::new(data.to_vec())
}
pub fn make_params(data: &[u8]) -> Parameters<'static> {
Parameters::from(data.to_vec())
}
pub fn make_delta(data: &[u8]) -> StateDelta<'static> {
StateDelta::from(data.to_vec())
}
}
#[cfg(test)]
mod tests {
use super::*;
mod executor_error_tests {
use super::*;
#[test]
fn test_executor_error_other_is_not_request() {
let err = ExecutorError::other(anyhow::anyhow!("some error"));
assert!(!err.is_request());
assert!(!err.is_fatal());
}
#[test]
fn test_executor_error_request_is_request() {
let err = ExecutorError::request(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "test".into(),
});
assert!(err.is_request());
assert!(!err.is_fatal());
}
#[test]
fn test_executor_error_internal_error() {
let err = ExecutorError::internal_error();
assert!(!err.is_request());
assert!(!err.is_fatal());
assert!(err.to_string().contains("internal error"));
}
#[test]
fn test_executor_error_display_left() {
let err = ExecutorError::request(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "test cause".into(),
});
let display = err.to_string();
assert!(display.contains("test cause") || display.contains("Put"));
}
#[test]
fn test_executor_error_display_right() {
let err = ExecutorError::other(anyhow::anyhow!("custom error message"));
assert!(err.to_string().contains("custom error message"));
}
#[test]
fn test_executor_error_from_request_error() {
let request_err = RequestError::ContractError(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "from conversion".into(),
});
let err: ExecutorError = request_err.into();
assert!(err.is_request());
}
#[test]
fn test_executor_error_from_boxed_request_error() {
let request_err = Box::new(RequestError::ContractError(StdContractError::Put {
key: test_fixtures::make_contract_key(),
cause: "boxed".into(),
}));
let err: ExecutorError = request_err.into();
assert!(err.is_request());
}
#[test]
fn test_unwrap_request_succeeds_for_request_error() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::Put {
key,
cause: "unwrap test".into(),
});
let _unwrapped = err.unwrap_request(); }
#[test]
#[should_panic]
fn test_unwrap_request_panics_for_other_error() {
let err = ExecutorError::other(anyhow::anyhow!("not a request"));
let _unwrapped = err.unwrap_request(); }
#[test]
fn test_contract_exec_rejection_for_update_error() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::update_exec_error(
key,
"New state version 100 must be higher than current version 100",
));
assert!(
err.is_contract_exec_rejection(),
"Update exec errors should be recognized as contract exec rejections"
);
}
#[test]
fn test_contract_exec_rejection_false_for_missing_parameters() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::Update {
key,
cause: "missing contract parameters".into(),
});
assert!(
!err.is_contract_exec_rejection(),
"Missing parameters errors should NOT be recognized as exec rejections"
);
}
#[test]
fn test_contract_exec_rejection_false_for_missing_contract() {
let key = test_fixtures::make_contract_key();
let err =
ExecutorError::request(StdContractError::MissingContract { key: (*key.id()) });
assert!(
!err.is_contract_exec_rejection(),
"MissingContract errors should NOT be recognized as exec rejections"
);
}
#[test]
fn test_contract_exec_rejection_false_for_other_error() {
let err = ExecutorError::other(anyhow::anyhow!("some other error"));
assert!(
!err.is_contract_exec_rejection(),
"Non-request errors should NOT be recognized as exec rejections"
);
}
#[test]
fn test_invalid_update_rejection_for_invalid_update() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::update_exec_error(
key,
"invalid contract update, reason: New state version 100 must be higher than current version 100",
));
assert!(
err.is_invalid_update_rejection(),
"Contract InvalidUpdateWithInfo rejection MUST be recognized as benign"
);
assert!(
err.is_contract_exec_rejection(),
"The benign case must also satisfy the broader predicate (auto-fetch gate)"
);
}
#[test]
fn test_invalid_update_rejection_false_for_out_of_gas() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::update_exec_error(
key,
"The operation ran out of gas. This might be caused by an infinite loop or an inefficient computation.",
));
assert!(
!err.is_invalid_update_rejection(),
"Out-of-gas MUST NOT be classified as a benign invalid-update rejection (it's a real WASM fault)"
);
assert!(
err.is_contract_exec_rejection(),
"Out-of-gas IS a contract-exec error (broader predicate matches), so auto-fetch is correctly skipped"
);
}
#[test]
fn test_invalid_update_rejection_false_for_max_compute_time() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::update_exec_error(
key,
"The operation exceeded the maximum allowed compute time",
));
assert!(
!err.is_invalid_update_rejection(),
"Max-compute-time MUST NOT be classified as a benign invalid-update rejection"
);
}
#[test]
fn test_invalid_update_rejection_false_for_double_put() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::update_exec_error(
key,
format!(
"Attempted to perform a put for an already put contract ({key}), use update instead"
),
));
assert!(
!err.is_invalid_update_rejection(),
"DoublePut MUST NOT be classified as a benign invalid-update rejection"
);
}
#[test]
fn test_invalid_update_rejection_false_for_missing_parameters() {
let key = test_fixtures::make_contract_key();
let err = ExecutorError::request(StdContractError::Update {
key,
cause: "missing contract parameters".into(),
});
assert!(
!err.is_invalid_update_rejection(),
"Missing parameters is a real failure, not a benign rejection"
);
}
#[test]
fn test_max_compute_time_exceeded_is_not_fatal() {
use crate::wasm_runtime::{ContractError, ContractExecError, RuntimeInnerError};
let contract_err: ContractError =
RuntimeInnerError::ContractExecError(ContractExecError::MaxComputeTimeExceeded)
.into();
let err = ExecutorError::execution(contract_err, None);
assert!(
!err.is_fatal(),
"MaxComputeTimeExceeded must not be fatal - it would kill the entire contract handler"
);
}
}
mod test_fixtures_tests {
use super::*;
#[test]
fn test_make_contract_key_is_consistent() {
let key1 = test_fixtures::make_contract_key();
let key2 = test_fixtures::make_contract_key();
assert_eq!(key1, key2);
}
#[test]
fn test_make_contract_key_with_different_code() {
let key1 = test_fixtures::make_contract_key_with_code(&[1, 2, 3]);
let key2 = test_fixtures::make_contract_key_with_code(&[4, 5, 6]);
assert_ne!(key1, key2);
}
#[test]
fn test_make_state() {
let state = test_fixtures::make_state(&[1, 2, 3, 4]);
assert_eq!(state.as_ref(), &[1, 2, 3, 4]);
}
#[test]
fn test_make_params() {
let params = test_fixtures::make_params(&[10, 20]);
assert_eq!(params.as_ref(), &[10, 20]);
}
#[test]
fn test_make_delta() {
let delta = test_fixtures::make_delta(&[100, 200]);
assert_eq!(delta.as_ref(), &[100, 200]);
}
}
}