collective_utils/
lib.rs

1use std::path::PathBuf;
2
3use tokio::task::JoinSet;
4use tokio_stream::wrappers::ReceiverStream;
5
6pub mod discretize;
7
8// pub type SyncBoxStream<'a, T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send + Sync + 'a>>;
9pub type Stream<T> = futures_util::stream::BoxStream<'static, T>;
10
11/// Yield the default value for a type that implements [`Default`].
12///
13/// This is a copy of the [`default`] function from the standard library, which is not yet stable.
14#[must_use]
15pub fn default<T: Default>() -> T {
16    T::default()
17}
18
19/// # Errors
20/// If the current directory is not inside a git repository.
21pub fn git_project_root() -> anyhow::Result<PathBuf> {
22    let mut path = std::env::current_dir()?;
23    loop {
24        if path.join(".git").exists() {
25            return Ok(path);
26        }
27        if !path.pop() {
28            return Err(anyhow::anyhow!("Could not find git project root"));
29        }
30    }
31}
32
33/// Get a directory with respect to the git project root.
34/// # Errors
35/// If the current directory is not inside a git repository.
36pub fn dir(dir: impl AsRef<str>) -> anyhow::Result<PathBuf> {
37    let mut path = git_project_root()?;
38    path.push(dir.as_ref());
39    Ok(path)
40}
41
42#[cfg(test)]
43mod tests {
44    #[test]
45    fn test_git_project_root() {
46        let path = super::git_project_root().unwrap();
47        assert!(path.join("Cargo.toml").exists());
48    }
49
50    #[test]
51    fn test_dir() {
52        let path = super::dir("Cargo.toml").unwrap();
53        assert!(path.exists());
54    }
55}
56
57pub trait JoinSetExt {
58    type Item;
59    fn into_stream(self) -> ReceiverStream<Self::Item>;
60}
61
62impl<Item: Send + 'static> JoinSetExt for JoinSet<Item> {
63    type Item = Item;
64
65    fn into_stream(mut self) -> ReceiverStream<Self::Item> {
66        let (tx, rx) = tokio::sync::mpsc::channel(20);
67        tokio::spawn(async move {
68            while let Some(Ok(result)) = self.join_next().await {
69                if tx.send(result).await.is_err() {
70                    return;
71                }
72            }
73        });
74        ReceiverStream::new(rx)
75    }
76}