1use std::{collections::BTreeMap, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8 crypto::CryptoHash,
9 data_types::{BlobContent, BlockHeight, StreamUpdate},
10 identifiers::{AccountOwner, BlobId, StreamId},
11 time::Instant,
12};
13use linera_views::{
14 context::Context,
15 key_value_store_view::KeyValueStoreView,
16 map_view::MapView,
17 reentrant_collection_view::HashedReentrantCollectionView,
18 views::{ClonableView, HashableView as _, ReplaceContext, View},
19 ViewError,
20};
21use linera_views_derive::HashableView;
22#[cfg(with_testing)]
23use {
24 crate::{
25 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
26 },
27 linera_base::data_types::Blob,
28 linera_views::context::MemoryContext,
29 std::sync::Arc,
30};
31
32use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
33use crate::{
34 execution_state_actor::ExecutionStateActor, resources::ResourceController,
35 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, BcsHashable,
36 Deserialize, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext,
37 OperationContext, ProcessStreamsContext, Query, QueryContext, QueryOutcome, Serialize,
38 ServiceSyncRuntime, Timestamp, TransactionTracker, FLAG_ZERO_HASH,
39};
40
41#[derive(Debug, ClonableView, HashableView)]
43pub struct ExecutionStateView<C> {
44 pub system: SystemExecutionStateView<C>,
46 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
48 pub stream_event_counts: MapView<C, StreamId, u32>,
50}
51
52impl<C> ExecutionStateView<C>
53where
54 C: Context + Clone + Send + Sync + 'static,
55 C::Extra: ExecutionRuntimeContext,
56{
57 pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
58 if self
59 .system
60 .current_committee()
61 .is_some_and(|(_epoch, committee)| {
62 committee
63 .policy()
64 .http_request_allow_list
65 .contains(FLAG_ZERO_HASH)
66 })
67 {
68 Ok(CryptoHash::from([0; 32]))
69 } else {
70 #[derive(Serialize, Deserialize)]
71 struct ExecutionStateViewHash([u8; 32]);
72 impl BcsHashable<'_> for ExecutionStateViewHash {}
73 self.hash_mut()
74 .await
75 .map(|hash| CryptoHash::new(&ExecutionStateViewHash(hash.into())))
76 }
77 }
78}
79
80impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
81 type Target = ExecutionStateView<C2>;
82
83 async fn with_context(
84 &mut self,
85 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
86 ) -> Self::Target {
87 ExecutionStateView {
88 system: self.system.with_context(ctx.clone()).await,
89 users: self.users.with_context(ctx.clone()).await,
90 stream_event_counts: self.stream_event_counts.with_context(ctx.clone()).await,
91 }
92 }
93}
94
95pub struct ServiceRuntimeEndpoint {
97 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
99 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
101}
102
103#[cfg(with_testing)]
104impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
105where
106 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
107{
108 pub async fn simulate_instantiation(
110 &mut self,
111 contract: UserContractCode,
112 local_time: linera_base::data_types::Timestamp,
113 application_description: ApplicationDescription,
114 instantiation_argument: Vec<u8>,
115 contract_blob: Blob,
116 service_blob: Blob,
117 ) -> Result<(), ExecutionError> {
118 let chain_id = application_description.creator_chain_id;
119 assert_eq!(chain_id, self.context().extra().chain_id);
120 let context = OperationContext {
121 chain_id,
122 authenticated_signer: None,
123 height: application_description.block_height,
124 round: None,
125 timestamp: local_time,
126 };
127
128 let action = UserAction::Instantiate(context, instantiation_argument);
129 let next_application_index = application_description.application_index + 1;
130 let next_chain_index = 0;
131
132 let application_id = From::from(&application_description);
133 let blob = Blob::new_application_description(&application_description);
134
135 self.system.used_blobs.insert(&blob.id())?;
136 self.system.used_blobs.insert(&contract_blob.id())?;
137 self.system.used_blobs.insert(&service_blob.id())?;
138
139 self.context()
140 .extra()
141 .user_contracts()
142 .pin()
143 .insert(application_id, contract);
144
145 self.context()
146 .extra()
147 .add_blobs([
148 contract_blob,
149 service_blob,
150 Blob::new_application_description(&application_description),
151 ])
152 .await?;
153
154 let tracker = ResourceTracker::default();
155 let policy = ResourceControlPolicy::no_fees();
156 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
157 let mut txn_tracker = TransactionTracker::new(
158 local_time,
159 0,
160 next_application_index,
161 next_chain_index,
162 None,
163 &[],
164 );
165 txn_tracker.add_created_blob(blob);
166 Box::pin(
167 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
168 .run_user_action(application_id, action, context.refund_grant_to(), None),
169 )
170 .await?;
171
172 Ok(())
173 }
174}
175
176pub enum UserAction {
177 Instantiate(OperationContext, Vec<u8>),
178 Operation(OperationContext, Vec<u8>),
179 Message(MessageContext, Vec<u8>),
180 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
181}
182
183impl UserAction {
184 pub(crate) fn signer(&self) -> Option<AccountOwner> {
185 match self {
186 UserAction::Instantiate(context, _) => context.authenticated_signer,
187 UserAction::Operation(context, _) => context.authenticated_signer,
188 UserAction::ProcessStreams(_, _) => None,
189 UserAction::Message(context, _) => context.authenticated_signer,
190 }
191 }
192
193 pub(crate) fn height(&self) -> BlockHeight {
194 match self {
195 UserAction::Instantiate(context, _) => context.height,
196 UserAction::Operation(context, _) => context.height,
197 UserAction::ProcessStreams(context, _) => context.height,
198 UserAction::Message(context, _) => context.height,
199 }
200 }
201
202 pub(crate) fn round(&self) -> Option<u32> {
203 match self {
204 UserAction::Instantiate(context, _) => context.round,
205 UserAction::Operation(context, _) => context.round,
206 UserAction::ProcessStreams(context, _) => context.round,
207 UserAction::Message(context, _) => context.round,
208 }
209 }
210
211 pub(crate) fn timestamp(&self) -> Timestamp {
212 match self {
213 UserAction::Instantiate(context, _) => context.timestamp,
214 UserAction::Operation(context, _) => context.timestamp,
215 UserAction::ProcessStreams(context, _) => context.timestamp,
216 UserAction::Message(context, _) => context.timestamp,
217 }
218 }
219}
220
221impl<C> ExecutionStateView<C>
222where
223 C: Context + Clone + Send + Sync + 'static,
224 C::Extra: ExecutionRuntimeContext,
225{
226 pub async fn query_application(
227 &mut self,
228 context: QueryContext,
229 query: Query,
230 endpoint: Option<&mut ServiceRuntimeEndpoint>,
231 ) -> Result<QueryOutcome, ExecutionError> {
232 assert_eq!(context.chain_id, self.context().extra().chain_id());
233 match query {
234 Query::System(query) => {
235 let outcome = self.system.handle_query(context, query).await?;
236 Ok(outcome.into())
237 }
238 Query::User {
239 application_id,
240 bytes,
241 } => {
242 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
243 let outcome = match endpoint {
244 Some(endpoint) => {
245 self.query_user_application_with_long_lived_service(
246 application_id,
247 context,
248 bytes,
249 &mut endpoint.incoming_execution_requests,
250 &mut endpoint.runtime_request_sender,
251 )
252 .await?
253 }
254 None => {
255 self.query_user_application(application_id, context, bytes)
256 .await?
257 }
258 };
259 Ok(outcome.into())
260 }
261 }
262 }
263
264 async fn query_user_application(
265 &mut self,
266 application_id: ApplicationId,
267 context: QueryContext,
268 query: Vec<u8>,
269 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
270 self.query_user_application_with_deadline(
271 application_id,
272 context,
273 query,
274 None,
275 BTreeMap::new(),
276 )
277 .await
278 }
279
280 pub(crate) async fn query_user_application_with_deadline(
281 &mut self,
282 application_id: ApplicationId,
283 context: QueryContext,
284 query: Vec<u8>,
285 deadline: Option<Instant>,
286 created_blobs: BTreeMap<BlobId, BlobContent>,
287 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
288 let (execution_state_sender, mut execution_state_receiver) =
289 futures::channel::mpsc::unbounded();
290 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
291 let mut resource_controller = ResourceController::default();
292 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
293 let (code, description) = actor.load_service(application_id).await?;
294
295 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
296 let mut runtime =
297 ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
298
299 async move {
300 let code = codes.next().await.expect("we send this immediately below");
301 runtime.preload_service(application_id, code, description)?;
302 runtime.run_query(application_id, query)
303 }
304 })
305 .await;
306
307 service_runtime_task.send(code)?;
308
309 while let Some(request) = execution_state_receiver.next().await {
310 actor.handle_request(request).await?;
311 }
312
313 service_runtime_task.join().await
314 }
315
316 async fn query_user_application_with_long_lived_service(
317 &mut self,
318 application_id: ApplicationId,
319 context: QueryContext,
320 query: Vec<u8>,
321 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
322 ExecutionRequest,
323 >,
324 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
325 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
326 let (outcome_sender, outcome_receiver) = oneshot::channel();
327 let mut outcome_receiver = outcome_receiver.fuse();
328
329 runtime_request_sender
330 .send(ServiceRuntimeRequest::Query {
331 application_id,
332 context,
333 query,
334 callback: outcome_sender,
335 })
336 .expect("Service runtime thread should only stop when `request_sender` is dropped");
337
338 let mut txn_tracker = TransactionTracker::default();
339 let mut resource_controller = ResourceController::default();
340 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
341
342 loop {
343 futures::select! {
344 maybe_request = incoming_execution_requests.next() => {
345 if let Some(request) = maybe_request {
346 actor.handle_request(request).await?;
347 }
348 }
349 outcome = &mut outcome_receiver => {
350 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
351 }
352 }
353 }
354 }
355
356 pub async fn list_applications(
357 &self,
358 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
359 let mut applications = vec![];
360 for app_id in self.users.indices().await? {
361 let blob_id = app_id.description_blob_id();
362 let blob_content = self.system.read_blob_content(blob_id).await?;
363 let application_description = bcs::from_bytes(blob_content.bytes())?;
364 applications.push((app_id, application_description));
365 }
366 Ok(applications)
367 }
368}