liquid_ml/liquid_ml.rs
1//! This module defines the implementation of the highest level component in
2//! a `liquid_ml` system.
3use crate::dataframe::{Column, DistributedDataFrame, LocalDataFrame, Rower};
4use crate::error::LiquidError;
5use crate::kv::KVStore;
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use std::collections::HashMap;
9use std::future::Future;
10use std::sync::Arc;
11use tokio::sync::{mpsc, mpsc::Receiver, Mutex, Notify};
12
13/// Represents a `liquid_ml` application, an easy way to create and operate on
14/// multiple [`DistributedDataFrame`]s at the same time.
15///
16/// This struct is the highest level component of a distributed `liquid_ml`
17/// system, the application layer. For 90% of use cases, you will not need to
18/// use anything else in this crate besides a `LiquidML` struct and your own
19/// implementation of a [`Rower`].
20///
21/// Non-trivial and trivial examples that use the application can be found in
22/// the examples directory of this crate.
23///
24/// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
25/// [`Rower`]: dataframe/trait.Rower.html
26pub struct LiquidML {
27 /// A pointer to the `KVStore` that stores all the data for this node of
28 /// the application
29 pub kv: Arc<KVStore<LocalDataFrame>>,
30 /// The id of this node, assigned by the registration [`Server`]
31 ///
32 /// [`Server`]: network/struct.Server.html
33 pub node_id: usize,
34 /// A receiver for blob messages that can be processed by the user for
35 /// lower level access to the network
36 pub blob_receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
37 /// The number of nodes in this network
38 pub num_nodes: usize,
39 /// A notifier that gets notified when the [`Server`] has sent a [`Kill`]
40 /// message
41 ///
42 /// [`Server`]: network/struct.Server.html
43 /// [`Kill`]: network/enum.ControlMsg.html#variant.Kill
44 pub kill_notifier: Arc<Notify>,
45 /// A map of a data frame's name to that `DistributedDataFrame`
46 pub data_frames: HashMap<String, Arc<DistributedDataFrame>>,
47 /// The `IP:Port` address of the [`Server`]
48 ///
49 /// [`Server`]: network/struct.Server.html
50 pub server_addr: String,
51 /// The `IP` of this node
52 pub my_ip: String,
53}
54
55impl LiquidML {
56 /// Create a new `liquid_ml` application that runs at `my_addr` and will
57 /// wait to connect to `num_nodes` nodes before returning.
58 pub async fn new(
59 my_addr: &str,
60 server_addr: &str,
61 num_nodes: usize,
62 ) -> Result<Self, LiquidError> {
63 let (blob_sender, blob_receiver) = mpsc::channel(20);
64 let kill_notifier = Arc::new(Notify::new());
65 let kv = KVStore::new(
66 server_addr.to_string(),
67 my_addr.to_string(),
68 blob_sender,
69 num_nodes,
70 )
71 .await;
72 let node_id = kv.id;
73 let (my_ip, _my_port) = {
74 let mut iter = my_addr.split(':');
75 let first = iter.next().unwrap();
76 let second = iter.next().unwrap();
77 (first, second)
78 };
79
80 Ok(LiquidML {
81 kv,
82 node_id,
83 blob_receiver: Arc::new(Mutex::new(blob_receiver)),
84 num_nodes,
85 kill_notifier,
86 data_frames: HashMap::new(),
87 server_addr: server_addr.to_string(),
88 my_ip: my_ip.to_string(),
89 })
90 }
91
92 /// Create a new data frame with the given name. The data will be generated
93 /// by calling the provided `data_generator` function on node 1, which
94 /// will then distribute chunks across all of the nodes.
95 ///
96 /// `await`ing this function will block until the data is completely
97 /// distributed on all nodes. After the data is distributed, each node
98 /// of this distributed `liquid_ml` system will have their `LiquidML`
99 /// struct updated with the information of the new [`DistributedDataFrame`]
100 ///
101 /// **NOTE**: `df_name` must be unique.
102 ///
103 /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
104 pub async fn df_from_fn(
105 &mut self,
106 df_name: &str,
107 data_generator: fn() -> Vec<Column>,
108 ) -> Result<(), LiquidError> {
109 let data = if self.node_id == 1 {
110 Some(data_generator())
111 } else {
112 None
113 };
114 let ddf = DistributedDataFrame::new(
115 &self.server_addr,
116 &self.my_ip,
117 data,
118 self.kv.clone(),
119 df_name,
120 self.num_nodes,
121 )
122 .await?;
123 self.data_frames.insert(df_name.to_string(), ddf);
124 Ok(())
125 }
126
127 /// Create a new data frame with the given name. The data comes from a
128 /// `SoR` file which is assumed to only exist on node 1. Node 1 will
129 /// parse the file into chunks sized so that each node only has 1 or at
130 /// most 2 chunks. Node 1 distributes these chunks to all the other nodes,
131 /// sending up to 2 chunks concurrently so as to restrict memory usage
132 /// because of the large chunk size.
133 ///
134 /// `await`ing this function will block until the data is completely
135 /// distributed on all nodes. After the data is distributed, each node
136 /// of this distributed `liquid_ml` system will have their `LiquidML`
137 /// struct updated with the information of the new [`DistributedDataFrame`]
138 ///
139 /// **NOTE**: `df_name` must be unique.
140 ///
141 /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
142 pub async fn df_from_sor(
143 &mut self,
144 df_name: &str,
145 file_name: &str,
146 ) -> Result<(), LiquidError> {
147 let ddf = DistributedDataFrame::from_sor(
148 &self.server_addr,
149 &self.my_ip,
150 file_name,
151 self.kv.clone(),
152 df_name,
153 self.num_nodes,
154 )
155 .await?;
156 self.data_frames.insert(df_name.to_string(), ddf);
157 Ok(())
158 }
159
160 /// Create a new data frame that consists of all the chunks in `iter` until
161 /// `iter` is consumed. Node 1 will call `next` on the `iter` and
162 /// distributes these chunks to all the other nodes, sending up to 2 chunks
163 /// concurrently so as to restrict memory usage.
164 ///
165 /// There is a possibility to increase the concurrency of sending the
166 /// chunks, this would change the API slightly but not in a major way.
167 ///
168 /// `await`ing this function will block until the data is completely
169 /// distributed on all nodes. After the data is distributed, each node
170 /// of this distributed `liquid_ml` system will have their `LiquidML`
171 /// struct updated with the information of the new [`DistributedDataFrame`]
172 ///
173 /// **NOTE**: `df_name` must be unique.
174 ///
175 /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
176 pub async fn df_from_iter(
177 &mut self,
178 df_name: &str,
179 iter: impl Iterator<Item = Vec<Column>>,
180 ) -> Result<(), LiquidError> {
181 let ddf = DistributedDataFrame::from_iter(
182 &self.server_addr,
183 &self.my_ip,
184 Some(iter),
185 self.kv.clone(),
186 df_name,
187 self.num_nodes,
188 )
189 .await?;
190 self.data_frames.insert(df_name.to_string(), ddf);
191 Ok(())
192 }
193
194 /// Given a function, run it on this application. This function only
195 /// terminates when a kill signal from the [`Server`] has been sent.
196 ///
197 /// ## Examples
198 /// `examples/demo_client.rs` is a good starting point to see this in
199 /// action
200 ///
201 /// [`Server`]: network/struct.Server.html
202 pub async fn run<F, Fut>(self, f: F)
203 where
204 Fut: Future<Output = ()>,
205 F: FnOnce(Arc<KVStore<LocalDataFrame>>) -> Fut,
206 {
207 f(self.kv.clone()).await;
208 self.kill_notifier.notified().await;
209 }
210
211 /// Perform a distributed map operation on the [`DistributedDataFrame`] with
212 /// the name `df_name` and uses the given `rower`. Returns `Some(rower)`
213 /// (of the joined results) if the `node_id` of this
214 /// [`DistributedDataFrame`] is `1`, and `None` otherwise.
215 ///
216 /// A local `pmap` is used on each node to map over that nodes' chunk.
217 /// By default, each node will use the number of threads available on that
218 /// machine.
219 ///
220 ///
221 /// NOTE:
222 /// There is an important design decision that comes with a distinct trade
223 /// off here. The trade off is:
224 /// 1. Join the last node with the next one until you get to the end. This
225 /// has reduced memory requirements but a performance impact because
226 /// of the synchronous network calls
227 /// 2. Join all nodes with one node by sending network messages
228 /// concurrently to the final node. This has increased memory
229 /// requirements and greater complexity but greater performance because
230 /// all nodes can asynchronously send to one node at the same time.
231 ///
232 /// This implementation went with option 1 for simplicity reasons
233 ///
234 /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
235 pub async fn map<T: Rower + Serialize + Clone + DeserializeOwned + Send>(
236 &self,
237 df_name: &str,
238 rower: T,
239 ) -> Result<Option<T>, LiquidError> {
240 let df = match self.data_frames.get(df_name) {
241 Some(x) => x,
242 None => return Err(LiquidError::NotPresent),
243 };
244 df.map(rower).await
245 }
246
247 /// Perform a distributed filter operation on the [`DistributedDataFrame`]
248 /// with the name `df_name` and uses the given `rower`. This function
249 /// does not mutate the [`DistributedDataFrame`] in anyway, instead, it
250 /// creates a new [`DistributedDataFrame`] of the results. This
251 /// [`DistributedDataFrame`] is returned to every node so that the results
252 /// are consistent everywhere.
253 ///
254 /// A local `pfilter` is used on each node to filter over that nodes'
255 /// chunks. By default, each node will use the number of threads available
256 /// on that machine.
257 ///
258 /// It is possible to re-write this to use a bit map of the rows that
259 /// should remain in the filtered result, but currently this just clones
260 /// the rows.
261 ///
262 /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
263 pub async fn filter<
264 T: Rower + Serialize + Clone + DeserializeOwned + Send,
265 >(
266 &mut self,
267 df_name: &str,
268 rower: T,
269 ) -> Result<(), LiquidError> {
270 let df = match self.data_frames.get(df_name) {
271 Some(x) => x,
272 None => return Err(LiquidError::NotPresent),
273 };
274 let filtered_df = df.filter(rower).await?;
275 self.data_frames
276 .insert(filtered_df.df_name.clone(), filtered_df);
277
278 Ok(())
279 }
280}