mcl_rs/transfer.rs
1#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]
2use crate::device::DevType;
3use crate::low_level;
4use crate::low_level::ReqStatus;
5use crate::task::{TaskArg, TaskArgData};
6
7use libmcl_sys::mcl_transfer;
8
9/// Transfer can be used to create a request for data transfer from MCL.
10pub struct Transfer<'a> {
11 args: Vec<TaskArg<'a>>,
12 curr_arg: usize,
13 d_type: DevType,
14 c_handle: *mut mcl_transfer,
15}
16
17impl<'a> Transfer<'a> {
18 /// Creates a new transfer with the given parameters
19 ///
20 /// ## Arguments
21 /// * `num_args` - The number of arguments that will be transfered
22 /// * `ncopies` - The number of copies to create
23 /// * `flags` - Other related flags
24 ///
25 /// Returns a new Transfer object
26 pub(crate) fn new(num_args: usize, ncopies: usize, flags: u64) -> Self {
27 Transfer {
28 args: vec![Default::default(); num_args],
29 curr_arg: 0,
30 d_type: DevType::ANY,
31 c_handle: low_level::transfer_create(num_args as u64, ncopies as u64, flags),
32 }
33 }
34
35 /// Adds an argument to be transferred by this request
36 ///
37 ///
38 /// Returns the Transfer object
39 ///
40 /// # Examples
41 ///```no_run
42 /// let mcl = mcl_rs::MclEnvBuilder::new().initialize();
43 /// mcl.load_prog("my_prog",mcl_rs::PrgType::Src);
44 ///
45 /// let data = vec![0; 4];
46 ///
47 /// let tr = mcl.transfer(1, 1)
48 /// .arg(mcl_rs::TaskArg::input_slice(&data));
49 ///```
50 pub fn arg(mut self, arg: TaskArg<'a>) -> Self {
51 match &arg.data {
52 TaskArgData::Scalar(x) => {
53 low_level::transfer_set_arg(self.c_handle, self.curr_arg as u64, x, 0, arg.flags)
54 }
55 TaskArgData::Buffer(x) => {
56 low_level::transfer_set_arg(self.c_handle, self.curr_arg as u64, x, 0, arg.flags)
57 }
58 // TaskArgData::Local(x) => {
59 // low_level::transfer_set_local(self.c_handle, self.curr_arg as u64, *x, 0, arg.flags)
60 // }
61 #[cfg(feature = "shared_mem")]
62 TaskArgData::Shared(..) => panic!("must use arg_shared api "),
63 TaskArgData::Empty => panic!("cannot have an empty arg"),
64 }
65 self.args[self.curr_arg] = arg;
66 self.curr_arg += 1;
67
68 self
69 }
70
71 /// Sets the desired device type
72 ///
73 /// ## Arguments
74 ///
75 /// * `d_type` - The device type to transfer to
76 ///
77 /// Returns the Transfer with the preference set
78 ///
79 /// # Examples
80 ///```no_run
81 /// let mcl = mcl_rs::MclEnvBuilder::new().initialize();
82 /// mcl.load_prog("my_prog",mcl_rs::PrgType::Src);
83 ///
84 /// let data = vec![0; 4];
85 ///
86 /// let tr = mcl.transfer(1, 1)
87 /// .arg(mcl_rs::TaskArg::input_slice(&data))
88 /// .dev(mcl_rs::DevType::CPU);
89 ///```
90 pub fn dev(mut self, d_type: DevType) -> Self {
91 self.d_type = d_type;
92 self
93 }
94
95 /// Submit the transfer request
96 /// This is an asynchronous operation, meaning that no work is actually performed until
97 /// the returned future is actually `await`ed.
98 /// While awaiting a transfer execution, the user application will not make forward progress until
99 /// the underylying device has executed the transfer.
100 /// Upon return from the await call, any data is gauranteed to be written to its approriate buffer.
101 ///
102 /// Transfer execution order can be enforced by sequentially awaiting tasks, or may be executed simultaneously
103 /// using data structures such as Join_all <https://docs.rs/futures/latest/futures/future/fn.join_all.html>
104 /// # Examples
105 ///```no_run
106 /// let mcl = mcl_rs::MclEnvBuilder::new().initialize();
107 /// mcl.load_prog("my_prog",mcl_rs::PrgType::Src);
108 ///
109 /// let data = vec![0; 4];
110 ///
111 /// let t_hdl = mcl.transfer(1, 1)
112 /// .arg(mcl_rs::TaskArg::input_slice(&data))
113 /// .dev(mcl_rs::DevType::CPU)
114 /// .exec();
115 /// futures::executor::block_on(t_hdl);
116 ///```
117 ///```no_run
118 /// let mcl = mcl_rs::MclEnvBuilder::new().num_workers(10).initialize();
119 /// mcl.load_prog("my_path", mcl_rs::PrgType::Src);
120 /// let data = vec![0; 4];
121 /// let pes: [u64; 3] = [1, 1, 1];
122 /// let t1 =mcl.transfer(1, 1)
123 /// .arg(mcl_rs::TaskArg::input_slice(&data))
124 /// .dev(mcl_rs::DevType::CPU)
125 /// .exec(); //this creates a future we need to await
126 /// let t2 = mcl.transfer(1, 1)
127 /// .arg(mcl_rs::TaskArg::input_slice(&data))
128 /// .dev(mcl_rs::DevType::CPU)
129 /// .exec(); //this creates a future we need to await
130 /// let sequential_tasks = async move{
131 /// t1.await; //task will execute before task 2 is even submitted
132 /// t2.await;
133 /// };
134 /// futures::executor::block_on(sequential_tasks);
135 ///
136 /// let t3 = mcl.transfer(1, 1)
137 /// .arg(mcl_rs::TaskArg::input_slice(&data))
138 /// .dev(mcl_rs::DevType::CPU)
139 /// .exec(); //this creates a future we need to await
140 /// let t4 = mcl.transfer(1, 1)
141 /// .arg(mcl_rs::TaskArg::input_slice(&data))
142 /// .dev(mcl_rs::DevType::CPU)
143 /// .exec(); //this creates a future we need to await
144 /// let simultaneous_tasks = futures::future::join_all([t3,t4]);
145 /// futures::executor::block_on(simultaneous_tasks); //both tasks submitted "simultaneously"
146 ///```
147 pub async fn exec(self) {
148 assert_eq!(self.curr_arg, self.args.len());
149 low_level::transfer_exec(self.c_handle, self.d_type);
150
151 while low_level::transfer_test(self.c_handle) != ReqStatus::Completed {
152 async_std::task::yield_now().await;
153 }
154 }
155}
156
157impl Drop for Transfer<'_> {
158 fn drop(&mut self) {
159 low_level::transfer_free(self.c_handle);
160 }
161}