1use std::{mem, vec};
5
6use futures::{FutureExt, StreamExt};
7use linera_base::{
8 data_types::{Amount, BlockHeight},
9 identifiers::{Account, AccountOwner, Destination, StreamId},
10};
11use linera_views::{
12 context::Context,
13 key_value_store_view::KeyValueStoreView,
14 map_view::MapView,
15 reentrant_collection_view::HashedReentrantCollectionView,
16 views::{ClonableView, View},
17};
18use linera_views_derive::CryptoHashView;
19#[cfg(with_testing)]
20use {
21 crate::{
22 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
23 },
24 linera_base::data_types::Blob,
25 linera_views::context::MemoryContext,
26 std::sync::Arc,
27};
28
29use super::{runtime::ServiceRuntimeRequest, ExecutionRequest};
30use crate::{
31 resources::ResourceController, system::SystemExecutionStateView, ApplicationDescription,
32 ApplicationId, ContractSyncRuntime, ExecutionError, ExecutionRuntimeConfig,
33 ExecutionRuntimeContext, Message, MessageContext, MessageKind, Operation, OperationContext,
34 OutgoingMessage, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, SystemMessage,
35 TransactionTracker,
36};
37
38#[derive(Debug, ClonableView, CryptoHashView)]
40pub struct ExecutionStateView<C> {
41 pub system: SystemExecutionStateView<C>,
43 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
45 pub stream_event_counts: MapView<C, StreamId, u32>,
47}
48
49pub struct ServiceRuntimeEndpoint {
51 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
53 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
55}
56
57#[cfg(with_testing)]
58impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
59where
60 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
61{
62 pub async fn simulate_instantiation(
64 &mut self,
65 contract: UserContractCode,
66 local_time: linera_base::data_types::Timestamp,
67 application_description: ApplicationDescription,
68 instantiation_argument: Vec<u8>,
69 contract_blob: Blob,
70 service_blob: Blob,
71 ) -> Result<(), ExecutionError> {
72 let chain_id = application_description.creator_chain_id;
73 assert_eq!(chain_id, self.context().extra().chain_id);
74 let context = OperationContext {
75 chain_id,
76 authenticated_signer: None,
77 authenticated_caller_id: None,
78 height: application_description.block_height,
79 round: None,
80 index: Some(0),
81 };
82
83 let action = UserAction::Instantiate(context, instantiation_argument);
84 let next_message_index = 0;
85 let next_application_index = application_description.application_index + 1;
86
87 let application_id = From::from(&application_description);
88
89 self.system.used_blobs.insert(&contract_blob.id())?;
90 self.system.used_blobs.insert(&service_blob.id())?;
91
92 self.context()
93 .extra()
94 .user_contracts()
95 .insert(application_id, contract);
96
97 self.context()
98 .extra()
99 .add_blobs([
100 contract_blob,
101 service_blob,
102 Blob::new_application_description(&application_description),
103 ])
104 .await?;
105
106 let tracker = ResourceTracker::default();
107 let policy = ResourceControlPolicy::no_fees();
108 let mut resource_controller = ResourceController {
109 policy: Arc::new(policy),
110 tracker,
111 account: None,
112 };
113 let mut txn_tracker = TransactionTracker::new(
114 local_time,
115 0,
116 next_message_index,
117 next_application_index,
118 None,
119 );
120 txn_tracker.add_created_blob(Blob::new_application_description(&application_description));
121 self.run_user_action(
122 application_id,
123 action,
124 context.refund_grant_to(),
125 None,
126 &mut txn_tracker,
127 &mut resource_controller,
128 )
129 .await?;
130
131 Ok(())
132 }
133}
134
135pub enum UserAction {
136 Instantiate(OperationContext, Vec<u8>),
137 Operation(OperationContext, Vec<u8>),
138 Message(MessageContext, Vec<u8>),
139}
140
141impl UserAction {
142 pub(crate) fn signer(&self) -> Option<AccountOwner> {
143 use UserAction::*;
144 match self {
145 Instantiate(context, _) => context.authenticated_signer,
146 Operation(context, _) => context.authenticated_signer,
147 Message(context, _) => context.authenticated_signer,
148 }
149 }
150
151 pub(crate) fn height(&self) -> BlockHeight {
152 match self {
153 UserAction::Instantiate(context, _) => context.height,
154 UserAction::Operation(context, _) => context.height,
155 UserAction::Message(context, _) => context.height,
156 }
157 }
158
159 pub(crate) fn round(&self) -> Option<u32> {
160 match self {
161 UserAction::Instantiate(context, _) => context.round,
162 UserAction::Operation(context, _) => context.round,
163 UserAction::Message(context, _) => context.round,
164 }
165 }
166}
167
168impl<C> ExecutionStateView<C>
169where
170 C: Context + Clone + Send + Sync + 'static,
171 C::Extra: ExecutionRuntimeContext,
172{
173 async fn run_user_action(
174 &mut self,
175 application_id: ApplicationId,
176 action: UserAction,
177 refund_grant_to: Option<Account>,
178 grant: Option<&mut Amount>,
179 txn_tracker: &mut TransactionTracker,
180 resource_controller: &mut ResourceController<Option<AccountOwner>>,
181 ) -> Result<(), ExecutionError> {
182 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
183 self.run_user_action_with_runtime(
184 application_id,
185 action,
186 refund_grant_to,
187 grant,
188 txn_tracker,
189 resource_controller,
190 )
191 .await
192 }
193
194 async fn run_user_action_with_runtime(
195 &mut self,
196 application_id: ApplicationId,
197 action: UserAction,
198 refund_grant_to: Option<Account>,
199 grant: Option<&mut Amount>,
200 txn_tracker: &mut TransactionTracker,
201 resource_controller: &mut ResourceController<Option<AccountOwner>>,
202 ) -> Result<(), ExecutionError> {
203 let chain_id = self.context().extra().chain_id();
204 let mut cloned_grant = grant.as_ref().map(|x| **x);
205 let initial_balance = resource_controller
206 .with_state_and_grant(&mut self.system, cloned_grant.as_mut())
207 .await?
208 .balance()?;
209 let controller = ResourceController {
210 policy: resource_controller.policy.clone(),
211 tracker: resource_controller.tracker,
212 account: initial_balance,
213 };
214 let (execution_state_sender, mut execution_state_receiver) =
215 futures::channel::mpsc::unbounded();
216 let (code, description) = self.load_contract(application_id, txn_tracker).await?;
217 let txn_tracker_moved = mem::take(txn_tracker);
218 let contract_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
219 let runtime = ContractSyncRuntime::new(
220 execution_state_sender,
221 chain_id,
222 refund_grant_to,
223 controller,
224 &action,
225 txn_tracker_moved,
226 );
227
228 async move {
229 let code = codes.next().await.expect("we send this immediately below");
230 runtime.preload_contract(application_id, code, description)?;
231 runtime.run_action(application_id, chain_id, action)
232 }
233 })
234 .await;
235
236 contract_runtime_task.send(code)?;
237
238 while let Some(request) = execution_state_receiver.next().await {
239 self.handle_request(request, resource_controller).await?;
240 }
241
242 let (result, controller, txn_tracker_moved) = contract_runtime_task.join().await?;
243
244 *txn_tracker = txn_tracker_moved;
245 txn_tracker.add_operation_result(result);
246
247 resource_controller
248 .with_state_and_grant(&mut self.system, grant)
249 .await?
250 .merge_balance(initial_balance, controller.balance()?)?;
251 resource_controller.tracker = controller.tracker;
252
253 Ok(())
254 }
255
256 pub async fn execute_operation(
257 &mut self,
258 context: OperationContext,
259 operation: Operation,
260 txn_tracker: &mut TransactionTracker,
261 resource_controller: &mut ResourceController<Option<AccountOwner>>,
262 ) -> Result<(), ExecutionError> {
263 assert_eq!(context.chain_id, self.context().extra().chain_id());
264 match operation {
265 Operation::System(op) => {
266 let new_application = self
267 .system
268 .execute_operation(context, *op, txn_tracker, resource_controller)
269 .await?;
270 if let Some((application_id, argument)) = new_application {
271 let user_action = UserAction::Instantiate(context, argument);
272 self.run_user_action(
273 application_id,
274 user_action,
275 context.refund_grant_to(),
276 None,
277 txn_tracker,
278 resource_controller,
279 )
280 .await?;
281 }
282 }
283 Operation::User {
284 application_id,
285 bytes,
286 } => {
287 self.run_user_action(
288 application_id,
289 UserAction::Operation(context, bytes),
290 context.refund_grant_to(),
291 None,
292 txn_tracker,
293 resource_controller,
294 )
295 .await?;
296 }
297 }
298 Ok(())
299 }
300
301 pub async fn execute_message(
302 &mut self,
303 context: MessageContext,
304 message: Message,
305 grant: Option<&mut Amount>,
306 txn_tracker: &mut TransactionTracker,
307 resource_controller: &mut ResourceController<Option<AccountOwner>>,
308 ) -> Result<(), ExecutionError> {
309 assert_eq!(context.chain_id, self.context().extra().chain_id());
310 match message {
311 Message::System(message) => {
312 let outcome = self.system.execute_message(context, message).await?;
313 txn_tracker.add_outgoing_messages(outcome)?;
314 }
315 Message::User {
316 application_id,
317 bytes,
318 } => {
319 self.run_user_action(
320 application_id,
321 UserAction::Message(context, bytes),
322 context.refund_grant_to,
323 grant,
324 txn_tracker,
325 resource_controller,
326 )
327 .await?;
328 }
329 }
330 Ok(())
331 }
332
333 pub async fn bounce_message(
334 &self,
335 context: MessageContext,
336 grant: Amount,
337 message: Message,
338 txn_tracker: &mut TransactionTracker,
339 ) -> Result<(), ExecutionError> {
340 assert_eq!(context.chain_id, self.context().extra().chain_id());
341 txn_tracker.add_outgoing_message(OutgoingMessage {
342 destination: Destination::Recipient(context.message_id.chain_id),
343 authenticated_signer: context.authenticated_signer,
344 refund_grant_to: context.refund_grant_to.filter(|_| !grant.is_zero()),
345 grant,
346 kind: MessageKind::Bouncing,
347 message,
348 })?;
349 Ok(())
350 }
351
352 pub async fn send_refund(
353 &self,
354 context: MessageContext,
355 amount: Amount,
356 txn_tracker: &mut TransactionTracker,
357 ) -> Result<(), ExecutionError> {
358 assert_eq!(context.chain_id, self.context().extra().chain_id());
359 if amount.is_zero() {
360 return Ok(());
361 }
362 let Some(account) = context.refund_grant_to else {
363 return Err(ExecutionError::InternalError(
364 "Messages with grants should have a non-empty `refund_grant_to`",
365 ));
366 };
367 let message = SystemMessage::Credit {
368 amount,
369 source: context.authenticated_signer.unwrap_or(AccountOwner::CHAIN),
370 target: account.owner,
371 };
372 txn_tracker.add_outgoing_message(
373 OutgoingMessage::new(account.chain_id, message).with_kind(MessageKind::Tracked),
374 )?;
375 Ok(())
376 }
377
378 pub async fn query_application(
379 &mut self,
380 context: QueryContext,
381 query: Query,
382 endpoint: Option<&mut ServiceRuntimeEndpoint>,
383 ) -> Result<QueryOutcome, ExecutionError> {
384 assert_eq!(context.chain_id, self.context().extra().chain_id());
385 match query {
386 Query::System(query) => {
387 let outcome = self.system.handle_query(context, query).await?;
388 Ok(outcome.into())
389 }
390 Query::User {
391 application_id,
392 bytes,
393 } => {
394 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
395 let outcome = match endpoint {
396 Some(endpoint) => {
397 self.query_user_application_with_long_lived_service(
398 application_id,
399 context,
400 bytes,
401 &mut endpoint.incoming_execution_requests,
402 &mut endpoint.runtime_request_sender,
403 )
404 .await?
405 }
406 None => {
407 self.query_user_application(application_id, context, bytes)
408 .await?
409 }
410 };
411 Ok(outcome.into())
412 }
413 }
414 }
415
416 async fn query_user_application(
417 &mut self,
418 application_id: ApplicationId,
419 context: QueryContext,
420 query: Vec<u8>,
421 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
422 let (execution_state_sender, mut execution_state_receiver) =
423 futures::channel::mpsc::unbounded();
424 let (code, description) = self.load_service(application_id, None).await?;
425
426 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
427 let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
428
429 async move {
430 let code = codes.next().await.expect("we send this immediately below");
431 runtime.preload_service(application_id, code, description)?;
432 runtime.run_query(application_id, query)
433 }
434 })
435 .await;
436
437 service_runtime_task.send(code)?;
438
439 while let Some(request) = execution_state_receiver.next().await {
440 self.handle_request(request, &mut ResourceController::default())
441 .await?;
442 }
443
444 service_runtime_task.join().await
445 }
446
447 async fn query_user_application_with_long_lived_service(
448 &mut self,
449 application_id: ApplicationId,
450 context: QueryContext,
451 query: Vec<u8>,
452 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
453 ExecutionRequest,
454 >,
455 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
456 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
457 let (outcome_sender, outcome_receiver) = oneshot::channel();
458 let mut outcome_receiver = outcome_receiver.fuse();
459
460 runtime_request_sender
461 .send(ServiceRuntimeRequest::Query {
462 application_id,
463 context,
464 query,
465 callback: outcome_sender,
466 })
467 .expect("Service runtime thread should only stop when `request_sender` is dropped");
468
469 loop {
470 futures::select! {
471 maybe_request = incoming_execution_requests.next() => {
472 if let Some(request) = maybe_request {
473 self.handle_request(request, &mut ResourceController::default()).await?;
474 }
475 }
476 outcome = &mut outcome_receiver => {
477 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
478 }
479 }
480 }
481 }
482
483 pub async fn list_applications(
484 &self,
485 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
486 let mut applications = vec![];
487 for app_id in self.users.indices().await? {
488 let blob_id = app_id.description_blob_id();
489 let blob_content = self.system.read_blob_content(blob_id).await?;
490 let application_description = bcs::from_bytes(blob_content.bytes())?;
491 applications.push((app_id, application_description));
492 }
493 Ok(applications)
494 }
495}