1use std::collections::HashMap;
6use std::fmt::Display;
7use std::future::Future;
8use std::path::PathBuf;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::time::Instant;
14
15use futures::Stream;
16
17use either::Either;
18use freenet_stdlib::client_api::{
19 ClientError as WsClientError, ClientRequest, ContractError as StdContractError,
20 ContractRequest, ContractResponse, DelegateError as StdDelegateError, DelegateRequest,
21 HostResponse::{self, DelegateResponse},
22 RequestError,
23};
24use freenet_stdlib::prelude::*;
25use serde::{Deserialize, Serialize};
26use tokio::sync::{mpsc, oneshot};
27
28use super::storages::Storage;
29use crate::config::Config;
30use crate::message::Transaction;
31use crate::node::OpManager;
32use crate::operations::get::GetResult;
33use crate::operations::{OpEnum, OpError};
34use crate::wasm_runtime::{
35 ContractExecError, ContractRuntimeInterface, ContractStore, DelegateRuntimeInterface,
36 DelegateStore, Runtime, SecretsStore, StateStorage, StateStore, StateStoreError,
37};
38use crate::{
39 client_events::{ClientId, HostResult},
40 operations::{self, Operation},
41};
42
43pub(super) mod init_tracker;
44pub(super) mod mock_runtime;
45#[cfg(test)]
46mod pool_tests;
47pub(super) mod runtime;
48
49pub(crate) use init_tracker::{
50 ContractInitTracker, InitCheckResult, SLOW_INIT_THRESHOLD, STALE_INIT_THRESHOLD,
51};
52pub(crate) use runtime::RuntimePool;
53
54pub(crate) type OpRequestSender =
58 mpsc::Sender<(Transaction, oneshot::Sender<Result<OpEnum, OpRequestError>>)>;
59
60pub(crate) type OpRequestReceiver =
63 mpsc::Receiver<(Transaction, oneshot::Sender<Result<OpEnum, OpRequestError>>)>;
64
65pub(crate) fn op_request_channel() -> (OpRequestReceiver, OpRequestSender) {
71 let (tx, rx) = mpsc::channel(1000);
73 tracing::debug!(buffer_size = 1000, "Created op_request channel");
74 (rx, tx)
75}
76
77#[derive(Debug, Clone)]
79pub enum OpRequestError {
80 Failed(String),
82 ChannelClosed,
84}
85
86impl std::fmt::Display for OpRequestError {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 OpRequestError::Failed(msg) => write!(f, "Operation failed: {}", msg),
90 OpRequestError::ChannelClosed => write!(f, "Channel closed"),
91 }
92 }
93}
94
95impl std::error::Error for OpRequestError {}
96
97#[derive(Debug)]
98pub struct ExecutorError {
99 inner: Either<Box<RequestError>, anyhow::Error>,
100 fatal: bool,
101}
102
103enum InnerOpError {
104 Upsert(ContractKey),
105 Delegate(DelegateKey),
106}
107
108impl std::error::Error for ExecutorError {}
109
110impl ExecutorError {
111 pub fn other(error: impl Into<anyhow::Error>) -> Self {
112 Self {
113 inner: Either::Right(error.into()),
114 fatal: false,
115 }
116 }
117
118 fn internal_error() -> Self {
120 Self {
121 inner: Either::Right(anyhow::anyhow!("internal error")),
122 fatal: false,
123 }
124 }
125
126 fn request(error: impl Into<RequestError>) -> Self {
127 Self {
128 inner: Either::Left(Box::new(error.into())),
129 fatal: false,
130 }
131 }
132
133 fn execution(
134 outer_error: crate::wasm_runtime::ContractError,
135 op: Option<InnerOpError>,
136 ) -> Self {
137 use crate::wasm_runtime::RuntimeInnerError;
138 let error = outer_error.deref();
139
140 let mut fatal = false;
141 if let RuntimeInnerError::ContractExecError(e) = error {
142 if matches!(e, ContractExecError::MaxComputeTimeExceeded) {
143 fatal = true;
144 }
145 if let Some(InnerOpError::Upsert(key)) = &op {
146 return ExecutorError::request(StdContractError::update_exec_error(*key, e));
147 }
148 }
149
150 if let RuntimeInnerError::DelegateNotFound(key) = error {
151 return ExecutorError::request(StdDelegateError::Missing(key.clone()));
152 }
153
154 if let RuntimeInnerError::DelegateExecError(e) = error {
155 return ExecutorError::request(StdDelegateError::ExecutionError(format!("{e}").into()));
156 }
157
158 if let (
159 RuntimeInnerError::SecretStoreError(
160 crate::wasm_runtime::SecretStoreError::MissingSecret(secret),
161 ),
162 Some(InnerOpError::Delegate(key)),
163 ) = (error, &op)
164 {
165 return ExecutorError::request(StdDelegateError::MissingSecret {
166 key: key.clone(),
167 secret: secret.clone(),
168 });
169 }
170
171 match error {
172 RuntimeInnerError::WasmCompileError(e) => match op {
173 Some(InnerOpError::Upsert(key)) => {
174 return ExecutorError::request(StdContractError::update_exec_error(key, e))
175 }
176 _ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
177 },
178 RuntimeInnerError::WasmExportError(e) => match op {
179 Some(InnerOpError::Upsert(key)) => {
180 return ExecutorError::request(StdContractError::update_exec_error(key, e))
181 }
182 _ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
183 },
184 RuntimeInnerError::WasmInstantiationError(e) => match op {
185 Some(InnerOpError::Upsert(key)) => {
186 return ExecutorError::request(StdContractError::update_exec_error(key, e))
187 }
188 _ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
189 },
190 _ => {}
191 }
192
193 let mut err = ExecutorError::other(outer_error);
194 err.fatal = fatal;
195 err
196 }
197
198 pub fn is_request(&self) -> bool {
199 matches!(self.inner, Either::Left(_))
200 }
201
202 pub fn is_fatal(&self) -> bool {
203 self.fatal
204 }
205
206 pub fn unwrap_request(self) -> RequestError {
207 match self.inner {
208 Either::Left(err) => *err,
209 Either::Right(_) => panic!(),
210 }
211 }
212}
213
214impl From<RequestError> for ExecutorError {
215 fn from(value: RequestError) -> Self {
216 Self {
217 inner: Either::Left(Box::new(value)),
218 fatal: false,
219 }
220 }
221}
222
223impl Display for ExecutorError {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 match &self.inner {
226 Either::Left(l) => write!(f, "{}", &**l),
227 Either::Right(r) => write!(f, "{}", &**r),
228 }
229 }
230}
231
232impl From<Box<RequestError>> for ExecutorError {
233 fn from(value: Box<RequestError>) -> Self {
234 Self {
235 inner: Either::Left(value),
236 fatal: false,
237 }
238 }
239}
240
241type Response = Result<HostResponse, ExecutorError>;
242
243#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub enum OperationMode {
246 Local,
248 Network,
250}
251
252impl Display for OperationMode {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 match self {
255 OperationMode::Local => write!(f, "local"),
256 OperationMode::Network => write!(f, "network"),
257 }
258 }
259}
260
261pub struct ExecutorToEventLoopChannel<End: sealed::ChannelHalve> {
262 #[allow(dead_code)] op_manager: Arc<OpManager>,
264 end: End,
265}
266
267pub(crate) fn mediator_channels(
275 op_manager: Arc<OpManager>,
276) -> (
277 ExecutorToEventLoopChannel<NetworkEventListenerHalve>,
278 mpsc::Sender<Transaction>,
279 mpsc::Receiver<OpEnum>,
280) {
281 let (waiting_for_op_tx, waiting_for_op_rx) = mpsc::channel(1000);
282 let (response_for_tx, response_for_rx) = mpsc::channel(1000);
283
284 tracing::debug!(buffer_size = 1000, "Created mediator channels");
285
286 let listener_halve = ExecutorToEventLoopChannel {
287 op_manager,
288 end: NetworkEventListenerHalve {
289 waiting_for_op_rx,
290 response_for_tx,
291 },
292 };
293
294 (listener_halve, waiting_for_op_tx, response_for_rx)
295}
296
297const MAX_PENDING_REQUESTS: usize = 10_000;
300
301const STALE_REQUEST_THRESHOLD: Duration = Duration::from_secs(180);
304
305const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
307
308struct PendingRequest {
310 response_tx: oneshot::Sender<Result<OpEnum, OpRequestError>>,
311 created_at: Instant,
312}
313
314pub(crate) async fn run_op_request_mediator(
394 mut op_request_receiver: OpRequestReceiver,
395 to_event_loop_tx: mpsc::Sender<Transaction>,
396 mut from_event_loop_rx: mpsc::Receiver<OpEnum>,
397) {
398 use std::collections::BTreeMap;
399
400 let mut pending_responses: BTreeMap<Transaction, PendingRequest> = BTreeMap::new();
401 let mut cleanup_interval = tokio::time::interval(CLEANUP_INTERVAL);
402 cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
403
404 tracing::info!("Op request mediator starting");
405
406 loop {
407 tokio::select! {
408 biased;
410
411 Some((transaction, response_tx)) = op_request_receiver.recv() => {
413 tracing::trace!(
414 tx = %transaction,
415 pending_count = pending_responses.len(),
416 "Mediator received operation request"
417 );
418
419 if pending_responses.len() >= MAX_PENDING_REQUESTS {
421 tracing::warn!(
422 tx = %transaction,
423 max = MAX_PENDING_REQUESTS,
424 "Mediator at capacity, rejecting request"
425 );
426 let _ = response_tx.send(Err(OpRequestError::Failed(
427 "mediator at capacity".to_string()
428 )));
429 continue;
430 }
431
432 pending_responses.insert(transaction, PendingRequest {
434 response_tx,
435 created_at: Instant::now(),
436 });
437
438 if let Err(e) = to_event_loop_tx.send(transaction).await {
440 tracing::error!(
441 tx = %transaction,
442 error = %e,
443 "Failed to forward transaction to event loop - channel closed"
444 );
445 if let Some(pending) = pending_responses.remove(&transaction) {
447 let _ = pending.response_tx.send(Err(OpRequestError::ChannelClosed));
448 }
449 }
450 }
451
452 Some(op_result) = from_event_loop_rx.recv() => {
454 let transaction = *op_result.id();
455 tracing::trace!(
456 tx = %transaction,
457 pending_count = pending_responses.len(),
458 "Mediator received response from event loop"
459 );
460
461 if let Some(pending) = pending_responses.remove(&transaction) {
463 if pending.response_tx.send(Ok(op_result)).is_err() {
464 tracing::debug!(
465 tx = %transaction,
466 "Executor dropped before receiving response"
467 );
468 }
469 } else {
470 tracing::warn!(
471 tx = %transaction,
472 "Received response for unknown transaction - executor may have timed out"
473 );
474 }
475 }
476
477 _ = cleanup_interval.tick() => {
479 let now = Instant::now();
480 let stale_threshold = STALE_REQUEST_THRESHOLD;
481
482 let stale_txs: Vec<Transaction> = pending_responses
484 .iter()
485 .filter(|(_, pending)| now.duration_since(pending.created_at) > stale_threshold)
486 .map(|(tx, _)| *tx)
487 .collect();
488
489 if !stale_txs.is_empty() {
490 tracing::warn!(
491 stale_count = stale_txs.len(),
492 pending_count = pending_responses.len(),
493 threshold_secs = stale_threshold.as_secs(),
494 "Cleaning up stale pending requests"
495 );
496
497 for tx in stale_txs {
498 if let Some(pending) = pending_responses.remove(&tx) {
499 tracing::debug!(
500 tx = %tx,
501 age_secs = now.duration_since(pending.created_at).as_secs(),
502 "Removing stale pending request"
503 );
504 let _ = pending.response_tx.send(Err(OpRequestError::Failed(
506 "request exceeded stale threshold".to_string()
507 )));
508 }
509 }
510 }
511 }
512
513 else => {
515 tracing::info!(
516 pending_count = pending_responses.len(),
517 "Mediator channels closed, shutting down"
518 );
519 for (tx, pending) in std::mem::take(&mut pending_responses) {
521 tracing::debug!(tx = %tx, "Notifying orphaned waiter of shutdown");
522 let _ = pending.response_tx.send(Err(OpRequestError::ChannelClosed));
523 }
524 break;
525 }
526 }
527 }
528
529 tracing::info!("Op request mediator stopped");
530}
531
532impl Stream for ExecutorToEventLoopChannel<NetworkEventListenerHalve> {
533 type Item = Transaction;
534
535 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
536 Pin::new(&mut self.end.waiting_for_op_rx).poll_recv(cx)
537 }
538}
539
540impl ExecutorToEventLoopChannel<Callback> {
541 pub async fn response(&mut self, result: OpEnum) {
542 let tx_id = *result.id();
543 if self.end.response_for_tx.send(result).await.is_err() {
544 tracing::debug!(
545 tx = %tx_id,
546 "Failed to send response to executor - channel closed"
547 );
548 }
549 }
550}
551
552pub(crate) struct Callback {
553 response_for_tx: mpsc::Sender<OpEnum>,
555}
556
557#[allow(dead_code)] pub(crate) struct NetworkEventListenerHalve {
559 waiting_for_op_rx: mpsc::Receiver<Transaction>,
562 response_for_tx: mpsc::Sender<OpEnum>,
565}
566
567mod sealed {
568 use super::{Callback, NetworkEventListenerHalve};
569 pub trait ChannelHalve {}
570 impl ChannelHalve for NetworkEventListenerHalve {}
571 impl ChannelHalve for Callback {}
572}
573
574trait ComposeNetworkMessage<Op>
575where
576 Self: Sized,
577 Op: Operation + Send + 'static,
578{
579 fn initiate_op(self, op_manager: &OpManager) -> Op;
580
581 fn resume_op(
582 op: Op,
583 op_manager: &OpManager,
584 ) -> impl Future<Output = Result<(), OpError>> + Send;
585}
586
587#[allow(unused)]
588struct GetContract {
589 instance_id: ContractInstanceId,
590 return_contract_code: bool,
591}
592
593impl ComposeNetworkMessage<operations::get::GetOp> for GetContract {
594 fn initiate_op(self, _op_manager: &OpManager) -> operations::get::GetOp {
595 operations::get::start_op(self.instance_id, self.return_contract_code, false)
596 }
597
598 async fn resume_op(op: operations::get::GetOp, op_manager: &OpManager) -> Result<(), OpError> {
599 let visited = operations::VisitedPeers::new(&op.id);
600 operations::get::request_get(op_manager, op, visited).await
601 }
602}
603
604#[allow(unused)]
605struct SubscribeContract {
606 instance_id: ContractInstanceId,
607}
608
609impl ComposeNetworkMessage<operations::subscribe::SubscribeOp> for SubscribeContract {
610 fn initiate_op(self, _op_manager: &OpManager) -> operations::subscribe::SubscribeOp {
611 operations::subscribe::start_op(self.instance_id)
612 }
613
614 async fn resume_op(
615 op: operations::subscribe::SubscribeOp,
616 op_manager: &OpManager,
617 ) -> Result<(), OpError> {
618 operations::subscribe::request_subscribe(op_manager, op).await
619 }
620}
621
622struct UpdateContract {
623 key: ContractKey,
624 new_state: WrappedState,
625}
626
627#[derive(Debug)]
628pub(crate) enum UpsertResult {
629 NoChange,
630 Updated(WrappedState),
631}
632
633impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
634 fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp {
635 let UpdateContract { key, new_state } = self;
636 let related_contracts = RelatedContracts::default();
637 let update_data = freenet_stdlib::prelude::UpdateData::State(
641 freenet_stdlib::prelude::State::from(new_state),
642 );
643 operations::update::start_op(key, update_data, related_contracts)
644 }
645
646 async fn resume_op(
647 op: operations::update::UpdateOp,
648 op_manager: &OpManager,
649 ) -> Result<(), OpError> {
650 operations::update::request_update(op_manager, op).await
651 }
652}
653
654pub(crate) trait ContractExecutor: Send + 'static {
655 fn lookup_key(&self, instance_id: &ContractInstanceId) -> Option<ContractKey>;
658
659 fn fetch_contract(
660 &mut self,
661 key: ContractKey,
662 return_contract_code: bool,
663 ) -> impl Future<Output = Result<(Option<WrappedState>, Option<ContractContainer>), ExecutorError>>
664 + Send;
665
666 fn upsert_contract_state(
667 &mut self,
668 key: ContractKey,
669 update: Either<WrappedState, StateDelta<'static>>,
670 related_contracts: RelatedContracts<'static>,
671 code: Option<ContractContainer>,
672 ) -> impl Future<Output = Result<UpsertResult, ExecutorError>> + Send;
673
674 fn register_contract_notifier(
675 &mut self,
676 key: ContractInstanceId,
677 cli_id: ClientId,
678 notification_ch: tokio::sync::mpsc::UnboundedSender<HostResult>,
679 summary: Option<StateSummary<'_>>,
680 ) -> Result<(), Box<RequestError>>;
681
682 fn execute_delegate_request(
683 &mut self,
684 req: DelegateRequest<'_>,
685 attested_contract: Option<&ContractInstanceId>,
686 ) -> Response;
687
688 fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo>;
689}
690
691type SharedNotifications = Arc<
701 std::sync::RwLock<
702 HashMap<ContractInstanceId, Vec<(ClientId, mpsc::UnboundedSender<HostResult>)>>,
703 >,
704>;
705
706type SharedSummaries = Arc<
708 std::sync::RwLock<
709 HashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>,
710 >,
711>;
712
713pub struct Executor<R = Runtime, S: StateStorage = Storage> {
714 mode: OperationMode,
715 runtime: R,
716 pub state_store: StateStore<S>,
717 update_notifications:
720 HashMap<ContractInstanceId, Vec<(ClientId, mpsc::UnboundedSender<HostResult>)>>,
721 subscriber_summaries:
724 HashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>,
725 delegate_attested_ids: HashMap<DelegateKey, Vec<ContractInstanceId>>,
727 init_tracker: ContractInitTracker,
729
730 op_sender: Option<OpRequestSender>,
732 op_manager: Option<Arc<OpManager>>,
734
735 shared_notifications: Option<SharedNotifications>,
740 shared_summaries: Option<SharedSummaries>,
742}
743
744impl<R, S> Executor<R, S>
745where
746 S: StateStorage + Send + 'static,
747 <S as StateStorage>::Error: Into<anyhow::Error>,
748{
749 pub(crate) async fn new(
752 state_store: StateStore<S>,
753 ctrl_handler: impl FnOnce() -> anyhow::Result<()>,
754 mode: OperationMode,
755 runtime: R,
756 op_sender: Option<OpRequestSender>,
757 op_manager: Option<Arc<OpManager>>,
758 ) -> anyhow::Result<Self> {
759 ctrl_handler()?;
760
761 Ok(Self {
762 mode,
763 runtime,
764 state_store,
765 update_notifications: HashMap::default(),
766 subscriber_summaries: HashMap::default(),
767 delegate_attested_ids: HashMap::default(),
768 init_tracker: ContractInitTracker::new(),
769 op_sender,
770 op_manager,
771 shared_notifications: None,
772 shared_summaries: None,
773 })
774 }
775
776 pub fn test_data_dir(identifier: &str) -> PathBuf {
777 use std::sync::atomic::{AtomicU64, Ordering};
778 static COUNTER: AtomicU64 = AtomicU64::new(0);
779 let unique_id = COUNTER.fetch_add(1, Ordering::Relaxed);
780 std::env::temp_dir().join(format!(
781 "freenet-executor-{identifier}-{}-{unique_id}",
782 std::process::id()
783 ))
784 }
785
786 pub(crate) fn set_shared_notifications(
790 &mut self,
791 notifications: SharedNotifications,
792 summaries: SharedSummaries,
793 ) {
794 self.shared_notifications = Some(notifications);
795 self.shared_summaries = Some(summaries);
796 }
797
798 pub(crate) async fn get_stores(
800 config: &Config,
801 ) -> Result<
802 (
803 ContractStore,
804 DelegateStore,
805 SecretsStore,
806 StateStore<Storage>,
807 ),
808 anyhow::Error,
809 > {
810 const MAX_MEM_CACHE: u32 = 10_000_000;
811
812 let state_store =
813 StateStore::new(Storage::new(&config.db_dir()).await?, MAX_MEM_CACHE).unwrap();
814 let (contract_store, delegate_store, secret_store) = Self::get_runtime_stores(config)?;
815
816 Ok((contract_store, delegate_store, secret_store, state_store))
817 }
818
819 pub(crate) fn get_runtime_stores(
822 config: &Config,
823 ) -> Result<(ContractStore, DelegateStore, SecretsStore), anyhow::Error> {
824 const MAX_SIZE: i64 = 10 * 1024 * 1024;
825
826 let contract_store = ContractStore::new(config.contracts_dir(), MAX_SIZE)?;
827 let delegate_store = DelegateStore::new(config.delegates_dir(), MAX_SIZE)?;
828 let secret_store = SecretsStore::new(config.secrets_dir(), config.secrets.clone())?;
829
830 Ok((contract_store, delegate_store, secret_store))
831 }
832
833 async fn op_request<Op, M>(&mut self, request: M) -> Result<Op::Result, ExecutorError>
834 where
835 Op: Operation + Send + TryFrom<OpEnum, Error = OpError> + 'static,
836 <Op as Operation>::Result: TryFrom<Op, Error = OpError>,
837 M: ComposeNetworkMessage<Op>,
838 {
839 let (op_sender, op_manager) = match (&self.op_sender, &self.op_manager) {
840 (Some(sender), Some(manager)) => (sender.clone(), manager.clone()),
841 _ => {
842 return Err(ExecutorError::other(anyhow::anyhow!(
843 "missing op_sender or op_manager"
844 )));
845 }
846 };
847
848 let op = request.initiate_op(&op_manager);
850 let transaction = *op.id();
851
852 let (response_tx, response_rx) = oneshot::channel();
854
855 op_sender
857 .send((transaction, response_tx))
858 .await
859 .map_err(|_| ExecutorError::other(anyhow::anyhow!("event loop channel closed")))?;
860
861 <M as ComposeNetworkMessage<Op>>::resume_op(op, &op_manager)
863 .await
864 .map_err(|e| {
865 tracing::debug!(
866 tx = %transaction,
867 error = %e,
868 "Failed to resume operation"
869 );
870 ExecutorError::other(e)
871 })?;
872
873 const OP_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
875 let op_result = tokio::time::timeout(OP_REQUEST_TIMEOUT, response_rx)
876 .await
877 .map_err(|_| {
878 tracing::warn!(
879 tx = %transaction,
880 timeout_secs = OP_REQUEST_TIMEOUT.as_secs(),
881 "Network operation timed out waiting for response"
882 );
883 ExecutorError::other(anyhow::anyhow!(
884 "network operation timed out after {} seconds",
885 OP_REQUEST_TIMEOUT.as_secs()
886 ))
887 })?
888 .map_err(|_| {
889 ExecutorError::other(anyhow::anyhow!(
890 "response channel closed before receiving result"
891 ))
892 })?;
893
894 let op_enum = op_result.map_err(|e| ExecutorError::other(anyhow::anyhow!("{}", e)))?;
896
897 let op: Op = op_enum.try_into().map_err(|err: OpError| {
899 tracing::error!(
900 tx = %transaction,
901 error = %err,
902 "Expected message of one type but got another"
903 );
904 ExecutorError::other(err)
905 })?;
906
907 let result = <Op::Result>::try_from(op).map_err(|err| {
909 tracing::debug!(
910 tx = %transaction,
911 error = %err,
912 "Failed to convert operation result"
913 );
914 ExecutorError::other(err)
915 })?;
916
917 Ok(result)
918 }
919
920 pub fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo> {
921 let mut subscriptions = Vec::new();
922 for (instance_id, client_list) in &self.update_notifications {
923 for (client_id, _channel) in client_list {
924 subscriptions.push(crate::message::SubscriptionInfo {
925 instance_id: *instance_id,
926 client_id: *client_id,
927 last_update: None,
928 });
929 }
930 }
931 subscriptions
932 }
933}
934
935#[cfg(test)]
940pub(crate) mod test_fixtures {
941 use freenet_stdlib::prelude::*;
942
943 pub fn make_contract_key() -> ContractKey {
945 let code = ContractCode::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
946 let params = Parameters::from(vec![10, 20, 30, 40]);
947 ContractKey::from_params_and_code(¶ms, &code)
948 }
949
950 pub fn make_contract_key_with_code(code_bytes: &[u8]) -> ContractKey {
952 let code = ContractCode::from(code_bytes.to_vec());
953 let params = Parameters::from(vec![10, 20, 30, 40]);
954 ContractKey::from_params_and_code(¶ms, &code)
955 }
956
957 pub fn make_state(data: &[u8]) -> WrappedState {
959 WrappedState::new(data.to_vec())
960 }
961
962 pub fn make_params(data: &[u8]) -> Parameters<'static> {
964 Parameters::from(data.to_vec())
965 }
966
967 pub fn make_delta(data: &[u8]) -> StateDelta<'static> {
969 StateDelta::from(data.to_vec())
970 }
971}
972
973#[cfg(test)]
974mod tests {
975 use super::*;
976
977 mod executor_error_tests {
978 use super::*;
979
980 #[test]
981 fn test_executor_error_other_is_not_request() {
982 let err = ExecutorError::other(anyhow::anyhow!("some error"));
983 assert!(!err.is_request());
984 assert!(!err.is_fatal());
985 }
986
987 #[test]
988 fn test_executor_error_request_is_request() {
989 let err = ExecutorError::request(StdContractError::Put {
990 key: test_fixtures::make_contract_key(),
991 cause: "test".into(),
992 });
993 assert!(err.is_request());
994 assert!(!err.is_fatal());
995 }
996
997 #[test]
998 fn test_executor_error_internal_error() {
999 let err = ExecutorError::internal_error();
1000 assert!(!err.is_request());
1001 assert!(!err.is_fatal());
1002 assert!(err.to_string().contains("internal error"));
1003 }
1004
1005 #[test]
1006 fn test_executor_error_display_left() {
1007 let err = ExecutorError::request(StdContractError::Put {
1008 key: test_fixtures::make_contract_key(),
1009 cause: "test cause".into(),
1010 });
1011 let display = err.to_string();
1012 assert!(display.contains("test cause") || display.contains("Put"));
1013 }
1014
1015 #[test]
1016 fn test_executor_error_display_right() {
1017 let err = ExecutorError::other(anyhow::anyhow!("custom error message"));
1018 assert!(err.to_string().contains("custom error message"));
1019 }
1020
1021 #[test]
1022 fn test_executor_error_from_request_error() {
1023 let request_err = RequestError::ContractError(StdContractError::Put {
1024 key: test_fixtures::make_contract_key(),
1025 cause: "from conversion".into(),
1026 });
1027 let err: ExecutorError = request_err.into();
1028 assert!(err.is_request());
1029 }
1030
1031 #[test]
1032 fn test_executor_error_from_boxed_request_error() {
1033 let request_err = Box::new(RequestError::ContractError(StdContractError::Put {
1034 key: test_fixtures::make_contract_key(),
1035 cause: "boxed".into(),
1036 }));
1037 let err: ExecutorError = request_err.into();
1038 assert!(err.is_request());
1039 }
1040
1041 #[test]
1042 fn test_unwrap_request_succeeds_for_request_error() {
1043 let key = test_fixtures::make_contract_key();
1044 let err = ExecutorError::request(StdContractError::Put {
1045 key,
1046 cause: "unwrap test".into(),
1047 });
1048 let _unwrapped = err.unwrap_request(); }
1050
1051 #[test]
1052 #[should_panic]
1053 fn test_unwrap_request_panics_for_other_error() {
1054 let err = ExecutorError::other(anyhow::anyhow!("not a request"));
1055 let _unwrapped = err.unwrap_request(); }
1057 }
1058
1059 mod test_fixtures_tests {
1060 use super::*;
1061
1062 #[test]
1063 fn test_make_contract_key_is_consistent() {
1064 let key1 = test_fixtures::make_contract_key();
1065 let key2 = test_fixtures::make_contract_key();
1066 assert_eq!(key1, key2);
1067 }
1068
1069 #[test]
1070 fn test_make_contract_key_with_different_code() {
1071 let key1 = test_fixtures::make_contract_key_with_code(&[1, 2, 3]);
1072 let key2 = test_fixtures::make_contract_key_with_code(&[4, 5, 6]);
1073 assert_ne!(key1, key2);
1074 }
1075
1076 #[test]
1077 fn test_make_state() {
1078 let state = test_fixtures::make_state(&[1, 2, 3, 4]);
1079 assert_eq!(state.as_ref(), &[1, 2, 3, 4]);
1080 }
1081
1082 #[test]
1083 fn test_make_params() {
1084 let params = test_fixtures::make_params(&[10, 20]);
1085 assert_eq!(params.as_ref(), &[10, 20]);
1086 }
1087
1088 #[test]
1089 fn test_make_delta() {
1090 let delta = test_fixtures::make_delta(&[100, 200]);
1091 assert_eq!(delta.as_ref(), &[100, 200]);
1092 }
1093 }
1094}