liquid_ml/dataframe/distributed_dataframe.rs
1//! Defines functionality for a data frame that is split across different
2//! physical machines.
3use crate::dataframe::{local_dataframe::LocalDataFrame, Row, Rower, Schema};
4use crate::error::LiquidError;
5use crate::kv::{KVStore, Key};
6use crate::network::{Client, FramedStream};
7use bincode::{deserialize, serialize};
8use futures::stream::{SelectAll, StreamExt};
9use log::{debug, info};
10use rand::{self, Rng};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use sorer::dataframe::{Column, Data, SorTerator};
13use std::cmp;
14use std::collections::HashMap;
15use std::fs::File;
16use std::io::{BufRead, BufReader};
17use std::ops::Range;
18use std::sync::Arc;
19use tokio::sync::{
20 mpsc::{self, Receiver, Sender},
21 Mutex, Notify, RwLock,
22};
23
24/// Represents a distributed, immutable data frame which contains data stored
25/// in a columnar format and a well defined [`Schema`]. Provides convenient
26/// `map` and `filter` methods that operate on the entire distributed data
27/// frame (ie, across different machines) with a given [`Rower`]
28///
29/// [`Rower`]: trait.Rower.html
30#[derive(Debug)]
31pub struct DistributedDataFrame {
32 /// The `Schema` of this `DistributedDataFrame`
33 pub schema: Schema,
34 /// The name of this `DistributedDataFrame`. Must be unique in a `LiquidML`
35 /// instance
36 pub df_name: String,
37 /// A map of the range of row indices to the `Key`s that point to the chunk
38 /// of data with those rows. Not all `Key`s in this map belong to this node
39 /// of the `DistributedDataFrame`, some may belong to other nodes
40 pub df_chunk_map: HashMap<Range<usize>, Key>,
41 /// The number of rows in this entire `DistributedDataFrame`
42 pub num_rows: usize,
43 /// The id of the node this `DistributedDataFrame` is running on
44 pub node_id: usize,
45 /// How many nodes are there in this `DistributedDataFrame`?
46 pub num_nodes: usize,
47 /// What's the address of the `Server`?
48 pub server_addr: String,
49 /// What's my IP address?
50 pub my_ip: String,
51 /// Used for communication with other nodes in this `DistributedDataFrame`
52 network: Arc<Mutex<Client<DistributedDFMsg>>>,
53 /// The `KVStore`, which stores the serialized data owned by this
54 /// `DistributedDataFrame` and deserialized cached data that may or may
55 /// not belong to this node
56 kv: Arc<KVStore<LocalDataFrame>>,
57 /// Used for processing messages so that the asynchronous task running
58 /// the `process_message` function can notify other asynchronous tasks
59 /// when the `row` of this `DistributedDataFrame` is ready to use for
60 /// operations (such as returning the result to the `get_row` function
61 internal_notifier: Arc<Notify>,
62 /// Is mutated by the asynchronous `process_message` task to be a requested
63 /// row when the network responds to `GetRow` requests, to enable getter
64 /// methods for data such as `get_row`
65 row: Arc<RwLock<Row>>,
66 /// A notifier that gets notified when the `Server` has sent a `Kill`
67 /// message to this `DistributedDataFrame`'s network `Client`
68 _kill_notifier: Arc<Notify>,
69 /// Used for lower level messages, such as sending arbitrary `Rower`s
70 blob_receiver: Mutex<Receiver<Vec<u8>>>,
71 /// Used for processing filter results TODO: maybe a better way to do this
72 filter_results: Mutex<Receiver<DistributedDFMsg>>,
73}
74
75/// Represents the kinds of messages sent between `DistributedDataFrame`s
76#[derive(Debug, Serialize, Deserialize, Clone)]
77pub(crate) enum DistributedDFMsg {
78 /// A messaged used to request a `Row` with the given index from another
79 /// node in a `DistributedDataFrame`
80 GetRow(usize),
81 /// A message used to respond to `GetRow` messages with the requested row
82 Row(Row),
83 /// A message used to tell the 1st node the results from using the `filter`
84 /// method. If there were no rows after filtering, then `filtered_df_key`
85 /// is `None` and `num_rows` is `0`.
86 FilterResult {
87 num_rows: usize,
88 filtered_df_key: Option<Key>,
89 },
90 /// A message used to share random blobs of data with other nodes. This
91 /// provides a lower level interface to facilitate other kinds of messages,
92 /// for example sending rowers when performing `map`/`filter`.
93 Blob(Vec<u8>),
94 /// Used to inform other nodes in a `DistributedDataFrame` the required
95 /// information for other nodes to construct a new `DistributedDataFrame`
96 /// struct that is consistent across all nodes.
97 Initialization {
98 schema: Schema,
99 df_chunk_map: HashMap<Range<usize>, Key>,
100 },
101}
102
103impl DistributedDataFrame {
104 /// Creates a new `DistributedDataFrame` from the given file. It is
105 /// assumed that node 1 contains the file with the given `file_name`.
106 /// Node 1 will then parse that file and distribute chunks to other nodes
107 /// over the network, so if network latency is a concern you should not
108 /// use this method.
109 pub(crate) async fn from_sor(
110 server_addr: &str,
111 my_ip: &str,
112 file_name: &str,
113 kv: Arc<KVStore<LocalDataFrame>>,
114 df_name: &str,
115 num_nodes: usize,
116 ) -> Result<Arc<Self>, LiquidError> {
117 // make a chunking iterator for the sor file
118 let sor_terator = if kv.id == 1 {
119 let total_newlines = count_new_lines(file_name);
120 let max_rows_per_node = total_newlines / num_nodes;
121 let schema = sorer::schema::infer_schema(file_name)?;
122 info!(
123 "Total newlines: {} max rows per node: {}",
124 total_newlines, max_rows_per_node
125 );
126 info!("Inferred schema: {:?}", &schema);
127 Some(SorTerator::new(file_name, schema, max_rows_per_node))
128 } else {
129 None
130 };
131 DistributedDataFrame::from_iter(
132 server_addr,
133 my_ip,
134 sor_terator,
135 kv,
136 df_name,
137 num_nodes,
138 )
139 .await
140 }
141
142 /// Creates a new `DataFrame` from the given iterator. The iterator is
143 /// used only on node 1, which calls `next` on it and distributes chunks
144 /// concurrently.
145 pub(crate) async fn from_iter(
146 server_addr: &str,
147 my_ip: &str,
148 iter: Option<impl Iterator<Item = Vec<Column>>>,
149 kv: Arc<KVStore<LocalDataFrame>>,
150 df_name: &str,
151 num_nodes: usize,
152 ) -> Result<Arc<Self>, LiquidError> {
153 // Figure out what node we are supposed to be
154 let node_id = kv.id;
155 // initialize some other required fields of self so as not to duplicate
156 // code in if branches
157 let (blob_sender, blob_receiver) = mpsc::channel(2);
158 // used for internal messaging processing so that the asynchronous
159 // messaging task can notify other tasks when `self.row` is ready
160 let internal_notifier = Arc::new(Notify::new());
161 // so that our network client can notify us when they get a Kill
162 // signal
163 let _kill_notifier = Arc::new(Notify::new());
164 // so that our client only connects to clients for this dataframe
165 let df_network_name = format!("ddf-{}", df_name);
166 // for processing results when distributed filtering is performed
167 // on this `DistributedDataFrame`
168 let (filter_results_sender, filter_results) = mpsc::channel(num_nodes);
169 let filter_results = Mutex::new(filter_results);
170
171 let (network, mut read_streams, __kill_notifier) =
172 Client::register_network(
173 kv.network.clone(),
174 df_network_name.to_string(),
175 )
176 .await?;
177 assert_eq!(node_id, { network.lock().await.id });
178
179 // Node 1 is responsible for sending out chunks
180 if node_id == 1 {
181 // Distribute the chunked sor file round-robin style
182 let mut df_chunk_map = HashMap::new();
183 let mut cur_num_rows = 0;
184 let mut schema = None;
185 {
186 // in each iteration, create a future sends a chunk to a node
187 let mut chunk_idx = 0;
188 for chunk in iter.unwrap().into_iter() {
189 if chunk_idx == 0 {
190 schema = Some(Schema::from(&chunk));
191 }
192
193 let ldf = LocalDataFrame::from(chunk);
194 if chunk_idx > 0 {
195 // assert all chunks have the same schema
196 assert_eq!(schema.as_ref(), Some(ldf.get_schema()));
197 }
198
199 // make the key that will be associated with this chunk
200 let key =
201 Key::generate(df_name, (chunk_idx % num_nodes) + 1);
202 // add this chunk range and key to our <range, key> map
203 df_chunk_map.insert(
204 Range {
205 start: cur_num_rows,
206 end: cur_num_rows + ldf.n_rows(),
207 },
208 key.clone(),
209 );
210 cur_num_rows += ldf.n_rows();
211
212 let kv_ptr = kv.clone();
213 tokio::spawn(async move {
214 kv_ptr.put(key, ldf).await.unwrap();
215 });
216
217 // NOTE: might need to do some tuning on when to join the
218 // futures here, possibly even dynamically figure out some
219 // value to smooth over the tradeoff between memory and
220 // speed
221 chunk_idx += 1;
222 }
223
224 // we are almost done distributing chunks
225 info!("Finished distributing {} SoR chunks", chunk_idx);
226 }
227
228 // Create an Initialization message that holds all the information
229 // related to this DistributedDataFrame, the Schema and the map
230 // of the range of indices that each chunk holds and the `Key`
231 // associated with that chunk
232 let schema = schema.unwrap();
233 let intro_msg = DistributedDFMsg::Initialization {
234 schema: schema.clone(),
235 df_chunk_map: df_chunk_map.clone(),
236 };
237
238 // Broadcast the initialization message to all nodes
239 network.lock().await.broadcast(intro_msg).await?;
240 debug!("Node 1 sent the initialization message to all nodes");
241
242 let row = Arc::new(RwLock::new(Row::new(&schema)));
243
244 let ddf = Arc::new(DistributedDataFrame {
245 schema,
246 df_name: df_name.to_string(),
247 df_chunk_map,
248 num_rows: cur_num_rows,
249 network,
250 node_id,
251 num_nodes,
252 server_addr: server_addr.to_string(),
253 my_ip: my_ip.to_string(),
254 kv,
255 internal_notifier,
256 row,
257 _kill_notifier,
258 blob_receiver: Mutex::new(blob_receiver),
259 filter_results,
260 });
261
262 // spawn a tokio task to process messages
263 let ddf_clone = ddf.clone();
264 tokio::spawn(async move {
265 DistributedDataFrame::process_messages(
266 ddf_clone,
267 read_streams,
268 blob_sender,
269 filter_results_sender,
270 )
271 .await
272 .unwrap();
273 });
274
275 Ok(ddf)
276 } else {
277 // Node 1 will send the initialization message to our network
278 let init_msg = read_streams.next().await.unwrap()?;
279 // We got a message, check it was the initialization message
280 let (schema, df_chunk_map) = match init_msg.msg {
281 DistributedDFMsg::Initialization {
282 schema,
283 df_chunk_map,
284 } => (schema, df_chunk_map),
285 _ => return Err(LiquidError::UnexpectedMessage),
286 };
287 debug!("Got the Initialization message from Node 1");
288
289 let row = Arc::new(RwLock::new(Row::new(&schema)));
290 let num_rows = df_chunk_map.iter().fold(0, |mut acc, (k, _)| {
291 if acc > k.end {
292 acc
293 } else {
294 acc = k.end;
295 acc
296 }
297 });
298
299 let ddf = Arc::new(DistributedDataFrame {
300 schema,
301 df_name: df_name.to_string(),
302 df_chunk_map,
303 num_rows,
304 network,
305 node_id,
306 num_nodes,
307 server_addr: server_addr.to_string(),
308 my_ip: my_ip.to_string(),
309 kv,
310 internal_notifier,
311 row,
312 _kill_notifier,
313 blob_receiver: Mutex::new(blob_receiver),
314 filter_results,
315 });
316
317 // spawn a tokio task to process messages
318 let ddf_clone = ddf.clone();
319 tokio::spawn(async move {
320 DistributedDataFrame::process_messages(
321 ddf_clone,
322 read_streams,
323 blob_sender,
324 filter_results_sender,
325 )
326 .await
327 .unwrap();
328 });
329
330 Ok(ddf)
331 }
332 }
333
334 // TODO: add some verification that the `data` is not jagged. A function
335 // that is a no-op if its not jagged, otherwise inserts nulls to fix
336 // it, would be nice.
337
338 /// Creates a new `DistributedDataFrame` by chunking the given `data` into
339 /// evenly sized chunks and distributing it across all nodes. Each chunk
340 /// will be size of total number of rows in `data` divided by the number of
341 /// nodes, since this was found to have the best performance for `map` and
342 /// `filter`. Node 1 is responsible for distributing the data, and thus
343 /// `data` should only be `Some` on node 1.
344 ///
345 /// NOTE: this function currently does not verify that `data` is not
346 /// jagged, which is a required invariant of the program. There is a plan
347 /// to automatically fix jagged data.
348 pub(crate) async fn new(
349 server_addr: &str,
350 my_ip: &str,
351 data: Option<Vec<Column>>,
352 kv: Arc<KVStore<LocalDataFrame>>,
353 df_name: &str,
354 num_nodes: usize,
355 ) -> Result<Arc<Self>, LiquidError> {
356 let num_rows = if let Some(d) = &data { n_rows(d) } else { 0 };
357 let chunk_size = num_rows / num_nodes;
358 let chunkerator = if data.is_some() {
359 Some(DataChunkerator { chunk_size, data })
360 } else {
361 None
362 };
363 DistributedDataFrame::from_iter(
364 server_addr,
365 my_ip,
366 chunkerator,
367 kv,
368 df_name,
369 num_nodes,
370 )
371 .await
372 }
373
374 /// Obtains a reference to this `DistributedDataFrame`s schema.
375 pub fn get_schema(&self) -> &Schema {
376 &self.schema
377 }
378
379 /// Get the data at the given `col_idx`, `row_idx` offsets as a boxed value
380 pub async fn get(
381 &self,
382 col_idx: usize,
383 row_idx: usize,
384 ) -> Result<Data, LiquidError> {
385 let r = self.get_row(row_idx).await?;
386 Ok(r.get(col_idx)?.clone())
387 }
388
389 /// Returns a clone of the row at the requested `index`
390 pub async fn get_row(&self, index: usize) -> Result<Row, LiquidError> {
391 match self.df_chunk_map.iter().find(|(k, _)| k.contains(&index)) {
392 Some((range, key)) => {
393 // key is either owned by us or another node
394 if key.home == self.node_id {
395 // we own it
396 let our_local_df = self.kv.get(&key).await?;
397 let mut r = Row::new(self.get_schema());
398 // TODO: is this index for fill_row correct?
399 our_local_df.fill_row(index - range.start, &mut r)?;
400 Ok(r)
401 } else {
402 // owned by another node, must request over the network
403 let get_msg = DistributedDFMsg::GetRow(index);
404 {
405 self.network
406 .lock()
407 .await
408 .send_msg(key.home, get_msg)
409 .await?;
410 }
411 // wait here until we are notified the row is set by our
412 // message processing task
413 self.internal_notifier.notified().await;
414 // self.row is now set
415 Ok(self.row.read().await.clone())
416 }
417 }
418 None => Err(LiquidError::RowIndexOutOfBounds),
419 }
420 }
421
422 /// Get the index of the `Column` with the given `col_name`. Returns `Some`
423 /// if a `Column` with the given name exists, or `None` otherwise.
424 pub fn get_col_idx(&self, col_name: &str) -> Option<usize> {
425 self.schema.col_idx(col_name)
426 }
427
428 /// Perform a distributed map operation on this `DistributedDataFrame` with
429 /// the given `rower`. Returns `Some(rower)` (of the joined results) if the
430 /// `node_id` of this `DistributedDataFrame` is `1`, and `None` otherwise.
431 ///
432 /// A local `pmap` is used on each node to map over that nodes' chunk.
433 /// By default, each node will use the number of threads available on that
434 /// machine.
435 ///
436 ///
437 /// NOTE:
438 /// There is an important design decision that comes with a distinct trade
439 /// off here. The trade off is:
440 /// 1. Join the last node with the next one until you get to the end. This
441 /// has reduced memory requirements but a performance impact because
442 /// of the synchronous network calls
443 /// 2. Join all nodes with one node by sending network messages
444 /// concurrently to the final node. This has increased memory
445 /// requirements and greater complexity but greater performance because
446 /// all nodes can asynchronously send to one node at the same time.
447 ///
448 /// This implementation went with option 1 for simplicity reasons
449 pub async fn map<T: Rower + Clone + Send + Serialize + DeserializeOwned>(
450 &self,
451 mut rower: T,
452 ) -> Result<Option<T>, LiquidError> {
453 // get the keys for our locally owned chunks
454 let my_keys: Vec<&Key> = self
455 .df_chunk_map
456 .iter()
457 .filter(|(_, key)| key.home == self.node_id)
458 .map(|(_, v)| v)
459 .collect();
460 // map over our chunks
461 for key in my_keys {
462 // TODO: shouldn't need wait_and_get here since we own that chunk..
463 let ldf = self.kv.wait_and_get(key).await?;
464 rower = ldf.pmap(rower);
465 }
466 if self.node_id == self.num_nodes {
467 // we are the last node
468 self.send_blob(self.node_id - 1, &rower).await?;
469 debug!("Last node sent its results");
470 Ok(None)
471 } else {
472 let blob =
473 { self.blob_receiver.lock().await.recv().await.unwrap() };
474 let external_rower: T = deserialize(&blob[..])?;
475 rower = rower.join(external_rower);
476 debug!("Received a resulting rower and joined it with local rower");
477 if self.node_id != 1 {
478 self.send_blob(self.node_id - 1, &rower).await?;
479 debug!("Forwarded the combined rower");
480 Ok(None)
481 } else {
482 debug!("Final node completed map");
483 Ok(Some(rower))
484 }
485 }
486 }
487
488 // TODO: maybe abstract this into an iterator and use the from_iter
489 // function since a **lot** of code here is copy pasted from that.
490 // One issue: filter needs to generate a client-type that is unique
491 // to the filtered dataframe, but from_iter assumes the client-type
492 // is `ddf`. We could make a private from_iter_and_type method
493 // that also accepts the client-type, and then from_iter passes in
494 // "ddf" while filter passes in the generated client-type
495
496 /// Perform a distributed filter operation on this `DistributedDataFrame`.
497 /// This function does not mutate the `DistributedDataFrame` in anyway,
498 /// instead, it creates a new `DistributedDataFrame` of the results. This
499 /// `DistributedDataFrame` is returned to every node so that the results
500 /// are consistent everywhere.
501 ///
502 /// A local `pfilter` is used on each node to filter over that nodes'
503 /// chunks. By default, each node will use the number of threads available
504 /// on that machine.
505 ///
506 /// It is possible to re-write this to use a bit map of the rows that
507 /// should remain in the filtered result, but currently this just clones
508 /// the rows.
509 pub async fn filter<
510 T: Rower + Clone + Send + Serialize + DeserializeOwned,
511 >(
512 &self,
513 mut rower: T,
514 ) -> Result<Arc<Self>, LiquidError> {
515 // so that our network client can notify us when they get a Kill
516 // signal
517 let _kill_notifier = Arc::new(Notify::new());
518 let mut rng = rand::thread_rng();
519 let r = rng.gen::<i16>();
520 let new_name = format!("{}-filtered-{}", &self.df_name, r);
521 let df_network_name = format!("ddf-{}", new_name);
522 let (network, mut read_streams, __kill_notifier) =
523 Client::register_network(
524 self.kv.network.clone(),
525 df_network_name.to_string(),
526 )
527 .await?;
528 assert_eq!(self.node_id, { network.lock().await.id });
529
530 // get the keys for our locally owned chunks
531 let my_keys: Vec<&Key> = self
532 .df_chunk_map
533 .iter()
534 .filter(|(_, key)| key.home == self.node_id)
535 .map(|(_, v)| v)
536 .collect();
537 // NOTE: combines all chunks into one final chunk, may want to change
538 // to stay 1-1
539 // filter over our locally owned chunks
540 let mut filtered_ldf = LocalDataFrame::new(self.get_schema());
541 for key in &my_keys {
542 // TODO: should not really need wait_and_get here since we own that chunk?
543 let ldf = self.kv.wait_and_get(key).await?;
544 filtered_ldf = filtered_ldf.combine(ldf.pfilter(&mut rower))?;
545 }
546
547 // initialize some other required fields of self so as not to duplicate
548 // code in if branches
549 let (blob_sender, blob_receiver) = mpsc::channel(2);
550 // used for internal messaging processing so that the asynnchronous
551 // messaging task can notify other tasks when `self.row` is ready
552 let internal_notifier = Arc::new(Notify::new());
553 // for processing results of distributed filtering
554 let (filter_results_sender, filter_results) =
555 mpsc::channel(self.num_nodes);
556 let filter_results = Mutex::new(filter_results);
557
558 let num_rows_left = filtered_ldf.n_rows();
559 info!(
560 "Finished filtering {} local chunk(s), have {} rows after filter",
561 my_keys.len(),
562 num_rows_left
563 );
564
565 // put our result in our KVStore only if its not empty
566 let mut key = None;
567 if num_rows_left > 0 {
568 let k = Key::generate(&new_name, self.node_id);
569 key = Some(k.clone());
570 self.kv.put(k, filtered_ldf).await?;
571 }
572
573 if self.node_id == 1 {
574 // 2. collect all results from other nodes (insert ours first)
575 let mut df_chunk_map = HashMap::new();
576 let mut cur_num_rows = 0;
577 if let Some(key) = key {
578 df_chunk_map.insert(
579 Range {
580 start: cur_num_rows,
581 end: cur_num_rows + num_rows_left,
582 },
583 key,
584 );
585 cur_num_rows += num_rows_left;
586 }
587
588 let mut results_received = 1;
589 // TODO: maybe a better way to pass around these results
590 {
591 let mut unlocked = self.filter_results.lock().await;
592 while results_received < self.num_nodes {
593 let msg = unlocked.recv().await.unwrap();
594 match msg {
595 DistributedDFMsg::FilterResult {
596 num_rows,
597 filtered_df_key,
598 } => {
599 match filtered_df_key {
600 Some(k) => {
601 df_chunk_map.insert(
602 Range {
603 start: cur_num_rows,
604 end: cur_num_rows + num_rows,
605 },
606 k,
607 );
608 cur_num_rows += num_rows;
609 }
610 None => {
611 assert_eq!(num_rows, 0);
612 }
613 }
614 results_received += 1;
615 }
616 _ => return Err(LiquidError::UnexpectedMessage),
617 }
618 results_received += 1;
619 }
620 debug!("Got all filter results from other nodes");
621 }
622
623 // 3. broadcast initialization message
624
625 // Create an Initialization message that holds all the information
626 // related to this DistributedDataFrame, the Schema and the map
627 // of the range of indices that each chunk holds and the `Key`
628 // associated with that chunk
629 let intro_msg = DistributedDFMsg::Initialization {
630 schema: self.get_schema().clone(),
631 df_chunk_map: df_chunk_map.clone(),
632 };
633
634 // Broadcast the initialization message to all nodes
635 network.lock().await.broadcast(intro_msg).await?;
636 debug!("Node 1 sent the initialization message to all nodes");
637
638 // 4. initialize self
639 let row = Arc::new(RwLock::new(Row::new(self.get_schema())));
640 let num_rows = df_chunk_map.iter().fold(0, |mut acc, (k, _)| {
641 if acc > k.end {
642 acc
643 } else {
644 acc = k.end;
645 acc
646 }
647 });
648
649 let ddf = Arc::new(DistributedDataFrame {
650 schema: self.get_schema().clone(),
651 df_name: new_name,
652 df_chunk_map,
653 num_rows,
654 network,
655 node_id: self.node_id,
656 num_nodes: self.num_nodes,
657 server_addr: self.server_addr.clone(),
658 my_ip: self.my_ip.clone(),
659 kv: self.kv.clone(),
660 internal_notifier,
661 row,
662 _kill_notifier,
663 blob_receiver: Mutex::new(blob_receiver),
664 filter_results,
665 });
666
667 // spawn a tokio task to process messages
668 let ddf_clone = ddf.clone();
669 tokio::spawn(async move {
670 DistributedDataFrame::process_messages(
671 ddf_clone,
672 read_streams,
673 blob_sender,
674 filter_results_sender,
675 )
676 .await
677 .unwrap();
678 });
679
680 Ok(ddf)
681 } else {
682 // send our filterresults to node 1
683 let results = DistributedDFMsg::FilterResult {
684 num_rows: num_rows_left,
685 filtered_df_key: key,
686 };
687 network.lock().await.send_msg(1, results).await?;
688 // Node 1 will send the initialization message to our network
689 let init_msg = read_streams.next().await.unwrap()?;
690 // We got a message, check it was the initialization message
691 let (schema, df_chunk_map) = match init_msg.msg {
692 DistributedDFMsg::Initialization {
693 schema,
694 df_chunk_map,
695 } => (schema, df_chunk_map),
696 _ => return Err(LiquidError::UnexpectedMessage),
697 };
698 debug!("Got the Initialization message from Node 1");
699
700 // 4. initialize self
701 let row = Arc::new(RwLock::new(Row::new(&schema)));
702 let num_rows = df_chunk_map.iter().fold(0, |mut acc, (k, _)| {
703 if acc > k.end {
704 acc
705 } else {
706 acc = k.end;
707 acc
708 }
709 });
710
711 let ddf = Arc::new(DistributedDataFrame {
712 schema,
713 df_name: new_name,
714 df_chunk_map,
715 num_rows,
716 network,
717 node_id: self.node_id,
718 num_nodes: self.num_nodes,
719 server_addr: self.server_addr.clone(),
720 my_ip: self.my_ip.clone(),
721 kv: self.kv.clone(),
722 internal_notifier,
723 row,
724 _kill_notifier,
725 blob_receiver: Mutex::new(blob_receiver),
726 filter_results,
727 });
728
729 // spawn a tokio task to process messages
730 let ddf_clone = ddf.clone();
731 tokio::spawn(async move {
732 DistributedDataFrame::process_messages(
733 ddf_clone,
734 read_streams,
735 blob_sender,
736 filter_results_sender,
737 )
738 .await
739 .unwrap();
740 });
741
742 Ok(ddf)
743 }
744 }
745
746 /// Return the (total) number of rows across all nodes for this
747 /// `DistributedDataFrame`
748 pub fn n_rows(&self) -> usize {
749 self.num_rows
750 }
751
752 /// Return the number of columns in this `DistributedDataFrame`.
753 pub fn n_cols(&self) -> usize {
754 self.schema.width()
755 }
756
757 /// Sends the given `blob` to the `DistributedDataFrame` with the given
758 /// `target_id` This provides a lower level interface to facilitate other
759 /// kinds of messages, such as sending deserialized `Rower`s
760 async fn send_blob<T: Serialize>(
761 &self,
762 target_id: usize,
763 blob: &T,
764 ) -> Result<(), LiquidError> {
765 let blob = serialize(blob)?;
766 self.network
767 .lock()
768 .await
769 .send_msg(target_id, DistributedDFMsg::Blob(blob))
770 .await
771 }
772
773 /// Spawns a `tokio` task that processes `DistributedDFMsg` messages
774 /// When a message is received, a new `tokio` task is spawned to
775 /// handle processing of that message to reduce blocking of the message
776 /// receiving task, so that new messages can be read and processed
777 /// concurrently.
778 async fn process_messages(
779 ddf: Arc<DistributedDataFrame>,
780 mut read_streams: SelectAll<FramedStream<DistributedDFMsg>>,
781 blob_sender: Sender<Vec<u8>>,
782 filter_results_sender: Sender<DistributedDFMsg>,
783 ) -> Result<(), LiquidError> {
784 while let Some(Ok(msg)) = read_streams.next().await {
785 let mut blob_sender_clone = blob_sender.clone();
786 let mut filter_res_sender = filter_results_sender.clone();
787 let ddf2 = ddf.clone();
788 tokio::spawn(async move {
789 match msg.msg {
790 DistributedDFMsg::GetRow(row_idx) => {
791 let r = ddf2.get_row(row_idx).await.unwrap();
792 {
793 ddf2.network
794 .lock()
795 .await
796 .send_msg(
797 msg.sender_id,
798 DistributedDFMsg::Row(r),
799 )
800 .await
801 .unwrap();
802 }
803 },
804 DistributedDFMsg::Row(row) => {
805 {
806 *ddf2.row.write().await = row;
807 }
808 ddf2.internal_notifier.notify();
809 },
810 DistributedDFMsg::Blob(blob) => {
811 blob_sender_clone.send(blob).await.unwrap();
812 },
813 DistributedDFMsg::FilterResult { num_rows, filtered_df_key } => {
814 filter_res_sender.send(DistributedDFMsg:: FilterResult { num_rows, filtered_df_key }).await.unwrap();
815 }
816 _ => panic!("Should always happen before message process loop is started"),
817 }
818 });
819 }
820
821 Ok(())
822 }
823}
824
825/// A simple struct to help chunk `Vec<Column>` by a given number of rows
826#[derive(Debug)]
827struct DataChunkerator {
828 /// how many rows in each chunk
829 chunk_size: usize,
830 /// Optional because its assumed node 1 has the data
831 data: Option<Vec<Column>>,
832}
833
834impl Iterator for DataChunkerator {
835 type Item = Vec<Column>;
836
837 /// Advances this iterator by breaking off `self.chunk_size` rows of its
838 /// data until the data is empty. The last chunk may be less than
839 /// `self.chunk_size`
840 fn next(&mut self) -> Option<Self::Item> {
841 if let Some(data) = &mut self.data {
842 // we are node 1 and have the data
843 let cur_chunk_size = cmp::min(self.chunk_size, n_rows(&data));
844 if cur_chunk_size == 0 {
845 // the data has been consumed
846 None
847 } else {
848 // there is more data to chunk
849 let mut chunked_data = Vec::with_capacity(data.len());
850 for col in data {
851 // will panic if rows_per_node is greater than i.len()
852 let new_col = match col {
853 Column::Int(i) => {
854 Column::Int(i.drain(0..cur_chunk_size).collect())
855 }
856 Column::Bool(i) => {
857 Column::Bool(i.drain(0..cur_chunk_size).collect())
858 }
859 Column::Float(i) => {
860 Column::Float(i.drain(0..cur_chunk_size).collect())
861 }
862 Column::String(i) => {
863 Column::String(i.drain(0..cur_chunk_size).collect())
864 }
865 };
866 chunked_data.push(new_col);
867 }
868 Some(chunked_data)
869 }
870 } else {
871 // we are not node 1, we don't have the data
872 None
873 }
874 }
875}
876
877fn n_rows(data: &[Column]) -> usize {
878 match data.get(0) {
879 None => 0,
880 Some(x) => match x {
881 Column::Int(c) => c.len(),
882 Column::Float(c) => c.len(),
883 Column::Bool(c) => c.len(),
884 Column::String(c) => c.len(),
885 },
886 }
887}
888
889fn count_new_lines(file_name: &str) -> usize {
890 let mut buf_reader = BufReader::new(File::open(file_name).unwrap());
891 let mut new_lines = 0;
892
893 loop {
894 let bytes_read = buf_reader.fill_buf().unwrap();
895 let len = bytes_read.len();
896 if len == 0 {
897 return new_lines;
898 };
899 new_lines += bytecount::count(bytes_read, b'\n');
900 buf_reader.consume(len);
901 }
902}