use anyhow::Result;
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
use tracing::error;
use crate::actor::types::{
ActorError, ActorOperation, WasiHttpResponse, DEFAULT_OPERATION_TIMEOUT,
};
use crate::chain::ChainEvent;
use crate::metrics::ActorMetrics;
use crate::pack_bridge::{self, InterfaceHash, Value};
use super::types::{ActorControl, ActorInfo};
#[derive(Clone, Debug)]
pub struct ActorHandle {
operation_tx: mpsc::Sender<ActorOperation>,
info_tx: mpsc::Sender<ActorInfo>,
control_tx: mpsc::Sender<ActorControl>,
}
impl ActorHandle {
pub fn new(
operation_tx: mpsc::Sender<ActorOperation>,
info_tx: mpsc::Sender<ActorInfo>,
control_tx: mpsc::Sender<ActorControl>,
) -> Self {
Self {
operation_tx,
info_tx,
control_tx,
}
}
pub async fn call_function(&self, name: String, params: Value) -> Result<Value, ActorError> {
let (tx, rx) = oneshot::channel();
let params_bytes = pack_bridge::encode_value(¶ms).map_err(|e| {
error!("Failed to encode params: {}", e);
ActorError::SerializationError
})?;
self.operation_tx
.send(ActorOperation::CallFunctionPack {
name,
params: params_bytes,
response_tx: tx,
})
.await
.map_err(|e| {
error!("Failed to send operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => match result {
Ok(inner) => {
let result_bytes = inner?;
pack_bridge::decode_value(&result_bytes).map_err(|e| {
error!("Failed to decode result: {}", e);
ActorError::SerializationError
})
}
Err(e) => {
error!("Channel closed while waiting for response: {:?}", e);
Err(ActorError::ChannelClosed)
}
},
Err(_) => {
error!("Operation timed out after {:?}", DEFAULT_OPERATION_TIMEOUT);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
pub async fn call_function_pack_void(
&self,
name: String,
params: Vec<u8>,
) -> Result<(), ActorError> {
let (tx, rx) = oneshot::channel();
self.operation_tx
.send(ActorOperation::CallFunctionPack {
name: name.clone(),
params,
response_tx: tx,
})
.await
.map_err(|e| {
error!("Failed to send operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => match result {
Ok(inner_result) => match inner_result {
Ok(_) => Ok(()),
Err(e) => {
error!("Function call '{}' failed: {:?}", name, e);
Err(e)
}
},
Err(_) => {
error!("Channel closed while waiting for function call '{}'", name);
Err(ActorError::ChannelClosed)
}
},
Err(_) => {
error!("Operation timed out after {:?}", DEFAULT_OPERATION_TIMEOUT);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
pub async fn handle_wasi_http_request(
&self,
method: String,
scheme: Option<String>,
authority: Option<String>,
path_with_query: Option<String>,
headers: Vec<(String, Vec<u8>)>,
body: Vec<u8>,
) -> Result<WasiHttpResponse, ActorError> {
let (tx, rx) = oneshot::channel();
self.operation_tx
.send(ActorOperation::HandleWasiHttpRequest {
method,
scheme,
authority,
path_with_query,
headers,
body,
response_tx: tx,
})
.await
.map_err(|e| {
error!("Failed to send HandleWasiHttpRequest operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => match result {
Ok(result) => result,
Err(e) => {
error!("Channel closed while waiting for HTTP response: {:?}", e);
Err(ActorError::ChannelClosed)
}
},
Err(_) => {
error!(
"HTTP request timed out after {:?}",
DEFAULT_OPERATION_TIMEOUT
);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
pub async fn get_state(&self) -> Result<Value, ActorError> {
let (tx, rx) = oneshot::channel();
self.info_tx
.send(ActorInfo::GetState { response_tx: tx })
.await
.map_err(|e| {
error!("Failed to send GetState operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => result.map_err(|e| {
error!(
"Channel closed while waiting for GetState response: {:?}",
e
);
ActorError::ChannelClosed
})?,
Err(_) => {
error!(
"GetState operation timed out after {:?}",
DEFAULT_OPERATION_TIMEOUT
);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
pub async fn get_chain(&self) -> Result<Vec<ChainEvent>, ActorError> {
let (tx, rx) = oneshot::channel();
self.info_tx
.send(ActorInfo::GetChain { response_tx: tx })
.await
.map_err(|e| {
error!("Failed to send GetChain operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => result.map_err(|e| {
error!(
"Channel closed while waiting for GetChain response: {:?}",
e
);
ActorError::ChannelClosed
})?,
Err(_) => {
error!(
"GetChain operation timed out after {:?}",
DEFAULT_OPERATION_TIMEOUT
);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
pub async fn get_metrics(&self) -> Result<ActorMetrics, ActorError> {
let (tx, rx) = oneshot::channel();
self.info_tx
.send(ActorInfo::GetMetrics { response_tx: tx })
.await
.map_err(|e| {
error!("Failed to send GetMetrics operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => result.map_err(|e| {
error!(
"Channel closed while waiting for GetMetrics response: {:?}",
e
);
ActorError::ChannelClosed
})?,
Err(_) => {
error!(
"GetMetrics operation timed out after {:?}",
DEFAULT_OPERATION_TIMEOUT
);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
pub async fn shutdown(&self) -> Result<(), ActorError> {
let (tx, rx) = oneshot::channel();
self.control_tx
.send(ActorControl::Shutdown { response_tx: tx })
.await
.map_err(|e| {
error!("Failed to send Shutdown operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => result.map_err(|e| {
error!(
"Channel closed while waiting for Shutdown response: {:?}",
e
);
ActorError::ChannelClosed
})?,
Err(_) => {
error!(
"Shutdown operation timed out after {:?}",
DEFAULT_OPERATION_TIMEOUT
);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
pub async fn get_export_hashes(&self) -> Result<Vec<InterfaceHash>, ActorError> {
let (tx, rx) = oneshot::channel();
self.info_tx
.send(ActorInfo::GetExportHashes { response_tx: tx })
.await
.map_err(|e| {
error!("Failed to send GetExportHashes operation: {}", e);
ActorError::ChannelClosed
})?;
match timeout(DEFAULT_OPERATION_TIMEOUT, rx).await {
Ok(result) => result.map_err(|e| {
error!(
"Channel closed while waiting for GetExportHashes response: {:?}",
e
);
ActorError::ChannelClosed
})?,
Err(_) => {
error!(
"GetExportHashes operation timed out after {:?}",
DEFAULT_OPERATION_TIMEOUT
);
Err(ActorError::OperationTimeout(
DEFAULT_OPERATION_TIMEOUT.as_secs(),
))
}
}
}
}