use std::time::Duration;
use crate::broker::backend_sdk::frame_client::{FrameClient, FrameClientError};
use crate::broker::protocol::{Endpoint, Frame};
pub struct AsyncFrameClient {
inner: Option<FrameClient>,
}
impl AsyncFrameClient {
pub async fn connect(endpoint: &Endpoint) -> Result<Self, FrameClientError> {
let endpoint = endpoint.clone();
let inner = match tokio::task::spawn_blocking(move || FrameClient::connect(&endpoint)).await
{
Ok(result) => result?,
Err(join_err) => return Err(join_error_to_client(join_err)),
};
Ok(Self { inner: Some(inner) })
}
pub async fn connect_with_timeout(
endpoint: &Endpoint,
timeout: Duration,
) -> Result<Self, FrameClientError> {
match tokio::time::timeout(timeout, Self::connect(endpoint)).await {
Ok(result) => result,
Err(_) => Err(FrameClientError::Connect(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"async frame client connect timed out",
))),
}
}
pub fn from_blocking(client: FrameClient) -> Self {
Self {
inner: Some(client),
}
}
pub async fn request(
&mut self,
payload_protocol: u32,
payload: Vec<u8>,
) -> Result<Frame, FrameClientError> {
let mut client = self.inner.take().ok_or_else(|| {
FrameClientError::Framing(crate::broker::protocol::FramingError::Io(
std::io::Error::other("async frame client was poisoned by a prior request panic"),
))
})?;
let join = tokio::task::spawn_blocking(move || {
let result = client.request(payload_protocol, payload);
(client, result)
})
.await;
match join {
Ok((client, result)) => {
self.inner = Some(client);
result
}
Err(join_err) => Err(join_error_to_client(join_err)),
}
}
pub async fn request_with_timeout(
&mut self,
payload_protocol: u32,
payload: Vec<u8>,
timeout: Duration,
) -> Result<Frame, FrameClientError> {
match tokio::time::timeout(timeout, self.request(payload_protocol, payload)).await {
Ok(result) => result,
Err(_) => Err(FrameClientError::Framing(
crate::broker::protocol::FramingError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"async frame client request timed out",
)),
)),
}
}
pub fn next_request_id(&self) -> Option<u64> {
self.inner.as_ref().map(FrameClient::next_request_id)
}
pub fn into_blocking(self) -> Option<FrameClient> {
self.inner
}
}
fn join_error_to_client(join_err: tokio::task::JoinError) -> FrameClientError {
FrameClientError::Framing(crate::broker::protocol::FramingError::Io(
std::io::Error::other(format!(
"async frame client worker thread panicked or was cancelled: {join_err}"
)),
))
}