use std::{
    collections::{hash_map, BTreeMap, HashMap, HashSet},
    mem,
    sync::{Arc, Mutex},
};
use custom_debug_derive::Debug;
use linera_base::{
    data_types::{
        Amount, ApplicationPermissions, ArithmeticError, BlockHeight, Resources,
        SendMessageRequest, Timestamp,
    },
    ensure,
    identifiers::{Account, ChainId, ChannelName, MessageId, Owner},
    ownership::ChainOwnership,
};
use linera_views::batch::Batch;
use oneshot::Receiver;
use crate::{
    execution::UserAction,
    execution_state_actor::{ExecutionStateSender, Request},
    resources::ResourceController,
    util::{ReceiverExt, UnboundedSenderExt},
    BaseRuntime, ContractRuntime, ExecutionError, ExecutionOutcome, FinalizeContext,
    MessageContext, OperationContext, RawExecutionOutcome, ServiceRuntime,
    UserApplicationDescription, UserApplicationId, UserContractInstance, UserServiceInstance,
};
#[cfg(test)]
#[path = "unit_tests/runtime_tests.rs"]
mod tests;
#[derive(Debug)]
pub struct SyncRuntime<UserInstance>(Arc<Mutex<SyncRuntimeInternal<UserInstance>>>);
pub type ContractSyncRuntime = SyncRuntime<UserContractInstance>;
pub type ServiceSyncRuntime = SyncRuntime<UserServiceInstance>;
#[derive(Debug)]
pub struct SyncRuntimeInternal<UserInstance> {
    chain_id: ChainId,
    height: BlockHeight,
    authenticated_signer: Option<Owner>,
    executing_message: Option<ExecutingMessage>,
    next_message_index: u32,
    execution_state_sender: ExecutionStateSender,
    is_finalizing: bool,
    applications_to_finalize: Vec<UserApplicationId>,
    loaded_applications: HashMap<UserApplicationId, LoadedApplication<UserInstance>>,
    call_stack: Vec<ApplicationStatus>,
    active_applications: HashSet<UserApplicationId>,
    execution_outcomes: Vec<ExecutionOutcome>,
    simple_user_states: BTreeMap<UserApplicationId, SimpleUserState>,
    view_user_states: BTreeMap<UserApplicationId, ViewUserState>,
    refund_grant_to: Option<Account>,
    resource_controller: ResourceController,
}
impl<UserInstance> SyncRuntimeInternal<UserInstance> {
    fn next_message_index(&self) -> Result<u32, ArithmeticError> {
        let mut index = self.next_message_index;
        for outcome in &self.execution_outcomes {
            let len = match outcome {
                ExecutionOutcome::System(outcome) => outcome.messages.len(),
                ExecutionOutcome::User(_, outcome) => outcome.messages.len(),
            };
            let len = u32::try_from(len).map_err(|_| ArithmeticError::Overflow)?;
            index = index.checked_add(len).ok_or(ArithmeticError::Overflow)?;
        }
        Ok(index)
    }
}
#[derive(Debug)]
struct ApplicationStatus {
    caller_id: Option<UserApplicationId>,
    id: UserApplicationId,
    parameters: Vec<u8>,
    signer: Option<Owner>,
    outcome: RawExecutionOutcome<Vec<u8>>,
}
#[derive(Debug)]
struct LoadedApplication<Instance> {
    instance: Arc<Mutex<Instance>>,
    parameters: Vec<u8>,
}
impl<Instance> LoadedApplication<Instance> {
    fn new(instance: Instance, description: UserApplicationDescription) -> Self {
        LoadedApplication {
            instance: Arc::new(Mutex::new(instance)),
            parameters: description.parameters,
        }
    }
}
impl<Instance> Clone for LoadedApplication<Instance> {
    fn clone(&self) -> Self {
        LoadedApplication {
            instance: self.instance.clone(),
            parameters: self.parameters.clone(),
        }
    }
}
#[derive(Debug)]
enum Promise<T> {
    Ready(T),
    Pending(Receiver<T>),
}
impl<T> Promise<T> {
    fn force(&mut self) -> Result<(), ExecutionError> {
        if let Promise::Pending(receiver) = self {
            let value = receiver
                .recv_ref()
                .map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)?;
            *self = Promise::Ready(value);
        }
        Ok(())
    }
    fn read(self) -> Result<T, ExecutionError> {
        match self {
            Promise::Pending(receiver) => {
                let value = receiver.recv_response()?;
                Ok(value)
            }
            Promise::Ready(value) => Ok(value),
        }
    }
}
#[derive(Debug, Default)]
struct SimpleUserState {
    pending_query: Option<Receiver<Vec<u8>>>,
}
#[derive(Debug, Default)]
struct QueryManager<T> {
    pending_queries: BTreeMap<u32, Promise<T>>,
    query_count: u32,
    active_query_count: u32,
}
impl<T> QueryManager<T> {
    fn register(&mut self, receiver: Receiver<T>) -> Result<u32, ExecutionError> {
        let id = self.query_count;
        self.pending_queries.insert(id, Promise::Pending(receiver));
        self.query_count = self
            .query_count
            .checked_add(1)
            .ok_or(ArithmeticError::Overflow)?;
        self.active_query_count = self
            .active_query_count
            .checked_add(1)
            .ok_or(ArithmeticError::Overflow)?;
        Ok(id)
    }
    fn wait(&mut self, id: u32) -> Result<T, ExecutionError> {
        let promise = self
            .pending_queries
            .remove(&id)
            .ok_or(ExecutionError::InvalidPromise)?;
        let value = promise.read()?;
        self.active_query_count -= 1;
        Ok(value)
    }
    fn force_all(&mut self) -> Result<(), ExecutionError> {
        for promise in self.pending_queries.values_mut() {
            promise.force()?;
        }
        Ok(())
    }
}
type Keys = Vec<Vec<u8>>;
type Value = Vec<u8>;
type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
#[derive(Debug, Default)]
struct ViewUserState {
    contains_key_queries: QueryManager<bool>,
    read_value_queries: QueryManager<Option<Value>>,
    read_multi_values_queries: QueryManager<Vec<Option<Value>>>,
    find_keys_queries: QueryManager<Keys>,
    find_key_values_queries: QueryManager<KeyValues>,
}
impl ViewUserState {
    fn force_all_pending_queries(&mut self) -> Result<(), ExecutionError> {
        self.contains_key_queries.force_all()?;
        self.read_value_queries.force_all()?;
        self.read_multi_values_queries.force_all()?;
        self.find_keys_queries.force_all()?;
        self.find_key_values_queries.force_all()?;
        Ok(())
    }
}
impl<UserInstance> SyncRuntimeInternal<UserInstance> {
    #[allow(clippy::too_many_arguments)]
    fn new(
        chain_id: ChainId,
        height: BlockHeight,
        authenticated_signer: Option<Owner>,
        next_message_index: u32,
        executing_message: Option<ExecutingMessage>,
        execution_state_sender: ExecutionStateSender,
        refund_grant_to: Option<Account>,
        resource_controller: ResourceController,
    ) -> Self {
        Self {
            chain_id,
            height,
            authenticated_signer,
            next_message_index,
            executing_message,
            execution_state_sender,
            is_finalizing: false,
            applications_to_finalize: Vec::new(),
            loaded_applications: HashMap::new(),
            call_stack: Vec::new(),
            active_applications: HashSet::new(),
            execution_outcomes: Vec::default(),
            simple_user_states: BTreeMap::default(),
            view_user_states: BTreeMap::default(),
            refund_grant_to,
            resource_controller,
        }
    }
    fn current_application(&mut self) -> &ApplicationStatus {
        self.call_stack
            .last()
            .expect("Call stack is unexpectedly empty")
    }
    fn current_application_mut(&mut self) -> &mut ApplicationStatus {
        self.call_stack
            .last_mut()
            .expect("Call stack is unexpectedly empty")
    }
    fn push_application(&mut self, status: ApplicationStatus) {
        self.active_applications.insert(status.id);
        self.call_stack.push(status);
    }
    fn pop_application(&mut self) -> ApplicationStatus {
        let status = self
            .call_stack
            .pop()
            .expect("Can't remove application from empty call stack");
        assert!(self.active_applications.remove(&status.id));
        status
    }
    fn check_for_reentrancy(
        &mut self,
        application_id: UserApplicationId,
    ) -> Result<(), ExecutionError> {
        ensure!(
            !self.active_applications.contains(&application_id),
            ExecutionError::ReentrantCall(application_id)
        );
        Ok(())
    }
}
impl SyncRuntimeInternal<UserContractInstance> {
    fn load_contract_instance(
        &mut self,
        this: Arc<Mutex<Self>>,
        id: UserApplicationId,
    ) -> Result<LoadedApplication<UserContractInstance>, ExecutionError> {
        match self.loaded_applications.entry(id) {
            hash_map::Entry::Vacant(entry) => {
                let (code, description) = self
                    .execution_state_sender
                    .send_request(|callback| Request::LoadContract { id, callback })?
                    .recv_response()?;
                let instance = code.instantiate(SyncRuntime(this))?;
                self.applications_to_finalize.push(id);
                Ok(entry
                    .insert(LoadedApplication::new(instance, description))
                    .clone())
            }
            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
        }
    }
    fn prepare_for_call(
        &mut self,
        this: Arc<Mutex<Self>>,
        authenticated: bool,
        callee_id: UserApplicationId,
    ) -> Result<(Arc<Mutex<UserContractInstance>>, OperationContext), ExecutionError> {
        self.check_for_reentrancy(callee_id)?;
        ensure!(
            !self.is_finalizing,
            ExecutionError::CrossApplicationCallInFinalize {
                caller_id: Box::new(self.current_application().id),
                callee_id: Box::new(callee_id),
            }
        );
        let application = self.load_contract_instance(this, callee_id)?;
        let caller = self.current_application();
        let caller_id = caller.id;
        let caller_signer = caller.signer;
        let authenticated_signer = match caller_signer {
            Some(signer) if authenticated => Some(signer),
            _ => None,
        };
        let authenticated_caller_id = authenticated.then_some(caller_id);
        let callee_context = OperationContext {
            chain_id: self.chain_id,
            authenticated_signer,
            authenticated_caller_id,
            height: self.height,
            index: None,
            next_message_index: self.next_message_index,
        };
        self.push_application(ApplicationStatus {
            caller_id: authenticated_caller_id,
            id: callee_id,
            parameters: application.parameters,
            signer: authenticated_signer,
            outcome: RawExecutionOutcome::default(),
        });
        Ok((application.instance, callee_context))
    }
    fn finish_call(&mut self) -> Result<(), ExecutionError> {
        let ApplicationStatus {
            id: callee_id,
            signer,
            outcome,
            ..
        } = self.pop_application();
        self.handle_outcome(outcome, signer, callee_id)?;
        Ok(())
    }
    fn handle_outcome(
        &mut self,
        raw_outcome: RawExecutionOutcome<Vec<u8>, Resources>,
        signer: Option<Owner>,
        application_id: UserApplicationId,
    ) -> Result<(), ExecutionError> {
        let outcome = raw_outcome
            .with_refund_grant_to(self.refund_grant_to)
            .with_authenticated_signer(signer)
            .into_priced(&self.resource_controller.policy)?;
        for message in &outcome.messages {
            self.resource_controller.track_grant(message.grant)?;
        }
        self.execution_outcomes
            .push(ExecutionOutcome::User(application_id, outcome));
        Ok(())
    }
}
impl SyncRuntimeInternal<UserServiceInstance> {
    fn load_service_instance(
        &mut self,
        this: Arc<Mutex<Self>>,
        id: UserApplicationId,
    ) -> Result<LoadedApplication<UserServiceInstance>, ExecutionError> {
        match self.loaded_applications.entry(id) {
            hash_map::Entry::Vacant(entry) => {
                let (code, description) = self
                    .execution_state_sender
                    .send_request(|callback| Request::LoadService { id, callback })?
                    .recv_response()?;
                let instance = code.instantiate(SyncRuntime(this))?;
                Ok(entry
                    .insert(LoadedApplication::new(instance, description))
                    .clone())
            }
            hash_map::Entry::Occupied(entry) => Ok(entry.get().clone()),
        }
    }
}
impl<UserInstance> SyncRuntime<UserInstance> {
    fn new(runtime: SyncRuntimeInternal<UserInstance>) -> Self {
        SyncRuntime(Arc::new(Mutex::new(runtime)))
    }
    fn into_inner(self) -> Option<SyncRuntimeInternal<UserInstance>> {
        let runtime = Arc::into_inner(self.0)?
            .into_inner()
            .expect("thread should not have panicked");
        Some(runtime)
    }
    fn inner(&mut self) -> std::sync::MutexGuard<'_, SyncRuntimeInternal<UserInstance>> {
        self.0
            .try_lock()
            .expect("Synchronous runtimes run on a single execution thread")
    }
}
impl<UserInstance> BaseRuntime for SyncRuntime<UserInstance> {
    type Read = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::Read;
    type ReadValueBytes = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::ReadValueBytes;
    type ContainsKey = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::ContainsKey;
    type ReadMultiValuesBytes =
        <SyncRuntimeInternal<UserInstance> as BaseRuntime>::ReadMultiValuesBytes;
    type FindKeysByPrefix = <SyncRuntimeInternal<UserInstance> as BaseRuntime>::FindKeysByPrefix;
    type FindKeyValuesByPrefix =
        <SyncRuntimeInternal<UserInstance> as BaseRuntime>::FindKeyValuesByPrefix;
    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
        self.inner().chain_id()
    }
    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
        self.inner().block_height()
    }
    fn application_id(&mut self) -> Result<UserApplicationId, ExecutionError> {
        self.inner().application_id()
    }
    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
        self.inner().application_parameters()
    }
    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
        self.inner().read_system_timestamp()
    }
    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
        self.inner().read_chain_balance()
    }
    fn read_owner_balance(&mut self, owner: Owner) -> Result<Amount, ExecutionError> {
        self.inner().read_owner_balance(owner)
    }
    fn read_owner_balances(&mut self) -> Result<Vec<(Owner, Amount)>, ExecutionError> {
        self.inner().read_owner_balances()
    }
    fn read_balance_owners(&mut self) -> Result<Vec<Owner>, ExecutionError> {
        self.inner().read_balance_owners()
    }
    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
        self.inner().chain_ownership()
    }
    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
        self.inner().write_batch(batch)
    }
    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
        self.inner().contains_key_new(key)
    }
    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
        self.inner().contains_key_wait(promise)
    }
    fn read_multi_values_bytes_new(
        &mut self,
        keys: Vec<Vec<u8>>,
    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
        self.inner().read_multi_values_bytes_new(keys)
    }
    fn read_multi_values_bytes_wait(
        &mut self,
        promise: &Self::ReadMultiValuesBytes,
    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
        self.inner().read_multi_values_bytes_wait(promise)
    }
    fn read_value_bytes_new(
        &mut self,
        key: Vec<u8>,
    ) -> Result<Self::ReadValueBytes, ExecutionError> {
        self.inner().read_value_bytes_new(key)
    }
    fn read_value_bytes_wait(
        &mut self,
        promise: &Self::ReadValueBytes,
    ) -> Result<Option<Vec<u8>>, ExecutionError> {
        self.inner().read_value_bytes_wait(promise)
    }
    fn find_keys_by_prefix_new(
        &mut self,
        key_prefix: Vec<u8>,
    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
        self.inner().find_keys_by_prefix_new(key_prefix)
    }
    fn find_keys_by_prefix_wait(
        &mut self,
        promise: &Self::FindKeysByPrefix,
    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
        self.inner().find_keys_by_prefix_wait(promise)
    }
    fn find_key_values_by_prefix_new(
        &mut self,
        key_prefix: Vec<u8>,
    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
        self.inner().find_key_values_by_prefix_new(key_prefix)
    }
    fn find_key_values_by_prefix_wait(
        &mut self,
        promise: &Self::FindKeyValuesByPrefix,
    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
        self.inner().find_key_values_by_prefix_wait(promise)
    }
}
impl<UserInstance> BaseRuntime for SyncRuntimeInternal<UserInstance> {
    type Read = ();
    type ReadValueBytes = u32;
    type ContainsKey = u32;
    type ReadMultiValuesBytes = u32;
    type FindKeysByPrefix = u32;
    type FindKeyValuesByPrefix = u32;
    fn chain_id(&mut self) -> Result<ChainId, ExecutionError> {
        Ok(self.chain_id)
    }
    fn block_height(&mut self) -> Result<BlockHeight, ExecutionError> {
        Ok(self.height)
    }
    fn application_id(&mut self) -> Result<UserApplicationId, ExecutionError> {
        Ok(self.current_application().id)
    }
    fn application_parameters(&mut self) -> Result<Vec<u8>, ExecutionError> {
        Ok(self.current_application().parameters.clone())
    }
    fn read_system_timestamp(&mut self) -> Result<Timestamp, ExecutionError> {
        self.execution_state_sender
            .send_request(|callback| Request::SystemTimestamp { callback })?
            .recv_response()
    }
    fn read_chain_balance(&mut self) -> Result<Amount, ExecutionError> {
        self.execution_state_sender
            .send_request(|callback| Request::ChainBalance { callback })?
            .recv_response()
    }
    fn read_owner_balance(&mut self, owner: Owner) -> Result<Amount, ExecutionError> {
        self.execution_state_sender
            .send_request(|callback| Request::OwnerBalance { owner, callback })?
            .recv_response()
    }
    fn read_owner_balances(&mut self) -> Result<Vec<(Owner, Amount)>, ExecutionError> {
        self.execution_state_sender
            .send_request(|callback| Request::OwnerBalances { callback })?
            .recv_response()
    }
    fn read_balance_owners(&mut self) -> Result<Vec<Owner>, ExecutionError> {
        self.execution_state_sender
            .send_request(|callback| Request::BalanceOwners { callback })?
            .recv_response()
    }
    fn chain_ownership(&mut self) -> Result<ChainOwnership, ExecutionError> {
        self.execution_state_sender
            .send_request(|callback| Request::ChainOwnership { callback })?
            .recv_response()
    }
    fn write_batch(&mut self, batch: Batch) -> Result<(), ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        state.force_all_pending_queries()?;
        self.resource_controller.track_write_operations(
            batch
                .num_operations()
                .try_into()
                .map_err(|_| ExecutionError::from(ArithmeticError::Overflow))?,
        )?;
        self.resource_controller
            .track_bytes_written(batch.size() as u64)?;
        self.execution_state_sender
            .send_request(|callback| Request::WriteBatch {
                id,
                batch,
                callback,
            })?
            .recv_response()?;
        Ok(())
    }
    fn contains_key_new(&mut self, key: Vec<u8>) -> Result<Self::ContainsKey, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        self.resource_controller.track_read_operations(1)?;
        let receiver = self
            .execution_state_sender
            .send_request(move |callback| Request::ContainsKey { id, key, callback })?;
        state.contains_key_queries.register(receiver)
    }
    fn contains_key_wait(&mut self, promise: &Self::ContainsKey) -> Result<bool, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        let value = state.contains_key_queries.wait(*promise)?;
        Ok(value)
    }
    fn read_multi_values_bytes_new(
        &mut self,
        keys: Vec<Vec<u8>>,
    ) -> Result<Self::ReadMultiValuesBytes, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        self.resource_controller.track_read_operations(1)?;
        let receiver = self
            .execution_state_sender
            .send_request(move |callback| Request::ReadMultiValuesBytes { id, keys, callback })?;
        state.read_multi_values_queries.register(receiver)
    }
    fn read_multi_values_bytes_wait(
        &mut self,
        promise: &Self::ReadMultiValuesBytes,
    ) -> Result<Vec<Option<Vec<u8>>>, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        let values = state.read_multi_values_queries.wait(*promise)?;
        for value in &values {
            if let Some(value) = &value {
                self.resource_controller
                    .track_bytes_read(value.len() as u64)?;
            }
        }
        Ok(values)
    }
    fn read_value_bytes_new(
        &mut self,
        key: Vec<u8>,
    ) -> Result<Self::ReadValueBytes, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        self.resource_controller.track_read_operations(1)?;
        let receiver = self
            .execution_state_sender
            .send_request(move |callback| Request::ReadValueBytes { id, key, callback })?;
        state.read_value_queries.register(receiver)
    }
    fn read_value_bytes_wait(
        &mut self,
        promise: &Self::ReadValueBytes,
    ) -> Result<Option<Vec<u8>>, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        let value = state.read_value_queries.wait(*promise)?;
        if let Some(value) = &value {
            self.resource_controller
                .track_bytes_read(value.len() as u64)?;
        }
        Ok(value)
    }
    fn find_keys_by_prefix_new(
        &mut self,
        key_prefix: Vec<u8>,
    ) -> Result<Self::FindKeysByPrefix, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        self.resource_controller.track_read_operations(1)?;
        let receiver = self.execution_state_sender.send_request(move |callback| {
            Request::FindKeysByPrefix {
                id,
                key_prefix,
                callback,
            }
        })?;
        state.find_keys_queries.register(receiver)
    }
    fn find_keys_by_prefix_wait(
        &mut self,
        promise: &Self::FindKeysByPrefix,
    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        let keys = state.find_keys_queries.wait(*promise)?;
        let mut read_size = 0;
        for key in &keys {
            read_size += key.len();
        }
        self.resource_controller
            .track_bytes_read(read_size as u64)?;
        Ok(keys)
    }
    fn find_key_values_by_prefix_new(
        &mut self,
        key_prefix: Vec<u8>,
    ) -> Result<Self::FindKeyValuesByPrefix, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        self.resource_controller.track_read_operations(1)?;
        let receiver = self.execution_state_sender.send_request(move |callback| {
            Request::FindKeyValuesByPrefix {
                id,
                key_prefix,
                callback,
            }
        })?;
        state.find_key_values_queries.register(receiver)
    }
    fn find_key_values_by_prefix_wait(
        &mut self,
        promise: &Self::FindKeyValuesByPrefix,
    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ExecutionError> {
        let id = self.application_id()?;
        let state = self.view_user_states.entry(id).or_default();
        let key_values = state.find_key_values_queries.wait(*promise)?;
        let mut read_size = 0;
        for (key, value) in &key_values {
            read_size += key.len() + value.len();
        }
        self.resource_controller
            .track_bytes_read(read_size as u64)?;
        Ok(key_values)
    }
}
impl<UserInstance> Clone for SyncRuntime<UserInstance> {
    fn clone(&self) -> Self {
        SyncRuntime(self.0.clone())
    }
}
impl ContractSyncRuntime {
    pub(crate) fn run_action(
        execution_state_sender: ExecutionStateSender,
        application_id: UserApplicationId,
        chain_id: ChainId,
        refund_grant_to: Option<Account>,
        resource_controller: ResourceController,
        action: UserAction,
    ) -> Result<(Vec<ExecutionOutcome>, ResourceController), ExecutionError> {
        let executing_message = match &action {
            UserAction::Message(context, _) => Some(context.into()),
            _ => None,
        };
        let signer = action.signer();
        let height = action.height();
        let next_message_index = action.next_message_index();
        let mut runtime = ContractSyncRuntime::new(SyncRuntimeInternal::new(
            chain_id,
            height,
            signer,
            next_message_index,
            executing_message,
            execution_state_sender,
            refund_grant_to,
            resource_controller,
        ));
        let finalize_context = FinalizeContext {
            authenticated_signer: signer,
            chain_id,
            height,
            next_message_index,
        };
        runtime.execute(application_id, signer, move |code| match action {
            UserAction::Initialize(context, argument) => code.initialize(context, argument),
            UserAction::Operation(context, operation) => {
                code.execute_operation(context, operation).map(|_| ())
            }
            UserAction::Message(context, message) => code.execute_message(context, message),
        })?;
        runtime.finalize(finalize_context)?;
        let runtime = runtime
            .into_inner()
            .expect("Runtime clones should have been freed by now");
        Ok((runtime.execution_outcomes, runtime.resource_controller))
    }
    fn finalize(&mut self, context: FinalizeContext) -> Result<(), ExecutionError> {
        let applications = mem::take(&mut self.inner().applications_to_finalize)
            .into_iter()
            .rev();
        self.inner().is_finalizing = true;
        for application in applications {
            self.execute(application, context.authenticated_signer, |contract| {
                contract.finalize(context)
            })?;
        }
        self.inner().loaded_applications.clear();
        Ok(())
    }
    fn execute(
        &mut self,
        application_id: UserApplicationId,
        signer: Option<Owner>,
        closure: impl FnOnce(&mut UserContractInstance) -> Result<(), ExecutionError>,
    ) -> Result<(), ExecutionError> {
        match self.try_execute(application_id, signer, closure) {
            Ok(()) => Ok(()),
            Err(error) => {
                self.inner().loaded_applications.clear();
                Err(error)
            }
        }
    }
    fn try_execute(
        &mut self,
        application_id: UserApplicationId,
        signer: Option<Owner>,
        closure: impl FnOnce(&mut UserContractInstance) -> Result<(), ExecutionError>,
    ) -> Result<(), ExecutionError> {
        let contract = {
            let cloned_runtime = self.0.clone();
            let mut runtime = self.inner();
            let application = runtime.load_contract_instance(cloned_runtime, application_id)?;
            let status = ApplicationStatus {
                caller_id: None,
                id: application_id,
                parameters: application.parameters.clone(),
                signer,
                outcome: RawExecutionOutcome::default(),
            };
            runtime.push_application(status);
            application
        };
        closure(
            &mut contract
                .instance
                .try_lock()
                .expect("Application should not be already executing"),
        )?;
        let mut runtime = self.inner();
        let application_status = runtime.pop_application();
        assert_eq!(application_status.caller_id, None);
        assert_eq!(application_status.id, application_id);
        assert_eq!(application_status.parameters, contract.parameters);
        assert_eq!(application_status.signer, signer);
        assert!(runtime.call_stack.is_empty());
        runtime.handle_outcome(application_status.outcome, signer, application_id)?;
        Ok(())
    }
}
impl ContractRuntime for ContractSyncRuntime {
    fn authenticated_signer(&mut self) -> Result<Option<Owner>, ExecutionError> {
        Ok(self.inner().authenticated_signer)
    }
    fn message_id(&mut self) -> Result<Option<MessageId>, ExecutionError> {
        Ok(self.inner().executing_message.map(|metadata| metadata.id))
    }
    fn message_is_bouncing(&mut self) -> Result<Option<bool>, ExecutionError> {
        Ok(self
            .inner()
            .executing_message
            .map(|metadata| metadata.is_bouncing))
    }
    fn authenticated_caller_id(&mut self) -> Result<Option<UserApplicationId>, ExecutionError> {
        let mut this = self.inner();
        if this.call_stack.len() <= 1 {
            return Ok(None);
        }
        Ok(this.current_application().caller_id)
    }
    fn remaining_fuel(&mut self) -> Result<u64, ExecutionError> {
        Ok(self.inner().resource_controller.remaining_fuel())
    }
    fn consume_fuel(&mut self, fuel: u64) -> Result<(), ExecutionError> {
        let mut this = self.inner();
        this.resource_controller.track_fuel(fuel)
    }
    fn send_message(&mut self, message: SendMessageRequest<Vec<u8>>) -> Result<(), ExecutionError> {
        let mut this = self.inner();
        let application = this.current_application_mut();
        application.outcome.messages.push(message.into());
        Ok(())
    }
    fn subscribe(&mut self, chain: ChainId, channel: ChannelName) -> Result<(), ExecutionError> {
        let mut this = self.inner();
        let application = this.current_application_mut();
        application.outcome.subscribe.push((channel, chain));
        Ok(())
    }
    fn unsubscribe(&mut self, chain: ChainId, channel: ChannelName) -> Result<(), ExecutionError> {
        let mut this = self.inner();
        let application = this.current_application_mut();
        application.outcome.unsubscribe.push((channel, chain));
        Ok(())
    }
    fn transfer(
        &mut self,
        source: Option<Owner>,
        destination: Account,
        amount: Amount,
    ) -> Result<(), ExecutionError> {
        let signer = self.inner().current_application().signer;
        let execution_outcome = self
            .inner()
            .execution_state_sender
            .send_request(|callback| Request::Transfer {
                source,
                destination,
                amount,
                signer,
                callback,
            })?
            .recv_response()?;
        self.inner()
            .execution_outcomes
            .push(ExecutionOutcome::System(execution_outcome));
        Ok(())
    }
    fn claim(
        &mut self,
        source: Account,
        destination: Account,
        amount: Amount,
    ) -> Result<(), ExecutionError> {
        let signer = self.inner().current_application().signer;
        let execution_outcome = self
            .inner()
            .execution_state_sender
            .send_request(|callback| Request::Claim {
                source,
                destination,
                amount,
                signer,
                callback,
            })?
            .recv_response()?
            .with_authenticated_signer(signer);
        self.inner()
            .execution_outcomes
            .push(ExecutionOutcome::System(execution_outcome));
        Ok(())
    }
    fn try_call_application(
        &mut self,
        authenticated: bool,
        callee_id: UserApplicationId,
        argument: Vec<u8>,
    ) -> Result<Vec<u8>, ExecutionError> {
        let cloned_self = self.clone().0;
        let (contract, context) =
            self.inner()
                .prepare_for_call(cloned_self, authenticated, callee_id)?;
        let value = contract
            .try_lock()
            .expect("Applications should not have reentrant calls")
            .execute_operation(context, argument)?;
        self.inner().finish_call()?;
        Ok(value)
    }
    fn open_chain(
        &mut self,
        ownership: ChainOwnership,
        balance: Amount,
    ) -> Result<ChainId, ExecutionError> {
        let mut this = self.inner();
        let id = this.current_application().id;
        let next_message_id = MessageId {
            chain_id: this.chain_id,
            height: this.height,
            index: this.next_message_index()?,
        };
        let chain_id = ChainId::child(next_message_id);
        let application_permissions = ApplicationPermissions::new_single(id);
        let [open_chain_message, subscribe_message] = this
            .execution_state_sender
            .send_request(|callback| Request::OpenChain {
                ownership,
                balance,
                next_message_id,
                application_permissions,
                callback,
            })?
            .recv_response()?;
        let outcome = RawExecutionOutcome::default()
            .with_message(open_chain_message)
            .with_message(subscribe_message);
        this.execution_outcomes
            .push(ExecutionOutcome::System(outcome));
        Ok(chain_id)
    }
    fn close_chain(&mut self) -> Result<(), ExecutionError> {
        let mut this = self.inner();
        let application_id = this.current_application().id;
        this.execution_state_sender
            .send_request(|callback| Request::CloseChain {
                application_id,
                callback,
            })?
            .recv_response()?
    }
}
impl ServiceSyncRuntime {
    pub(crate) fn run_query(
        execution_state_sender: ExecutionStateSender,
        application_id: UserApplicationId,
        context: crate::QueryContext,
        query: Vec<u8>,
    ) -> Result<Vec<u8>, ExecutionError> {
        let runtime_internal = SyncRuntimeInternal::new(
            context.chain_id,
            context.next_block_height,
            None,
            0,
            None,
            execution_state_sender,
            None,
            ResourceController::default(),
        );
        let mut runtime = ServiceSyncRuntime::new(runtime_internal);
        let result = runtime.try_query_application(application_id, query);
        runtime.inner().loaded_applications.clear();
        result
    }
}
impl ServiceRuntime for ServiceSyncRuntime {
    fn try_query_application(
        &mut self,
        queried_id: UserApplicationId,
        argument: Vec<u8>,
    ) -> Result<Vec<u8>, ExecutionError> {
        let (query_context, service) = {
            let cloned_self = self.clone().0;
            let mut this = self.inner();
            let application = this.load_service_instance(cloned_self, queried_id)?;
            let query_context = crate::QueryContext {
                chain_id: this.chain_id,
                next_block_height: this.height,
            };
            this.push_application(ApplicationStatus {
                caller_id: None,
                id: queried_id,
                parameters: application.parameters,
                signer: None,
                outcome: RawExecutionOutcome::default(),
            });
            (query_context, application.instance)
        };
        let response = service
            .try_lock()
            .expect("Applications should not have reentrant calls")
            .handle_query(query_context, argument)?;
        self.inner().pop_application();
        Ok(response)
    }
}
#[derive(Clone, Copy, Debug)]
struct ExecutingMessage {
    id: MessageId,
    is_bouncing: bool,
}
impl From<&MessageContext> for ExecutingMessage {
    fn from(context: &MessageContext) -> Self {
        ExecutingMessage {
            id: context.message_id,
            is_bouncing: context.is_bouncing,
        }
    }
}