use super::*;
impl Executor<Runtime> {
pub(super) async fn subscribe(&mut self, key: ContractKey) -> Result<(), ExecutorError> {
if self.mode == OperationMode::Local {
return Ok(());
}
let op_manager = self
.op_manager
.as_ref()
.ok_or_else(|| ExecutorError::other(anyhow::anyhow!("missing op_manager")))?;
let executor_tx = crate::message::Transaction::new::<operations::subscribe::SubscribeMsg>();
const SUBSCRIBE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
match tokio::time::timeout(
SUBSCRIBE_TIMEOUT,
operations::subscribe::run_executor_subscribe(
op_manager.clone(),
*key.id(),
executor_tx,
),
)
.await
{
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(ExecutorError::other(anyhow::anyhow!("{err}"))),
Err(_) => Err(ExecutorError::other(anyhow::anyhow!(
"executor subscribe timed out after {}s",
SUBSCRIBE_TIMEOUT.as_secs()
))),
}
}
#[inline]
pub(super) async fn local_state_or_from_network(
&mut self,
id: &ContractInstanceId,
return_contract_code: bool,
) -> Result<Either<WrappedState, operations::get::GetResult>, ExecutorError> {
if let Some(full_key) = self.lookup_key(id) {
if let Ok(state) = self.state_store.get(&full_key).await {
return Ok(Either::Left(state));
}
}
let op_manager = self
.op_manager
.as_ref()
.ok_or_else(|| ExecutorError::other(anyhow::anyhow!("missing op_manager")))?;
let (_tx, rx) =
operations::get::op_ctx_task::start_sub_op_get(op_manager, *id, return_contract_code);
const SUB_OP_FETCH_TIMEOUT: Duration = Duration::from_secs(120);
let outcome = tokio::time::timeout(SUB_OP_FETCH_TIMEOUT, rx)
.await
.map_err(|_| {
tracing::warn!(
contract = %id,
"sub-op GET timed out at executor"
);
ExecutorError::other(anyhow::anyhow!("sub-op GET timed out"))
})?
.map_err(|_| ExecutorError::other(anyhow::anyhow!("sub-op GET task dropped")))?;
match outcome {
operations::get::op_ctx_task::SubOpGetOutcome::Found(get_result) => {
Ok(Either::Right(get_result))
}
operations::get::op_ctx_task::SubOpGetOutcome::NotFound(cause) => {
Err(ExecutorError::other(anyhow::anyhow!(cause)))
}
operations::get::op_ctx_task::SubOpGetOutcome::Infra(err) => {
Err(ExecutorError::other(err))
}
}
}
}