1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
// [[file:../remote.note::d03e6166][d03e6166]]
#![deny(warnings)]
//! Task for remote execution
//!
//! # Example
//!
//! ```ignore
//! let (rx, tx) = Task::new().split();
//!
//! // client side
//! tx1 = tx.clone();
//! tx2 = tx.clone();
//! let out1 = tx1.remote_compute("test input 1")?;
//! let out2 = tx2.remote_compute("test input 2")?;
//!
//! // server side
//! if let Some(RemoteIO(input, tx_out)) = rx.recv() {
//! // compute with job input
//! let output = compute_with(input)?;
//! // send job output to client side
//! tx_out.send(output)?;
//! } else {
//! // task channel closed
//! // ...
//! }
//! ```
// d03e6166 ends here
// [[file:../remote.note::475dbc7d][475dbc7d]]
use super::*;
use tokio::sync::{mpsc, oneshot};
use std::fmt::Debug;
use std::marker::Send;
// 475dbc7d ends here
// [[file:../remote.note::214790a9][214790a9]]
type Computed<O> = O;
type TxInput<I, O> = mpsc::Sender<RemoteIO<I, O>>;
/// The receiver of task for remote execution
pub type RxInput<I, O> = mpsc::Receiver<RemoteIO<I, O>>;
/// The sender of computational results.
pub type TxOutput<O> = oneshot::Sender<Computed<O>>;
/// RemoteIO contains input and output for remote execution. The first field in tuple
/// is job input, and the second is for writing job output.
#[derive(Debug)]
pub struct RemoteIO<I, O>(
/// Input data for starting computation
pub I,
/// A oneshot channel for send computational output.
pub TxOutput<O>,
);
// 214790a9 ends here
// [[file:../remote.note::b55affa9][b55affa9]]
/// The client side for remote execution
#[derive(Debug, Clone, Default)]
pub struct TaskSender<I, O> {
tx_inp: Option<TxInput<I, O>>,
}
impl<I: Debug, O: Debug + Send> TaskSender<I, O> {
/// Ask remote side compute with `input` and return the computed.
pub async fn remote_compute(&self, input: impl Into<I>) -> Result<Computed<O>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.tx_inp
.as_ref()
.expect("task input")
.send(RemoteIO(input.into(), tx))
.await
.map_err(|err| format_err!("task send error: {err:?}"))?;
let computed = rx.await?;
Ok(computed)
}
}
// b55affa9 ends here
// [[file:../remote.note::f45eafe9][f45eafe9]]
/// The server side for remote execution
#[derive(Debug)]
pub struct TaskReceiver<I, O> {
rx_inp: RxInput<I, O>,
}
impl<I, O> TaskReceiver<I, O> {
/// Receives the next task for this receiver.
pub async fn recv(&mut self) -> Option<RemoteIO<I, O>> {
self.rx_inp.recv().await
}
}
fn new_interactive_task<I, O>() -> (TaskReceiver<I, O>, TaskSender<I, O>) {
let (tx_inp, rx_inp) = tokio::sync::mpsc::channel(1);
let server = TaskReceiver { rx_inp };
let client = TaskSender { tx_inp: tx_inp.into() };
(server, client)
}
// f45eafe9 ends here
// [[file:../remote.note::3f19ae12][3f19ae12]]
/// A Task channel for remote execution (multi-producer, single-consumer)
pub struct Task<I, O> {
sender: TaskSender<I, O>,
receiver: TaskReceiver<I, O>,
}
impl<I, O> Task<I, O> {
/// Create a task channel for computation of molecule in client/server
/// architecture
pub fn new() -> Self {
let (receiver, sender) = new_interactive_task();
Self { sender, receiver }
}
/// Splits a single task into separate read and write half
pub fn split(self) -> (TaskReceiver<I, O>, TaskSender<I, O>) {
match self {
Self {
sender: tx,
receiver: rx,
} => (rx, tx),
}
}
}
// 3f19ae12 ends here