gosh_remote/
task.rs

1// [[file:../remote.note::d03e6166][d03e6166]]
2#![deny(warnings)]
3//! Task for remote execution
4//!
5//! # Example
6//!
7//! ```ignore
8//! let (rx, tx) = Task::new().split();
9//! 
10//! // client side
11//! tx_input1 = tx.clone();
12//! tx_input2 = tx.clone();
13//! let out1 = tx_input1.send("test input 1")?;
14//! let out2 = tx_input2.send("test input 2")?;
15//! 
16//! // server side
17//! if let Some(RemoteIO(input, tx_out)) = rx.recv().await {
18//!     // compute with job input
19//!     let output = compute_with(input)?;
20//!     // send job output to client side
21//!     tx_out.send(output)?;
22//! } else {
23//!     // task channel closed
24//!     // ...
25//! }
26//! ```
27// d03e6166 ends here
28
29// [[file:../remote.note::475dbc7d][475dbc7d]]
30use super::*;
31
32use tokio::sync::{mpsc, oneshot};
33
34use std::fmt::Debug;
35use std::marker::Send;
36// 475dbc7d ends here
37
38// [[file:../remote.note::214790a9][214790a9]]
39type Computed<O> = O;
40type TxInput<I, O> = mpsc::Sender<RemoteIO<I, O>>;
41
42/// The receiver of task for remote execution
43pub type RxInput<I, O> = mpsc::Receiver<RemoteIO<I, O>>;
44/// The sender of computational results.
45pub type TxOutput<O> = oneshot::Sender<Computed<O>>;
46
47/// RemoteIO contains input and output for remote execution. The first field in tuple
48/// is job input, and the second is for writing job output.
49#[derive(Debug)]
50pub struct RemoteIO<I, O>(
51    /// Input data for starting computation
52    pub I,
53    /// A oneshot channel for send computational output.
54    pub TxOutput<O>,
55);
56// 214790a9 ends here
57
58// [[file:../remote.note::b55affa9][b55affa9]]
59/// The client side for remote execution
60#[derive(Debug, Clone, Default)]
61pub struct TaskSender<I, O> {
62    tx_inp: Option<TxInput<I, O>>,
63}
64
65impl<I: Debug, O: Debug + Send> TaskSender<I, O> {
66    #[deprecated(note = "use send instead")]
67    /// Ask remote side compute with `input` and return the computed.
68    pub async fn remote_compute(&self, input: impl Into<I>) -> Result<Computed<O>> {
69        let o = self.send(input).await?;
70        Ok(o)
71    }
72
73    /// Ask remote side compute with `input` and return the computed.
74    pub async fn send(&self, input: impl Into<I>) -> Result<Computed<O>> {
75        let (tx, rx) = tokio::sync::oneshot::channel();
76        self.tx_inp
77            .as_ref()
78            .expect("task input")
79            .send(RemoteIO(input.into(), tx))
80            .await
81            .map_err(|err| format_err!("task send error: {err:?}"))?;
82        let computed = rx.await?;
83        Ok(computed)
84    }
85}
86// b55affa9 ends here
87
88// [[file:../remote.note::f45eafe9][f45eafe9]]
89/// The server side for remote execution
90#[derive(Debug)]
91pub struct TaskReceiver<I, O> {
92    rx_inp: RxInput<I, O>,
93}
94
95impl<I, O> TaskReceiver<I, O> {
96    /// Receives the next task for this receiver.
97    pub async fn recv(&mut self) -> Option<RemoteIO<I, O>> {
98        self.rx_inp.recv().await
99    }
100}
101
102fn new_interactive_task<I, O>() -> (TaskReceiver<I, O>, TaskSender<I, O>) {
103    let (tx_inp, rx_inp) = tokio::sync::mpsc::channel(1);
104
105    let server = TaskReceiver { rx_inp };
106    let client = TaskSender { tx_inp: tx_inp.into() };
107
108    (server, client)
109}
110// f45eafe9 ends here
111
112// [[file:../remote.note::3f19ae12][3f19ae12]]
113/// A Task channel for remote execution (multi-producer, single-consumer)
114pub struct Task<I, O> {
115    sender: TaskSender<I, O>,
116    receiver: TaskReceiver<I, O>,
117}
118
119impl<I, O> Task<I, O> {
120    /// Create a task channel for computation of molecule in client/server
121    /// architecture
122    pub fn new() -> Self {
123        let (receiver, sender) = new_interactive_task();
124        Self { sender, receiver }
125    }
126
127    /// Splits a single task into separate read and write half
128    pub fn split(self) -> (TaskReceiver<I, O>, TaskSender<I, O>) {
129        match self {
130            Self {
131                sender: tx,
132                receiver: rx,
133            } => (rx, tx),
134        }
135    }
136}
137// 3f19ae12 ends here