use std::cell::Cell;
use std::collections::BTreeMap;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::Stream;
use freenet_stdlib::client_api::DelegateRequest;
use freenet_stdlib::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use super::ExecutorError;
use super::executor::{OpRequestSender, RuntimePool};
use super::{ContractError, executor::ContractExecutor};
use crate::client_events::ClientId;
use crate::client_events::{AuthToken, HostResult, RequestId};
use crate::config::Config;
use crate::message::{QueryResult, Transaction};
use crate::node::OpManager;
use std::num::NonZeroUsize;
pub(crate) struct ClientResponsesReceiver(UnboundedReceiver<(ClientId, RequestId, HostResult)>);
pub(crate) fn client_responses_channel() -> (ClientResponsesReceiver, ClientResponsesSender) {
let (tx, rx) = mpsc::unbounded_channel();
(ClientResponsesReceiver(rx), ClientResponsesSender(tx))
}
impl std::ops::Deref for ClientResponsesReceiver {
type Target = UnboundedReceiver<(ClientId, RequestId, HostResult)>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for ClientResponsesReceiver {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Clone)]
pub(crate) struct ClientResponsesSender(UnboundedSender<(ClientId, RequestId, HostResult)>);
impl std::ops::Deref for ClientResponsesSender {
type Target = UnboundedSender<(ClientId, RequestId, HostResult)>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub(crate) trait ContractHandler {
type Builder;
type ContractExecutor: ContractExecutor;
fn build(
contract_handler_channel: ContractHandlerChannel<ContractHandlerHalve>,
op_sender: OpRequestSender,
op_manager: Arc<OpManager>,
builder: Self::Builder,
) -> impl Future<Output = anyhow::Result<Self>> + Send
where
Self: Sized + 'static;
fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve>;
fn executor(&mut self) -> &mut Self::ContractExecutor;
}
pub(crate) struct NetworkContractHandler {
executor: RuntimePool,
channel: ContractHandlerChannel<ContractHandlerHalve>,
}
impl ContractHandler for NetworkContractHandler {
type Builder = Arc<Config>;
type ContractExecutor = RuntimePool;
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
op_sender: OpRequestSender,
op_manager: Arc<OpManager>,
config: Self::Builder,
) -> anyhow::Result<Self>
where
Self: Sized + 'static,
{
let parallelism = std::thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(4).unwrap())
.get()
.saturating_sub(1)
.max(1);
let pool_size = std::env::var("FREENET_RUNTIME_POOL_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.and_then(|n| NonZeroUsize::new(n.clamp(1, 16)))
.unwrap_or_else(|| NonZeroUsize::new(parallelism.clamp(1, 16)).unwrap());
tracing::info!(pool_size = %pool_size, "Creating RuntimePool");
let executor =
RuntimePool::new(config.clone(), op_sender, op_manager.clone(), pool_size).await?;
let storage = executor.state_store().inner().clone();
op_manager.ring.set_hosting_storage(storage.clone());
#[cfg(feature = "redb")]
{
if let Err(e) = op_manager.ring.load_hosting_cache(&storage, |instance_id| {
executor.code_hash_from_id(instance_id)
}) {
tracing::warn!(error = %e, "Failed to load hosting cache from storage");
}
}
#[cfg(all(feature = "sqlite", not(feature = "redb")))]
{
if let Err(e) = op_manager
.ring
.load_hosting_cache(&storage, |instance_id| {
executor.code_hash_from_id(instance_id)
})
.await
{
tracing::warn!(error = %e, "Failed to load hosting cache from storage");
}
}
let hosted_keys = op_manager.ring.hosting_contract_keys();
let hosted_ids = hosted_keys.iter().map(|k| *k.id());
op_manager
.neighbor_hosting
.initialize_from_hosting_cache(hosted_ids);
Ok(Self { executor, channel })
}
fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}
fn executor(&mut self) -> &mut Self::ContractExecutor {
&mut self.executor
}
}
#[derive(Debug, Eq)]
pub(crate) struct EventId {
pub(crate) id: u64,
}
impl PartialEq for EventId {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Hash for EventId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
pub(crate) struct ContractHandlerChannel<End: sealed::ChannelHalve> {
end: End,
session_adapter_tx: Option<mpsc::Sender<SessionMessage>>,
}
pub(crate) struct ContractHandlerHalve {
event_receiver: mpsc::UnboundedReceiver<InternalCHEvent>,
waiting_response: BTreeMap<u64, tokio::sync::oneshot::Sender<(EventId, ContractHandlerEvent)>>,
}
pub(crate) struct SenderHalve {
event_sender: mpsc::UnboundedSender<InternalCHEvent>,
wait_for_res_tx: mpsc::Sender<(ClientId, WaitingTransaction)>,
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum WaitingTransaction {
Transaction(Transaction),
Subscription { contract_key: ContractInstanceId },
}
#[allow(dead_code)]
#[derive(Debug)]
pub enum SessionMessage {
#[allow(dead_code)]
RegisterClient {
client_id: ClientId,
request_id: RequestId,
transport_tx: UnboundedSender<HostResult>,
token: Option<AuthToken>,
},
#[allow(dead_code)]
RegisterTransaction {
#[allow(dead_code)]
tx: Transaction,
#[allow(dead_code)]
client_id: ClientId,
#[allow(dead_code)]
request_id: RequestId,
},
#[allow(dead_code)]
DeliverResult {
tx: Transaction,
result: Box<QueryResult>,
},
#[allow(dead_code)]
DeliverHostResponse {
tx: Transaction,
response: Arc<HostResult>,
},
#[allow(dead_code)]
DeliverHostResponseWithRequestId {
tx: Transaction,
response: Arc<HostResult>,
request_id: RequestId,
},
ClientDisconnect {
client_id: ClientId,
},
}
impl From<Transaction> for WaitingTransaction {
fn from(tx: Transaction) -> Self {
WaitingTransaction::Transaction(tx)
}
}
pub(crate) struct WaitingResolution {
wait_for_res_rx: mpsc::Receiver<(ClientId, WaitingTransaction)>,
}
mod sealed {
use super::{ContractHandlerHalve, SenderHalve, WaitingResolution};
pub(crate) trait ChannelHalve {}
impl ChannelHalve for ContractHandlerHalve {}
impl ChannelHalve for SenderHalve {}
impl ChannelHalve for WaitingResolution {}
}
pub(crate) fn contract_handler_channel() -> (
ContractHandlerChannel<SenderHalve>,
ContractHandlerChannel<ContractHandlerHalve>,
ContractHandlerChannel<WaitingResolution>,
) {
let (event_sender, event_receiver) = mpsc::unbounded_channel();
let (wait_for_res_tx, wait_for_res_rx) = mpsc::channel(100);
(
ContractHandlerChannel {
end: SenderHalve {
event_sender,
wait_for_res_tx,
},
session_adapter_tx: None,
},
ContractHandlerChannel {
end: ContractHandlerHalve {
event_receiver,
waiting_response: BTreeMap::new(),
},
session_adapter_tx: None,
},
ContractHandlerChannel {
end: WaitingResolution { wait_for_res_rx },
session_adapter_tx: None,
},
)
}
const EV_ID_BLOCK: u64 = 1_000_000;
thread_local! {
static EV_ID: Cell<u64> = {
let idx = crate::config::GlobalRng::thread_index();
Cell::new(idx * EV_ID_BLOCK)
};
}
pub fn reset_event_id_counter() {
let idx = crate::config::GlobalRng::thread_index();
EV_ID.with(|c| c.set(idx * EV_ID_BLOCK));
}
impl Stream for ContractHandlerChannel<WaitingResolution> {
type Item = (ClientId, WaitingTransaction);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.end.wait_for_res_rx).poll_recv(cx)
}
}
impl ContractHandlerChannel<SenderHalve> {
const CH_EV_RESPONSE_TIME_OUT: Duration = Duration::from_secs(300);
pub async fn send_to_handler(
&self,
ev: ContractHandlerEvent,
) -> Result<ContractHandlerEvent, ContractError> {
self.send_to_handler_with_timeout(ev, Self::CH_EV_RESPONSE_TIME_OUT)
.await
}
pub async fn send_to_handler_with_timeout(
&self,
ev: ContractHandlerEvent,
timeout: Duration,
) -> Result<ContractHandlerEvent, ContractError> {
let id = EV_ID.with(|c| {
let v = c.get();
c.set(v + 1);
v
});
let (result, result_receiver) = tokio::sync::oneshot::channel();
self.end
.event_sender
.send(InternalCHEvent { ev, id, result })
.map_err(|err| ContractError::ChannelDropped(Box::new(err.0.ev)))?;
match tokio::time::timeout(timeout, result_receiver).await {
Ok(Ok((_, res))) => Ok(res),
Ok(Err(_)) | Err(_) => Err(ContractError::NoEvHandlerResponse),
}
}
pub fn send_to_handler_fire_and_forget(
&self,
ev: ContractHandlerEvent,
) -> Result<(), ContractError> {
let id = EV_ID.with(|c| {
let v = c.get();
c.set(v + 1);
v
});
let (result, _) = tokio::sync::oneshot::channel();
self.end
.event_sender
.send(InternalCHEvent { ev, id, result })
.map_err(|err| ContractError::ChannelDropped(Box::new(err.0.ev)))?;
Ok(())
}
pub fn with_session_adapter(&mut self, session_tx: mpsc::Sender<SessionMessage>) {
self.session_adapter_tx = Some(session_tx);
}
const WAIT_FOR_RES_SEND_TIMEOUT: Duration = Duration::from_secs(30);
pub async fn waiting_for_transaction_result(
&self,
transaction: impl Into<WaitingTransaction>,
client_id: ClientId,
request_id: RequestId,
) -> Result<(), ContractError> {
let waiting_tx = transaction.into();
match tokio::time::timeout(
Self::WAIT_FOR_RES_SEND_TIMEOUT,
self.end.wait_for_res_tx.send((client_id, waiting_tx)),
)
.await
{
Ok(Ok(())) => {}
Ok(Err(_)) => return Err(ContractError::NoEvHandlerResponse),
Err(_) => {
tracing::error!(
client = %client_id,
request_id = %request_id,
channel_capacity = self.end.wait_for_res_tx.capacity(),
timeout_secs = Self::WAIT_FOR_RES_SEND_TIMEOUT.as_secs(),
"Timed out registering transaction for result delivery — \
event loop is not draining client transactions (Priority 7 starvation)"
);
return Err(ContractError::NoEvHandlerResponse);
}
}
if let WaitingTransaction::Transaction(tx) = waiting_tx {
self.notify_session_actor(tx, client_id, request_id).await;
}
Ok(())
}
pub async fn waiting_for_subscription_result(
&self,
tx: Transaction,
contract_key: ContractInstanceId,
client_id: ClientId,
request_id: RequestId,
) -> Result<(), ContractError> {
match tokio::time::timeout(
Self::WAIT_FOR_RES_SEND_TIMEOUT,
self.end
.wait_for_res_tx
.send((client_id, WaitingTransaction::Subscription { contract_key })),
)
.await
{
Ok(Ok(())) => {}
Ok(Err(_)) => return Err(ContractError::NoEvHandlerResponse),
Err(_) => {
tracing::error!(
tx = %tx,
client = %client_id,
request_id = %request_id,
contract = %contract_key,
channel_capacity = self.end.wait_for_res_tx.capacity(),
timeout_secs = Self::WAIT_FOR_RES_SEND_TIMEOUT.as_secs(),
"Timed out registering subscription for result delivery — \
event loop is not draining client transactions (Priority 7 starvation)"
);
return Err(ContractError::NoEvHandlerResponse);
}
}
self.notify_session_actor(tx, client_id, request_id).await;
Ok(())
}
async fn notify_session_actor(
&self,
tx: Transaction,
client_id: ClientId,
request_id: RequestId,
) {
let Some(session_tx) = &self.session_adapter_tx else {
tracing::warn!(
client = %client_id,
request_id = %request_id,
"Session adapter not installed — session actor will not track transaction"
);
return;
};
let msg = SessionMessage::RegisterTransaction {
tx,
client_id,
request_id,
};
match tokio::time::timeout(Self::WAIT_FOR_RES_SEND_TIMEOUT, session_tx.send(msg)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::error!(
tx = %tx,
client = %client_id,
request_id = %request_id,
error = %e,
"Failed to notify session actor — receiver dropped"
);
}
Err(_) => {
tracing::error!(
tx = %tx,
client = %client_id,
request_id = %request_id,
timeout_secs = Self::WAIT_FOR_RES_SEND_TIMEOUT.as_secs(),
"Timed out notifying session actor — channel full, client will not receive response"
);
}
}
}
}
impl ContractHandlerChannel<ContractHandlerHalve> {
pub async fn send_to_sender(
&mut self,
id: EventId,
ev: ContractHandlerEvent,
) -> Result<(), ContractError> {
if let Some(response) = self.end.waiting_response.remove(&id.id) {
response
.send((id, ev))
.map_err(|_| ContractError::NoEvHandlerResponse)
} else {
Err(ContractError::NoEvHandlerResponse)
}
}
pub fn drop_waiting_response(&mut self, id: EventId) {
self.end.waiting_response.remove(&id.id);
}
#[cfg(test)]
pub fn has_waiting_response(&self, id: &EventId) -> bool {
self.end.waiting_response.contains_key(&id.id)
}
pub async fn recv_from_sender(
&mut self,
) -> Result<(EventId, ContractHandlerEvent), ContractError> {
if let Some(InternalCHEvent { ev, id, result }) = self.end.event_receiver.recv().await {
self.end.waiting_response.insert(id, result);
return Ok((EventId { id }, ev));
}
Err(ContractError::NoEvHandlerResponse)
}
pub fn try_recv_from_sender(
&mut self,
) -> Result<Option<(EventId, ContractHandlerEvent)>, ContractError> {
match self.end.event_receiver.try_recv() {
Ok(InternalCHEvent { ev, id, result }) => {
self.end.waiting_response.insert(id, result);
Ok(Some((EventId { id }, ev)))
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Ok(None),
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
Err(ContractError::NoEvHandlerResponse)
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct StoreResponse {
pub state: Option<WrappedState>,
pub contract: Option<ContractContainer>,
}
struct InternalCHEvent {
ev: ContractHandlerEvent,
id: u64,
result: tokio::sync::oneshot::Sender<(EventId, ContractHandlerEvent)>,
}
#[derive(Debug)]
pub(crate) enum ContractHandlerEvent {
DelegateRequest {
req: DelegateRequest<'static>,
origin_contract: Option<ContractInstanceId>,
},
DelegateResponse(Vec<OutboundDelegateMsg>),
PutQuery {
key: ContractKey,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
contract: Option<ContractContainer>,
},
PutResponse {
new_value: Result<WrappedState, ExecutorError>,
state_changed: bool,
},
GetQuery {
instance_id: ContractInstanceId,
return_contract_code: bool,
},
GetResponse {
key: Option<ContractKey>,
response: Result<StoreResponse, ExecutorError>,
},
UpdateQuery {
key: ContractKey,
data: UpdateData<'static>,
related_contracts: RelatedContracts<'static>,
},
UpdateResponse {
new_value: Result<WrappedState, ExecutorError>,
state_changed: bool,
},
UpdateNoChange {
key: ContractKey,
},
RegisterSubscriberListener {
key: ContractInstanceId,
client_id: ClientId,
summary: Option<StateSummary<'static>>,
subscriber_listener: mpsc::Sender<HostResult>,
},
RegisterSubscriberListenerResponse,
#[allow(dead_code)]
QuerySubscriptions {
callback: tokio::sync::mpsc::Sender<QueryResult>,
},
#[allow(dead_code)]
QuerySubscriptionsResponse,
GetSummaryQuery {
key: ContractKey,
},
GetSummaryResponse {
key: ContractKey,
summary: Result<StateSummary<'static>, ExecutorError>,
},
GetDeltaQuery {
key: ContractKey,
their_summary: StateSummary<'static>,
},
GetDeltaResponse {
key: ContractKey,
delta: Result<StateDelta<'static>, ExecutorError>,
},
NotifySubscriptionError {
key: ContractInstanceId,
reason: String,
},
NotifySubscriptionErrorResponse,
ClientDisconnect {
client_id: ClientId,
},
}
impl std::fmt::Display for ContractHandlerEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ContractHandlerEvent::DelegateRequest {
req,
origin_contract,
} => {
write!(
f,
"delegate request {{ key: {:?}, origin: {:?} }}",
req.key(),
origin_contract
)
}
ContractHandlerEvent::DelegateResponse(_) => {
write!(f, "delegate response")
}
ContractHandlerEvent::PutQuery { key, contract, .. } => {
if let Some(contract) = contract {
use std::fmt::Write;
let mut params = String::new();
params.push_str("0x");
for b in contract.params().as_ref().iter().take(8) {
write!(&mut params, "{b:02x}")?;
}
params.push_str("...");
write!(f, "put query {{ {key}, params: {params} }}",)
} else {
write!(f, "put query {{ {key} }}")
}
}
ContractHandlerEvent::PutResponse {
new_value,
state_changed,
} => match new_value {
Ok(v) => {
write!(
f,
"put query response {{ {v}, state_changed: {state_changed} }}",
)
}
Err(e) => {
write!(f, "put query failed {{ {e} }}",)
}
},
ContractHandlerEvent::GetQuery {
instance_id,
return_contract_code,
..
} => {
write!(
f,
"get query {{ {instance_id}, return contract code: {return_contract_code} }}",
)
}
ContractHandlerEvent::GetResponse { key, response } => match response {
Ok(_) => {
write!(f, "get query response {{ {key:?} }}",)
}
Err(_) => {
write!(f, "get query failed {{ {key:?} }}",)
}
},
ContractHandlerEvent::UpdateQuery { key, .. } => {
write!(f, "update query {{ {key} }}")
}
ContractHandlerEvent::UpdateResponse { new_value, .. } => match new_value {
Ok(v) => {
write!(f, "update query response {{ {v} }}",)
}
Err(e) => {
write!(f, "update query failed {{ {e} }}",)
}
},
ContractHandlerEvent::UpdateNoChange { key } => {
write!(f, "update query no change {{ {key} }}",)
}
ContractHandlerEvent::RegisterSubscriberListener { key, client_id, .. } => {
write!(
f,
"register subscriber listener {{ {key}, client_id: {client_id} }}",
)
}
ContractHandlerEvent::RegisterSubscriberListenerResponse => {
write!(f, "register subscriber listener response")
}
ContractHandlerEvent::QuerySubscriptions { .. } => {
write!(f, "query subscriptions")
}
ContractHandlerEvent::QuerySubscriptionsResponse => {
write!(f, "query subscriptions response")
}
ContractHandlerEvent::GetSummaryQuery { key } => {
write!(f, "get summary query {{ {key} }}")
}
ContractHandlerEvent::GetSummaryResponse { key, summary } => match summary {
Ok(_) => write!(f, "get summary response {{ {key} }}"),
Err(e) => write!(f, "get summary failed {{ {key}, error: {e} }}"),
},
ContractHandlerEvent::GetDeltaQuery { key, .. } => {
write!(f, "get delta query {{ {key} }}")
}
ContractHandlerEvent::GetDeltaResponse { key, delta } => match delta {
Ok(_) => write!(f, "get delta response {{ {key} }}"),
Err(e) => write!(f, "get delta failed {{ {key}, error: {e} }}"),
},
ContractHandlerEvent::NotifySubscriptionError { key, reason } => {
write!(f, "notify subscription error {{ {key}, reason: {reason} }}")
}
ContractHandlerEvent::NotifySubscriptionErrorResponse => {
write!(f, "notify subscription error response")
}
ContractHandlerEvent::ClientDisconnect { client_id } => {
write!(f, "client disconnect {{ {client_id} }}")
}
}
}
}
#[cfg(test)]
pub mod test {
use freenet_stdlib::prelude::*;
use super::*;
use crate::config::GlobalExecutor;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn channel_test() -> anyhow::Result<()> {
let (send_halve, mut rcv_halve, _) = contract_handler_channel();
let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0, 1, 2, 3])),
Parameters::from(vec![4, 5]),
)));
let h = GlobalExecutor::spawn(async move {
send_halve
.send_to_handler(ContractHandlerEvent::PutQuery {
key: contract.key(),
state: vec![6, 7, 8].into(),
related_contracts: RelatedContracts::default(),
contract: Some(contract),
})
.await
});
let (id, ev) =
tokio::time::timeout(Duration::from_millis(100), rcv_halve.recv_from_sender())
.await??;
let ContractHandlerEvent::PutQuery { state, .. } = ev else {
anyhow::bail!("invalid event");
};
assert_eq!(state.as_ref(), &[6, 7, 8]);
tokio::time::timeout(
Duration::from_millis(100),
rcv_halve.send_to_sender(
id,
ContractHandlerEvent::PutResponse {
new_value: Ok(vec![0, 7].into()),
state_changed: true,
},
),
)
.await??;
let ContractHandlerEvent::PutResponse {
new_value,
state_changed,
} = h.await??
else {
anyhow::bail!("invalid event!");
};
let new_value = new_value.map_err(|e| anyhow::anyhow!(e))?;
assert_eq!(new_value.as_ref(), &[0, 7]);
assert!(state_changed);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_to_sender_fails_gracefully_when_receiver_dropped() -> anyhow::Result<()> {
let (send_halve, mut rcv_halve, _) = contract_handler_channel();
let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0, 1, 2, 3])),
Parameters::from(vec![4, 5]),
)));
let key = contract.key();
let h = GlobalExecutor::spawn({
async move {
send_halve
.send_to_handler(ContractHandlerEvent::PutQuery {
key,
state: vec![6, 7, 8].into(),
related_contracts: RelatedContracts::default(),
contract: None,
})
.await
}
});
let (id, ev) =
tokio::time::timeout(Duration::from_millis(100), rcv_halve.recv_from_sender())
.await??;
let ContractHandlerEvent::PutQuery { state, .. } = ev else {
anyhow::bail!("expected PutQuery event");
};
assert_eq!(state.as_ref(), &[6, 7, 8]);
h.abort();
tokio::time::sleep(Duration::from_millis(10)).await;
let send_result = rcv_halve
.send_to_sender(
id,
ContractHandlerEvent::PutResponse {
new_value: Ok(vec![0, 7].into()),
state_changed: true,
},
)
.await;
assert!(
send_result.is_err(),
"send_to_sender should fail when receiver is dropped, got {:?}",
send_result
);
assert!(
matches!(send_result, Err(super::ContractError::NoEvHandlerResponse)),
"Expected NoEvHandlerResponse error, got {:?}",
send_result
);
Ok(())
}
#[tokio::test(start_paused = true)]
async fn waiting_for_transaction_result_times_out_when_channel_full() {
use crate::client_events::RequestId;
use crate::message::Transaction;
use crate::operations::put::PutMsg;
let (send_halve, _rcv_halve, _wait_res) = contract_handler_channel();
for _ in 0..100 {
let tx = Transaction::new::<PutMsg>();
send_halve
.end
.wait_for_res_tx
.send((ClientId::FIRST, WaitingTransaction::Transaction(tx)))
.await
.expect("channel should accept items up to capacity");
}
let tx = Transaction::new::<PutMsg>();
let result = send_halve
.waiting_for_transaction_result(tx, ClientId::FIRST, RequestId::new())
.await;
assert!(
matches!(result, Err(super::ContractError::NoEvHandlerResponse)),
"Expected NoEvHandlerResponse timeout error, got {:?}",
result
);
}
#[tokio::test]
async fn notify_session_actor_delivers_under_backpressure() {
use crate::client_events::RequestId;
use crate::contract::SessionMessage;
use crate::message::Transaction;
use crate::operations::put::PutMsg;
let (mut send_halve, _rcv_halve, _wait_res) = contract_handler_channel();
let (session_tx, mut session_rx) = tokio::sync::mpsc::channel::<SessionMessage>(1);
send_halve.with_session_adapter(session_tx);
let filler_tx = Transaction::new::<PutMsg>();
send_halve
.notify_session_actor(filler_tx, ClientId::FIRST, RequestId::new())
.await;
let drain_handle = tokio::spawn(async move {
let mut received = Vec::new();
while let Some(msg) = session_rx.recv().await {
received.push(msg);
if received.len() == 2 {
break;
}
}
received
});
let test_tx = Transaction::new::<PutMsg>();
let test_client = ClientId::FIRST;
let test_request_id = RequestId::new();
send_halve
.notify_session_actor(test_tx, test_client, test_request_id)
.await;
let received = drain_handle.await.expect("drain task should complete");
assert_eq!(received.len(), 2, "Both messages should be delivered");
match &received[1] {
SessionMessage::RegisterTransaction {
tx,
client_id,
request_id,
} => {
assert_eq!(*tx, test_tx);
assert_eq!(*client_id, test_client);
assert_eq!(*request_id, test_request_id);
}
other @ SessionMessage::RegisterClient { .. }
| other @ SessionMessage::DeliverResult { .. }
| other @ SessionMessage::DeliverHostResponse { .. }
| other @ SessionMessage::DeliverHostResponseWithRequestId { .. }
| other @ SessionMessage::ClientDisconnect { .. } => panic!(
"Expected RegisterTransaction, got {:?}",
std::mem::discriminant(other)
),
}
}
#[tokio::test]
async fn try_recv_from_sender_empty_channel() {
let (_send_halve, mut rcv_halve, _) = contract_handler_channel();
let result = rcv_halve.try_recv_from_sender();
assert!(matches!(result, Ok(None)));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn try_recv_from_sender_receives_event() {
let (send_halve, mut rcv_halve, _) = contract_handler_channel();
let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![10, 11, 12])),
Parameters::from(vec![13]),
)));
let key = contract.key();
let _h = GlobalExecutor::spawn(async move {
send_halve
.send_to_handler(ContractHandlerEvent::PutQuery {
key,
state: vec![20, 21].into(),
related_contracts: RelatedContracts::default(),
contract: None,
})
.await
});
tokio::time::sleep(Duration::from_millis(50)).await;
let result = rcv_halve.try_recv_from_sender();
assert!(result.is_ok());
let (id, event) = result.unwrap().expect("should have received an event");
let ContractHandlerEvent::PutQuery { state, .. } = event else {
panic!("expected PutQuery event");
};
assert_eq!(state.as_ref(), &[20, 21]);
assert!(rcv_halve.end.waiting_response.contains_key(&id.id));
}
#[tokio::test]
async fn try_recv_from_sender_disconnected() {
let (send_halve, mut rcv_halve, _) = contract_handler_channel();
drop(send_halve);
let result = rcv_halve.try_recv_from_sender();
assert!(
matches!(result, Err(super::ContractError::NoEvHandlerResponse)),
"Expected NoEvHandlerResponse, got {:?}",
result
);
}
}
pub(super) mod in_memory {
use super::{
super::{
Executor, MockRuntime,
executor::{OpRequestSender, mock_wasm_runtime::MockWasmRuntime},
storages::Storage,
},
ContractHandler, ContractHandlerChannel, ContractHandlerHalve,
};
use crate::node::OpManager;
use crate::wasm_runtime::MockStateStorage;
use std::sync::Arc;
pub(crate) struct MemoryContractHandler {
channel: ContractHandlerChannel<ContractHandlerHalve>,
runtime: Executor<MockRuntime, Storage>,
}
impl MemoryContractHandler {
pub async fn new(
channel: ContractHandlerChannel<ContractHandlerHalve>,
op_sender: Option<OpRequestSender>,
op_manager: Option<Arc<OpManager>>,
identifier: &str,
) -> Self {
MemoryContractHandler {
channel,
runtime: Executor::new_mock(identifier, op_sender, op_manager)
.await
.expect("should start mock executor"),
}
}
}
pub(crate) struct SimulationContractHandler {
channel: ContractHandlerChannel<ContractHandlerHalve>,
runtime: Executor<MockRuntime, MockStateStorage>,
}
impl SimulationContractHandler {
pub async fn new(
channel: ContractHandlerChannel<ContractHandlerHalve>,
op_sender: Option<OpRequestSender>,
op_manager: Option<Arc<OpManager>>,
identifier: &str,
shared_storage: MockStateStorage,
) -> Self {
SimulationContractHandler {
channel,
runtime: Executor::new_mock_in_memory(
identifier,
shared_storage,
op_sender,
op_manager,
)
.await
.expect("should start mock in-memory executor"),
}
}
}
impl ContractHandler for MemoryContractHandler {
type Builder = String;
type ContractExecutor = Executor<MockRuntime, Storage>;
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
op_sender: OpRequestSender,
op_manager: Arc<OpManager>,
identifier: Self::Builder,
) -> anyhow::Result<Self>
where
Self: Sized + 'static,
{
Ok(
MemoryContractHandler::new(channel, Some(op_sender), Some(op_manager), &identifier)
.await,
)
}
fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}
fn executor(&mut self) -> &mut Self::ContractExecutor {
&mut self.runtime
}
}
pub struct SimulationHandlerBuilder {
pub identifier: String,
pub shared_storage: MockStateStorage,
}
impl ContractHandler for SimulationContractHandler {
type Builder = SimulationHandlerBuilder;
type ContractExecutor = Executor<MockRuntime, MockStateStorage>;
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
op_sender: OpRequestSender,
op_manager: Arc<OpManager>,
builder: Self::Builder,
) -> anyhow::Result<Self>
where
Self: Sized + 'static,
{
Ok(SimulationContractHandler::new(
channel,
Some(op_sender),
Some(op_manager),
&builder.identifier,
builder.shared_storage,
)
.await)
}
fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}
fn executor(&mut self) -> &mut Self::ContractExecutor {
&mut self.runtime
}
}
#[allow(dead_code)]
pub(crate) struct MockWasmContractHandler {
channel: ContractHandlerChannel<ContractHandlerHalve>,
runtime: Executor<MockWasmRuntime, MockStateStorage>,
}
#[allow(dead_code)]
pub struct MockWasmHandlerBuilder {
pub identifier: String,
pub shared_storage: MockStateStorage,
pub contract_store: Option<crate::wasm_runtime::InMemoryContractStore>,
}
impl ContractHandler for MockWasmContractHandler {
type Builder = MockWasmHandlerBuilder;
type ContractExecutor = Executor<MockWasmRuntime, MockStateStorage>;
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
op_sender: OpRequestSender,
op_manager: Arc<OpManager>,
builder: Self::Builder,
) -> anyhow::Result<Self>
where
Self: Sized + 'static,
{
let runtime = Executor::new_mock_wasm(
&builder.identifier,
builder.shared_storage,
builder.contract_store,
Some(op_sender),
Some(op_manager),
)
.await?;
Ok(Self { channel, runtime })
}
fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}
fn executor(&mut self) -> &mut Self::ContractExecutor {
&mut self.runtime
}
}
#[test]
fn serialization() -> anyhow::Result<()> {
use freenet_stdlib::prelude::WrappedContract;
let bytes = crate::util::test::random_bytes_1kb();
let mut unstructured = arbitrary::Unstructured::new(&bytes);
let contract: WrappedContract = unstructured.arbitrary()?;
let serialized = bincode::serialize(&contract)?;
let deser: WrappedContract = bincode::deserialize(&serialized)?;
assert_eq!(deser.code(), contract.code());
assert_eq!(deser.key(), contract.key());
Ok(())
}
}