mod sync_response;
use futures::channel::mpsc;
use linera_base::{data_types::OracleResponse, http::Response, identifiers::EventId};
pub use self::sync_response::SyncSender;
use crate::ExecutionError;
pub trait UnboundedSenderExt<Request> {
fn send_request<Response>(
&self,
builder: impl FnOnce(oneshot::Sender<Response>) -> Request,
) -> Result<oneshot::Receiver<Response>, ExecutionError>
where
Response: Send;
#[allow(dead_code)]
fn send_sync_request<Response>(
&self,
builder: impl FnOnce(SyncSender<Response>) -> Request,
) -> Result<Response, ExecutionError>
where
Response: Send;
}
impl<Request> UnboundedSenderExt<Request> for mpsc::UnboundedSender<Request>
where
Request: Send,
{
fn send_request<Response>(
&self,
builder: impl FnOnce(oneshot::Sender<Response>) -> Request,
) -> Result<oneshot::Receiver<Response>, ExecutionError>
where
Response: Send,
{
let (response_sender, response_receiver) = oneshot::channel();
let request = builder(response_sender);
self.unbounded_send(request).map_err(|send_error| {
assert!(
send_error.is_disconnected(),
"`send_request` should only be used with unbounded senders"
);
ExecutionError::MissingRuntimeResponse
})?;
Ok(response_receiver)
}
#[allow(dead_code)]
fn send_sync_request<Response>(
&self,
builder: impl FnOnce(SyncSender<Response>) -> Request,
) -> Result<Response, ExecutionError>
where
Response: Send,
{
let (response_sender, response_receiver) = sync_response::channel();
let request = builder(response_sender);
self.unbounded_send(request).map_err(|send_error| {
assert!(
send_error.is_disconnected(),
"`send_request` should only be used with unbounded senders"
);
ExecutionError::MissingRuntimeResponse
})?;
response_receiver
.recv()
.map_err(|_| ExecutionError::MissingRuntimeResponse)
}
}
pub trait ReceiverExt<T> {
fn recv_response(self) -> Result<T, ExecutionError>;
}
impl<T> ReceiverExt<T> for oneshot::Receiver<T> {
fn recv_response(self) -> Result<T, ExecutionError> {
self.recv()
.map_err(|oneshot::RecvError| ExecutionError::MissingRuntimeResponse)
}
}
pub trait RespondExt {
type Response;
fn respond(self, response: Self::Response);
}
impl<Response> RespondExt for oneshot::Sender<Response> {
type Response = Response;
fn respond(self, response: Self::Response) {
if self.send(response).is_err() {
tracing::debug!("Request sent to `RuntimeActor` was canceled");
}
}
}
impl<Response> RespondExt for SyncSender<Response> {
type Response = Response;
fn respond(self, response: Self::Response) {
if self.send(response).is_err() {
tracing::debug!("Request sent to `RuntimeActor` was canceled");
}
}
}
pub(crate) trait OracleResponseExt {
fn to_round(&self) -> Result<Option<u32>, ExecutionError>;
fn to_service_response(&self) -> Result<Vec<u8>, ExecutionError>;
fn to_http_response(&self) -> Result<Response, ExecutionError>;
fn to_event(&self, event_id: &EventId) -> Result<Vec<u8>, ExecutionError>;
}
impl OracleResponseExt for OracleResponse {
fn to_round(&self) -> Result<Option<u32>, ExecutionError> {
match self {
OracleResponse::Round(round) => Ok(*round),
_ => Err(ExecutionError::OracleResponseMismatch),
}
}
fn to_service_response(&self) -> Result<Vec<u8>, ExecutionError> {
match self {
OracleResponse::Service(bytes) => Ok(bytes.clone()),
_ => Err(ExecutionError::OracleResponseMismatch),
}
}
fn to_http_response(&self) -> Result<Response, ExecutionError> {
match self {
OracleResponse::Http(response) => Ok(response.clone()),
_ => Err(ExecutionError::OracleResponseMismatch),
}
}
fn to_event(&self, event_id: &EventId) -> Result<Vec<u8>, ExecutionError> {
match self {
OracleResponse::Event(recorded_event_id, event) if recorded_event_id == event_id => {
Ok(event.clone())
}
_ => Err(ExecutionError::OracleResponseMismatch),
}
}
}