use std::path::{Path, PathBuf};
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use rsigma_eval::ProcessResult;
use crate::error::RuntimeError;
pub struct UnixSocketSink {
path: PathBuf,
stream: Option<UnixStream>,
}
impl UnixSocketSink {
pub async fn connect(path: &Path) -> std::io::Result<Self> {
let stream = UnixStream::connect(path).await?;
Ok(Self {
path: path.to_path_buf(),
stream: Some(stream),
})
}
pub async fn send(&mut self, result: &ProcessResult) -> Result<(), RuntimeError> {
if result.is_empty() {
return Ok(());
}
for m in result {
let json = serde_json::to_string(m)?;
self.write_line(&json).await?;
}
Ok(())
}
pub async fn send_raw(&mut self, json: &str) -> Result<(), RuntimeError> {
self.write_line(json).await
}
async fn write_line(&mut self, json: &str) -> Result<(), RuntimeError> {
if self.try_write(json).await.is_err() {
self.stream = None;
self.try_write(json).await?;
}
Ok(())
}
async fn try_write(&mut self, json: &str) -> Result<(), RuntimeError> {
let stream = match self.stream.as_mut() {
Some(stream) => stream,
None => {
let stream = UnixStream::connect(&self.path).await?;
self.stream.insert(stream)
}
};
stream.write_all(json.as_bytes()).await?;
stream.write_all(b"\n").await?;
stream.flush().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::UnixListener;
#[tokio::test]
async fn writes_ndjson_lines_to_collector() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("collector.sock");
let listener = UnixListener::bind(&path).unwrap();
let accept = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut lines = BufReader::new(stream).lines();
let mut out = Vec::new();
while let Some(line) = lines.next_line().await.unwrap() {
out.push(line);
if out.len() == 2 {
break;
}
}
out
});
let mut sink = UnixSocketSink::connect(&path).await.unwrap();
sink.send_raw("{\"a\":1}").await.unwrap();
sink.send_raw("{\"b\":2}").await.unwrap();
let received = accept.await.unwrap();
assert_eq!(
received,
vec!["{\"a\":1}".to_string(), "{\"b\":2}".to_string()]
);
}
}