pub struct LiquidML {
pub kv: Arc<KVStore<LocalDataFrame>>,
pub node_id: usize,
pub blob_receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
pub num_nodes: usize,
pub kill_notifier: Arc<Notify>,
pub data_frames: HashMap<String, Arc<DistributedDataFrame>>,
pub server_addr: String,
pub my_ip: String,
}Expand description
Represents a liquid_ml application, an easy way to create and operate on
multiple DistributedDataFrames at the same time.
This struct is the highest level component of a distributed liquid_ml
system, the application layer. For 90% of use cases, you will not need to
use anything else in this crate besides a LiquidML struct and your own
implementation of a Rower.
Non-trivial and trivial examples that use the application can be found in the examples directory of this crate.
Fields§
§kv: Arc<KVStore<LocalDataFrame>>A pointer to the KVStore that stores all the data for this node of
the application
node_id: usizeThe id of this node, assigned by the registration Server
blob_receiver: Arc<Mutex<Receiver<Vec<u8>>>>A receiver for blob messages that can be processed by the user for lower level access to the network
num_nodes: usizeThe number of nodes in this network
kill_notifier: Arc<Notify>§data_frames: HashMap<String, Arc<DistributedDataFrame>>A map of a data frame’s name to that DistributedDataFrame
server_addr: StringThe IP:Port address of the Server
my_ip: StringThe IP of this node
Implementations§
Source§impl LiquidML
impl LiquidML
Sourcepub async fn new(
my_addr: &str,
server_addr: &str,
num_nodes: usize,
) -> Result<Self, LiquidError>
pub async fn new( my_addr: &str, server_addr: &str, num_nodes: usize, ) -> Result<Self, LiquidError>
Create a new liquid_ml application that runs at my_addr and will
wait to connect to num_nodes nodes before returning.
Sourcepub async fn df_from_fn(
&mut self,
df_name: &str,
data_generator: fn() -> Vec<Column>,
) -> Result<(), LiquidError>
pub async fn df_from_fn( &mut self, df_name: &str, data_generator: fn() -> Vec<Column>, ) -> Result<(), LiquidError>
Create a new data frame with the given name. The data will be generated
by calling the provided data_generator function on node 1, which
will then distribute chunks across all of the nodes.
awaiting this function will block until the data is completely
distributed on all nodes. After the data is distributed, each node
of this distributed liquid_ml system will have their LiquidML
struct updated with the information of the new DistributedDataFrame
NOTE: df_name must be unique.
Sourcepub async fn df_from_sor(
&mut self,
df_name: &str,
file_name: &str,
) -> Result<(), LiquidError>
pub async fn df_from_sor( &mut self, df_name: &str, file_name: &str, ) -> Result<(), LiquidError>
Create a new data frame with the given name. The data comes from a
SoR file which is assumed to only exist on node 1. Node 1 will
parse the file into chunks sized so that each node only has 1 or at
most 2 chunks. Node 1 distributes these chunks to all the other nodes,
sending up to 2 chunks concurrently so as to restrict memory usage
because of the large chunk size.
awaiting this function will block until the data is completely
distributed on all nodes. After the data is distributed, each node
of this distributed liquid_ml system will have their LiquidML
struct updated with the information of the new DistributedDataFrame
NOTE: df_name must be unique.
Sourcepub async fn df_from_iter(
&mut self,
df_name: &str,
iter: impl Iterator<Item = Vec<Column>>,
) -> Result<(), LiquidError>
pub async fn df_from_iter( &mut self, df_name: &str, iter: impl Iterator<Item = Vec<Column>>, ) -> Result<(), LiquidError>
Create a new data frame that consists of all the chunks in iter until
iter is consumed. Node 1 will call next on the iter and
distributes these chunks to all the other nodes, sending up to 2 chunks
concurrently so as to restrict memory usage.
There is a possibility to increase the concurrency of sending the chunks, this would change the API slightly but not in a major way.
awaiting this function will block until the data is completely
distributed on all nodes. After the data is distributed, each node
of this distributed liquid_ml system will have their LiquidML
struct updated with the information of the new DistributedDataFrame
NOTE: df_name must be unique.
Sourcepub async fn map<T: Rower + Serialize + Clone + DeserializeOwned + Send>(
&self,
df_name: &str,
rower: T,
) -> Result<Option<T>, LiquidError>
pub async fn map<T: Rower + Serialize + Clone + DeserializeOwned + Send>( &self, df_name: &str, rower: T, ) -> Result<Option<T>, LiquidError>
Perform a distributed map operation on the DistributedDataFrame with
the name df_name and uses the given rower. Returns Some(rower)
(of the joined results) if the node_id of this
DistributedDataFrame is 1, and None otherwise.
A local pmap is used on each node to map over that nodes’ chunk.
By default, each node will use the number of threads available on that
machine.
NOTE: There is an important design decision that comes with a distinct trade off here. The trade off is:
- Join the last node with the next one until you get to the end. This has reduced memory requirements but a performance impact because of the synchronous network calls
- Join all nodes with one node by sending network messages concurrently to the final node. This has increased memory requirements and greater complexity but greater performance because all nodes can asynchronously send to one node at the same time.
This implementation went with option 1 for simplicity reasons
Sourcepub async fn filter<T: Rower + Serialize + Clone + DeserializeOwned + Send>(
&mut self,
df_name: &str,
rower: T,
) -> Result<(), LiquidError>
pub async fn filter<T: Rower + Serialize + Clone + DeserializeOwned + Send>( &mut self, df_name: &str, rower: T, ) -> Result<(), LiquidError>
Perform a distributed filter operation on the DistributedDataFrame
with the name df_name and uses the given rower. This function
does not mutate the DistributedDataFrame in anyway, instead, it
creates a new DistributedDataFrame of the results. This
DistributedDataFrame is returned to every node so that the results
are consistent everywhere.
A local pfilter is used on each node to filter over that nodes’
chunks. By default, each node will use the number of threads available
on that machine.
It is possible to re-write this to use a bit map of the rows that should remain in the filtered result, but currently this just clones the rows.
Auto Trait Implementations§
impl Freeze for LiquidML
impl !RefUnwindSafe for LiquidML
impl Send for LiquidML
impl Sync for LiquidML
impl Unpin for LiquidML
impl !UnwindSafe for LiquidML
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more