1use std::time::{SystemTime, UNIX_EPOCH};
2
3use futures::{StreamExt, stream::BoxStream};
4use tokio::{
5 runtime::Handle,
6 sync::mpsc::{Receiver, Sender},
7 task::JoinSet,
8};
9
10use crate::{DistError, DistResult};
11
12pub fn timestamp_ms() -> i64 {
13 SystemTime::now()
14 .duration_since(UNIX_EPOCH)
15 .expect("Time went backwards")
16 .as_millis() as i64
17}
18
19pub fn get_local_ip() -> String {
21 local_ip_address::local_ip()
22 .expect("Failed to get local IP")
23 .to_string()
24}
25
26pub struct ReceiverStreamBuilder<O> {
27 tx: Sender<DistResult<O>>,
28 rx: Receiver<DistResult<O>>,
29 join_set: JoinSet<DistResult<()>>,
30}
31
32impl<O: Send + 'static> ReceiverStreamBuilder<O> {
33 pub fn new(capacity: usize) -> Self {
35 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
36
37 Self {
38 tx,
39 rx,
40 join_set: JoinSet::new(),
41 }
42 }
43
44 pub fn tx(&self) -> Sender<DistResult<O>> {
46 self.tx.clone()
47 }
48
49 pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
51 where
52 F: Future<Output = DistResult<()>>,
53 F: Send + 'static,
54 {
55 self.join_set.spawn_on(task, handle);
56 }
57
58 pub fn build(self) -> BoxStream<'static, DistResult<O>> {
60 let Self {
61 tx,
62 rx,
63 mut join_set,
64 } = self;
65
66 drop(tx);
68
69 let check = async move {
71 while let Some(result) = join_set.join_next().await {
72 match result {
73 Ok(task_result) => {
74 match task_result {
75 Ok(_) => continue,
77 Err(error) => return Some(Err(error)),
79 }
80 }
81 Err(e) => {
83 return Some(Err(DistError::internal(format!("Tokio join error: {e}"))));
84 }
85 }
86 }
87 None
88 };
89
90 let check_stream = futures::stream::once(check)
91 .filter_map(|item| async move { item });
93
94 let rx_stream = futures::stream::unfold(rx, |mut rx| async move {
96 let next_item = rx.recv().await;
97 next_item.map(|next_item| (next_item, rx))
98 });
99
100 futures::stream::select(rx_stream, check_stream).boxed()
103 }
104}