use std::collections::{BTreeMap, BTreeSet, HashMap};
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use linera_base::{
data_types::{Amount, BlockHeight},
identifiers::{Account, ChainId, Destination, Owner},
};
use linera_views::{
common::Context,
key_value_store_view::KeyValueStoreView,
reentrant_collection_view::HashedReentrantCollectionView,
views::{View, ViewError},
};
use linera_views_derive::CryptoHashView;
#[cfg(with_testing)]
use {
crate::{
ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
},
linera_views::memory::MemoryContext,
std::sync::Arc,
};
use crate::{
resources::ResourceController, system::SystemExecutionStateView, ContractSyncRuntime,
ExecutionError, ExecutionOutcome, ExecutionRuntimeConfig, ExecutionRuntimeContext, Message,
MessageContext, MessageKind, Operation, OperationContext, Query, QueryContext,
RawExecutionOutcome, RawOutgoingMessage, Response, ServiceSyncRuntime, SystemMessage,
UserApplicationDescription, UserApplicationId,
};
#[derive(Debug, CryptoHashView)]
pub struct ExecutionStateView<C> {
pub system: SystemExecutionStateView<C>,
pub users: HashedReentrantCollectionView<C, UserApplicationId, KeyValueStoreView<C>>,
}
#[cfg(with_testing)]
impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
where
MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
ViewError:
From<<MemoryContext<TestExecutionRuntimeContext> as linera_views::common::Context>::Error>,
{
pub async fn simulate_initialization(
&mut self,
contract: UserContractCode,
application_description: UserApplicationDescription,
initialization_argument: Vec<u8>,
) -> Result<(), ExecutionError> {
let chain_id = application_description.creation.chain_id;
let context = OperationContext {
chain_id,
authenticated_signer: None,
authenticated_caller_id: None,
height: application_description.creation.height,
index: Some(application_description.creation.index),
next_message_index: 0,
};
let action = UserAction::Initialize(context, initialization_argument);
let application_id = self
.system
.registry
.register_application(application_description)
.await?;
self.context()
.extra()
.user_contracts()
.insert(application_id, contract);
let tracker = ResourceTracker::default();
let policy = ResourceControlPolicy::default();
let mut resource_controller = ResourceController {
policy: Arc::new(policy),
tracker,
account: None,
};
self.run_user_action(
application_id,
chain_id,
action,
context.refund_grant_to(),
None,
&mut resource_controller,
)
.await?;
Ok(())
}
}
pub enum UserAction {
Initialize(OperationContext, Vec<u8>),
Operation(OperationContext, Vec<u8>),
Message(MessageContext, Vec<u8>),
}
impl UserAction {
pub(crate) fn signer(&self) -> Option<Owner> {
use UserAction::*;
match self {
Initialize(context, _) => context.authenticated_signer,
Operation(context, _) => context.authenticated_signer,
Message(context, _) => context.authenticated_signer,
}
}
pub(crate) fn height(&self) -> BlockHeight {
match self {
UserAction::Initialize(context, _) => context.height,
UserAction::Operation(context, _) => context.height,
UserAction::Message(context, _) => context.height,
}
}
pub(crate) fn next_message_index(&self) -> u32 {
match self {
UserAction::Initialize(context, _) => context.next_message_index,
UserAction::Operation(context, _) => context.next_message_index,
UserAction::Message(context, _) => context.next_message_index,
}
}
}
impl<C> ExecutionStateView<C>
where
C: Context + Clone + Send + Sync + 'static,
ViewError: From<C::Error>,
C::Extra: ExecutionRuntimeContext,
{
async fn run_user_action(
&mut self,
application_id: UserApplicationId,
chain_id: ChainId,
action: UserAction,
refund_grant_to: Option<Account>,
grant: Option<&mut Amount>,
resource_controller: &mut ResourceController<Option<Owner>>,
) -> Result<Vec<ExecutionOutcome>, ExecutionError> {
let execution_outcomes = match self.context().extra().execution_runtime_config() {
ExecutionRuntimeConfig::Synchronous => {
self.run_user_action_with_synchronous_runtime(
application_id,
chain_id,
action,
refund_grant_to,
grant,
resource_controller,
)
.await?
}
};
self.update_execution_outcomes_with_app_registrations(execution_outcomes)
.await
}
async fn run_user_action_with_synchronous_runtime(
&mut self,
application_id: UserApplicationId,
chain_id: ChainId,
action: UserAction,
refund_grant_to: Option<Account>,
grant: Option<&mut Amount>,
resource_controller: &mut ResourceController<Option<Owner>>,
) -> Result<Vec<ExecutionOutcome>, ExecutionError> {
let mut cloned_grant = grant.as_ref().map(|x| **x);
let initial_balance = resource_controller
.with_state_and_grant(self, cloned_grant.as_mut())
.await?
.balance()?;
let controller = ResourceController {
policy: resource_controller.policy.clone(),
tracker: resource_controller.tracker,
account: initial_balance,
};
let (execution_state_sender, mut execution_state_receiver) =
futures::channel::mpsc::unbounded();
let execution_outcomes_future = tokio::task::spawn_blocking(move || {
ContractSyncRuntime::run_action(
execution_state_sender,
application_id,
chain_id,
refund_grant_to,
controller,
action,
)
});
while let Some(request) = execution_state_receiver.next().await {
self.handle_request(request).await?;
}
let (execution_outcomes, controller) = execution_outcomes_future.await??;
resource_controller
.with_state_and_grant(self, grant)
.await?
.merge_balance(initial_balance, controller.balance()?)?;
resource_controller.tracker = controller.tracker;
Ok(execution_outcomes)
}
async fn update_execution_outcomes_with_app_registrations(
&self,
mut results: Vec<ExecutionOutcome>,
) -> Result<Vec<ExecutionOutcome>, ExecutionError> {
let user_application_outcomes = results.iter().filter_map(|outcome| match outcome {
ExecutionOutcome::User(application_id, result) => Some((application_id, result)),
_ => None,
});
let mut applications_to_register_per_destination = BTreeMap::<_, BTreeSet<_>>::new();
for (application_id, result) in user_application_outcomes {
for message in &result.messages {
applications_to_register_per_destination
.entry(&message.destination)
.or_default()
.insert(*application_id);
}
}
if applications_to_register_per_destination.is_empty() {
return Ok(results);
}
let messages = applications_to_register_per_destination
.into_iter()
.map(|(destination, applications_to_describe)| async {
let applications = self
.system
.registry
.describe_applications_with_dependencies(
applications_to_describe.into_iter().collect(),
&HashMap::new(),
)
.await?;
Ok::<_, ExecutionError>(RawOutgoingMessage {
destination: destination.clone(),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Simple,
message: SystemMessage::RegisterApplications { applications },
})
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await?;
let system_outcome = RawExecutionOutcome {
messages,
..RawExecutionOutcome::default()
};
results.insert(0, ExecutionOutcome::System(system_outcome));
Ok(results)
}
pub async fn execute_operation(
&mut self,
context: OperationContext,
operation: Operation,
resource_controller: &mut ResourceController<Option<Owner>>,
) -> Result<Vec<ExecutionOutcome>, ExecutionError> {
assert_eq!(context.chain_id, self.context().extra().chain_id());
match operation {
Operation::System(op) => {
let (mut result, new_application) =
self.system.execute_operation(context, op).await?;
result.authenticated_signer = context.authenticated_signer;
result.refund_grant_to = context.refund_grant_to();
let mut outcomes = vec![ExecutionOutcome::System(result)];
if let Some((application_id, argument)) = new_application {
let user_action = UserAction::Initialize(context, argument);
outcomes.extend(
self.run_user_action(
application_id,
context.chain_id,
user_action,
context.refund_grant_to(),
None,
resource_controller,
)
.await?,
);
}
Ok(outcomes)
}
Operation::User {
application_id,
bytes,
} => {
self.run_user_action(
application_id,
context.chain_id,
UserAction::Operation(context, bytes),
context.refund_grant_to(),
None,
resource_controller,
)
.await
}
}
}
pub async fn execute_message(
&mut self,
context: MessageContext,
message: Message,
grant: Option<&mut Amount>,
resource_controller: &mut ResourceController<Option<Owner>>,
) -> Result<Vec<ExecutionOutcome>, ExecutionError> {
assert_eq!(context.chain_id, self.context().extra().chain_id());
match message {
Message::System(message) => {
let outcome = self.system.execute_message(context, message).await?;
Ok(vec![ExecutionOutcome::System(outcome)])
}
Message::User {
application_id,
bytes,
} => {
self.run_user_action(
application_id,
context.chain_id,
UserAction::Message(context, bytes),
context.refund_grant_to,
grant,
resource_controller,
)
.await
}
}
}
pub async fn bounce_message(
&self,
context: MessageContext,
grant: Amount,
refund_grant_to: Option<Account>,
message: Message,
) -> Result<Vec<ExecutionOutcome>, ExecutionError> {
assert_eq!(context.chain_id, self.context().extra().chain_id());
match message {
Message::System(message) => {
let mut outcome = RawExecutionOutcome {
authenticated_signer: context.authenticated_signer,
refund_grant_to,
..Default::default()
};
outcome.messages.push(RawOutgoingMessage {
destination: Destination::Recipient(context.message_id.chain_id),
authenticated: true,
grant,
kind: MessageKind::Bouncing,
message,
});
Ok(vec![ExecutionOutcome::System(outcome)])
}
Message::User {
application_id,
bytes,
} => {
let mut outcome = RawExecutionOutcome {
authenticated_signer: context.authenticated_signer,
refund_grant_to,
..Default::default()
};
outcome.messages.push(RawOutgoingMessage {
destination: Destination::Recipient(context.message_id.chain_id),
authenticated: true,
grant,
kind: MessageKind::Bouncing,
message: bytes,
});
Ok(vec![ExecutionOutcome::User(application_id, outcome)])
}
}
}
pub async fn send_refund(
&self,
context: MessageContext,
amount: Amount,
account: Account,
) -> Result<ExecutionOutcome, ExecutionError> {
assert_eq!(context.chain_id, self.context().extra().chain_id());
let mut outcome = RawExecutionOutcome::default();
let message = RawOutgoingMessage {
destination: Destination::Recipient(account.chain_id),
authenticated: false,
grant: Amount::ZERO,
kind: MessageKind::Tracked,
message: SystemMessage::Credit {
amount,
source: context.authenticated_signer,
target: account.owner,
},
};
outcome.messages.push(message);
Ok(ExecutionOutcome::System(outcome))
}
pub async fn query_application(
&mut self,
context: QueryContext,
query: Query,
) -> Result<Response, ExecutionError> {
assert_eq!(context.chain_id, self.context().extra().chain_id());
match query {
Query::System(query) => {
let response = self.system.handle_query(context, query).await?;
Ok(Response::System(response))
}
Query::User {
application_id,
bytes,
} => {
let response = match self.context().extra().execution_runtime_config() {
ExecutionRuntimeConfig::Synchronous => {
self.query_application_with_sync_runtime(application_id, context, bytes)
.await?
}
};
Ok(Response::User(response))
}
}
}
async fn query_application_with_sync_runtime(
&mut self,
application_id: UserApplicationId,
context: QueryContext,
query: Vec<u8>,
) -> Result<Vec<u8>, ExecutionError> {
let (execution_state_sender, mut execution_state_receiver) =
futures::channel::mpsc::unbounded();
let query_result_future = tokio::task::spawn_blocking(move || {
ServiceSyncRuntime::run_query(execution_state_sender, application_id, context, query)
});
while let Some(request) = execution_state_receiver.next().await {
self.handle_request(request).await?;
}
query_result_future.await?
}
pub async fn list_applications(
&self,
) -> Result<Vec<(UserApplicationId, UserApplicationDescription)>, ExecutionError> {
let mut applications = vec![];
for index in self.system.registry.known_applications.indices().await? {
let application_description =
self.system.registry.known_applications.get(&index).await?;
if let Some(application_description) = application_description {
applications.push((index, application_description));
}
}
Ok(applications)
}
}