use std::{path::PathBuf, process::Stdio, sync::Mutex};
use async_stream::stream;
use futures::{Stream, StreamExt};
use tokio::{fs::read_dir, io::BufReader, process::Command};
use tokio_stream::wrappers::ReadDirStream;
use taganak_core::prelude::*;
#[derive(Clone)]
pub struct Provider {
path: PathBuf,
}
impl Provider {
pub fn exec(&self) -> ProviderExec {
ProviderExec::new(self.clone())
}
}
#[derive(Clone)]
pub struct ProviderExec {
provider: Provider,
recv_subject: String,
}
impl ProviderExec {
fn new(provider: Provider) -> Self {
Self {
provider,
recv_subject: format!("<urn:uuid:{}>", uuid::Uuid::new_v4()),
}
}
pub fn describe(
self,
) -> Result<impl Stream<Item = Result<std::sync::Arc<Triple>, SourceError>>, std::io::Error>
{
self.exec("describe")
}
pub fn run(
self,
) -> Result<impl Stream<Item = Result<std::sync::Arc<Triple>, SourceError>>, std::io::Error>
{
self.exec("run")
}
fn exec(
self,
op: &str,
) -> Result<impl Stream<Item = Result<std::sync::Arc<Triple>, SourceError>>, std::io::Error>
{
let mut child = Command::new(&self.provider.path)
.arg(op)
.arg(&self.recv_subject)
.stdout(Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().expect("we configured it");
let reader = BufReader::new(stdout);
let profile = Arc::new(Mutex::new(BasicProfile::new()));
let mut parser =
taganak_core::sources::parsers::n_triples::NTriplesParser::new(reader, profile.clone());
Ok(stream! {
while let Some(triple) = parser.next().await {
yield triple;
}
})
}
}
fn paths() -> Result<Vec<PathBuf>, std::io::Error> {
let mut paths = Vec::new();
paths.push(PathBuf::from("/home/nik/.local/lib/tinipon/providers"));
if let Ok(var) = std::env::var("TINIPON_PATH") {
for path in var.split(":") {
let path = PathBuf::from(path).canonicalize()?;
if !path.is_dir() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotADirectory,
format!("{} is not a directory", path.to_str().unwrap_or_default()),
));
}
paths.push(path);
}
}
Ok(paths)
}
pub fn discover() -> Result<impl Stream<Item = Provider>, std::io::Error> {
Ok(stream! {
for path in paths().unwrap() {
let mut stream = ReadDirStream::new(read_dir(path).await.unwrap());
while let Some(entry) = stream.next().await {
let entry = entry.unwrap();
yield Provider { path: entry.path() };
}
}
})
}