liquid_ml/
liquid_ml.rs

1//! This module defines the implementation of the highest level component in
2//! a `liquid_ml` system.
3use crate::dataframe::{Column, DistributedDataFrame, LocalDataFrame, Rower};
4use crate::error::LiquidError;
5use crate::kv::KVStore;
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use std::collections::HashMap;
9use std::future::Future;
10use std::sync::Arc;
11use tokio::sync::{mpsc, mpsc::Receiver, Mutex, Notify};
12
13/// Represents a `liquid_ml` application, an easy way to create and operate on
14/// multiple [`DistributedDataFrame`]s at the same time.
15///
16/// This struct is the highest level component of a distributed `liquid_ml`
17/// system, the application layer. For 90% of use cases, you will not need to
18/// use anything else in this crate besides a `LiquidML` struct and your own
19/// implementation of a [`Rower`].
20///
21/// Non-trivial and trivial examples that use the application can be found in
22/// the examples directory of this crate.
23///
24/// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
25/// [`Rower`]: dataframe/trait.Rower.html
26pub struct LiquidML {
27    /// A pointer to the `KVStore` that stores all the data for this node of
28    /// the application
29    pub kv: Arc<KVStore<LocalDataFrame>>,
30    /// The id of this node, assigned by the registration [`Server`]
31    ///
32    /// [`Server`]: network/struct.Server.html
33    pub node_id: usize,
34    /// A receiver for blob messages that can be processed by the user for
35    /// lower level access to the network
36    pub blob_receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
37    /// The number of nodes in this network
38    pub num_nodes: usize,
39    /// A notifier that gets notified when the [`Server`] has sent a [`Kill`]
40    /// message
41    ///
42    /// [`Server`]: network/struct.Server.html
43    /// [`Kill`]: network/enum.ControlMsg.html#variant.Kill
44    pub kill_notifier: Arc<Notify>,
45    /// A map of a data frame's name to that `DistributedDataFrame`
46    pub data_frames: HashMap<String, Arc<DistributedDataFrame>>,
47    /// The `IP:Port` address of the [`Server`]
48    ///
49    /// [`Server`]: network/struct.Server.html
50    pub server_addr: String,
51    /// The `IP` of this node
52    pub my_ip: String,
53}
54
55impl LiquidML {
56    /// Create a new `liquid_ml` application that runs at `my_addr` and will
57    /// wait to connect to `num_nodes` nodes before returning.
58    pub async fn new(
59        my_addr: &str,
60        server_addr: &str,
61        num_nodes: usize,
62    ) -> Result<Self, LiquidError> {
63        let (blob_sender, blob_receiver) = mpsc::channel(20);
64        let kill_notifier = Arc::new(Notify::new());
65        let kv = KVStore::new(
66            server_addr.to_string(),
67            my_addr.to_string(),
68            blob_sender,
69            num_nodes,
70        )
71        .await;
72        let node_id = kv.id;
73        let (my_ip, _my_port) = {
74            let mut iter = my_addr.split(':');
75            let first = iter.next().unwrap();
76            let second = iter.next().unwrap();
77            (first, second)
78        };
79
80        Ok(LiquidML {
81            kv,
82            node_id,
83            blob_receiver: Arc::new(Mutex::new(blob_receiver)),
84            num_nodes,
85            kill_notifier,
86            data_frames: HashMap::new(),
87            server_addr: server_addr.to_string(),
88            my_ip: my_ip.to_string(),
89        })
90    }
91
92    /// Create a new data frame with the given name. The data will be generated
93    /// by calling the provided `data_generator` function on node 1, which
94    /// will then distribute chunks across all of the nodes.
95    ///
96    /// `await`ing this function will block until the data is completely
97    /// distributed on all nodes. After the data is distributed, each node
98    /// of this distributed `liquid_ml` system will have their `LiquidML`
99    /// struct updated with the information of the new [`DistributedDataFrame`]
100    ///
101    /// **NOTE**: `df_name` must be unique.
102    ///
103    /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
104    pub async fn df_from_fn(
105        &mut self,
106        df_name: &str,
107        data_generator: fn() -> Vec<Column>,
108    ) -> Result<(), LiquidError> {
109        let data = if self.node_id == 1 {
110            Some(data_generator())
111        } else {
112            None
113        };
114        let ddf = DistributedDataFrame::new(
115            &self.server_addr,
116            &self.my_ip,
117            data,
118            self.kv.clone(),
119            df_name,
120            self.num_nodes,
121        )
122        .await?;
123        self.data_frames.insert(df_name.to_string(), ddf);
124        Ok(())
125    }
126
127    /// Create a new data frame with the given name. The data comes from a
128    /// `SoR` file which is assumed to only exist on node 1. Node 1 will
129    /// parse the file into chunks sized so that each node only has 1 or at
130    /// most 2 chunks. Node 1 distributes these chunks to all the other nodes,
131    /// sending up to 2 chunks concurrently so as to restrict memory usage
132    /// because of the large chunk size.
133    ///
134    /// `await`ing this function will block until the data is completely
135    /// distributed on all nodes. After the data is distributed, each node
136    /// of this distributed `liquid_ml` system will have their `LiquidML`
137    /// struct updated with the information of the new [`DistributedDataFrame`]
138    ///
139    /// **NOTE**: `df_name` must be unique.
140    ///
141    /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
142    pub async fn df_from_sor(
143        &mut self,
144        df_name: &str,
145        file_name: &str,
146    ) -> Result<(), LiquidError> {
147        let ddf = DistributedDataFrame::from_sor(
148            &self.server_addr,
149            &self.my_ip,
150            file_name,
151            self.kv.clone(),
152            df_name,
153            self.num_nodes,
154        )
155        .await?;
156        self.data_frames.insert(df_name.to_string(), ddf);
157        Ok(())
158    }
159
160    /// Create a new data frame that consists of all the chunks in `iter` until
161    /// `iter` is consumed. Node 1 will call `next` on the `iter` and
162    /// distributes these chunks to all the other nodes, sending up to 2 chunks
163    /// concurrently so as to restrict memory usage.
164    ///
165    /// There is a possibility to increase the concurrency of sending the
166    /// chunks, this would change the API slightly but not in a major way.
167    ///
168    /// `await`ing this function will block until the data is completely
169    /// distributed on all nodes. After the data is distributed, each node
170    /// of this distributed `liquid_ml` system will have their `LiquidML`
171    /// struct updated with the information of the new [`DistributedDataFrame`]
172    ///
173    /// **NOTE**: `df_name` must be unique.
174    ///
175    /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
176    pub async fn df_from_iter(
177        &mut self,
178        df_name: &str,
179        iter: impl Iterator<Item = Vec<Column>>,
180    ) -> Result<(), LiquidError> {
181        let ddf = DistributedDataFrame::from_iter(
182            &self.server_addr,
183            &self.my_ip,
184            Some(iter),
185            self.kv.clone(),
186            df_name,
187            self.num_nodes,
188        )
189        .await?;
190        self.data_frames.insert(df_name.to_string(), ddf);
191        Ok(())
192    }
193
194    /// Given a function, run it on this application. This function only
195    /// terminates when a kill signal from the [`Server`] has been sent.
196    ///
197    /// ## Examples
198    /// `examples/demo_client.rs` is a good starting point to see this in
199    /// action
200    ///
201    /// [`Server`]: network/struct.Server.html
202    pub async fn run<F, Fut>(self, f: F)
203    where
204        Fut: Future<Output = ()>,
205        F: FnOnce(Arc<KVStore<LocalDataFrame>>) -> Fut,
206    {
207        f(self.kv.clone()).await;
208        self.kill_notifier.notified().await;
209    }
210
211    /// Perform a distributed map operation on the [`DistributedDataFrame`] with
212    /// the name `df_name` and uses the given `rower`. Returns `Some(rower)`
213    /// (of the joined results) if the `node_id` of this
214    /// [`DistributedDataFrame`] is `1`, and `None` otherwise.
215    ///
216    /// A local `pmap` is used on each node to map over that nodes' chunk.
217    /// By default, each node will use the number of threads available on that
218    /// machine.
219    ///
220    ///
221    /// NOTE:
222    /// There is an important design decision that comes with a distinct trade
223    /// off here. The trade off is:
224    /// 1. Join the last node with the next one until you get to the end. This
225    ///    has reduced memory requirements but a performance impact because
226    ///    of the synchronous network calls
227    /// 2. Join all nodes with one node by sending network messages
228    ///    concurrently to the final node. This has increased memory
229    ///    requirements and greater complexity but greater performance because
230    ///    all nodes can asynchronously send to one node at the same time.
231    ///
232    /// This implementation went with option 1 for simplicity reasons
233    ///
234    /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
235    pub async fn map<T: Rower + Serialize + Clone + DeserializeOwned + Send>(
236        &self,
237        df_name: &str,
238        rower: T,
239    ) -> Result<Option<T>, LiquidError> {
240        let df = match self.data_frames.get(df_name) {
241            Some(x) => x,
242            None => return Err(LiquidError::NotPresent),
243        };
244        df.map(rower).await
245    }
246
247    /// Perform a distributed filter operation on the [`DistributedDataFrame`]
248    /// with the name `df_name` and uses the given `rower`.  This function
249    /// does not mutate the [`DistributedDataFrame`] in anyway, instead, it
250    /// creates a new [`DistributedDataFrame`] of the results. This
251    /// [`DistributedDataFrame`] is returned to every node so that the results
252    /// are consistent everywhere.
253    ///
254    /// A local `pfilter` is used on each node to filter over that nodes'
255    /// chunks.  By default, each node will use the number of threads available
256    /// on that machine.
257    ///
258    /// It is possible to re-write this to use a bit map of the rows that
259    /// should remain in the filtered result, but currently this just clones
260    /// the rows.
261    ///
262    /// [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
263    pub async fn filter<
264        T: Rower + Serialize + Clone + DeserializeOwned + Send,
265    >(
266        &mut self,
267        df_name: &str,
268        rower: T,
269    ) -> Result<(), LiquidError> {
270        let df = match self.data_frames.get(df_name) {
271            Some(x) => x,
272            None => return Err(LiquidError::NotPresent),
273        };
274        let filtered_df = df.filter(rower).await?;
275        self.data_frames
276            .insert(filtered_df.df_name.clone(), filtered_df);
277
278        Ok(())
279    }
280}