datafusion_dist/
util.rs

1use std::time::{SystemTime, UNIX_EPOCH};
2
3use futures::{StreamExt, stream::BoxStream};
4use tokio::{
5    runtime::Handle,
6    sync::mpsc::{Receiver, Sender},
7    task::JoinSet,
8};
9
10use crate::{DistError, DistResult};
11
12pub fn timestamp_ms() -> i64 {
13    SystemTime::now()
14        .duration_since(UNIX_EPOCH)
15        .expect("Time went backwards")
16        .as_millis() as i64
17}
18
19// This function will spawn thread to get the local IP address, so don't call it frequently
20pub fn get_local_ip() -> String {
21    local_ip_address::local_ip()
22        .expect("Failed to get local IP")
23        .to_string()
24}
25
26pub struct ReceiverStreamBuilder<O> {
27    tx: Sender<DistResult<O>>,
28    rx: Receiver<DistResult<O>>,
29    join_set: JoinSet<DistResult<()>>,
30}
31
32impl<O: Send + 'static> ReceiverStreamBuilder<O> {
33    /// Create new channels with the specified buffer size
34    pub fn new(capacity: usize) -> Self {
35        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
36
37        Self {
38            tx,
39            rx,
40            join_set: JoinSet::new(),
41        }
42    }
43
44    /// Get a handle for sending data to the output
45    pub fn tx(&self) -> Sender<DistResult<O>> {
46        self.tx.clone()
47    }
48
49    /// Same as [`Self::spawn`] but it spawns the task on the provided runtime
50    pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
51    where
52        F: Future<Output = DistResult<()>>,
53        F: Send + 'static,
54    {
55        self.join_set.spawn_on(task, handle);
56    }
57
58    /// Create a stream of all data written to `tx`
59    pub fn build(self) -> BoxStream<'static, DistResult<O>> {
60        let Self {
61            tx,
62            rx,
63            mut join_set,
64        } = self;
65
66        // Doesn't need tx
67        drop(tx);
68
69        // future that checks the result of the join set, and propagates panic if seen
70        let check = async move {
71            while let Some(result) = join_set.join_next().await {
72                match result {
73                    Ok(task_result) => {
74                        match task_result {
75                            // Nothing to report
76                            Ok(_) => continue,
77                            // This means a blocking task error
78                            Err(error) => return Some(Err(error)),
79                        }
80                    }
81                    // This means a tokio task error, likely a panic
82                    Err(e) => {
83                        return Some(Err(DistError::internal(format!("Tokio join error: {e}"))));
84                    }
85                }
86            }
87            None
88        };
89
90        let check_stream = futures::stream::once(check)
91            // unwrap Option / only return the error
92            .filter_map(|item| async move { item });
93
94        // Convert the receiver into a stream
95        let rx_stream = futures::stream::unfold(rx, |mut rx| async move {
96            let next_item = rx.recv().await;
97            next_item.map(|next_item| (next_item, rx))
98        });
99
100        // Merge the streams together so whichever is ready first
101        // produces the batch
102        futures::stream::select(rx_stream, check_stream).boxed()
103    }
104}