mcl_rs/
transfer.rs

1#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]
2use crate::device::DevType;
3use crate::low_level;
4use crate::low_level::ReqStatus;
5use crate::task::{TaskArg, TaskArgData};
6
7use libmcl_sys::mcl_transfer;
8
9/// Transfer can be used to create a request for data transfer from MCL.
10pub struct Transfer<'a> {
11    args: Vec<TaskArg<'a>>,
12    curr_arg: usize,
13    d_type: DevType,
14    c_handle: *mut mcl_transfer,
15}
16
17impl<'a> Transfer<'a> {
18    /// Creates a new transfer with the given parameters
19    ///
20    /// ## Arguments
21    /// * `num_args` - The number of arguments that will be transfered
22    /// * `ncopies` - The number of copies to create
23    /// * `flags` - Other related flags
24    ///
25    /// Returns a new  Transfer object
26    pub(crate) fn new(num_args: usize, ncopies: usize, flags: u64) -> Self {
27        Transfer {
28            args: vec![Default::default(); num_args],
29            curr_arg: 0,
30            d_type: DevType::ANY,
31            c_handle: low_level::transfer_create(num_args as u64, ncopies as u64, flags),
32        }
33    }
34
35    /// Adds an argument to be transferred by this request
36    ///
37    ///
38    /// Returns the Transfer object
39    ///
40    /// # Examples
41    ///```no_run     
42    ///     let mcl = mcl_rs::MclEnvBuilder::new().initialize();
43    ///     mcl.load_prog("my_prog",mcl_rs::PrgType::Src);
44    ///     
45    ///     let data = vec![0; 4];
46    ///
47    ///     let tr = mcl.transfer(1, 1)
48    ///                 .arg(mcl_rs::TaskArg::input_slice(&data));
49    ///```
50    pub fn arg(mut self, arg: TaskArg<'a>) -> Self {
51        match &arg.data {
52            TaskArgData::Scalar(x) => {
53                low_level::transfer_set_arg(self.c_handle, self.curr_arg as u64, x, 0, arg.flags)
54            }
55            TaskArgData::Buffer(x) => {
56                low_level::transfer_set_arg(self.c_handle, self.curr_arg as u64, x, 0, arg.flags)
57            }
58            // TaskArgData::Local(x) => {
59            //     low_level::transfer_set_local(self.c_handle, self.curr_arg as u64, *x, 0, arg.flags)
60            // }
61            #[cfg(feature = "shared_mem")]
62            TaskArgData::Shared(..) => panic!("must use arg_shared api "),
63            TaskArgData::Empty => panic!("cannot have an empty arg"),
64        }
65        self.args[self.curr_arg] = arg;
66        self.curr_arg += 1;
67
68        self
69    }
70
71    /// Sets the desired device type
72    ///
73    /// ## Arguments
74    ///
75    /// * `d_type` - The device type to transfer to
76    ///
77    /// Returns the Transfer with the preference set
78    ///
79    /// # Examples
80    ///```no_run     
81    ///     let mcl = mcl_rs::MclEnvBuilder::new().initialize();
82    ///     mcl.load_prog("my_prog",mcl_rs::PrgType::Src);
83    ///
84    ///     let data = vec![0; 4];
85    ///
86    ///     let tr = mcl.transfer(1, 1)
87    ///                 .arg(mcl_rs::TaskArg::input_slice(&data))
88    ///                 .dev(mcl_rs::DevType::CPU);
89    ///```
90    pub fn dev(mut self, d_type: DevType) -> Self {
91        self.d_type = d_type;
92        self
93    }
94
95    /// Submit the transfer request
96    /// This is an asynchronous operation, meaning that no work is actually performed until
97    /// the returned future is actually `await`ed.
98    /// While awaiting a transfer execution, the user application will not make forward progress until
99    /// the underylying device has executed the transfer.
100    /// Upon return from the await call, any data is gauranteed to be written to its approriate buffer.
101    ///
102    /// Transfer execution order can be enforced by sequentially awaiting tasks, or may be executed simultaneously
103    /// using data structures such as Join_all <https://docs.rs/futures/latest/futures/future/fn.join_all.html>
104    /// # Examples
105    ///```no_run     
106    ///     let mcl = mcl_rs::MclEnvBuilder::new().initialize();
107    ///     mcl.load_prog("my_prog",mcl_rs::PrgType::Src);
108    ///
109    ///     let data = vec![0; 4];
110    ///
111    ///     let t_hdl = mcl.transfer(1, 1)
112    ///                 .arg(mcl_rs::TaskArg::input_slice(&data))
113    ///                 .dev(mcl_rs::DevType::CPU)
114    ///                 .exec();
115    ///     futures::executor::block_on(t_hdl);
116    ///```
117    ///```no_run
118    ///     let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
119    ///     mcl.load_prog("my_path", mcl_rs::PrgType::Src);
120    ///     let data = vec![0; 4];
121    ///     let pes: [u64; 3] = [1, 1, 1];
122    ///     let t1 =mcl.transfer(1, 1)
123    ///                 .arg(mcl_rs::TaskArg::input_slice(&data))
124    ///                 .dev(mcl_rs::DevType::CPU)
125    ///                 .exec(); //this creates a future we need to await
126    ///     let t2 = mcl.transfer(1, 1)
127    ///                 .arg(mcl_rs::TaskArg::input_slice(&data))
128    ///                 .dev(mcl_rs::DevType::CPU)
129    ///                 .exec(); //this creates a future we need to await
130    ///     let sequential_tasks = async move{
131    ///         t1.await; //task will execute before task 2 is even submitted
132    ///         t2.await;
133    ///     };
134    ///     futures::executor::block_on(sequential_tasks);
135    ///     
136    ///     let t3 = mcl.transfer(1, 1)
137    ///                 .arg(mcl_rs::TaskArg::input_slice(&data))
138    ///                 .dev(mcl_rs::DevType::CPU)
139    ///                 .exec(); //this creates a future we need to await
140    ///     let t4 = mcl.transfer(1, 1)
141    ///                 .arg(mcl_rs::TaskArg::input_slice(&data))
142    ///                 .dev(mcl_rs::DevType::CPU)
143    ///                 .exec(); //this creates a future we need to await
144    ///     let simultaneous_tasks = futures::future::join_all([t3,t4]);
145    ///     futures::executor::block_on(simultaneous_tasks); //both tasks submitted "simultaneously"
146    ///```
147    pub async fn exec(self) {
148        assert_eq!(self.curr_arg, self.args.len());
149        low_level::transfer_exec(self.c_handle, self.d_type);
150
151        while low_level::transfer_test(self.c_handle) != ReqStatus::Completed {
152            async_std::task::yield_now().await;
153        }
154    }
155}
156
157impl Drop for Transfer<'_> {
158    fn drop(&mut self) {
159        low_level::transfer_free(self.c_handle);
160    }
161}