linera_execution/
execution.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8    crypto::CryptoHash,
9    data_types::{BlobContent, BlockHeight, StreamUpdate},
10    identifiers::{AccountOwner, BlobId, StreamId},
11    time::Instant,
12};
13use linera_views::{
14    context::Context,
15    key_value_store_view::KeyValueStoreView,
16    map_view::MapView,
17    reentrant_collection_view::HashedReentrantCollectionView,
18    views::{ClonableView, HashableView as _, ReplaceContext, View},
19    ViewError,
20};
21use linera_views_derive::HashableView;
22#[cfg(with_testing)]
23use {
24    crate::{
25        ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
26    },
27    linera_base::data_types::Blob,
28    linera_views::context::MemoryContext,
29    std::sync::Arc,
30};
31
32use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
33use crate::{
34    execution_state_actor::ExecutionStateActor, resources::ResourceController,
35    system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
36    Deserialize, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext,
37    OperationContext, ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize,
38    ServiceSyncRuntime, Timestamp, TransactionTracker, FLAG_ZERO_HASH,
39};
40
41/// A view accessing the execution state of a chain.
42#[derive(Debug, ClonableView, HashableView)]
43pub struct ExecutionStateView<C> {
44    /// System application.
45    pub system: SystemExecutionStateView<C>,
46    /// User applications.
47    pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
48    /// The number of events in the streams that this chain is writing to.
49    pub stream_event_counts: MapView<C, StreamId, u32>,
50}
51
52impl<C> ExecutionStateView<C>
53where
54    C: Context + Clone + Send + Sync + 'static,
55    C::Extra: ExecutionRuntimeContext,
56{
57    pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
58        if self
59            .system
60            .current_committee()
61            .is_some_and(|(_epoch, committee)| {
62                committee
63                    .policy()
64                    .http_request_allow_list
65                    .contains(FLAG_ZERO_HASH)
66            })
67        {
68            Ok(CryptoHash::from([0; 32]))
69        } else {
70            #[derive(Serialize, Deserialize)]
71            struct ExecutionStateViewHash([u8; 32]);
72            impl BcsHashable<'_> for ExecutionStateViewHash {}
73            self.hash_mut()
74                .await
75                .map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
76        }
77    }
78}
79
80impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
81    type Target = ExecutionStateView<C2>;
82
83    async fn with_context(
84        &mut self,
85        ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
86    ) -> Self::Target {
87        ExecutionStateView {
88            system: self.system.with_context(ctx.clone()).await,
89            users: self.users.with_context(ctx.clone()).await,
90            stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
91        }
92    }
93}
94
95/// How to interact with a long-lived service runtime.
96pub struct ServiceRuntimeEndpoint {
97    /// How to receive requests.
98    pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
99    /// How to query the runtime.
100    pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
101}
102
103#[cfg(with_testing)]
104impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
105where
106    MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
107{
108    /// Simulates the instantiation of an application.
109    pub async fn simulate_instantiation(
110        &mut self,
111        contract: UserContractCode,
112        local_time: linera_base::data_types::Timestamp,
113        application_description: ApplicationDescription,
114        instantiation_argument: Vec<u8>,
115        contract_blob: Blob,
116        service_blob: Blob,
117    ) -> Result<(), ExecutionError> {
118        let chain_id = application_description.creator_chain_id;
119        assert_eq!(chain_id, self.context().extra().chain_id);
120        let context = OperationContext {
121            chain_id,
122            authenticated_signer: None,
123            height: application_description.block_height,
124            round: None,
125            timestamp: local_time,
126        };
127
128        let action = UserAction::Instantiate(context, instantiation_argument);
129        let next_application_index = application_description.application_index + 1;
130        let next_chain_index = 0;
131
132        let application_id = From::from(&application_description);
133        let blob = Blob::new_application_description(&application_description);
134
135        self.system.used_blobs.insert(&blob.id())?;
136        self.system.used_blobs.insert(&contract_blob.id())?;
137        self.system.used_blobs.insert(&service_blob.id())?;
138
139        self.context()
140            .extra()
141            .user_contracts()
142            .pin()
143            .insert(application_id, contract);
144
145        self.context()
146            .extra()
147            .add_blobs([
148                contract_blob,
149                service_blob,
150                Blob::new_application_description(&application_description),
151            ])
152            .await?;
153
154        let tracker = ResourceTracker::default();
155        let policy = ResourceControlPolicy::no_fees();
156        let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
157        let mut txn_tracker = TransactionTracker::new(
158            local_time,
159            0,
160            next_application_index,
161            next_chain_index,
162            None,
163            &[],
164        );
165        txn_tracker.add_created_blob(blob);
166        Box::pin(
167            ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
168                .run_user_action(application_id, action, context.refund_grant_to(), None),
169        )
170        .await?;
171
172        Ok(())
173    }
174}
175
176pub enum UserAction {
177    Instantiate(OperationContext, Vec<u8>),
178    Operation(OperationContext, Vec<u8>),
179    Message(MessageContext, Vec<u8>),
180    ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
181}
182
183impl UserAction {
184    pub(crate) fn signer(&self) -> Option<AccountOwner> {
185        match self {
186            UserAction::Instantiate(context, _) => context.authenticated_signer,
187            UserAction::Operation(context, _) => context.authenticated_signer,
188            UserAction::ProcessStreams(_, _) => None,
189            UserAction::Message(context, _) => context.authenticated_signer,
190        }
191    }
192
193    pub(crate) fn height(&self) -> BlockHeight {
194        match self {
195            UserAction::Instantiate(context, _) => context.height,
196            UserAction::Operation(context, _) => context.height,
197            UserAction::ProcessStreams(context, _) => context.height,
198            UserAction::Message(context, _) => context.height,
199        }
200    }
201
202    pub(crate) fn round(&self) -> Option<u32> {
203        match self {
204            UserAction::Instantiate(context, _) => context.round,
205            UserAction::Operation(context, _) => context.round,
206            UserAction::ProcessStreams(context, _) => context.round,
207            UserAction::Message(context, _) => context.round,
208        }
209    }
210
211    pub(crate) fn timestamp(&self) -> Timestamp {
212        match self {
213            UserAction::Instantiate(context, _) => context.timestamp,
214            UserAction::Operation(context, _) => context.timestamp,
215            UserAction::ProcessStreams(context, _) => context.timestamp,
216            UserAction::Message(context, _) => context.timestamp,
217        }
218    }
219}
220
221impl<C> ExecutionStateView<C>
222where
223    C: Context + Clone + Send + Sync + 'static,
224    C::Extra: ExecutionRuntimeContext,
225{
226    pub async fn query_application(
227        &mut self,
228        context: QueryContext,
229        query: Query,
230        endpoint: Option<&mut ServiceRuntimeEndpoint>,
231    ) -> Result<QueryOutcome, ExecutionError> {
232        assert_eq!(context.chain_id, self.context().extra().chain_id());
233        match query {
234            Query::System(query) => {
235                let outcome = self.system.handle_query(context, query).await?;
236                Ok(outcome.into())
237            }
238            Query::User {
239                application_id,
240                bytes,
241            } => {
242                let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
243                let outcome = match endpoint {
244                    Some(endpoint) => {
245                        self.query_user_application_with_long_lived_service(
246                            application_id,
247                            context,
248                            bytes,
249                            &mut endpoint.incoming_execution_requests,
250                            &mut endpoint.runtime_request_sender,
251                        )
252                        .await?
253                    }
254                    None => {
255                        self.query_user_application(application_id, context, bytes)
256                            .await?
257                    }
258                };
259                Ok(outcome.into())
260            }
261        }
262    }
263
264    async fn query_user_application(
265        &mut self,
266        application_id: ApplicationId,
267        context: QueryContext,
268        query: Vec<u8>,
269    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
270        self.query_user_application_with_deadline(
271            application_id,
272            context,
273            query,
274            None,
275            BTreeMap::new(),
276        )
277        .await
278    }
279
280    pub(crate) async fn query_user_application_with_deadline(
281        &mut self,
282        application_id: ApplicationId,
283        context: QueryContext,
284        query: Vec<u8>,
285        deadline: Option<Instant>,
286        created_blobs: BTreeMap<BlobId, BlobContent>,
287    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
288        let (execution_state_sender, mut execution_state_receiver) =
289            futures::channel::mpsc::unbounded();
290        let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
291        let mut resource_controller = ResourceController::default();
292        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
293        let (code, description) = actor.load_service(application_id).await?;
294
295        let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
296            let mut runtime =
297                ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
298
299            async move {
300                let code = codes.next().await.expect("we send this immediately below");
301                runtime.preload_service(application_id, code, description)?;
302                runtime.run_query(application_id, query)
303            }
304        })
305        .await;
306
307        service_runtime_task.send(code)?;
308
309        while let Some(request) = execution_state_receiver.next().await {
310            actor.handle_request(request).await?;
311        }
312
313        service_runtime_task.join().await
314    }
315
316    async fn query_user_application_with_long_lived_service(
317        &mut self,
318        application_id: ApplicationId,
319        context: QueryContext,
320        query: Vec<u8>,
321        incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
322            ExecutionRequest,
323        >,
324        runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
325    ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
326        let (outcome_sender, outcome_receiver) = oneshot::channel();
327        let mut outcome_receiver = outcome_receiver.fuse();
328
329        runtime_request_sender
330            .send(ServiceRuntimeRequest::Query {
331                application_id,
332                context,
333                query,
334                callback: outcome_sender,
335            })
336            .expect("Service runtime thread should only stop when `request_sender` is dropped");
337
338        let mut txn_tracker = TransactionTracker::default();
339        let mut resource_controller = ResourceController::default();
340        let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
341
342        loop {
343            futures::select! {
344                maybe_request = incoming_execution_requests.next() => {
345                    if let Some(request) = maybe_request {
346                        actor.handle_request(request).await?;
347                    }
348                }
349                outcome = &mut outcome_receiver => {
350                    return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
351                }
352            }
353        }
354    }
355
356    pub async fn list_applications(
357        &self,
358    ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
359        let mut applications = vec![];
360        for app_id in self.users.indices().await? {
361            let blob_id = app_id.description_blob_id();
362            let blob_content = self.system.read_blob_content(blob_id).await?;
363            let application_description = bcs::from_bytes(blob_content.bytes())?;
364            applications.push((app_id, application_description));
365        }
366        Ok(applications)
367    }
368}