use crate::dataframe::{Column, DistributedDataFrame, LocalDataFrame, Rower};
use crate::error::LiquidError;
use crate::kv::KVStore;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::{mpsc, mpsc::Receiver, Mutex, Notify};
pub struct LiquidML {
pub kv: Arc<KVStore<LocalDataFrame>>,
pub node_id: usize,
pub blob_receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
pub num_nodes: usize,
pub kill_notifier: Arc<Notify>,
pub data_frames: HashMap<String, Arc<DistributedDataFrame>>,
pub server_addr: String,
pub my_ip: String,
}
impl LiquidML {
pub async fn new(
my_addr: &str,
server_addr: &str,
num_nodes: usize,
) -> Result<Self, LiquidError> {
let (blob_sender, blob_receiver) = mpsc::channel(20);
let kill_notifier = Arc::new(Notify::new());
let kv = KVStore::new(
server_addr.to_string(),
my_addr.to_string(),
blob_sender,
num_nodes,
)
.await;
let node_id = kv.id;
let (my_ip, _my_port) = {
let mut iter = my_addr.split(':');
let first = iter.next().unwrap();
let second = iter.next().unwrap();
(first, second)
};
Ok(LiquidML {
kv,
node_id,
blob_receiver: Arc::new(Mutex::new(blob_receiver)),
num_nodes,
kill_notifier,
data_frames: HashMap::new(),
server_addr: server_addr.to_string(),
my_ip: my_ip.to_string(),
})
}
pub async fn df_from_fn(
&mut self,
df_name: &str,
data_generator: fn() -> Vec<Column>,
) -> Result<(), LiquidError> {
let data = if self.node_id == 1 {
Some(data_generator())
} else {
None
};
let ddf = DistributedDataFrame::new(
&self.server_addr,
&self.my_ip,
data,
self.kv.clone(),
df_name,
self.num_nodes,
)
.await?;
self.data_frames.insert(df_name.to_string(), ddf);
Ok(())
}
pub async fn df_from_sor(
&mut self,
df_name: &str,
file_name: &str,
) -> Result<(), LiquidError> {
let ddf = DistributedDataFrame::from_sor(
&self.server_addr,
&self.my_ip,
file_name,
self.kv.clone(),
df_name,
self.num_nodes,
)
.await?;
self.data_frames.insert(df_name.to_string(), ddf);
Ok(())
}
pub async fn df_from_iter(
&mut self,
df_name: &str,
iter: impl Iterator<Item = Vec<Column>>,
) -> Result<(), LiquidError> {
let ddf = DistributedDataFrame::from_iter(
&self.server_addr,
&self.my_ip,
Some(iter),
self.kv.clone(),
df_name,
self.num_nodes,
)
.await?;
self.data_frames.insert(df_name.to_string(), ddf);
Ok(())
}
pub async fn run<F, Fut>(self, f: F)
where
Fut: Future<Output = ()>,
F: FnOnce(Arc<KVStore<LocalDataFrame>>) -> Fut,
{
f(self.kv.clone()).await;
self.kill_notifier.notified().await;
}
pub async fn map<T: Rower + Serialize + Clone + DeserializeOwned + Send>(
&self,
df_name: &str,
rower: T,
) -> Result<Option<T>, LiquidError> {
let df = match self.data_frames.get(df_name) {
Some(x) => x,
None => return Err(LiquidError::NotPresent),
};
df.map(rower).await
}
pub async fn filter<
T: Rower + Serialize + Clone + DeserializeOwned + Send,
>(
&mut self,
df_name: &str,
rower: T,
) -> Result<(), LiquidError> {
let df = match self.data_frames.get(df_name) {
Some(x) => x,
None => return Err(LiquidError::NotPresent),
};
let filtered_df = df.filter(rower).await?;
self.data_frames
.insert(filtered_df.df_name.clone(), filtered_df);
Ok(())
}
}