freenet/contract/
executor.rs

1//! Executes WASM contract and delegate code within a sandboxed environment (`WasmRuntime`).
2//! Communicates with the `ContractHandler` and potentially the `OpManager` (via `ExecutorToEventLoopChannel`).
3//! See `architecture.md`.
4
5use std::collections::HashMap;
6use std::fmt::Display;
7use std::future::Future;
8use std::path::PathBuf;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::time::Instant;
14
15use futures::Stream;
16
17use either::Either;
18use freenet_stdlib::client_api::{
19    ClientError as WsClientError, ClientRequest, ContractError as StdContractError,
20    ContractRequest, ContractResponse, DelegateError as StdDelegateError, DelegateRequest,
21    HostResponse::{self, DelegateResponse},
22    RequestError,
23};
24use freenet_stdlib::prelude::*;
25use serde::{Deserialize, Serialize};
26use tokio::sync::{mpsc, oneshot};
27
28use super::storages::Storage;
29use crate::config::Config;
30use crate::message::Transaction;
31use crate::node::OpManager;
32use crate::operations::get::GetResult;
33use crate::operations::{OpEnum, OpError};
34use crate::wasm_runtime::{
35    ContractExecError, ContractRuntimeInterface, ContractStore, DelegateRuntimeInterface,
36    DelegateStore, Runtime, SecretsStore, StateStorage, StateStore, StateStoreError,
37};
38use crate::{
39    client_events::{ClientId, HostResult},
40    operations::{self, Operation},
41};
42
43pub(super) mod init_tracker;
44pub(super) mod mock_runtime;
45#[cfg(test)]
46mod pool_tests;
47pub(super) mod runtime;
48
49pub(crate) use init_tracker::{
50    ContractInitTracker, InitCheckResult, SLOW_INIT_THRESHOLD, STALE_INIT_THRESHOLD,
51};
52pub(crate) use runtime::RuntimePool;
53
54/// Type alias for the channel used to send operation requests from executors to the event loop.
55/// Each request includes a transaction ID and a oneshot sender for the response.
56/// This sender is cloneable, allowing multiple executors to share access to the event loop.
57pub(crate) type OpRequestSender =
58    mpsc::Sender<(Transaction, oneshot::Sender<Result<OpEnum, OpRequestError>>)>;
59
60/// Type alias for the receiver side of the operation request channel.
61/// This is held by the event loop to receive requests from all executors.
62pub(crate) type OpRequestReceiver =
63    mpsc::Receiver<(Transaction, oneshot::Sender<Result<OpEnum, OpRequestError>>)>;
64
65/// Create a channel pair for operation requests from executors to the event loop.
66///
67/// Returns:
68/// - `OpRequestReceiver`: Held by the event loop to receive operation requests
69/// - `OpRequestSender`: Cloneable sender given to executors to send requests
70pub(crate) fn op_request_channel() -> (OpRequestReceiver, OpRequestSender) {
71    // Buffer size matches the old executor_channel for consistency
72    let (tx, rx) = mpsc::channel(1000);
73    tracing::debug!(buffer_size = 1000, "Created op_request channel");
74    (rx, tx)
75}
76
77/// Error type for operation requests that can be sent over channels.
78#[derive(Debug, Clone)]
79pub enum OpRequestError {
80    /// The operation failed with an error message
81    Failed(String),
82    /// The channel was closed before a response was received
83    ChannelClosed,
84}
85
86impl std::fmt::Display for OpRequestError {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            OpRequestError::Failed(msg) => write!(f, "Operation failed: {}", msg),
90            OpRequestError::ChannelClosed => write!(f, "Channel closed"),
91        }
92    }
93}
94
95impl std::error::Error for OpRequestError {}
96
97#[derive(Debug)]
98pub struct ExecutorError {
99    inner: Either<Box<RequestError>, anyhow::Error>,
100    fatal: bool,
101}
102
103enum InnerOpError {
104    Upsert(ContractKey),
105    Delegate(DelegateKey),
106}
107
108impl std::error::Error for ExecutorError {}
109
110impl ExecutorError {
111    pub fn other(error: impl Into<anyhow::Error>) -> Self {
112        Self {
113            inner: Either::Right(error.into()),
114            fatal: false,
115        }
116    }
117
118    /// Call this when an unreachable path is reached but need to avoid panics.
119    fn internal_error() -> Self {
120        Self {
121            inner: Either::Right(anyhow::anyhow!("internal error")),
122            fatal: false,
123        }
124    }
125
126    fn request(error: impl Into<RequestError>) -> Self {
127        Self {
128            inner: Either::Left(Box::new(error.into())),
129            fatal: false,
130        }
131    }
132
133    fn execution(
134        outer_error: crate::wasm_runtime::ContractError,
135        op: Option<InnerOpError>,
136    ) -> Self {
137        use crate::wasm_runtime::RuntimeInnerError;
138        let error = outer_error.deref();
139
140        let mut fatal = false;
141        if let RuntimeInnerError::ContractExecError(e) = error {
142            if matches!(e, ContractExecError::MaxComputeTimeExceeded) {
143                fatal = true;
144            }
145            if let Some(InnerOpError::Upsert(key)) = &op {
146                return ExecutorError::request(StdContractError::update_exec_error(*key, e));
147            }
148        }
149
150        if let RuntimeInnerError::DelegateNotFound(key) = error {
151            return ExecutorError::request(StdDelegateError::Missing(key.clone()));
152        }
153
154        if let RuntimeInnerError::DelegateExecError(e) = error {
155            return ExecutorError::request(StdDelegateError::ExecutionError(format!("{e}").into()));
156        }
157
158        if let (
159            RuntimeInnerError::SecretStoreError(
160                crate::wasm_runtime::SecretStoreError::MissingSecret(secret),
161            ),
162            Some(InnerOpError::Delegate(key)),
163        ) = (error, &op)
164        {
165            return ExecutorError::request(StdDelegateError::MissingSecret {
166                key: key.clone(),
167                secret: secret.clone(),
168            });
169        }
170
171        match error {
172            RuntimeInnerError::WasmCompileError(e) => match op {
173                Some(InnerOpError::Upsert(key)) => {
174                    return ExecutorError::request(StdContractError::update_exec_error(key, e))
175                }
176                _ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
177            },
178            RuntimeInnerError::WasmExportError(e) => match op {
179                Some(InnerOpError::Upsert(key)) => {
180                    return ExecutorError::request(StdContractError::update_exec_error(key, e))
181                }
182                _ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
183            },
184            RuntimeInnerError::WasmInstantiationError(e) => match op {
185                Some(InnerOpError::Upsert(key)) => {
186                    return ExecutorError::request(StdContractError::update_exec_error(key, e))
187                }
188                _ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
189            },
190            _ => {}
191        }
192
193        let mut err = ExecutorError::other(outer_error);
194        err.fatal = fatal;
195        err
196    }
197
198    pub fn is_request(&self) -> bool {
199        matches!(self.inner, Either::Left(_))
200    }
201
202    pub fn is_fatal(&self) -> bool {
203        self.fatal
204    }
205
206    pub fn unwrap_request(self) -> RequestError {
207        match self.inner {
208            Either::Left(err) => *err,
209            Either::Right(_) => panic!(),
210        }
211    }
212}
213
214impl From<RequestError> for ExecutorError {
215    fn from(value: RequestError) -> Self {
216        Self {
217            inner: Either::Left(Box::new(value)),
218            fatal: false,
219        }
220    }
221}
222
223impl Display for ExecutorError {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        match &self.inner {
226            Either::Left(l) => write!(f, "{}", &**l),
227            Either::Right(r) => write!(f, "{}", &**r),
228        }
229    }
230}
231
232impl From<Box<RequestError>> for ExecutorError {
233    fn from(value: Box<RequestError>) -> Self {
234        Self {
235            inner: Either::Left(value),
236            fatal: false,
237        }
238    }
239}
240
241type Response = Result<HostResponse, ExecutorError>;
242
243#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub enum OperationMode {
246    /// Run the node in local-only mode. Useful for development purposes.
247    Local,
248    /// Standard operation mode.
249    Network,
250}
251
252impl Display for OperationMode {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        match self {
255            OperationMode::Local => write!(f, "local"),
256            OperationMode::Network => write!(f, "network"),
257        }
258    }
259}
260
261pub struct ExecutorToEventLoopChannel<End: sealed::ChannelHalve> {
262    #[allow(dead_code)] // Used for reference in callback pattern
263    op_manager: Arc<OpManager>,
264    end: End,
265}
266
267/// Creates channels for the mediator that bridges between the new OpRequest channel
268/// (used by pooled executors) and the old ExecutorToEventLoop channel (used by the event loop).
269///
270/// Returns:
271/// - `ExecutorToEventLoopChannel<NetworkEventListenerHalve>`: For the event loop
272/// - `mpsc::Sender<Transaction>`: For the mediator to send transactions to the event loop
273/// - `mpsc::Receiver<OpEnum>`: For the mediator to receive responses from the event loop
274pub(crate) fn mediator_channels(
275    op_manager: Arc<OpManager>,
276) -> (
277    ExecutorToEventLoopChannel<NetworkEventListenerHalve>,
278    mpsc::Sender<Transaction>,
279    mpsc::Receiver<OpEnum>,
280) {
281    let (waiting_for_op_tx, waiting_for_op_rx) = mpsc::channel(1000);
282    let (response_for_tx, response_for_rx) = mpsc::channel(1000);
283
284    tracing::debug!(buffer_size = 1000, "Created mediator channels");
285
286    let listener_halve = ExecutorToEventLoopChannel {
287        op_manager,
288        end: NetworkEventListenerHalve {
289            waiting_for_op_rx,
290            response_for_tx,
291        },
292    };
293
294    (listener_halve, waiting_for_op_tx, response_for_rx)
295}
296
297/// Maximum number of pending requests before the mediator starts rejecting new ones.
298/// This prevents unbounded memory growth if the event loop is slow or unresponsive.
299const MAX_PENDING_REQUESTS: usize = 10_000;
300
301/// How long to wait before cleaning up stale pending requests.
302/// This should be longer than OP_REQUEST_TIMEOUT to allow normal timeout handling.
303const STALE_REQUEST_THRESHOLD: Duration = Duration::from_secs(180);
304
305/// How often to run the stale request cleanup.
306const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
307
308/// Entry in the pending responses map, tracking when the request was added.
309struct PendingRequest {
310    response_tx: oneshot::Sender<Result<OpEnum, OpRequestError>>,
311    created_at: Instant,
312}
313
314/// Mediator task that bridges between the new `OpRequestReceiver` (from pooled executors)
315/// and the old event loop channels.
316///
317/// This allows multiple executors to share access to the event loop through a cloneable
318/// `OpRequestSender`, while the event loop continues to use the existing
319/// `ExecutorToEventLoopChannel<NetworkEventListenerHalve>` interface.
320///
321/// # Architecture
322///
323/// ```text
324/// ┌──────────────┐         ┌──────────────┐         ┌──────────────┐
325/// │  Executor 1  │────┐    │              │    ┌────│  Event Loop  │
326/// ├──────────────┤    │    │   Mediator   │    │    ├──────────────┤
327/// │  Executor 2  │────┼───▶│              │────┼───▶│  Operations  │
328/// ├──────────────┤    │    │  (pending    │    │    │  Processing  │
329/// │  Executor N  │────┘    │   HashMap)   │◀───┘    └──────────────┘
330/// └──────────────┘         └──────────────┘
331/// ```
332///
333/// # Workflow
334///
335/// 1. Receives (Transaction, oneshot::Sender) from executors via `op_request_receiver`
336/// 2. Forwards the Transaction to the event loop via `to_event_loop_tx`
337/// 3. Stores the oneshot sender keyed by transaction in `pending_responses`
338/// 4. Receives responses from the event loop via `from_event_loop_rx`
339/// 5. Routes responses back to the correct executor via the stored oneshot sender
340/// 6. Periodically cleans up stale pending requests to prevent memory leaks
341///
342/// # Failure Scenarios and Recovery
343///
344/// ## Executor Drops Before Response
345///
346/// **Scenario**: An executor times out or is dropped before receiving its response.
347/// **Detection**: The `oneshot::Sender::send()` returns `Err` when the receiver is dropped.
348/// **Recovery**: The mediator logs a debug message and continues. No cleanup needed since
349/// the entry is removed from `pending_responses` when the response arrives.
350///
351/// ## Event Loop Channel Closes
352///
353/// **Scenario**: The event loop crashes or its receiving channel is dropped.
354/// **Detection**: `to_event_loop_tx.send()` returns `SendError`.
355/// **Recovery**: The mediator removes the pending request and notifies the executor
356/// with `OpRequestError::ChannelClosed`. The mediator continues running to handle
357/// cleanup of remaining pending requests.
358///
359/// ## Mediator Capacity Exceeded
360///
361/// **Scenario**: More than `MAX_PENDING_REQUESTS` (10,000) concurrent requests.
362/// **Detection**: `pending_responses.len() >= MAX_PENDING_REQUESTS`
363/// **Recovery**: New requests are immediately rejected with an error. This provides
364/// backpressure to prevent unbounded memory growth. Existing requests continue processing.
365///
366/// ## Stale Request Cleanup
367///
368/// **Scenario**: Requests that have been pending longer than `STALE_REQUEST_THRESHOLD` (180s).
369/// **Detection**: Periodic cleanup runs every `CLEANUP_INTERVAL` (30s) and checks timestamps.
370/// **Recovery**: Stale entries are removed from `pending_responses` and their executors are
371/// notified with an error. This handles edge cases where responses are never received
372/// (e.g., network partitions, event loop bugs).
373///
374/// ## Unknown Transaction Response
375///
376/// **Scenario**: Response received for a transaction not in `pending_responses`.
377/// **Detection**: `pending_responses.remove()` returns `None`.
378/// **Recovery**: The mediator logs a warning. This can happen legitimately when an executor
379/// times out and its response arrives later. No action needed.
380///
381/// ## All Channels Close
382///
383/// **Scenario**: Both the executor channel and event loop channel are closed.
384/// **Detection**: `tokio::select!` returns from the `else` branch.
385/// **Recovery**: The mediator notifies all remaining pending executors with `ChannelClosed`
386/// error, then exits gracefully.
387///
388/// # Thread Safety
389///
390/// The mediator is designed to be run as a single task. It is not `Sync` because
391/// it holds mutable state (`pending_responses`). The `OpRequestSender` is cloneable
392/// and can be shared across multiple executor tasks.
393pub(crate) async fn run_op_request_mediator(
394    mut op_request_receiver: OpRequestReceiver,
395    to_event_loop_tx: mpsc::Sender<Transaction>,
396    mut from_event_loop_rx: mpsc::Receiver<OpEnum>,
397) {
398    use std::collections::BTreeMap;
399
400    let mut pending_responses: BTreeMap<Transaction, PendingRequest> = BTreeMap::new();
401    let mut cleanup_interval = tokio::time::interval(CLEANUP_INTERVAL);
402    cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
403
404    tracing::info!("Op request mediator starting");
405
406    loop {
407        tokio::select! {
408            // DST: biased; ensures deterministic branch selection order
409            biased;
410
411            // Receive new operation requests from executors
412            Some((transaction, response_tx)) = op_request_receiver.recv() => {
413                tracing::trace!(
414                    tx = %transaction,
415                    pending_count = pending_responses.len(),
416                    "Mediator received operation request"
417                );
418
419                // Check if we're at capacity
420                if pending_responses.len() >= MAX_PENDING_REQUESTS {
421                    tracing::warn!(
422                        tx = %transaction,
423                        max = MAX_PENDING_REQUESTS,
424                        "Mediator at capacity, rejecting request"
425                    );
426                    let _ = response_tx.send(Err(OpRequestError::Failed(
427                        "mediator at capacity".to_string()
428                    )));
429                    continue;
430                }
431
432                // Store the response channel with timestamp
433                pending_responses.insert(transaction, PendingRequest {
434                    response_tx,
435                    created_at: Instant::now(),
436                });
437
438                // Forward transaction to event loop
439                if let Err(e) = to_event_loop_tx.send(transaction).await {
440                    tracing::error!(
441                        tx = %transaction,
442                        error = %e,
443                        "Failed to forward transaction to event loop - channel closed"
444                    );
445                    // Remove and notify the waiting executor
446                    if let Some(pending) = pending_responses.remove(&transaction) {
447                        let _ = pending.response_tx.send(Err(OpRequestError::ChannelClosed));
448                    }
449                }
450            }
451
452            // Receive responses from event loop
453            Some(op_result) = from_event_loop_rx.recv() => {
454                let transaction = *op_result.id();
455                tracing::trace!(
456                    tx = %transaction,
457                    pending_count = pending_responses.len(),
458                    "Mediator received response from event loop"
459                );
460
461                // Route response to the waiting executor
462                if let Some(pending) = pending_responses.remove(&transaction) {
463                    if pending.response_tx.send(Ok(op_result)).is_err() {
464                        tracing::debug!(
465                            tx = %transaction,
466                            "Executor dropped before receiving response"
467                        );
468                    }
469                } else {
470                    tracing::warn!(
471                        tx = %transaction,
472                        "Received response for unknown transaction - executor may have timed out"
473                    );
474                }
475            }
476
477            // Periodic cleanup of stale requests
478            _ = cleanup_interval.tick() => {
479                let now = Instant::now();
480                let stale_threshold = STALE_REQUEST_THRESHOLD;
481
482                // Collect stale transaction IDs
483                let stale_txs: Vec<Transaction> = pending_responses
484                    .iter()
485                    .filter(|(_, pending)| now.duration_since(pending.created_at) > stale_threshold)
486                    .map(|(tx, _)| *tx)
487                    .collect();
488
489                if !stale_txs.is_empty() {
490                    tracing::warn!(
491                        stale_count = stale_txs.len(),
492                        pending_count = pending_responses.len(),
493                        threshold_secs = stale_threshold.as_secs(),
494                        "Cleaning up stale pending requests"
495                    );
496
497                    for tx in stale_txs {
498                        if let Some(pending) = pending_responses.remove(&tx) {
499                            tracing::debug!(
500                                tx = %tx,
501                                age_secs = now.duration_since(pending.created_at).as_secs(),
502                                "Removing stale pending request"
503                            );
504                            // Try to notify the executor (likely already timed out)
505                            let _ = pending.response_tx.send(Err(OpRequestError::Failed(
506                                "request exceeded stale threshold".to_string()
507                            )));
508                        }
509                    }
510                }
511            }
512
513            // Both channels closed - exit
514            else => {
515                tracing::info!(
516                    pending_count = pending_responses.len(),
517                    "Mediator channels closed, shutting down"
518                );
519                // Notify any remaining waiters
520                for (tx, pending) in std::mem::take(&mut pending_responses) {
521                    tracing::debug!(tx = %tx, "Notifying orphaned waiter of shutdown");
522                    let _ = pending.response_tx.send(Err(OpRequestError::ChannelClosed));
523                }
524                break;
525            }
526        }
527    }
528
529    tracing::info!("Op request mediator stopped");
530}
531
532impl Stream for ExecutorToEventLoopChannel<NetworkEventListenerHalve> {
533    type Item = Transaction;
534
535    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
536        Pin::new(&mut self.end.waiting_for_op_rx).poll_recv(cx)
537    }
538}
539
540impl ExecutorToEventLoopChannel<Callback> {
541    pub async fn response(&mut self, result: OpEnum) {
542        let tx_id = *result.id();
543        if self.end.response_for_tx.send(result).await.is_err() {
544            tracing::debug!(
545                tx = %tx_id,
546                "Failed to send response to executor - channel closed"
547            );
548        }
549    }
550}
551
552pub(crate) struct Callback {
553    /// sends the callback response to the executor
554    response_for_tx: mpsc::Sender<OpEnum>,
555}
556
557#[allow(dead_code)] // Used via callback pattern
558pub(crate) struct NetworkEventListenerHalve {
559    /// this is the receiver end of the Executor halve, which will be sent from the executor
560    /// when a callback is expected for a given transaction
561    waiting_for_op_rx: mpsc::Receiver<Transaction>,
562    /// this is the sender end of the Executor halve receiver, which will communicate
563    /// back responses to the executor, it's cloned each tiome a new callback halve is created
564    response_for_tx: mpsc::Sender<OpEnum>,
565}
566
567mod sealed {
568    use super::{Callback, NetworkEventListenerHalve};
569    pub trait ChannelHalve {}
570    impl ChannelHalve for NetworkEventListenerHalve {}
571    impl ChannelHalve for Callback {}
572}
573
574trait ComposeNetworkMessage<Op>
575where
576    Self: Sized,
577    Op: Operation + Send + 'static,
578{
579    fn initiate_op(self, op_manager: &OpManager) -> Op;
580
581    fn resume_op(
582        op: Op,
583        op_manager: &OpManager,
584    ) -> impl Future<Output = Result<(), OpError>> + Send;
585}
586
587#[allow(unused)]
588struct GetContract {
589    instance_id: ContractInstanceId,
590    return_contract_code: bool,
591}
592
593impl ComposeNetworkMessage<operations::get::GetOp> for GetContract {
594    fn initiate_op(self, _op_manager: &OpManager) -> operations::get::GetOp {
595        operations::get::start_op(self.instance_id, self.return_contract_code, false)
596    }
597
598    async fn resume_op(op: operations::get::GetOp, op_manager: &OpManager) -> Result<(), OpError> {
599        let visited = operations::VisitedPeers::new(&op.id);
600        operations::get::request_get(op_manager, op, visited).await
601    }
602}
603
604#[allow(unused)]
605struct SubscribeContract {
606    instance_id: ContractInstanceId,
607}
608
609impl ComposeNetworkMessage<operations::subscribe::SubscribeOp> for SubscribeContract {
610    fn initiate_op(self, _op_manager: &OpManager) -> operations::subscribe::SubscribeOp {
611        operations::subscribe::start_op(self.instance_id)
612    }
613
614    async fn resume_op(
615        op: operations::subscribe::SubscribeOp,
616        op_manager: &OpManager,
617    ) -> Result<(), OpError> {
618        operations::subscribe::request_subscribe(op_manager, op).await
619    }
620}
621
622struct UpdateContract {
623    key: ContractKey,
624    new_state: WrappedState,
625}
626
627#[derive(Debug)]
628pub(crate) enum UpsertResult {
629    NoChange,
630    Updated(WrappedState),
631}
632
633impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
634    fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp {
635        let UpdateContract { key, new_state } = self;
636        let related_contracts = RelatedContracts::default();
637        // Wrap the computed state as UpdateData::State for the update operation.
638        // The executor computes state without committing, expecting update_contract
639        // to handle persistence and change detection.
640        let update_data = freenet_stdlib::prelude::UpdateData::State(
641            freenet_stdlib::prelude::State::from(new_state),
642        );
643        operations::update::start_op(key, update_data, related_contracts)
644    }
645
646    async fn resume_op(
647        op: operations::update::UpdateOp,
648        op_manager: &OpManager,
649    ) -> Result<(), OpError> {
650        operations::update::request_update(op_manager, op).await
651    }
652}
653
654pub(crate) trait ContractExecutor: Send + 'static {
655    /// Look up the full ContractKey from a ContractInstanceId.
656    /// Returns None if the contract is not known to this node.
657    fn lookup_key(&self, instance_id: &ContractInstanceId) -> Option<ContractKey>;
658
659    fn fetch_contract(
660        &mut self,
661        key: ContractKey,
662        return_contract_code: bool,
663    ) -> impl Future<Output = Result<(Option<WrappedState>, Option<ContractContainer>), ExecutorError>>
664           + Send;
665
666    fn upsert_contract_state(
667        &mut self,
668        key: ContractKey,
669        update: Either<WrappedState, StateDelta<'static>>,
670        related_contracts: RelatedContracts<'static>,
671        code: Option<ContractContainer>,
672    ) -> impl Future<Output = Result<UpsertResult, ExecutorError>> + Send;
673
674    fn register_contract_notifier(
675        &mut self,
676        key: ContractInstanceId,
677        cli_id: ClientId,
678        notification_ch: tokio::sync::mpsc::UnboundedSender<HostResult>,
679        summary: Option<StateSummary<'_>>,
680    ) -> Result<(), Box<RequestError>>;
681
682    fn execute_delegate_request(
683        &mut self,
684        req: DelegateRequest<'_>,
685        attested_contract: Option<&ContractInstanceId>,
686    ) -> Response;
687
688    fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo>;
689}
690
691/// Consumers of the executor are required to poll for new changes in order to be notified
692/// of changes or can alternatively use the notification channel.
693///
694/// The type parameters are:
695/// - `R`: The runtime type (default: `Runtime` for production, `MockRuntime` for testing)
696/// - `S`: The state storage type (default: `Storage` for disk-based, can use `MockStateStorage` for in-memory)
697// Type alias for shared notification storage (used by RuntimePool)
698// Uses RwLock<HashMap> with snapshot pattern - locks are only held briefly during clone,
699// never during WASM execution (get_state_delta)
700type SharedNotifications = Arc<
701    std::sync::RwLock<
702        HashMap<ContractInstanceId, Vec<(ClientId, mpsc::UnboundedSender<HostResult>)>>,
703    >,
704>;
705
706// Type alias for shared subscriber summaries (used by RuntimePool)
707type SharedSummaries = Arc<
708    std::sync::RwLock<
709        HashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>,
710    >,
711>;
712
713pub struct Executor<R = Runtime, S: StateStorage = Storage> {
714    mode: OperationMode,
715    runtime: R,
716    pub state_store: StateStore<S>,
717    /// Notification channels for any clients subscribed to updates for a given contract.
718    /// Used when executor is standalone (not in a pool).
719    update_notifications:
720        HashMap<ContractInstanceId, Vec<(ClientId, mpsc::UnboundedSender<HostResult>)>>,
721    /// Summaries of the state of all clients subscribed to a given contract.
722    /// Used when executor is standalone (not in a pool).
723    subscriber_summaries:
724        HashMap<ContractInstanceId, HashMap<ClientId, Option<StateSummary<'static>>>>,
725    /// Attested contract instances for a given delegate.
726    delegate_attested_ids: HashMap<DelegateKey, Vec<ContractInstanceId>>,
727    /// Tracks contracts that are being initialized and operations queued for them
728    init_tracker: ContractInitTracker,
729
730    /// Channel to send operation requests to the event loop (cloneable).
731    op_sender: Option<OpRequestSender>,
732    /// Reference to the operation manager for initiating operations.
733    op_manager: Option<Arc<OpManager>>,
734
735    /// Shared notification storage at pool level (when running in a pool).
736    /// When present, this is used instead of per-executor update_notifications
737    /// to ensure subscriptions registered while an executor is checked out are
738    /// still notified when that executor processes updates.
739    shared_notifications: Option<SharedNotifications>,
740    /// Shared subscriber summaries at pool level (when running in a pool).
741    shared_summaries: Option<SharedSummaries>,
742}
743
744impl<R, S> Executor<R, S>
745where
746    S: StateStorage + Send + 'static,
747    <S as StateStorage>::Error: Into<anyhow::Error>,
748{
749    /// Create a new Executor with optional network operation support.
750    /// This is `pub(crate)` because the parameters involve crate-internal types.
751    pub(crate) async fn new(
752        state_store: StateStore<S>,
753        ctrl_handler: impl FnOnce() -> anyhow::Result<()>,
754        mode: OperationMode,
755        runtime: R,
756        op_sender: Option<OpRequestSender>,
757        op_manager: Option<Arc<OpManager>>,
758    ) -> anyhow::Result<Self> {
759        ctrl_handler()?;
760
761        Ok(Self {
762            mode,
763            runtime,
764            state_store,
765            update_notifications: HashMap::default(),
766            subscriber_summaries: HashMap::default(),
767            delegate_attested_ids: HashMap::default(),
768            init_tracker: ContractInitTracker::new(),
769            op_sender,
770            op_manager,
771            shared_notifications: None,
772            shared_summaries: None,
773        })
774    }
775
776    pub fn test_data_dir(identifier: &str) -> PathBuf {
777        use std::sync::atomic::{AtomicU64, Ordering};
778        static COUNTER: AtomicU64 = AtomicU64::new(0);
779        let unique_id = COUNTER.fetch_add(1, Ordering::Relaxed);
780        std::env::temp_dir().join(format!(
781            "freenet-executor-{identifier}-{}-{unique_id}",
782            std::process::id()
783        ))
784    }
785
786    /// Set shared notification storage for pool-based operation.
787    /// When set, notifications will be sent via shared storage instead of per-executor storage.
788    /// This ensures subscriptions registered while this executor is checked out are still notified.
789    pub(crate) fn set_shared_notifications(
790        &mut self,
791        notifications: SharedNotifications,
792        summaries: SharedSummaries,
793    ) {
794        self.shared_notifications = Some(notifications);
795        self.shared_summaries = Some(summaries);
796    }
797
798    /// Create all stores including StateStore. Used when creating a standalone executor.
799    pub(crate) async fn get_stores(
800        config: &Config,
801    ) -> Result<
802        (
803            ContractStore,
804            DelegateStore,
805            SecretsStore,
806            StateStore<Storage>,
807        ),
808        anyhow::Error,
809    > {
810        const MAX_MEM_CACHE: u32 = 10_000_000;
811
812        let state_store =
813            StateStore::new(Storage::new(&config.db_dir()).await?, MAX_MEM_CACHE).unwrap();
814        let (contract_store, delegate_store, secret_store) = Self::get_runtime_stores(config)?;
815
816        Ok((contract_store, delegate_store, secret_store, state_store))
817    }
818
819    /// Create only the Runtime stores (contract, delegate, secrets) without StateStore.
820    /// Used by RuntimePool to create executors that share a StateStore.
821    pub(crate) fn get_runtime_stores(
822        config: &Config,
823    ) -> Result<(ContractStore, DelegateStore, SecretsStore), anyhow::Error> {
824        const MAX_SIZE: i64 = 10 * 1024 * 1024;
825
826        let contract_store = ContractStore::new(config.contracts_dir(), MAX_SIZE)?;
827        let delegate_store = DelegateStore::new(config.delegates_dir(), MAX_SIZE)?;
828        let secret_store = SecretsStore::new(config.secrets_dir(), config.secrets.clone())?;
829
830        Ok((contract_store, delegate_store, secret_store))
831    }
832
833    async fn op_request<Op, M>(&mut self, request: M) -> Result<Op::Result, ExecutorError>
834    where
835        Op: Operation + Send + TryFrom<OpEnum, Error = OpError> + 'static,
836        <Op as Operation>::Result: TryFrom<Op, Error = OpError>,
837        M: ComposeNetworkMessage<Op>,
838    {
839        let (op_sender, op_manager) = match (&self.op_sender, &self.op_manager) {
840            (Some(sender), Some(manager)) => (sender.clone(), manager.clone()),
841            _ => {
842                return Err(ExecutorError::other(anyhow::anyhow!(
843                    "missing op_sender or op_manager"
844                )));
845            }
846        };
847
848        // Create the operation and get its transaction ID
849        let op = request.initiate_op(&op_manager);
850        let transaction = *op.id();
851
852        // Create a oneshot channel for this specific request's response
853        let (response_tx, response_rx) = oneshot::channel();
854
855        // Send the transaction and response channel to the event loop
856        op_sender
857            .send((transaction, response_tx))
858            .await
859            .map_err(|_| ExecutorError::other(anyhow::anyhow!("event loop channel closed")))?;
860
861        // Start the network operation
862        <M as ComposeNetworkMessage<Op>>::resume_op(op, &op_manager)
863            .await
864            .map_err(|e| {
865                tracing::debug!(
866                    tx = %transaction,
867                    error = %e,
868                    "Failed to resume operation"
869                );
870                ExecutorError::other(e)
871            })?;
872
873        // Wait for the response on our oneshot channel with timeout
874        const OP_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
875        let op_result = tokio::time::timeout(OP_REQUEST_TIMEOUT, response_rx)
876            .await
877            .map_err(|_| {
878                tracing::warn!(
879                    tx = %transaction,
880                    timeout_secs = OP_REQUEST_TIMEOUT.as_secs(),
881                    "Network operation timed out waiting for response"
882                );
883                ExecutorError::other(anyhow::anyhow!(
884                    "network operation timed out after {} seconds",
885                    OP_REQUEST_TIMEOUT.as_secs()
886                ))
887            })?
888            .map_err(|_| {
889                ExecutorError::other(anyhow::anyhow!(
890                    "response channel closed before receiving result"
891                ))
892            })?;
893
894        // Handle the result
895        let op_enum = op_result.map_err(|e| ExecutorError::other(anyhow::anyhow!("{}", e)))?;
896
897        // Convert to the specific operation type
898        let op: Op = op_enum.try_into().map_err(|err: OpError| {
899            tracing::error!(
900                tx = %transaction,
901                error = %err,
902                "Expected message of one type but got another"
903            );
904            ExecutorError::other(err)
905        })?;
906
907        // Convert to the result type
908        let result = <Op::Result>::try_from(op).map_err(|err| {
909            tracing::debug!(
910                tx = %transaction,
911                error = %err,
912                "Failed to convert operation result"
913            );
914            ExecutorError::other(err)
915        })?;
916
917        Ok(result)
918    }
919
920    pub fn get_subscription_info(&self) -> Vec<crate::message::SubscriptionInfo> {
921        let mut subscriptions = Vec::new();
922        for (instance_id, client_list) in &self.update_notifications {
923            for (client_id, _channel) in client_list {
924                subscriptions.push(crate::message::SubscriptionInfo {
925                    instance_id: *instance_id,
926                    client_id: *client_id,
927                    last_update: None,
928                });
929            }
930        }
931        subscriptions
932    }
933}
934
935/// Test fixtures for creating contract-related test data.
936///
937/// These helpers make it easier to write unit tests for contract module code
938/// by providing convenient constructors for common types.
939#[cfg(test)]
940pub(crate) mod test_fixtures {
941    use freenet_stdlib::prelude::*;
942
943    /// Create a test contract key with arbitrary but consistent data
944    pub fn make_contract_key() -> ContractKey {
945        let code = ContractCode::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
946        let params = Parameters::from(vec![10, 20, 30, 40]);
947        ContractKey::from_params_and_code(&params, &code)
948    }
949
950    /// Create a test contract key with custom code bytes
951    pub fn make_contract_key_with_code(code_bytes: &[u8]) -> ContractKey {
952        let code = ContractCode::from(code_bytes.to_vec());
953        let params = Parameters::from(vec![10, 20, 30, 40]);
954        ContractKey::from_params_and_code(&params, &code)
955    }
956
957    /// Create a test wrapped state from raw bytes
958    pub fn make_state(data: &[u8]) -> WrappedState {
959        WrappedState::new(data.to_vec())
960    }
961
962    /// Create test parameters from raw bytes
963    pub fn make_params(data: &[u8]) -> Parameters<'static> {
964        Parameters::from(data.to_vec())
965    }
966
967    /// Create a test state delta from raw bytes
968    pub fn make_delta(data: &[u8]) -> StateDelta<'static> {
969        StateDelta::from(data.to_vec())
970    }
971}
972
973#[cfg(test)]
974mod tests {
975    use super::*;
976
977    mod executor_error_tests {
978        use super::*;
979
980        #[test]
981        fn test_executor_error_other_is_not_request() {
982            let err = ExecutorError::other(anyhow::anyhow!("some error"));
983            assert!(!err.is_request());
984            assert!(!err.is_fatal());
985        }
986
987        #[test]
988        fn test_executor_error_request_is_request() {
989            let err = ExecutorError::request(StdContractError::Put {
990                key: test_fixtures::make_contract_key(),
991                cause: "test".into(),
992            });
993            assert!(err.is_request());
994            assert!(!err.is_fatal());
995        }
996
997        #[test]
998        fn test_executor_error_internal_error() {
999            let err = ExecutorError::internal_error();
1000            assert!(!err.is_request());
1001            assert!(!err.is_fatal());
1002            assert!(err.to_string().contains("internal error"));
1003        }
1004
1005        #[test]
1006        fn test_executor_error_display_left() {
1007            let err = ExecutorError::request(StdContractError::Put {
1008                key: test_fixtures::make_contract_key(),
1009                cause: "test cause".into(),
1010            });
1011            let display = err.to_string();
1012            assert!(display.contains("test cause") || display.contains("Put"));
1013        }
1014
1015        #[test]
1016        fn test_executor_error_display_right() {
1017            let err = ExecutorError::other(anyhow::anyhow!("custom error message"));
1018            assert!(err.to_string().contains("custom error message"));
1019        }
1020
1021        #[test]
1022        fn test_executor_error_from_request_error() {
1023            let request_err = RequestError::ContractError(StdContractError::Put {
1024                key: test_fixtures::make_contract_key(),
1025                cause: "from conversion".into(),
1026            });
1027            let err: ExecutorError = request_err.into();
1028            assert!(err.is_request());
1029        }
1030
1031        #[test]
1032        fn test_executor_error_from_boxed_request_error() {
1033            let request_err = Box::new(RequestError::ContractError(StdContractError::Put {
1034                key: test_fixtures::make_contract_key(),
1035                cause: "boxed".into(),
1036            }));
1037            let err: ExecutorError = request_err.into();
1038            assert!(err.is_request());
1039        }
1040
1041        #[test]
1042        fn test_unwrap_request_succeeds_for_request_error() {
1043            let key = test_fixtures::make_contract_key();
1044            let err = ExecutorError::request(StdContractError::Put {
1045                key,
1046                cause: "unwrap test".into(),
1047            });
1048            let _unwrapped = err.unwrap_request(); // Should not panic
1049        }
1050
1051        #[test]
1052        #[should_panic]
1053        fn test_unwrap_request_panics_for_other_error() {
1054            let err = ExecutorError::other(anyhow::anyhow!("not a request"));
1055            let _unwrapped = err.unwrap_request(); // Should panic
1056        }
1057    }
1058
1059    mod test_fixtures_tests {
1060        use super::*;
1061
1062        #[test]
1063        fn test_make_contract_key_is_consistent() {
1064            let key1 = test_fixtures::make_contract_key();
1065            let key2 = test_fixtures::make_contract_key();
1066            assert_eq!(key1, key2);
1067        }
1068
1069        #[test]
1070        fn test_make_contract_key_with_different_code() {
1071            let key1 = test_fixtures::make_contract_key_with_code(&[1, 2, 3]);
1072            let key2 = test_fixtures::make_contract_key_with_code(&[4, 5, 6]);
1073            assert_ne!(key1, key2);
1074        }
1075
1076        #[test]
1077        fn test_make_state() {
1078            let state = test_fixtures::make_state(&[1, 2, 3, 4]);
1079            assert_eq!(state.as_ref(), &[1, 2, 3, 4]);
1080        }
1081
1082        #[test]
1083        fn test_make_params() {
1084            let params = test_fixtures::make_params(&[10, 20]);
1085            assert_eq!(params.as_ref(), &[10, 20]);
1086        }
1087
1088        #[test]
1089        fn test_make_delta() {
1090            let delta = test_fixtures::make_delta(&[100, 200]);
1091            assert_eq!(delta.as_ref(), &[100, 200]);
1092        }
1093    }
1094}