Skip to main content

linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, vec};
5
6use allocative::Allocative;
7use futures::{FutureExt, StreamExt};
8use linera_base::{
9    crypto::CryptoHash,
10    data_types::{BlobContent, BlockHeight, StreamUpdate},
11    identifiers::{AccountOwner, BlobId, StreamId},
12    time::Instant,
13};
14use linera_views::{
15    context::Context,
16    key_value_store_view::KeyValueStoreView,
17    map_view::MapView,
18    reentrant_collection_view::HashedReentrantCollectionView,
19    views::{ClonableView, HashableView as _, ReplaceContext, View},
20    ViewError,
21};
22use linera_views_derive::HashableView;
23#[cfg(with_testing)]
24use {
25    crate::{
26        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
27    },
28    linera_base::data_types::Blob,
29    linera_views::context::MemoryContext,
30    std::sync::Arc,
31};
32
33use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
34use crate::{
35    execution_state_actor::ExecutionStateActor, resources::ResourceController,
36    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
37    Deserialize, ExecutionError, ExecutionRuntimeContext, JsVec, MessageContext, OperationContext,
38    ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize, ServiceSyncRuntime,
39    Timestamp, TransactionTracker, FLAG_ZERO_HASH,
40};
41
42/// A view accessing the execution state of a chain.
43#[derive(Debug, ClonableView, HashableView, Allocative)]
44#[allocative(bound = "C")]
45pub struct ExecutionStateView<C> {
46    /// System application.
47    pub system: SystemExecutionStateView<C>,
48    /// User applications.
49    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
50    /// The number of events in the streams that this chain is writing to.
51    pub stream_event_counts: MapView<C, StreamId, u32>,
52}
53
54impl<C> ExecutionStateView<C>
55where
56    C: Context + Clone + 'static,
57    C::Extra: ExecutionRuntimeContext,
58{
59    pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
60        if self
61            .system
62            .current_committee()
63            .await?
64            .is_some_and(|(_epoch, committee)| {
65                committee
66                    .policy()
67                    .http_request_allow_list
68                    .contains(FLAG_ZERO_HASH)
69            })
70        {
71            Ok(CryptoHash::from([0; 32]))
72        } else {
73            #[derive(Serialize, Deserialize)]
74            struct ExecutionStateViewHash([u8; 32]);
75            impl BcsHashable<'_> for ExecutionStateViewHash {}
76            self.hash_mut()
77                .await
78                .map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
79        }
80    }
81}
82
83impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
84    type Target = ExecutionStateView<C2>;
85
86    async fn with_context(
87        &mut self,
88        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
89    ) -> Self::Target {
90        ExecutionStateView {
91            system: self.system.with_context(ctx.clone()).await,
92            users: self.users.with_context(ctx.clone()).await,
93            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
94        }
95    }
96}
97
98/// How to interact with a long-lived service runtime.
99pub struct ServiceRuntimeEndpoint {
100    /// How to receive requests.
101    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
102    /// How to query the runtime.
103    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
104}
105
106#[cfg(with_testing)]
107impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
108where
109    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
110{
111    /// Simulates the instantiation of an application.
112    pub async fn simulate_instantiation(
113        &mut self,
114        contract: UserContractCode,
115        local_time: linera_base::data_types::Timestamp,
116        application_description: ApplicationDescription,
117        instantiation_argument: Vec<u8>,
118        contract_blob: Blob,
119        service_blob: Blob,
120    ) -> Result<(), ExecutionError> {
121        let chain_id = application_description.creator_chain_id;
122        assert_eq!(chain_id, self.context().extra().chain_id);
123        let context = OperationContext {
124            chain_id,
125            authenticated_signer: None,
126            height: application_description.block_height,
127            round: None,
128            timestamp: local_time,
129        };
130
131        let action = UserAction::Instantiate(context, instantiation_argument);
132        let next_application_index = application_description.application_index + 1;
133        let next_chain_index = 0;
134
135        let application_id = From::from(&application_description);
136        let blob = Blob::new_application_description(&application_description);
137
138        self.system.used_blobs.insert(&blob.id())?;
139        self.system.used_blobs.insert(&contract_blob.id())?;
140        self.system.used_blobs.insert(&service_blob.id())?;
141
142        self.context()
143            .extra()
144            .user_contracts()
145            .pin()
146            .insert(application_id, contract);
147
148        self.context()
149            .extra()
150            .add_blobs([
151                contract_blob,
152                service_blob,
153                Blob::new_application_description(&application_description),
154            ])
155            .await?;
156
157        let tracker = ResourceTracker::default();
158        let policy = ResourceControlPolicy::no_fees();
159        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
160        let mut txn_tracker = TransactionTracker::new(
161            local_time,
162            0,
163            next_application_index,
164            next_chain_index,
165            None,
166            &[],
167        );
168        txn_tracker.add_created_blob(blob);
169        Box::pin(
170            ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
171                .run_user_action(application_id, action, context.refund_grant_to(), None),
172        )
173        .await?;
174
175        Ok(())
176    }
177}
178
179pub enum UserAction {
180    Instantiate(OperationContext, Vec<u8>),
181    Operation(OperationContext, Vec<u8>),
182    Message(MessageContext, Vec<u8>),
183    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
184}
185
186impl UserAction {
187    pub(crate) fn signer(&self) -> Option<AccountOwner> {
188        match self {
189            UserAction::Instantiate(context, _) => context.authenticated_signer,
190            UserAction::Operation(context, _) => context.authenticated_signer,
191            UserAction::ProcessStreams(_, _) => None,
192            UserAction::Message(context, _) => context.authenticated_signer,
193        }
194    }
195
196    pub(crate) fn height(&self) -> BlockHeight {
197        match self {
198            UserAction::Instantiate(context, _) => context.height,
199            UserAction::Operation(context, _) => context.height,
200            UserAction::ProcessStreams(context, _) => context.height,
201            UserAction::Message(context, _) => context.height,
202        }
203    }
204
205    pub(crate) fn round(&self) -> Option<u32> {
206        match self {
207            UserAction::Instantiate(context, _) => context.round,
208            UserAction::Operation(context, _) => context.round,
209            UserAction::ProcessStreams(context, _) => context.round,
210            UserAction::Message(context, _) => context.round,
211        }
212    }
213
214    pub(crate) fn timestamp(&self) -> Timestamp {
215        match self {
216            UserAction::Instantiate(context, _) => context.timestamp,
217            UserAction::Operation(context, _) => context.timestamp,
218            UserAction::ProcessStreams(context, _) => context.timestamp,
219            UserAction::Message(context, _) => context.timestamp,
220        }
221    }
222}
223
224impl<C> ExecutionStateView<C>
225where
226    C: Context + Clone + 'static,
227    C::Extra: ExecutionRuntimeContext,
228{
229    pub async fn query_application(
230        &mut self,
231        context: QueryContext,
232        query: Query,
233        endpoint: Option<&mut ServiceRuntimeEndpoint>,
234    ) -> Result<QueryOutcome, ExecutionError> {
235        assert_eq!(context.chain_id, self.context().extra().chain_id());
236        match query {
237            Query::System(query) => {
238                let outcome = self.system.handle_query(context, query)?;
239                Ok(outcome.into())
240            }
241            Query::User {
242                application_id,
243                bytes,
244            } => {
245                let outcome = match endpoint {
246                    Some(endpoint) => {
247                        self.query_user_application_with_long_lived_service(
248                            application_id,
249                            context,
250                            bytes,
251                            &mut endpoint.incoming_execution_requests,
252                            &mut endpoint.runtime_request_sender,
253                        )
254                        .await?
255                    }
256                    None => {
257                        self.query_user_application(application_id, context, bytes)
258                            .await?
259                    }
260                };
261                Ok(outcome.into())
262            }
263        }
264    }
265
266    async fn query_user_application(
267        &mut self,
268        application_id: ApplicationId,
269        context: QueryContext,
270        query: Vec<u8>,
271    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
272        self.query_user_application_with_deadline(
273            application_id,
274            context,
275            query,
276            None,
277            BTreeMap::new(),
278        )
279        .await
280    }
281
282    pub(crate) async fn query_user_application_with_deadline(
283        &mut self,
284        application_id: ApplicationId,
285        context: QueryContext,
286        query: Vec<u8>,
287        deadline: Option<Instant>,
288        created_blobs: BTreeMap<BlobId, BlobContent>,
289    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
290        let (execution_state_sender, mut execution_state_receiver) =
291            futures::channel::mpsc::unbounded();
292        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
293        let mut resource_controller = ResourceController::default();
294        let thread_pool = self.context().extra().thread_pool().clone();
295        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
296
297        let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
298
299        let service_runtime_task = thread_pool
300            .run_send(JsVec(codes), move |codes| async move {
301                let mut runtime = ServiceSyncRuntime::new_with_deadline(
302                    execution_state_sender,
303                    context,
304                    deadline,
305                );
306
307                for (code, description) in codes.0.into_iter().zip(descriptions) {
308                    runtime.preload_service(
309                        ApplicationId::from(&description),
310                        code,
311                        description,
312                    )?;
313                }
314
315                runtime.run_query(application_id, query)
316            })
317            .await;
318
319        while let Some(request) = execution_state_receiver.next().await {
320            actor.handle_request(request).await?;
321        }
322
323        service_runtime_task.await?
324    }
325
326    async fn query_user_application_with_long_lived_service(
327        &mut self,
328        application_id: ApplicationId,
329        context: QueryContext,
330        query: Vec<u8>,
331        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
332            ExecutionRequest,
333        >,
334        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
335    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
336        let (outcome_sender, outcome_receiver) = oneshot::channel();
337        let mut outcome_receiver = outcome_receiver.fuse();
338
339        runtime_request_sender
340            .send(ServiceRuntimeRequest::Query {
341                application_id,
342                context,
343                query,
344                callback: outcome_sender,
345            })
346            .expect("Service runtime thread should only stop when `request_sender` is dropped");
347
348        let mut txn_tracker = TransactionTracker::default();
349        let mut resource_controller = ResourceController::default();
350        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
351
352        loop {
353            futures::select! {
354                maybe_request = incoming_execution_requests.next() => {
355                    if let Some(request) = maybe_request {
356                        actor.handle_request(request).await?;
357                    }
358                }
359                outcome = &mut outcome_receiver => {
360                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
361                }
362            }
363        }
364    }
365
366    pub async fn list_applications(
367        &self,
368    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
369        let mut applications = vec![];
370        for app_id in self.users.indices().await? {
371            let blob_id = app_id.description_blob_id();
372            let blob_content = self.system.read_blob_content(blob_id).await?;
373            let application_description = bcs::from_bytes(blob_content.bytes())?;
374            applications.push((app_id, application_description));
375        }
376        Ok(applications)
377    }
378}