LiquidML

Struct LiquidML 

Source
pub struct LiquidML {
    pub kv: Arc<KVStore<LocalDataFrame>>,
    pub node_id: usize,
    pub blob_receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
    pub num_nodes: usize,
    pub kill_notifier: Arc<Notify>,
    pub data_frames: HashMap<String, Arc<DistributedDataFrame>>,
    pub server_addr: String,
    pub my_ip: String,
}
Expand description

Represents a liquid_ml application, an easy way to create and operate on multiple DistributedDataFrames at the same time.

This struct is the highest level component of a distributed liquid_ml system, the application layer. For 90% of use cases, you will not need to use anything else in this crate besides a LiquidML struct and your own implementation of a Rower.

Non-trivial and trivial examples that use the application can be found in the examples directory of this crate.

Fields§

§kv: Arc<KVStore<LocalDataFrame>>

A pointer to the KVStore that stores all the data for this node of the application

§node_id: usize

The id of this node, assigned by the registration Server

§blob_receiver: Arc<Mutex<Receiver<Vec<u8>>>>

A receiver for blob messages that can be processed by the user for lower level access to the network

§num_nodes: usize

The number of nodes in this network

§kill_notifier: Arc<Notify>

A notifier that gets notified when the Server has sent a Kill message

§data_frames: HashMap<String, Arc<DistributedDataFrame>>

A map of a data frame’s name to that DistributedDataFrame

§server_addr: String

The IP:Port address of the Server

§my_ip: String

The IP of this node

Implementations§

Source§

impl LiquidML

Source

pub async fn new( my_addr: &str, server_addr: &str, num_nodes: usize, ) -> Result<Self, LiquidError>

Create a new liquid_ml application that runs at my_addr and will wait to connect to num_nodes nodes before returning.

Source

pub async fn df_from_fn( &mut self, df_name: &str, data_generator: fn() -> Vec<Column>, ) -> Result<(), LiquidError>

Create a new data frame with the given name. The data will be generated by calling the provided data_generator function on node 1, which will then distribute chunks across all of the nodes.

awaiting this function will block until the data is completely distributed on all nodes. After the data is distributed, each node of this distributed liquid_ml system will have their LiquidML struct updated with the information of the new DistributedDataFrame

NOTE: df_name must be unique.

Source

pub async fn df_from_sor( &mut self, df_name: &str, file_name: &str, ) -> Result<(), LiquidError>

Create a new data frame with the given name. The data comes from a SoR file which is assumed to only exist on node 1. Node 1 will parse the file into chunks sized so that each node only has 1 or at most 2 chunks. Node 1 distributes these chunks to all the other nodes, sending up to 2 chunks concurrently so as to restrict memory usage because of the large chunk size.

awaiting this function will block until the data is completely distributed on all nodes. After the data is distributed, each node of this distributed liquid_ml system will have their LiquidML struct updated with the information of the new DistributedDataFrame

NOTE: df_name must be unique.

Source

pub async fn df_from_iter( &mut self, df_name: &str, iter: impl Iterator<Item = Vec<Column>>, ) -> Result<(), LiquidError>

Create a new data frame that consists of all the chunks in iter until iter is consumed. Node 1 will call next on the iter and distributes these chunks to all the other nodes, sending up to 2 chunks concurrently so as to restrict memory usage.

There is a possibility to increase the concurrency of sending the chunks, this would change the API slightly but not in a major way.

awaiting this function will block until the data is completely distributed on all nodes. After the data is distributed, each node of this distributed liquid_ml system will have their LiquidML struct updated with the information of the new DistributedDataFrame

NOTE: df_name must be unique.

Source

pub async fn run<F, Fut>(self, f: F)
where Fut: Future<Output = ()>, F: FnOnce(Arc<KVStore<LocalDataFrame>>) -> Fut,

Given a function, run it on this application. This function only terminates when a kill signal from the Server has been sent.

§Examples

examples/demo_client.rs is a good starting point to see this in action

Source

pub async fn map<T: Rower + Serialize + Clone + DeserializeOwned + Send>( &self, df_name: &str, rower: T, ) -> Result<Option<T>, LiquidError>

Perform a distributed map operation on the DistributedDataFrame with the name df_name and uses the given rower. Returns Some(rower) (of the joined results) if the node_id of this DistributedDataFrame is 1, and None otherwise.

A local pmap is used on each node to map over that nodes’ chunk. By default, each node will use the number of threads available on that machine.

NOTE: There is an important design decision that comes with a distinct trade off here. The trade off is:

  1. Join the last node with the next one until you get to the end. This has reduced memory requirements but a performance impact because of the synchronous network calls
  2. Join all nodes with one node by sending network messages concurrently to the final node. This has increased memory requirements and greater complexity but greater performance because all nodes can asynchronously send to one node at the same time.

This implementation went with option 1 for simplicity reasons

Source

pub async fn filter<T: Rower + Serialize + Clone + DeserializeOwned + Send>( &mut self, df_name: &str, rower: T, ) -> Result<(), LiquidError>

Perform a distributed filter operation on the DistributedDataFrame with the name df_name and uses the given rower. This function does not mutate the DistributedDataFrame in anyway, instead, it creates a new DistributedDataFrame of the results. This DistributedDataFrame is returned to every node so that the results are consistent everywhere.

A local pfilter is used on each node to filter over that nodes’ chunks. By default, each node will use the number of threads available on that machine.

It is possible to re-write this to use a bit map of the rows that should remain in the filtered result, but currently this just clones the rows.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V