use futures::future::BoxFuture;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::RwLock;
#[async_trait::async_trait]
pub trait Dispatch<Cmd> {
async fn dispatch(&mut self, cmd: Cmd);
}
pub trait ReplyHandle<T>: Send {
fn send(self: Box<Self>, val: T);
}
impl<T: Send> ReplyHandle<T> for tokio::sync::oneshot::Sender<T> {
fn send(self: Box<Self>, val: T) {
let _ = (*self).send(val);
}
}
#[async_trait::async_trait]
pub trait Backend<T: ?Sized, C>: Send + Sync {
async fn send(&self, cmd: C);
async fn request<R, FLocal, FRemote>(&self, f_local: FLocal, f_remote: FRemote) -> R
where
R: Send + 'static,
FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send;
}
#[derive(Clone)]
pub struct Local<T: ?Sized> {
inner: Arc<RwLock<T>>,
}
impl<T: ?Sized> Local<T> {
pub fn new(inner: Arc<RwLock<T>>) -> Self {
Self { inner }
}
}
#[async_trait::async_trait]
impl<T, C> Backend<T, C> for Local<T>
where
T: ?Sized + Dispatch<C> + Send + Sync,
C: Send + 'static,
{
#[inline]
async fn send(&self, cmd: C) {
let mut guard = self.inner.write().await;
guard.dispatch(cmd).await;
}
#[inline]
async fn request<R, FLocal, FRemote>(&self, f_local: FLocal, _f_remote: FRemote) -> R
where
R: Send + 'static,
FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send,
{
let mut guard = self.inner.write().await;
f_local(&mut *guard).await
}
}
pub trait Transport<C>: Send + Sync + Clone {
fn send_msg(&self, cmd: C);
}
#[derive(Clone)]
pub struct Remote<Tr, C> {
transport: Tr,
_marker: PhantomData<C>,
}
#[async_trait::async_trait]
impl<T, C, Tr> Backend<T, C> for Remote<Tr, C>
where
T: ?Sized + Sync,
C: Send + Sync + 'static,
Tr: Transport<C>,
{
#[inline]
async fn send(&self, cmd: C) {
self.transport.send_msg(cmd);
}
async fn request<R, FLocal, FRemote>(&self, _f_local: FLocal, f_remote: FRemote) -> R
where
R: Send + 'static,
FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let cmd = f_remote(Box::new(tx));
self.transport.send_msg(cmd);
rx.await.expect("Remote channel closed unexpectedly")
}
}
pub trait ArrowTransport<C>: Transport<C> {
fn get_shm_handle(&self) -> String; }
#[derive(Clone)]
pub struct ArrowShm<Tr, C> {
transport: Tr,
_marker: PhantomData<C>,
}
#[async_trait::async_trait]
impl<T, C, Tr> Backend<T, C> for ArrowShm<Tr, C>
where
T: ?Sized + Sync,
C: Send + Sync + 'static, Tr: ArrowTransport<C>,
{
#[inline]
async fn send(&self, cmd: C) {
self.transport.send_msg(cmd);
}
async fn request<R, FLocal, FRemote>(&self, _f_local: FLocal, f_remote: FRemote) -> R
where
R: Send + 'static,
FLocal: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send,
FRemote: FnOnce(Box<dyn ReplyHandle<R>>) -> C + Send,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let cmd = f_remote(Box::new(tx));
self.transport.send_msg(cmd);
rx.await.expect("Arrow IPC channel closed")
}
}