1use std::fmt::{self, Debug, Formatter};
7#[cfg(with_metrics)]
8use std::sync::LazyLock;
9
10use futures::channel::mpsc;
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::{self, MeasureLatency as _};
13use linera_base::{
14 data_types::{Amount, ApplicationPermissions, BlobContent, Timestamp},
15 identifiers::{Account, BlobId, MessageId, Owner},
16 ownership::ChainOwnership,
17};
18use linera_views::{batch::Batch, context::Context, views::View};
19use oneshot::Sender;
20#[cfg(with_metrics)]
21use prometheus::HistogramVec;
22use reqwest::{header::CONTENT_TYPE, Client};
23
24use crate::{
25 system::{OpenChainConfig, Recipient},
26 util::RespondExt,
27 ExecutionError, ExecutionRuntimeContext, ExecutionStateView, RawExecutionOutcome,
28 RawOutgoingMessage, SystemExecutionError, SystemMessage, UserApplicationDescription,
29 UserApplicationId, UserContractCode, UserServiceCode,
30};
31
32#[cfg(with_metrics)]
33static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
35 prometheus_util::register_histogram_vec(
36 "load_contract_latency",
37 "Load contract latency",
38 &[],
39 Some(vec![
40 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0,
41 100.0, 250.0,
42 ]),
43 )
44 .expect("Histogram creation should not fail")
45});
46
47#[cfg(with_metrics)]
48static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
50 prometheus_util::register_histogram_vec(
51 "load_service_latency",
52 "Load service latency",
53 &[],
54 Some(vec![
55 0.001, 0.002_5, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0,
56 100.0, 250.0,
57 ]),
58 )
59 .expect("Histogram creation should not fail")
60});
61
62pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
63
64impl<C> ExecutionStateView<C>
65where
66 C: Context + Clone + Send + Sync + 'static,
67 C::Extra: ExecutionRuntimeContext,
68{
69 pub(crate) async fn handle_request(
71 &mut self,
72 request: ExecutionRequest,
73 ) -> Result<(), ExecutionError> {
74 use ExecutionRequest::*;
75 match request {
76 LoadContract { id, callback } => {
77 #[cfg(with_metrics)]
78 let _latency = LOAD_CONTRACT_LATENCY.measure_latency();
79 let description = self.system.registry.describe_application(id).await?;
80 let code = self
81 .context()
82 .extra()
83 .get_user_contract(&description)
84 .await?;
85 callback.respond((code, description));
86 }
87
88 LoadService { id, callback } => {
89 #[cfg(with_metrics)]
90 let _latency = LOAD_SERVICE_LATENCY.measure_latency();
91 let description = self.system.registry.describe_application(id).await?;
92 let code = self
93 .context()
94 .extra()
95 .get_user_service(&description)
96 .await?;
97 callback.respond((code, description));
98 }
99
100 ChainBalance { callback } => {
101 let balance = *self.system.balance.get();
102 callback.respond(balance);
103 }
104
105 OwnerBalance { owner, callback } => {
106 let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
107 callback.respond(balance);
108 }
109
110 OwnerBalances { callback } => {
111 let mut balances = Vec::new();
112 self.system
113 .balances
114 .for_each_index_value(|owner, balance| {
115 balances.push((owner, balance));
116 Ok(())
117 })
118 .await?;
119 callback.respond(balances);
120 }
121
122 BalanceOwners { callback } => {
123 let owners = self.system.balances.indices().await?;
124 callback.respond(owners);
125 }
126
127 Transfer {
128 source,
129 destination,
130 amount,
131 signer,
132 callback,
133 } => {
134 let mut execution_outcome = RawExecutionOutcome::default();
135 let message = self
136 .system
137 .transfer(signer, source, Recipient::Account(destination), amount)
138 .await?;
139
140 if let Some(message) = message {
141 execution_outcome.messages.push(message);
142 }
143 callback.respond(execution_outcome);
144 }
145
146 Claim {
147 source,
148 destination,
149 amount,
150 signer,
151 callback,
152 } => {
153 let owner = source.owner.ok_or(ExecutionError::OwnerIsNone)?;
154 let mut execution_outcome = RawExecutionOutcome::default();
155 let message = self
156 .system
157 .claim(
158 signer,
159 owner,
160 source.chain_id,
161 Recipient::Account(destination),
162 amount,
163 )
164 .await?;
165
166 execution_outcome.messages.push(message);
167 callback.respond(execution_outcome);
168 }
169
170 SystemTimestamp { callback } => {
171 let timestamp = *self.system.timestamp.get();
172 callback.respond(timestamp);
173 }
174
175 ChainOwnership { callback } => {
176 let ownership = self.system.ownership.get().clone();
177 callback.respond(ownership);
178 }
179
180 ContainsKey { id, key, callback } => {
181 let view = self.users.try_load_entry(&id).await?;
182 let result = match view {
183 Some(view) => view.contains_key(&key).await?,
184 None => false,
185 };
186 callback.respond(result);
187 }
188
189 ContainsKeys { id, keys, callback } => {
190 let view = self.users.try_load_entry(&id).await?;
191 let result = match view {
192 Some(view) => view.contains_keys(keys).await?,
193 None => vec![false; keys.len()],
194 };
195 callback.respond(result);
196 }
197
198 ReadMultiValuesBytes { id, keys, callback } => {
199 let view = self.users.try_load_entry(&id).await?;
200 let values = match view {
201 Some(view) => view.multi_get(keys).await?,
202 None => vec![None; keys.len()],
203 };
204 callback.respond(values);
205 }
206
207 ReadValueBytes { id, key, callback } => {
208 let view = self.users.try_load_entry(&id).await?;
209 let result = match view {
210 Some(view) => view.get(&key).await?,
211 None => None,
212 };
213 callback.respond(result);
214 }
215
216 FindKeysByPrefix {
217 id,
218 key_prefix,
219 callback,
220 } => {
221 let view = self.users.try_load_entry(&id).await?;
222 let result = match view {
223 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
224 None => Vec::new(),
225 };
226 callback.respond(result);
227 }
228
229 FindKeyValuesByPrefix {
230 id,
231 key_prefix,
232 callback,
233 } => {
234 let view = self.users.try_load_entry(&id).await?;
235 let result = match view {
236 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
237 None => Vec::new(),
238 };
239 callback.respond(result);
240 }
241
242 WriteBatch {
243 id,
244 batch,
245 callback,
246 } => {
247 let mut view = self.users.try_load_entry_mut(&id).await?;
248 view.write_batch(batch).await?;
249 callback.respond(());
250 }
251
252 OpenChain {
253 ownership,
254 balance,
255 next_message_id,
256 application_permissions,
257 callback,
258 } => {
259 let inactive_err = || SystemExecutionError::InactiveChain;
260 let config = OpenChainConfig {
261 ownership,
262 admin_id: self.system.admin_id.get().ok_or_else(inactive_err)?,
263 epoch: self.system.epoch.get().ok_or_else(inactive_err)?,
264 committees: self.system.committees.get().clone(),
265 balance,
266 application_permissions,
267 };
268 let messages = self.system.open_chain(config, next_message_id)?;
269 callback.respond(messages)
270 }
271
272 CloseChain {
273 application_id,
274 callback,
275 } => {
276 let app_permissions = self.system.application_permissions.get();
277 if !app_permissions.can_close_chain(&application_id) {
278 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
279 } else {
280 let chain_id = self.context().extra().chain_id();
281 self.system.close_chain(chain_id).await?;
282 callback.respond(Ok(()));
283 }
284 }
285
286 FetchUrl { url, callback } => {
287 let bytes = reqwest::get(url).await?.bytes().await?.to_vec();
288 callback.respond(bytes);
289 }
290
291 HttpPost {
292 url,
293 content_type,
294 payload,
295 callback,
296 } => {
297 let res = Client::new()
298 .post(url)
299 .body(payload)
300 .header(CONTENT_TYPE, content_type)
301 .send()
302 .await?;
303 let body = res.bytes().await?;
304 let bytes = body.as_ref().to_vec();
305 callback.respond(bytes);
306 }
307
308 ReadBlobContent { blob_id, callback } => {
309 let blob = self
310 .system
311 .read_blob_content(blob_id)
312 .await
313 .map_err(|_| SystemExecutionError::BlobNotFoundOnRead(blob_id))?;
314 callback.respond(blob);
315 }
316
317 AssertBlobExists { blob_id, callback } => {
318 self.system.assert_blob_exists(blob_id).await?;
319 callback.respond(())
320 }
321 }
322
323 Ok(())
324 }
325}
326
327pub enum ExecutionRequest {
329 LoadContract {
330 id: UserApplicationId,
331 callback: Sender<(UserContractCode, UserApplicationDescription)>,
332 },
333
334 LoadService {
335 id: UserApplicationId,
336 callback: Sender<(UserServiceCode, UserApplicationDescription)>,
337 },
338
339 ChainBalance {
340 callback: Sender<Amount>,
341 },
342
343 OwnerBalance {
344 owner: Owner,
345 callback: Sender<Amount>,
346 },
347
348 OwnerBalances {
349 callback: Sender<Vec<(Owner, Amount)>>,
350 },
351
352 BalanceOwners {
353 callback: Sender<Vec<Owner>>,
354 },
355
356 Transfer {
357 source: Option<Owner>,
358 destination: Account,
359 amount: Amount,
360 signer: Option<Owner>,
361 callback: Sender<RawExecutionOutcome<SystemMessage, Amount>>,
362 },
363
364 Claim {
365 source: Account,
366 destination: Account,
367 amount: Amount,
368 signer: Option<Owner>,
369 callback: Sender<RawExecutionOutcome<SystemMessage, Amount>>,
370 },
371
372 SystemTimestamp {
373 callback: Sender<Timestamp>,
374 },
375
376 ChainOwnership {
377 callback: Sender<ChainOwnership>,
378 },
379
380 ReadValueBytes {
381 id: UserApplicationId,
382 key: Vec<u8>,
383 callback: Sender<Option<Vec<u8>>>,
384 },
385
386 ContainsKey {
387 id: UserApplicationId,
388 key: Vec<u8>,
389 callback: Sender<bool>,
390 },
391
392 ContainsKeys {
393 id: UserApplicationId,
394 keys: Vec<Vec<u8>>,
395 callback: Sender<Vec<bool>>,
396 },
397
398 ReadMultiValuesBytes {
399 id: UserApplicationId,
400 keys: Vec<Vec<u8>>,
401 callback: Sender<Vec<Option<Vec<u8>>>>,
402 },
403
404 FindKeysByPrefix {
405 id: UserApplicationId,
406 key_prefix: Vec<u8>,
407 callback: Sender<Vec<Vec<u8>>>,
408 },
409
410 FindKeyValuesByPrefix {
411 id: UserApplicationId,
412 key_prefix: Vec<u8>,
413 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
414 },
415
416 WriteBatch {
417 id: UserApplicationId,
418 batch: Batch,
419 callback: Sender<()>,
420 },
421
422 OpenChain {
423 ownership: ChainOwnership,
424 balance: Amount,
425 next_message_id: MessageId,
426 application_permissions: ApplicationPermissions,
427 callback: Sender<[RawOutgoingMessage<SystemMessage, Amount>; 2]>,
428 },
429
430 CloseChain {
431 application_id: UserApplicationId,
432 callback: oneshot::Sender<Result<(), ExecutionError>>,
433 },
434
435 FetchUrl {
436 url: String,
437 callback: Sender<Vec<u8>>,
438 },
439
440 HttpPost {
441 url: String,
442 content_type: String,
443 payload: Vec<u8>,
444 callback: oneshot::Sender<Vec<u8>>,
445 },
446
447 ReadBlobContent {
448 blob_id: BlobId,
449 callback: Sender<BlobContent>,
450 },
451
452 AssertBlobExists {
453 blob_id: BlobId,
454 callback: Sender<()>,
455 },
456}
457
458impl Debug for ExecutionRequest {
459 fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
460 match self {
461 ExecutionRequest::LoadContract { id, .. } => formatter
462 .debug_struct("ExecutionRequest::LoadContract")
463 .field("id", id)
464 .finish_non_exhaustive(),
465
466 ExecutionRequest::LoadService { id, .. } => formatter
467 .debug_struct("ExecutionRequest::LoadService")
468 .field("id", id)
469 .finish_non_exhaustive(),
470
471 ExecutionRequest::ChainBalance { .. } => formatter
472 .debug_struct("ExecutionRequest::ChainBalance")
473 .finish_non_exhaustive(),
474
475 ExecutionRequest::OwnerBalance { owner, .. } => formatter
476 .debug_struct("ExecutionRequest::OwnerBalance")
477 .field("owner", owner)
478 .finish_non_exhaustive(),
479
480 ExecutionRequest::OwnerBalances { .. } => formatter
481 .debug_struct("ExecutionRequest::OwnerBalances")
482 .finish_non_exhaustive(),
483
484 ExecutionRequest::BalanceOwners { .. } => formatter
485 .debug_struct("ExecutionRequest::BalanceOwners")
486 .finish_non_exhaustive(),
487
488 ExecutionRequest::Transfer {
489 source,
490 destination,
491 amount,
492 signer,
493 ..
494 } => formatter
495 .debug_struct("ExecutionRequest::Transfer")
496 .field("source", source)
497 .field("destination", destination)
498 .field("amount", amount)
499 .field("signer", signer)
500 .finish_non_exhaustive(),
501
502 ExecutionRequest::Claim {
503 source,
504 destination,
505 amount,
506 signer,
507 ..
508 } => formatter
509 .debug_struct("ExecutionRequest::Claim")
510 .field("source", source)
511 .field("destination", destination)
512 .field("amount", amount)
513 .field("signer", signer)
514 .finish_non_exhaustive(),
515
516 ExecutionRequest::SystemTimestamp { .. } => formatter
517 .debug_struct("ExecutionRequest::SystemTimestamp")
518 .finish_non_exhaustive(),
519
520 ExecutionRequest::ChainOwnership { .. } => formatter
521 .debug_struct("ExecutionRequest::ChainOwnership")
522 .finish_non_exhaustive(),
523
524 ExecutionRequest::ReadValueBytes { id, key, .. } => formatter
525 .debug_struct("ExecutionRequest::ReadValueBytes")
526 .field("id", id)
527 .field("key", key)
528 .finish_non_exhaustive(),
529
530 ExecutionRequest::ContainsKey { id, key, .. } => formatter
531 .debug_struct("ExecutionRequest::ContainsKey")
532 .field("id", id)
533 .field("key", key)
534 .finish_non_exhaustive(),
535
536 ExecutionRequest::ContainsKeys { id, keys, .. } => formatter
537 .debug_struct("ExecutionRequest::ContainsKeys")
538 .field("id", id)
539 .field("keys", keys)
540 .finish_non_exhaustive(),
541
542 ExecutionRequest::ReadMultiValuesBytes { id, keys, .. } => formatter
543 .debug_struct("ExecutionRequest::ReadMultiValuesBytes")
544 .field("id", id)
545 .field("keys", keys)
546 .finish_non_exhaustive(),
547
548 ExecutionRequest::FindKeysByPrefix { id, key_prefix, .. } => formatter
549 .debug_struct("ExecutionRequest::FindKeysByPrefix")
550 .field("id", id)
551 .field("key_prefix", key_prefix)
552 .finish_non_exhaustive(),
553
554 ExecutionRequest::FindKeyValuesByPrefix { id, key_prefix, .. } => formatter
555 .debug_struct("ExecutionRequest::FindKeyValuesByPrefix")
556 .field("id", id)
557 .field("key_prefix", key_prefix)
558 .finish_non_exhaustive(),
559
560 ExecutionRequest::WriteBatch { id, batch, .. } => formatter
561 .debug_struct("ExecutionRequest::WriteBatch")
562 .field("id", id)
563 .field("batch", batch)
564 .finish_non_exhaustive(),
565
566 ExecutionRequest::OpenChain { balance, .. } => formatter
567 .debug_struct("ExecutionRequest::OpenChain")
568 .field("balance", balance)
569 .finish_non_exhaustive(),
570
571 ExecutionRequest::CloseChain { application_id, .. } => formatter
572 .debug_struct("ExecutionRequest::CloseChain")
573 .field("application_id", application_id)
574 .finish_non_exhaustive(),
575
576 ExecutionRequest::FetchUrl { url, .. } => formatter
577 .debug_struct("ExecutionRequest::FetchUrl")
578 .field("url", url)
579 .finish_non_exhaustive(),
580
581 ExecutionRequest::HttpPost {
582 url, content_type, ..
583 } => formatter
584 .debug_struct("ExecutionRequest::HttpPost")
585 .field("url", url)
586 .field("content_type", content_type)
587 .finish_non_exhaustive(),
588
589 ExecutionRequest::ReadBlobContent { blob_id, .. } => formatter
590 .debug_struct("ExecutionRequest::ReadBlob")
591 .field("blob_id", blob_id)
592 .finish_non_exhaustive(),
593
594 ExecutionRequest::AssertBlobExists { blob_id, .. } => formatter
595 .debug_struct("ExecutionRequest::AssertBlobExists")
596 .field("blob_id", blob_id)
597 .finish_non_exhaustive(),
598 }
599 }
600}