use futures::{Future, future, pin_mut};
use serde::{Deserialize, Serialize};
use std::fmt;
use tracing::Instrument;
use super::{CallError, msg::RFnRequest};
use crate::{
RemoteSend, codec, exec,
rch::{mpsc, oneshot},
};
pub struct RFnMutProvider {
keep_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl fmt::Debug for RFnMutProvider {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("RFnMutProvider").finish()
}
}
impl RFnMutProvider {
pub fn keep(mut self) {
let _ = self.keep_tx.take().unwrap().send(());
}
pub async fn done(&mut self) {
self.keep_tx.as_mut().unwrap().closed().await
}
}
impl Drop for RFnMutProvider {
fn drop(&mut self) {
}
}
#[derive(Serialize, Deserialize)]
#[serde(bound(serialize = "A: RemoteSend, R: RemoteSend, Codec: codec::Codec"))]
#[serde(bound(deserialize = "A: RemoteSend, R: RemoteSend, Codec: codec::Codec"))]
pub struct RFnMut<A, R, Codec = codec::Default> {
request_tx: mpsc::Sender<RFnRequest<A, R, Codec>, Codec, 1>,
}
impl<A, R, Codec> fmt::Debug for RFnMut<A, R, Codec> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("RFnMut").finish()
}
}
impl<A, R, Codec> RFnMut<A, R, Codec>
where
A: RemoteSend,
R: RemoteSend,
Codec: codec::Codec,
{
fn new_int<F, Fut>(fun: F) -> Self
where
F: FnMut(A) -> Fut + Send + Sync + 'static,
Fut: Future<Output = R> + Send,
{
let (rfn, provider) = Self::provided_int(fun);
provider.keep();
rfn
}
fn provided_int<F, Fut>(mut fun: F) -> (Self, RFnMutProvider)
where
F: FnMut(A) -> Fut + Send + Sync + 'static,
Fut: Future<Output = R> + Send,
{
let (request_tx, request_rx) = mpsc::channel(1);
let request_tx = request_tx.set_buffer();
let mut request_rx = request_rx.set_buffer::<1>();
let (keep_tx, keep_rx) = tokio::sync::oneshot::channel();
exec::spawn(
async move {
let term = async move {
if let Ok(()) = keep_rx.await {
future::pending().await
}
};
pin_mut!(term);
loop {
tokio::select! {
biased;
() = &mut term => break,
req_res = request_rx.recv() => {
match req_res {
Ok(Some(RFnRequest {argument, result_tx})) => {
let result = fun(argument).await;
let _ = result_tx.send(result);
}
Ok(None) => break,
Err(err) if err.is_final() => break,
Err(_) => (),
}
}
}
}
}
.in_current_span(),
);
(Self { request_tx }, RFnMutProvider { keep_tx: Some(keep_tx) })
}
async fn try_call_int(&mut self, argument: A) -> Result<R, CallError> {
let (result_tx, result_rx) = oneshot::channel();
let _ = self.request_tx.send(RFnRequest { argument, result_tx }).await;
let result = result_rx.await?;
Ok(result)
}
}
impl<A, RT, RE, Codec> RFnMut<A, Result<RT, RE>, Codec>
where
A: RemoteSend,
RT: RemoteSend,
RE: RemoteSend + From<CallError>,
Codec: codec::Codec,
{
async fn call_int(&mut self, argument: A) -> Result<RT, RE> {
self.try_call_int(argument).await?
}
}
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_0, provided_0, (&mut), );
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_1, provided_1, (&mut), arg1: A1);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_2, provided_2, (&mut), arg1: A1, arg2: A2);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_3, provided_3, (&mut), arg1: A1, arg2: A2, arg3: A3);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_4, provided_4, (&mut), arg1: A1, arg2: A2, arg3: A3, arg4: A4);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_5, provided_5, (&mut), arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_6, provided_6, (&mut), arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, arg6: A6);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_7, provided_7, (&mut), arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, arg6: A6, arg7: A7);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_8, provided_8, (&mut), arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, arg6: A6, arg7: A7, arg8: A8);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_9, provided_9, (&mut), arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, arg6: A6, arg7: A7, arg8: A8, arg9: A9);
#[rustfmt::skip] arg_stub!(RFnMut, FnMut, RFnMutProvider, new_10, provided_10, (&mut), arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, arg6: A6, arg7: A7, arg8: A8, arg9: A9, arg10: A10);
impl<A, R, Codec> Drop for RFnMut<A, R, Codec> {
fn drop(&mut self) {
}
}