gosh_remote/task.rs
1// [[file:../remote.note::d03e6166][d03e6166]]
2#![deny(warnings)]
3//! Task for remote execution
4//!
5//! # Example
6//!
7//! ```ignore
8//! let (rx, tx) = Task::new().split();
9//!
10//! // client side
11//! tx_input1 = tx.clone();
12//! tx_input2 = tx.clone();
13//! let out1 = tx_input1.send("test input 1")?;
14//! let out2 = tx_input2.send("test input 2")?;
15//!
16//! // server side
17//! if let Some(RemoteIO(input, tx_out)) = rx.recv().await {
18//! // compute with job input
19//! let output = compute_with(input)?;
20//! // send job output to client side
21//! tx_out.send(output)?;
22//! } else {
23//! // task channel closed
24//! // ...
25//! }
26//! ```
27// d03e6166 ends here
28
29// [[file:../remote.note::475dbc7d][475dbc7d]]
30use super::*;
31
32use tokio::sync::{mpsc, oneshot};
33
34use std::fmt::Debug;
35use std::marker::Send;
36// 475dbc7d ends here
37
38// [[file:../remote.note::214790a9][214790a9]]
39type Computed<O> = O;
40type TxInput<I, O> = mpsc::Sender<RemoteIO<I, O>>;
41
42/// The receiver of task for remote execution
43pub type RxInput<I, O> = mpsc::Receiver<RemoteIO<I, O>>;
44/// The sender of computational results.
45pub type TxOutput<O> = oneshot::Sender<Computed<O>>;
46
47/// RemoteIO contains input and output for remote execution. The first field in tuple
48/// is job input, and the second is for writing job output.
49#[derive(Debug)]
50pub struct RemoteIO<I, O>(
51 /// Input data for starting computation
52 pub I,
53 /// A oneshot channel for send computational output.
54 pub TxOutput<O>,
55);
56// 214790a9 ends here
57
58// [[file:../remote.note::b55affa9][b55affa9]]
59/// The client side for remote execution
60#[derive(Debug, Clone, Default)]
61pub struct TaskSender<I, O> {
62 tx_inp: Option<TxInput<I, O>>,
63}
64
65impl<I: Debug, O: Debug + Send> TaskSender<I, O> {
66 #[deprecated(note = "use send instead")]
67 /// Ask remote side compute with `input` and return the computed.
68 pub async fn remote_compute(&self, input: impl Into<I>) -> Result<Computed<O>> {
69 let o = self.send(input).await?;
70 Ok(o)
71 }
72
73 /// Ask remote side compute with `input` and return the computed.
74 pub async fn send(&self, input: impl Into<I>) -> Result<Computed<O>> {
75 let (tx, rx) = tokio::sync::oneshot::channel();
76 self.tx_inp
77 .as_ref()
78 .expect("task input")
79 .send(RemoteIO(input.into(), tx))
80 .await
81 .map_err(|err| format_err!("task send error: {err:?}"))?;
82 let computed = rx.await?;
83 Ok(computed)
84 }
85}
86// b55affa9 ends here
87
88// [[file:../remote.note::f45eafe9][f45eafe9]]
89/// The server side for remote execution
90#[derive(Debug)]
91pub struct TaskReceiver<I, O> {
92 rx_inp: RxInput<I, O>,
93}
94
95impl<I, O> TaskReceiver<I, O> {
96 /// Receives the next task for this receiver.
97 pub async fn recv(&mut self) -> Option<RemoteIO<I, O>> {
98 self.rx_inp.recv().await
99 }
100}
101
102fn new_interactive_task<I, O>() -> (TaskReceiver<I, O>, TaskSender<I, O>) {
103 let (tx_inp, rx_inp) = tokio::sync::mpsc::channel(1);
104
105 let server = TaskReceiver { rx_inp };
106 let client = TaskSender { tx_inp: tx_inp.into() };
107
108 (server, client)
109}
110// f45eafe9 ends here
111
112// [[file:../remote.note::3f19ae12][3f19ae12]]
113/// A Task channel for remote execution (multi-producer, single-consumer)
114pub struct Task<I, O> {
115 sender: TaskSender<I, O>,
116 receiver: TaskReceiver<I, O>,
117}
118
119impl<I, O> Task<I, O> {
120 /// Create a task channel for computation of molecule in client/server
121 /// architecture
122 pub fn new() -> Self {
123 let (receiver, sender) = new_interactive_task();
124 Self { sender, receiver }
125 }
126
127 /// Splits a single task into separate read and write half
128 pub fn split(self) -> (TaskReceiver<I, O>, TaskSender<I, O>) {
129 match self {
130 Self {
131 sender: tx,
132 receiver: rx,
133 } => (rx, tx),
134 }
135 }
136}
137// 3f19ae12 ends here