use std::{collections::BTreeMap, vec};
use allocative::Allocative;
use futures::{FutureExt, StreamExt};
use linera_base::{
crypto::CryptoHash,
data_types::{BlobContent, BlockHeight, StreamUpdate},
identifiers::{AccountOwner, BlobId, StreamId},
time::Instant,
};
use linera_views::{
context::Context,
key_value_store_view::KeyValueStoreView,
map_view::MapView,
reentrant_collection_view::HashedReentrantCollectionView,
views::{ClonableView, HashableView as _, ReplaceContext, View},
ViewError,
};
use linera_views_derive::HashableView;
#[cfg(with_testing)]
use {
crate::{
ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
},
linera_base::data_types::Blob,
linera_views::context::MemoryContext,
std::sync::Arc,
};
use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
use crate::{
execution_state_actor::ExecutionStateActor, resources::ResourceController,
system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
Deserialize, ExecutionError, ExecutionRuntimeContext, JsVec, MessageContext, OperationContext,
ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize, ServiceSyncRuntime,
Timestamp, TransactionTracker, FLAG_ZERO_HASH,
};
#[derive(Debug, ClonableView, HashableView, Allocative)]
#[allocative(bound = "C")]
pub struct ExecutionStateView<C> {
pub system: SystemExecutionStateView<C>,
pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
pub stream_event_counts: MapView<C, StreamId, u32>,
}
impl<C> ExecutionStateView<C>
where
C: Context + Clone + 'static,
C::Extra: ExecutionRuntimeContext,
{
pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
if self
.system
.current_committee()
.await?
.is_some_and(|(_epoch, committee)| {
committee
.policy()
.http_request_allow_list
.contains(FLAG_ZERO_HASH)
})
{
Ok(CryptoHash::from([0; 32]))
} else {
#[derive(Serialize, Deserialize)]
struct ExecutionStateViewHash([u8; 32]);
impl BcsHashable<'_> for ExecutionStateViewHash {}
self.hash_mut()
.await
.map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
}
}
}
impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
type Target = ExecutionStateView<C2>;
async fn with_context(
&mut self,
ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
) -> Self::Target {
ExecutionStateView {
system: self.system.with_context(ctx.clone()).await,
users: self.users.with_context(ctx.clone()).await,
stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
}
}
}
pub struct ServiceRuntimeEndpoint {
pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
}
#[cfg(with_testing)]
impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
where
MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
{
pub async fn simulate_instantiation(
&mut self,
contract: UserContractCode,
local_time: linera_base::data_types::Timestamp,
application_description: ApplicationDescription,
instantiation_argument: Vec<u8>,
contract_blob: Blob,
service_blob: Blob,
) -> Result<(), ExecutionError> {
let chain_id = application_description.creator_chain_id;
assert_eq!(chain_id, self.context().extra().chain_id);
let context = OperationContext {
chain_id,
authenticated_signer: None,
height: application_description.block_height,
round: None,
timestamp: local_time,
};
let action = UserAction::Instantiate(context, instantiation_argument);
let next_application_index = application_description.application_index + 1;
let next_chain_index = 0;
let application_id = From::from(&application_description);
let blob = Blob::new_application_description(&application_description);
self.system.used_blobs.insert(&blob.id())?;
self.system.used_blobs.insert(&contract_blob.id())?;
self.system.used_blobs.insert(&service_blob.id())?;
self.context()
.extra()
.user_contracts()
.pin()
.insert(application_id, contract);
self.context()
.extra()
.add_blobs([
contract_blob,
service_blob,
Blob::new_application_description(&application_description),
])
.await?;
let tracker = ResourceTracker::default();
let policy = ResourceControlPolicy::no_fees();
let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
let mut txn_tracker = TransactionTracker::new(
local_time,
0,
next_application_index,
next_chain_index,
None,
&[],
);
txn_tracker.add_created_blob(blob);
Box::pin(
ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
.run_user_action(application_id, action, context.refund_grant_to(), None),
)
.await?;
Ok(())
}
}
pub enum UserAction {
Instantiate(OperationContext, Vec<u8>),
Operation(OperationContext, Vec<u8>),
Message(MessageContext, Vec<u8>),
ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
}
impl UserAction {
pub(crate) fn signer(&self) -> Option<AccountOwner> {
match self {
UserAction::Instantiate(context, _) => context.authenticated_signer,
UserAction::Operation(context, _) => context.authenticated_signer,
UserAction::ProcessStreams(_, _) => None,
UserAction::Message(context, _) => context.authenticated_signer,
}
}
pub(crate) fn height(&self) -> BlockHeight {
match self {
UserAction::Instantiate(context, _) => context.height,
UserAction::Operation(context, _) => context.height,
UserAction::ProcessStreams(context, _) => context.height,
UserAction::Message(context, _) => context.height,
}
}
pub(crate) fn round(&self) -> Option<u32> {
match self {
UserAction::Instantiate(context, _) => context.round,
UserAction::Operation(context, _) => context.round,
UserAction::ProcessStreams(context, _) => context.round,
UserAction::Message(context, _) => context.round,
}
}
pub(crate) fn timestamp(&self) -> Timestamp {
match self {
UserAction::Instantiate(context, _) => context.timestamp,
UserAction::Operation(context, _) => context.timestamp,
UserAction::ProcessStreams(context, _) => context.timestamp,
UserAction::Message(context, _) => context.timestamp,
}
}
}
impl<C> ExecutionStateView<C>
where
C: Context + Clone + 'static,
C::Extra: ExecutionRuntimeContext,
{
pub async fn query_application(
&mut self,
context: QueryContext,
query: Query,
endpoint: Option<&mut ServiceRuntimeEndpoint>,
) -> Result<QueryOutcome, ExecutionError> {
assert_eq!(context.chain_id, self.context().extra().chain_id());
match query {
Query::System(query) => {
let outcome = self.system.handle_query(context, query)?;
Ok(outcome.into())
}
Query::User {
application_id,
bytes,
} => {
let outcome = match endpoint {
Some(endpoint) => {
self.query_user_application_with_long_lived_service(
application_id,
context,
bytes,
&mut endpoint.incoming_execution_requests,
&mut endpoint.runtime_request_sender,
)
.await?
}
None => {
self.query_user_application(application_id, context, bytes)
.await?
}
};
Ok(outcome.into())
}
}
}
async fn query_user_application(
&mut self,
application_id: ApplicationId,
context: QueryContext,
query: Vec<u8>,
) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
self.query_user_application_with_deadline(
application_id,
context,
query,
None,
BTreeMap::new(),
)
.await
}
pub(crate) async fn query_user_application_with_deadline(
&mut self,
application_id: ApplicationId,
context: QueryContext,
query: Vec<u8>,
deadline: Option<Instant>,
created_blobs: BTreeMap<BlobId, BlobContent>,
) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
let (execution_state_sender, mut execution_state_receiver) =
futures::channel::mpsc::unbounded();
let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
let mut resource_controller = ResourceController::default();
let thread_pool = self.context().extra().thread_pool().clone();
let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
let service_runtime_task = thread_pool
.run_send(JsVec(codes), move |codes| async move {
let mut runtime = ServiceSyncRuntime::new_with_deadline(
execution_state_sender,
context,
deadline,
);
for (code, description) in codes.0.into_iter().zip(descriptions) {
runtime.preload_service(
ApplicationId::from(&description),
code,
description,
)?;
}
runtime.run_query(application_id, query)
})
.await;
while let Some(request) = execution_state_receiver.next().await {
actor.handle_request(request).await?;
}
service_runtime_task.await?
}
async fn query_user_application_with_long_lived_service(
&mut self,
application_id: ApplicationId,
context: QueryContext,
query: Vec<u8>,
incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
ExecutionRequest,
>,
runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
let (outcome_sender, outcome_receiver) = oneshot::channel();
let mut outcome_receiver = outcome_receiver.fuse();
runtime_request_sender
.send(ServiceRuntimeRequest::Query {
application_id,
context,
query,
callback: outcome_sender,
})
.expect("Service runtime thread should only stop when `request_sender` is dropped");
let mut txn_tracker = TransactionTracker::default();
let mut resource_controller = ResourceController::default();
let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
loop {
futures::select! {
maybe_request = incoming_execution_requests.next() => {
if let Some(request) = maybe_request {
actor.handle_request(request).await?;
}
}
outcome = &mut outcome_receiver => {
return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
}
}
}
}
pub async fn list_applications(
&self,
) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
let mut applications = vec![];
for app_id in self.users.indices().await? {
let blob_id = app_id.description_blob_id();
let blob_content = self.system.read_blob_content(blob_id).await?;
let application_description = bcs::from_bytes(blob_content.bytes())?;
applications.push((app_id, application_description));
}
Ok(applications)
}
}