liquid_ml/
lib.rs

1//! # Introduction
2//! LiquidML is a platform for distributed, scalable data analysis for data
3//! sets too large to fit into memory on a single machine. It aims to be easy
4//! and simple to use, allowing users to easily create their own `map` and
5//! `filter` operations, leaving everything else to `liquid_ml`. It ships with
6//! many example uses, including the decision tree and random forest machine
7//! learning algorithms built in, showing the power and ease of use of the
8//! platform.
9//!
10//! LiquidML is written in Rust for both performance and safety reasons,
11//! allowing many optimizations to be made more easily without the risk of a
12//! memory safety bug. This helps guarantee security around our clients' data,
13//! as many memory safety bugs can be exploited by malicious hackers.
14//!
15//! LiquidML is currently in the state of an MVP. Tools on top of LiquidML can
16//! built and several examples are included in this crate to demonstrate
17//! various use cases.
18//!
19//! # Architecture
20//! LiquidML consists of the following architectural components:
21//! - SoRer, for inferring schemas and parsing `.sor` files
22//! - Networking Layer, for communication over TCP
23//! - KV Store, for associating ownership of data frame chunks with nodes
24//! - DataFrame, for local and distributed data frames
25//! - Application Layer, for convenience of using the entire system
26//!
27//! ## SoRer
28//! The main purpose of the `SoRer` is to handle parsing files of an unknown
29//! schema and with possibly malformed rows. Schemas are inferred based on the
30//! first 500 rows and any malformed rows are discarded. Specifics on the
31//! architecture and implementation of `SoRer` may be found
32//! [here](https://docs.rs/sorer)
33//!
34//! ## Networking
35//! The network layer is the lowest level component of a `liquid_ml` instance.
36//! It provides a simple registration [`Server`] and a network of distributed
37//! [`Client`]s, each with a unique ID assigned by the [`Server`]. The
38//! networking layer uses `TCP` to communicate. [`Client`]s are able to send
39//! messages directly to any other [`Client`] of the same client type after
40//! they register with the [`Server`].
41//!
42//! ## KV Store
43//! The [`KVStore`] stores blobs of serialized data that are associated with
44//! [`Key`]s, as well as caches deserialized values in memory. In `liquid_ml`,
45//! the [`KVStore`] stores [`LocalDataFrame`]s so that chunks of data frames
46//! can be associated with different nodes in the system.
47//!
48//! ## Data frame
49//! A data frame in `liquid_ml` is lightly inspired by those found in `R` or
50//! `pandas`, and supports optionally named columns. There are many provided
51//! constructors which make it easy to create any kind of data frame in
52//! different ways. You may analyze the data in a data frame by implementing
53//! the [`Rower`] trait to perform `map` or `filter` operations. These
54//! operations can be easily performed on either [`LocalDataFrame`]s for data
55//! that fits in memory or [`DistributedDataFrame`]s for data that is too
56//! large to fit in one machine.
57//!
58//! The `dataframe` module provides 2 implementations for a data frame:
59//! a [`LocalDataFrame`] and a [`DistributedDataFrame`], the differences are
60//! further explained in the implementation section.
61//!
62//! **Note**: If you need a [`DistributedDataFrame`], it is highly recommended
63//! that you check out the [`LiquidML`] struct since that provides many
64//! convenient helper functions for working with [`DistributedDataFrame`]s.
65//! Using a [`DistributedDataFrame`] directly is only recommended if you really
66//! know what you are doing. There are also helpful examples of `map` and
67//! `filter` in the [`LiquidML`] documentation
68//!
69//! ## Application Layer
70//! The application layer, aka the [`LiquidML`] struct, is an even higher level
71//! API for writing programs to perform data analysis on the entire distributed
72//! system. It allows a user to create and analyze multiple
73//! [`DistributedDataFrame`]s without worrying about [`KVStore`]s, nodes,
74//! networking, or any other complications of distributed systems, making it
75//! very easy for a user to run `map` or `filter` operations.
76//!
77//! [`LiquidML`] also provides a `run` method which takes a function and
78//! executes that function. The signature of this user-implemented function
79//! is `KVStore -> ()`. This allows much lower-level access for more
80//! advanced users so that they may have more powerful and general usage of the
81//! system beyond our provided implementations of `map`, and `filter`.
82//!
83//! See the use case section and the `examples` directory for illustrative
84//! examples.
85//!
86//! # Implementation
87//!
88//! ## Networking
89//!
90//! The Networking layer consists of [`Client`] and [`Server`] structs, as well
91//! as some helper functions in the `network` module.
92//!
93//! There is little handling of many of the complicated edge cases associated
94//! with distributed systems in the networking layer. It's assumed that most
95//! things happen without any errors, although some basic error checking is
96//! done, such as checking for connections being closed, etc.
97//!
98//! ### Client
99//! A [`Client`] can be used to send a message directly to any other [`Client`]
100//! of the same type at any time by using the following method:
101//!
102//! `client.send_msg(target_id: usize, message: Serialize)`
103//!
104//! When a [`Client`] is first created, it must be registered with the
105//! [`Server`] and connect with all other existing [`Client`]s.
106//!
107//! Registration process from [`Client`] perspective:
108//! 1. Connect to the [`Server`]
109//! 2. Send the [`Server`] a `Message<ControlMsg::Introduction>` message
110//!    containing the `IP:Port` and `network_name` for this [`Client`]
111//! 3. The [`Server`] will respond with the `Message<ControlMsg::Directory>`
112//!    message containing the `IP:Port` of all other currently connected
113//!    [`Client`]s in that network.
114//! 4. The newly created [`Client`] connects to all other existing [`Client`]s.
115//! 5. The `Client` waits for all other `Client`s that have not yet started to
116//!    connect to it, unless we have connected to all the nodes.
117//!
118//! ### Server
119//! The [`Server`] asynchronously registers new [`Client`]s via
120//! `Server::accept_new_connections` and also allows sending any
121//! `Message<ControlMsg>` type, such as `Kill` messages for orderly shutdown.
122//!
123//! Due to the servers fairly simple functionality, a default implementation of
124//! a server comes packaged with the `LiquidML` system and can be started by
125//! running the following command:
126//!
127//! `
128//! cargo run --bin server -- --address <Optional IP Address>
129//! `
130//!
131//! If an IP is not provided the server defaults to `127.0.0.1:9000`.
132//!
133//! ## KVStore
134//! Internally [`KVStore`]s store their data in memory as serialized blobs
135//! (aka a `Vec<u8>`). The [`KVStore`] caches deserialized values into their
136//! type `T` on a least-recently used basis. A hard limit for the cache size is
137//! set to be `1/3` the amount of total memory on the machine, though this will
138//! be changed to be configurable.
139//!
140//! [`KVStore`]s have an internal asynchronous message processing task since
141//! they use the network directly and need to communicate with other
142//! [`KVStore`]s. The [`KVStore`] also provides a lower level interface for
143//! network communication by exposing a method to directly send any serialized
144//! blob of data to any other [`KVStore`].
145//!
146//! ## DataFrame
147//! ### [`LocalDataFrame`]
148//!
149//! A [`LocalDataFrame`] implements the actual data storage and processing.
150//! Data is held in columnar format and with a well defined schema, and the
151//! [`LocalDataFrame`] defines  the actual single and multi-threaded `map` and
152//! `filter` operations. It should be noted that all `map` and `filter`
153//! operations are row-wise processing, but data is held in columnar format
154//! to avoid boxed types and reduced memory usage.
155//!
156//! ### [`DistributedDataFrame`]
157//!
158//! A [`DistributedDataFrame`] is an abstraction over a distributed system of
159//! nodes that run [`KVStore`]s which contain chunks of [`LocalDataFrame`]s.
160//! Therefore each [`DistributedDataFrame`] simply holds a pointer to a
161//! [`KVStore`] and a map of ranges of row indices to the [`Key`]s for the
162//! chunks of data with that range of row indices. A [`DistributedDataFrame`]
163//! is immutable to make it trivial for the global state of the data frame to
164//! be consistent.
165//!
166//! Because of this the [`DistributedDataFrame`] implementation is mainly
167//! concerned with networking and getting and putting chunks of different
168//! [`KVStore`]s. One of the main concerns are that creating a new
169//! [`DistributedDataFrame`] means distributing the [`Key`]s of all the chunks
170//! to all nodes.
171//!
172//! Upon creation, node 1 of a [`DistributedDataFrame`] will distribute chunks
173//! of data across multiple nodes from `SoR` files, iterators, and other
174//! convenient ways of adding data. Note that our experimental testing found
175//! that using the largest chunks possible to fit on each node increased
176//! performance by over `2x`.
177//!
178//! Since our experimental testing found that big chunks are best for `map` and
179//! `filter` performance, we can not simply use the [`KVStore`] to support the
180//! API of a [`DistributedDataFrame`] by sending chunks around, since each
181//! chunk will be too big to go over the network, so methods like `get`
182//! won't work unless each [`DistributedDataFrame`] has a way to (meaningfully)
183//! talk to other [`DistributedDataFrame`]s, which mean they need a [`Client`]
184//! of their own.
185//!
186//! ## Application Layer aka `LiquidML`
187//! The implementation of the [`LiquidML`] struct is quite simple since it
188//! delegates most of the work to the [`DistributedDataFrame`]. All it does is
189//! manage the state of its own node and allow creation and analysis of
190//! multiple [`DistributedDataFrame`]s.
191//!
192//! # Examples and Use Cases
193//! Please check the `examples/` directory for more fully featured examples.
194//!
195//!
196//! ## Creating a `DistributedDataFrame` With `LiquidML` and Using a Simple `Rower`
197//! This example shows a trivial implementation of a [`Rower`] and using the
198//! entire [`LiquidML`] system to perform a `map` operation while being
199//! unaware of the distributed internals of the system.
200//!
201//! ```rust,no_run
202//! use liquid_ml::dataframe::{Data, Rower, Row};
203//! use liquid_ml::LiquidML;
204//! use serde::{Serialize, Deserialize};
205//!
206//! #[derive(Serialize, Deserialize, Clone)]
207//! struct MyRower {
208//!     sum: i64
209//! }
210//!
211//! impl Rower for MyRower {
212//!     fn visit(&mut self, r: &Row) -> bool {
213//!         let i = r.get(0).unwrap();
214//!         match i {
215//!             Data::Int(val) => {
216//!                 if *val < 0 {
217//!                     return false;
218//!                 }
219//!                 self.sum += *val;
220//!                 true
221//!             },
222//!             _ => panic!(),
223//!         }
224//!     }
225//!
226//!     fn join(mut self, other: Self) -> Self {
227//!         self.sum += other.sum;
228//!         self
229//!     }
230//! }
231//!
232//! #[tokio::main]
233//! async fn main() {
234//!     // This main does the following:
235//!     // 1. Creates a `LiquidML` struct, which registers with the `Server`
236//!     //    running at the address "192.168.0.0:9000"
237//!     // 2. Construct a new `DistributedDataFrame` with the name "my-df". If
238//!     //    we are node 1, schema-on-read and parse the file, distributing
239//!     //    chunks to all the other nodes. Afterwards, all nodes will have
240//!     //    an identical `LiquidML` struct and we can call `map`
241//!     // 3. We call `map` and each node performs the operation on their local
242//!     //    chunk(s). Since the `Rower` trait defines how to join chunks, the
243//!     //    results from each node will be joined until we have a result
244//!     let mut app = LiquidML::new("192.155.22.11:9000",
245//!                                 "192.168.0.0:9000",
246//!                                 20)
247//!                                 .await
248//!                                 .unwrap();
249//!     app.df_from_sor("foo.sor", "my-df").await.unwrap();
250//!     let r = MyRower { sum: 0 };
251//!     app.map("my-df", r);
252//! }
253//! ```
254//!
255//! ## Generic, Low Level Use Case
256//!
257//! ```rust,no_run
258//! use liquid_ml::dataframe::LocalDataFrame;
259//! use liquid_ml::LiquidML;
260//! use liquid_ml::kv::KVStore;
261//! use std::sync::Arc;
262//! use tokio::sync::RwLock;
263//!
264//! async fn something_complicated(kv: Arc<KVStore<LocalDataFrame>>) {
265//!     println!("Use your imagination :D");
266//! }
267//!
268//! #[tokio::main]
269//! async fn main() {
270//!     let app =
271//!         LiquidML::new("192.15.2.1:900", "192.16.0.0:900", 20).await.unwrap();
272//!     app.run(something_complicated).await;
273//! }
274//! ```
275//!
276//!
277//! ## Random Forest
278//!
279//! A distributed random forest implementation can be found at
280//! `examples/random_forest.rs` this example demonstrates a built from scratch
281//! random forest on our distributed platform.
282//!
283//! This is currently a very rudimentary proof of concept as it assumes that
284//! the last columns is a boolean label and does not support more than
285//! boolean labels
286//!
287//! This program can be run as follows:
288//! 1. Start the [`Server`] with this command: `cargo run --bin server`
289//! 2. Start 3 clients, each with a different `IP:Port`, with the following
290//!    command:
291//!
292//! `
293//! cargo run --release --example random_forest -- -m <my IP:Port> -s <Sever IP:Port> -d <path to data file>
294//! `
295//!
296//! Full description of the command line arguments available can be seen by
297//! running the following command:
298//!
299//! `
300//! cargo run --example random_forest -- --help
301//! `
302//!
303//! ## Degrees of Linus
304//!
305//! This example shows how to calculate how many people have worked within
306//! `n` degrees of Linus Torvalds. It needs a very large data file to be run,
307//! so the data file is not included in this repository. Please reach out to us
308//! if you would like the data file.
309//!
310//! The code can be found in `examples/seven_degrees.rs`
311//!
312//! This program can be run as follows:
313//! 1. Start the [`Server`] with this command: `cargo run --bin server`
314//! 2. Start 3 clients, each with a different `IP:Port`, with the following
315//!    command:
316//!
317//! `
318//! cargo run --release --example seven_degrees -- -m <IP:Port> -c <full path to data file>
319//! `
320//!
321//! Full description of the command line arguments available can be seen by
322//! running the following command:
323//!
324//! `
325//! cargo run --example seven_degrees -- --help
326//! `
327//!
328//! We found that even with using swap space, that the `seven_degrees` example
329//! had peak memory usage of `18GB` and took ~5 minutes with 4 degrees of
330//! Linus and using the full sized files.
331//!
332//! One computer was a desktop with only 8GB of RAM and an i5-4690k, the
333//! other was a (plugged in) laptop with 16GB of RAM and an i7-8550u.
334//!
335//! ## Word Count
336//! This program counts the number of times each unique word is found in a
337//! given text file. The program splits words by white space and does not
338//! account for punctuation (though this would be an easy change in only the
339//! example itself), so "foo" and "foo," count as different words.
340//!
341//! The code can be found in `examples/word_count.rs`
342//!
343//! This program runs the word count example and can be run as follows:
344//! 1. Start the [`Server`] with this command: `cargo run --bin server`
345//! 2. Start 3 clients, each with a different `IP:Port`, with the following
346//!    command:
347//!
348//! `
349//! cargo run --release --example word_count -- -m <IP:Port>
350//! `
351//!
352//! Full description of the command line arguments available can be seen by
353//! running the following command:
354//!
355//! `
356//! cargo run --example word_count -- --help
357//! `
358//!
359//! ## Simple Demo
360//! We implemented a trivial demo that places some numbers into the `kv` adds
361//! them and verifies that the addition was correct.
362//!
363//! This program runs the demo program and can be run as follows:
364//! 1. Start the `Server` with this command: `cargo run --bin server`
365//! 2. Start 3 clients, each with a different `IP:Port`, with the following
366//!    command:
367//!
368//! `
369//! cargo run --example demo_client -- -m <IP:Port>
370//! `
371//!
372//! Full description of the command line arguments available can be seen by
373//! running the following command:
374//!
375//! `
376//! cargo run --example demo_client -- --help
377//! `
378//!
379//! The third client will print `SUCCESS` at the end.
380//!
381//! # Road Map
382//! 0. Build robust integration tests to define how the distributed system ks.
383//! 1. Fix up orderly shutdown of the network layer.
384//!
385//! [`Client`]: network/struct.Client.html
386//! [`Server`]: network/struct.Server.html
387//! [`Connection`]: network/struct.Connection.html
388//! [`LocalDataFrame`]: dataframe/struct.LocalDataFrame.html
389//! [`DistributedDataFrame`]: dataframe/struct.DistributedDataFrame.html
390//! [`Rower`]: dataframe/trait.Rower.html
391//! [`KVStore`]: kv/struct.KVStore.html
392//! [`Key`]: kv/struct.Key.html
393//! [`Key`]: kv/type.Value.html
394//! [`LiquidML`]: struct.LiquidML.html
395pub mod dataframe;
396pub mod error;
397pub mod kv;
398pub mod network;
399
400mod liquid_ml;
401pub use crate::liquid_ml::LiquidML;
402
403pub(crate) const MAX_NUM_CACHED_VALUES: usize = 10;
404pub(crate) const BYTES_PER_KIB: f64 = 1_024.0;
405pub(crate) const BYTES_PER_GB: f64 = 1_073_741_824.0;
406pub(crate) const KV_STORE_CACHE_SIZE_FRACTION: f64 = 0.33;
407pub(crate) const MAX_FRAME_LEN_FRACTION: f64 = 0.8;