1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap},
6 mem, vec,
7};
8
9use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
10use linera_base::{
11 data_types::{Amount, BlockHeight, Timestamp},
12 identifiers::{Account, ChainId, Destination, Owner},
13};
14use linera_views::{
15 context::Context,
16 key_value_store_view::KeyValueStoreView,
17 reentrant_collection_view::HashedReentrantCollectionView,
18 views::{ClonableView, View},
19};
20use linera_views_derive::CryptoHashView;
21#[cfg(with_testing)]
22use {
23 crate::{
24 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
25 },
26 linera_base::data_types::Blob,
27 linera_views::context::MemoryContext,
28 std::sync::Arc,
29};
30
31use super::{runtime::ServiceRuntimeRequest, ExecutionRequest};
32use crate::{
33 resources::ResourceController, system::SystemExecutionStateView, ContractSyncRuntime,
34 ExecutionError, ExecutionOutcome, ExecutionRuntimeConfig, ExecutionRuntimeContext, Message,
35 MessageContext, MessageKind, Operation, OperationContext, Query, QueryContext,
36 RawExecutionOutcome, RawOutgoingMessage, Response, ServiceSyncRuntime, SystemMessage,
37 TransactionTracker, UserApplicationDescription, UserApplicationId,
38};
39
40#[derive(Debug, ClonableView, CryptoHashView)]
42pub struct ExecutionStateView<C> {
43 pub system: SystemExecutionStateView<C>,
45 pub users: HashedReentrantCollectionView<C, UserApplicationId, KeyValueStoreView<C>>,
47}
48
49pub struct ServiceRuntimeEndpoint {
51 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
53 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
55}
56
57#[cfg(with_testing)]
58impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
59where
60 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
61{
62 pub async fn simulate_instantiation(
64 &mut self,
65 contract: UserContractCode,
66 local_time: Timestamp,
67 application_description: UserApplicationDescription,
68 instantiation_argument: Vec<u8>,
69 contract_blob: Blob,
70 service_blob: Blob,
71 ) -> Result<(), ExecutionError> {
72 let chain_id = application_description.creation.chain_id;
73 let context = OperationContext {
74 chain_id,
75 authenticated_signer: None,
76 authenticated_caller_id: None,
77 height: application_description.creation.height,
78 index: Some(0),
79 };
80
81 let action = UserAction::Instantiate(context, instantiation_argument);
82 let next_message_index = application_description.creation.index + 1;
83
84 let application_id = self
85 .system
86 .registry
87 .register_application(application_description)
88 .await?;
89
90 self.context()
91 .extra()
92 .user_contracts()
93 .insert(application_id, contract);
94
95 self.context()
96 .extra()
97 .add_blobs(vec![contract_blob, service_blob]);
98
99 let tracker = ResourceTracker::default();
100 let policy = ResourceControlPolicy::default();
101 let mut resource_controller = ResourceController {
102 policy: Arc::new(policy),
103 tracker,
104 account: None,
105 };
106 let mut txn_tracker = TransactionTracker::new(next_message_index, None);
107 self.run_user_action(
108 application_id,
109 chain_id,
110 local_time,
111 action,
112 context.refund_grant_to(),
113 None,
114 &mut txn_tracker,
115 &mut resource_controller,
116 )
117 .await?;
118 self.update_execution_outcomes_with_app_registrations(&mut txn_tracker)
119 .await?;
120 Ok(())
121 }
122}
123
124pub enum UserAction {
125 Instantiate(OperationContext, Vec<u8>),
126 Operation(OperationContext, Vec<u8>),
127 Message(MessageContext, Vec<u8>),
128}
129
130impl UserAction {
131 pub(crate) fn signer(&self) -> Option<Owner> {
132 use UserAction::*;
133 match self {
134 Instantiate(context, _) => context.authenticated_signer,
135 Operation(context, _) => context.authenticated_signer,
136 Message(context, _) => context.authenticated_signer,
137 }
138 }
139
140 pub(crate) fn height(&self) -> BlockHeight {
141 match self {
142 UserAction::Instantiate(context, _) => context.height,
143 UserAction::Operation(context, _) => context.height,
144 UserAction::Message(context, _) => context.height,
145 }
146 }
147}
148
149impl<C> ExecutionStateView<C>
150where
151 C: Context + Clone + Send + Sync + 'static,
152 C::Extra: ExecutionRuntimeContext,
153{
154 #[expect(clippy::too_many_arguments)]
155 async fn run_user_action(
156 &mut self,
157 application_id: UserApplicationId,
158 chain_id: ChainId,
159 local_time: Timestamp,
160 action: UserAction,
161 refund_grant_to: Option<Account>,
162 grant: Option<&mut Amount>,
163 txn_tracker: &mut TransactionTracker,
164 resource_controller: &mut ResourceController<Option<Owner>>,
165 ) -> Result<(), ExecutionError> {
166 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
167 self.run_user_action_with_runtime(
168 application_id,
169 chain_id,
170 local_time,
171 action,
172 refund_grant_to,
173 grant,
174 txn_tracker,
175 resource_controller,
176 )
177 .await?;
178 Ok(())
179 }
180
181 #[expect(clippy::too_many_arguments)]
182 async fn run_user_action_with_runtime(
183 &mut self,
184 application_id: UserApplicationId,
185 chain_id: ChainId,
186 local_time: Timestamp,
187 action: UserAction,
188 refund_grant_to: Option<Account>,
189 grant: Option<&mut Amount>,
190 txn_tracker: &mut TransactionTracker,
191 resource_controller: &mut ResourceController<Option<Owner>>,
192 ) -> Result<(), ExecutionError> {
193 let mut cloned_grant = grant.as_ref().map(|x| **x);
194 let initial_balance = resource_controller
195 .with_state_and_grant(self, cloned_grant.as_mut())
196 .await?
197 .balance()?;
198 let controller = ResourceController {
199 policy: resource_controller.policy.clone(),
200 tracker: resource_controller.tracker,
201 account: initial_balance,
202 };
203 let (execution_state_sender, mut execution_state_receiver) =
204 futures::channel::mpsc::unbounded();
205 let txn_tracker_moved = mem::take(txn_tracker);
206 let execution_outcomes_future = linera_base::task::spawn_blocking(move || {
207 ContractSyncRuntime::run_action(
208 execution_state_sender,
209 application_id,
210 chain_id,
211 local_time,
212 refund_grant_to,
213 controller,
214 action,
215 txn_tracker_moved,
216 )
217 });
218 while let Some(request) = execution_state_receiver.next().await {
219 self.handle_request(request).await?;
220 }
221
222 let (controller, txn_tracker_moved) = execution_outcomes_future.await??;
223 *txn_tracker = txn_tracker_moved;
224 resource_controller
225 .with_state_and_grant(self, grant)
226 .await?
227 .merge_balance(initial_balance, controller.balance()?)?;
228 resource_controller.tracker = controller.tracker;
229 Ok(())
230 }
231
232 pub async fn update_execution_outcomes_with_app_registrations(
237 &self,
238 txn_tracker: &mut TransactionTracker,
239 ) -> Result<(), ExecutionError> {
240 let results = txn_tracker.outcomes_mut();
241 let user_application_outcomes = results.iter().filter_map(|outcome| match outcome {
242 ExecutionOutcome::User(application_id, result) => Some((application_id, result)),
243 _ => None,
244 });
245
246 let mut applications_to_register_per_destination = BTreeMap::<_, BTreeSet<_>>::new();
247
248 for (application_id, result) in user_application_outcomes {
249 for message in &result.messages {
250 applications_to_register_per_destination
251 .entry(&message.destination)
252 .or_default()
253 .insert(*application_id);
254 }
255 }
256
257 if applications_to_register_per_destination.is_empty() {
258 return Ok(());
259 }
260
261 let messages = applications_to_register_per_destination
262 .into_iter()
263 .map(|(destination, applications_to_describe)| async {
264 let applications = self
265 .system
266 .registry
267 .describe_applications_with_dependencies(
268 applications_to_describe.into_iter().collect(),
269 &HashMap::new(),
270 )
271 .await?;
272
273 Ok::<_, ExecutionError>(RawOutgoingMessage {
274 destination: destination.clone(),
275 authenticated: false,
276 grant: Amount::ZERO,
277 kind: MessageKind::Simple,
278 message: SystemMessage::RegisterApplications { applications },
279 })
280 })
281 .collect::<FuturesOrdered<_>>()
282 .try_collect::<Vec<_>>()
283 .await?;
284
285 let system_outcome = RawExecutionOutcome {
286 messages,
287 ..RawExecutionOutcome::default()
288 };
289
290 let index = results
292 .iter()
293 .position(|outcome| matches!(outcome, ExecutionOutcome::User(_, _)))
294 .unwrap_or(results.len());
295 results.insert(index, ExecutionOutcome::System(system_outcome));
297
298 Ok(())
299 }
300
301 pub async fn execute_operation(
302 &mut self,
303 context: OperationContext,
304 local_time: Timestamp,
305 operation: Operation,
306 txn_tracker: &mut TransactionTracker,
307 resource_controller: &mut ResourceController<Option<Owner>>,
308 ) -> Result<(), ExecutionError> {
309 assert_eq!(context.chain_id, self.context().extra().chain_id());
310 match operation {
311 Operation::System(op) => {
312 let new_application = self
313 .system
314 .execute_operation(context, op, txn_tracker)
315 .await?;
316 if let Some((application_id, argument)) = new_application {
317 let user_action = UserAction::Instantiate(context, argument);
318 self.run_user_action(
319 application_id,
320 context.chain_id,
321 local_time,
322 user_action,
323 context.refund_grant_to(),
324 None,
325 txn_tracker,
326 resource_controller,
327 )
328 .await?;
329 }
330 }
331 Operation::User {
332 application_id,
333 bytes,
334 } => {
335 self.run_user_action(
336 application_id,
337 context.chain_id,
338 local_time,
339 UserAction::Operation(context, bytes),
340 context.refund_grant_to(),
341 None,
342 txn_tracker,
343 resource_controller,
344 )
345 .await?;
346 }
347 }
348 Ok(())
349 }
350
351 pub async fn execute_message(
352 &mut self,
353 context: MessageContext,
354 local_time: Timestamp,
355 message: Message,
356 grant: Option<&mut Amount>,
357 txn_tracker: &mut TransactionTracker,
358 resource_controller: &mut ResourceController<Option<Owner>>,
359 ) -> Result<(), ExecutionError> {
360 assert_eq!(context.chain_id, self.context().extra().chain_id());
361 match message {
362 Message::System(message) => {
363 let outcome = self
364 .system
365 .execute_message(context, message, txn_tracker)
366 .await?;
367 txn_tracker.add_system_outcome(outcome)?;
368 }
369 Message::User {
370 application_id,
371 bytes,
372 } => {
373 self.run_user_action(
374 application_id,
375 context.chain_id,
376 local_time,
377 UserAction::Message(context, bytes),
378 context.refund_grant_to,
379 grant,
380 txn_tracker,
381 resource_controller,
382 )
383 .await?;
384 }
385 }
386 Ok(())
387 }
388
389 pub async fn bounce_message(
390 &self,
391 context: MessageContext,
392 grant: Amount,
393 message: Message,
394 txn_tracker: &mut TransactionTracker,
395 ) -> Result<(), ExecutionError> {
396 assert_eq!(context.chain_id, self.context().extra().chain_id());
397 match message {
398 Message::System(message) => {
399 let mut outcome = RawExecutionOutcome {
400 authenticated_signer: context.authenticated_signer,
401 refund_grant_to: context.refund_grant_to,
402 ..Default::default()
403 };
404 outcome.messages.push(RawOutgoingMessage {
405 destination: Destination::Recipient(context.message_id.chain_id),
406 authenticated: true,
407 grant,
408 kind: MessageKind::Bouncing,
409 message,
410 });
411 txn_tracker.add_system_outcome(outcome)?;
412 }
413 Message::User {
414 application_id,
415 bytes,
416 } => {
417 let mut outcome = RawExecutionOutcome {
418 authenticated_signer: context.authenticated_signer,
419 refund_grant_to: context.refund_grant_to,
420 ..Default::default()
421 };
422 outcome.messages.push(RawOutgoingMessage {
423 destination: Destination::Recipient(context.message_id.chain_id),
424 authenticated: true,
425 grant,
426 kind: MessageKind::Bouncing,
427 message: bytes,
428 });
429 txn_tracker.add_user_outcome(application_id, outcome)?;
430 }
431 }
432 Ok(())
433 }
434
435 pub async fn send_refund(
436 &self,
437 context: MessageContext,
438 amount: Amount,
439 account: Account,
440 txn_tracker: &mut TransactionTracker,
441 ) -> Result<(), ExecutionError> {
442 assert_eq!(context.chain_id, self.context().extra().chain_id());
443 let mut outcome = RawExecutionOutcome::default();
444 let message = RawOutgoingMessage {
445 destination: Destination::Recipient(account.chain_id),
446 authenticated: false,
447 grant: Amount::ZERO,
448 kind: MessageKind::Tracked,
449 message: SystemMessage::Credit {
450 amount,
451 source: context.authenticated_signer,
452 target: account.owner,
453 },
454 };
455 outcome.messages.push(message);
456 txn_tracker.add_system_outcome(outcome)?;
457 Ok(())
458 }
459
460 pub async fn query_application(
461 &mut self,
462 context: QueryContext,
463 query: Query,
464 endpoint: Option<&mut ServiceRuntimeEndpoint>,
465 ) -> Result<Response, ExecutionError> {
466 assert_eq!(context.chain_id, self.context().extra().chain_id());
467 match query {
468 Query::System(query) => {
469 let response = self.system.handle_query(context, query).await?;
470 Ok(Response::System(response))
471 }
472 Query::User {
473 application_id,
474 bytes,
475 } => {
476 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
477 let response = match endpoint {
478 Some(endpoint) => {
479 self.query_user_application_with_long_lived_service(
480 application_id,
481 context,
482 bytes,
483 &mut endpoint.incoming_execution_requests,
484 &mut endpoint.runtime_request_sender,
485 )
486 .await?
487 }
488 None => {
489 self.query_user_application(application_id, context, bytes)
490 .await?
491 }
492 };
493 Ok(Response::User(response))
494 }
495 }
496 }
497
498 async fn query_user_application(
499 &mut self,
500 application_id: UserApplicationId,
501 context: QueryContext,
502 query: Vec<u8>,
503 ) -> Result<Vec<u8>, ExecutionError> {
504 let (execution_state_sender, mut execution_state_receiver) =
505 futures::channel::mpsc::unbounded();
506 let execution_outcomes_future = linera_base::task::spawn_blocking(move || {
507 let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
508 runtime.run_query(application_id, query)
509 });
510 while let Some(request) = execution_state_receiver.next().await {
511 self.handle_request(request).await?;
512 }
513
514 let response = execution_outcomes_future.await??;
515 Ok(response)
516 }
517
518 async fn query_user_application_with_long_lived_service(
519 &mut self,
520 application_id: UserApplicationId,
521 context: QueryContext,
522 query: Vec<u8>,
523 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
524 ExecutionRequest,
525 >,
526 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
527 ) -> Result<Vec<u8>, ExecutionError> {
528 let (response_sender, response_receiver) = oneshot::channel();
529 let mut response_receiver = response_receiver.fuse();
530
531 runtime_request_sender
532 .send(ServiceRuntimeRequest::Query {
533 application_id,
534 context,
535 query,
536 callback: response_sender,
537 })
538 .expect("Service runtime thread should only stop when `request_sender` is dropped");
539
540 loop {
541 futures::select! {
542 maybe_request = incoming_execution_requests.next() => {
543 if let Some(request) = maybe_request {
544 self.handle_request(request).await?;
545 }
546 }
547 response = &mut response_receiver => {
548 return response.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
549 }
550 }
551 }
552 }
553
554 pub async fn list_applications(
555 &self,
556 ) -> Result<Vec<(UserApplicationId, UserApplicationDescription)>, ExecutionError> {
557 let mut applications = vec![];
558 for index in self.system.registry.known_applications.indices().await? {
559 let application_description =
560 self.system.registry.known_applications.get(&index).await?;
561
562 if let Some(application_description) = application_description {
563 applications.push((index, application_description));
564 }
565 }
566 Ok(applications)
567 }
568}