tinipon 0.1.0

Systems and services inventory and monitoring as Linked Data (RDF)
Documentation
//! Registry and discovery code for the tinipon runner

use std::{path::PathBuf, process::Stdio, sync::Mutex};

use async_stream::stream;
use futures::{Stream, StreamExt};
use tokio::{fs::read_dir, io::BufReader, process::Command};
use tokio_stream::wrappers::ReadDirStream;

use taganak_core::prelude::*;

/// Description of an available provider
#[derive(Clone)]
pub struct Provider {
    /// Path to the provider binary
    path: PathBuf,
}

impl Provider {
    /// Prepare execution of the provider binary
    pub fn exec(&self) -> ProviderExec {
        ProviderExec::new(self.clone())
    }
}

/// Prepared execution environment for a provider
///
/// This struct mainly holds a stable `recv_subject` to pass to
/// the binary. See the protocol described in [super::providers].
#[derive(Clone)]
pub struct ProviderExec {
    provider: Provider,
    recv_subject: String,
}

impl ProviderExec {
    fn new(provider: Provider) -> Self {
        Self {
            provider,
            recv_subject: format!("<urn:uuid:{}>", uuid::Uuid::new_v4()),
        }
    }

    /// Run the `describe` operation and yield its triples as stream
    ///
    /// See [super::providers::Provider::describe] for details.
    pub fn describe(
        self,
    ) -> Result<impl Stream<Item = Result<std::sync::Arc<Triple>, SourceError>>, std::io::Error>
    {
        self.exec("describe")
    }

    /// Run the `run` operation and yield its triples as stream
    ///
    /// See [super::providers::Provider::run] for details.
    pub fn run(
        self,
    ) -> Result<impl Stream<Item = Result<std::sync::Arc<Triple>, SourceError>>, std::io::Error>
    {
        self.exec("run")
    }

    /// Execute an operation with the provider
    ///
    /// This method calls the provider binary using the provided `op` and
    /// parses the triples from the provider back into a stream.
    fn exec(
        self,
        op: &str,
    ) -> Result<impl Stream<Item = Result<std::sync::Arc<Triple>, SourceError>>, std::io::Error>
    {
        let mut child = Command::new(&self.provider.path)
            .arg(op)
            .arg(&self.recv_subject)
            .stdout(Stdio::piped())
            .spawn()?;
        let stdout = child.stdout.take().expect("we configured it");
        let reader = BufReader::new(stdout);

        let profile = Arc::new(Mutex::new(BasicProfile::new()));
        let mut parser =
            taganak_core::sources::parsers::n_triples::NTriplesParser::new(reader, profile.clone());

        Ok(stream! {
            while let Some(triple) = parser.next().await {
                yield triple;
            }
        })
    }
}

fn paths() -> Result<Vec<PathBuf>, std::io::Error> {
    let mut paths = Vec::new();
    paths.push(PathBuf::from("/home/nik/.local/lib/tinipon/providers"));
    if let Ok(var) = std::env::var("TINIPON_PATH") {
        for path in var.split(":") {
            let path = PathBuf::from(path).canonicalize()?;
            if !path.is_dir() {
                return Err(std::io::Error::new(
                    std::io::ErrorKind::NotADirectory,
                    format!("{} is not a directory", path.to_str().unwrap_or_default()),
                ));
            }
            paths.push(path);
        }
    }

    Ok(paths)
}

/// Discover all provider binaries in known paths
pub fn discover() -> Result<impl Stream<Item = Provider>, std::io::Error> {
    Ok(stream! {
        for path in paths().unwrap() {
            let mut stream = ReadDirStream::new(read_dir(path).await.unwrap());
            while let Some(entry) = stream.next().await {
                let entry = entry.unwrap();
                yield Provider { path: entry.path() };
            }
        }
    })
}