use std::fmt::Debug;
use std::fs::OpenOptions;
use std::io::Write;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use serde::Serialize;
pub trait Sink<T>: Send + Sync {
fn record(&self, event: &T);
}
#[derive(Debug, Default)]
pub struct NullSink<T> {
_phantom: PhantomData<T>,
}
impl<T> NullSink<T> {
pub const fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}
impl<T: Send + Sync> Sink<T> for NullSink<T> {
fn record(&self, _event: &T) {
}
}
pub struct InMemorySink<T> {
events: Mutex<Vec<T>>,
}
impl<T> Default for InMemorySink<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> InMemorySink<T> {
pub fn new() -> Self {
Self {
events: Mutex::new(Vec::new()),
}
}
pub fn snapshot(&self) -> Vec<T>
where
T: Clone,
{
self.events
.lock()
.expect("InMemorySink mutex poisoned")
.clone()
}
pub fn drain(&self) -> Vec<T> {
std::mem::take(
&mut *self
.events
.lock()
.expect("InMemorySink mutex poisoned"),
)
}
pub fn len(&self) -> usize {
self.events
.lock()
.expect("InMemorySink mutex poisoned")
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> std::fmt::Debug for InMemorySink<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemorySink")
.field("len", &self.len())
.finish()
}
}
impl<T: Clone + Send + Sync> Sink<T> for InMemorySink<T> {
fn record(&self, event: &T) {
self.events
.lock()
.expect("InMemorySink mutex poisoned")
.push(event.clone());
}
}
pub struct AuditFileSink<T> {
path: PathBuf,
file: Mutex<std::fs::File>,
_phantom: PhantomData<T>,
}
impl<T> AuditFileSink<T> {
pub fn new(path: &Path) -> std::io::Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self {
path: path.to_path_buf(),
file: Mutex::new(file),
_phantom: PhantomData,
})
}
pub fn path(&self) -> &Path {
&self.path
}
}
impl<T> std::fmt::Debug for AuditFileSink<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AuditFileSink")
.field("path", &self.path)
.finish()
}
}
impl<T: Serialize + Send + Sync> Sink<T> for AuditFileSink<T> {
fn record(&self, event: &T) {
match serde_json::to_string(event) {
Ok(line) => {
if let Ok(mut f) = self.file.lock() {
let _ = writeln!(f, "{line}");
}
}
Err(err) => {
eprintln!(
"[shigoto-types::sink] AuditFileSink serialize failed: {err}",
);
}
}
}
}
pub struct MultiSink<T> {
children: Vec<Arc<dyn Sink<T>>>,
}
impl<T> MultiSink<T> {
pub fn new() -> Self {
Self {
children: Vec::new(),
}
}
pub fn with(mut self, child: Arc<dyn Sink<T>>) -> Self {
self.children.push(child);
self
}
pub fn push(&mut self, child: Arc<dyn Sink<T>>) {
self.children.push(child);
}
pub fn len(&self) -> usize {
self.children.len()
}
pub fn is_empty(&self) -> bool {
self.children.is_empty()
}
}
impl<T> Default for MultiSink<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> std::fmt::Debug for MultiSink<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MultiSink")
.field("children", &self.children.len())
.finish()
}
}
impl<T: Send + Sync> Sink<T> for MultiSink<T> {
fn record(&self, event: &T) {
for child in &self.children {
child.record(event);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
use tempfile::TempDir;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestEvent {
kind: String,
n: u32,
}
fn ev(kind: &str, n: u32) -> TestEvent {
TestEvent {
kind: kind.into(),
n,
}
}
#[test]
fn null_sink_records_without_panic() {
let sink = NullSink::<TestEvent>::new();
sink.record(&ev("a", 1));
sink.record(&ev("b", 2));
}
#[test]
fn in_memory_sink_captures_in_order() {
let sink = InMemorySink::<TestEvent>::new();
assert!(sink.is_empty());
sink.record(&ev("a", 1));
sink.record(&ev("b", 2));
sink.record(&ev("c", 3));
assert_eq!(sink.len(), 3);
let snap = sink.snapshot();
assert_eq!(snap[0].kind, "a");
assert_eq!(snap[1].kind, "b");
assert_eq!(snap[2].kind, "c");
}
#[test]
fn in_memory_sink_drain_clears_buffer() {
let sink = InMemorySink::<TestEvent>::new();
sink.record(&ev("x", 1));
sink.record(&ev("y", 2));
let drained = sink.drain();
assert_eq!(drained.len(), 2);
assert!(sink.is_empty(), "drain() should clear the buffer");
}
#[test]
fn in_memory_sink_snapshot_does_not_clear() {
let sink = InMemorySink::<TestEvent>::new();
sink.record(&ev("x", 1));
let _ = sink.snapshot();
assert_eq!(sink.len(), 1, "snapshot() must not clear");
}
#[test]
fn audit_file_sink_appends_jsonl_lines() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("sink.jsonl");
let sink = AuditFileSink::<TestEvent>::new(&path).unwrap();
sink.record(&ev("first", 1));
sink.record(&ev("second", 2));
sink.record(&ev("third", 3));
drop(sink);
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<_> = content.lines().collect();
assert_eq!(lines.len(), 3);
for (i, line) in lines.iter().enumerate() {
let parsed: TestEvent = serde_json::from_str(line).unwrap();
assert_eq!(parsed.n, (i + 1) as u32);
}
}
#[test]
fn audit_file_sink_creates_parent_dirs() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("nested/dir/structure/sink.jsonl");
let sink = AuditFileSink::<TestEvent>::new(&path).unwrap();
sink.record(&ev("ok", 1));
drop(sink);
assert!(path.exists());
}
#[test]
fn multi_sink_fans_out_to_every_child() {
let in_mem_a = Arc::new(InMemorySink::<TestEvent>::new());
let in_mem_b = Arc::new(InMemorySink::<TestEvent>::new());
let multi = MultiSink::<TestEvent>::new()
.with(in_mem_a.clone() as Arc<dyn Sink<TestEvent>>)
.with(in_mem_b.clone() as Arc<dyn Sink<TestEvent>>);
assert_eq!(multi.len(), 2);
multi.record(&ev("broadcast", 1));
multi.record(&ev("broadcast", 2));
assert_eq!(in_mem_a.len(), 2, "child A should have both events");
assert_eq!(in_mem_b.len(), 2, "child B should have both events");
}
#[test]
fn multi_sink_empty_is_no_op() {
let multi = MultiSink::<TestEvent>::new();
assert!(multi.is_empty());
multi.record(&ev("a", 1)); }
#[test]
fn dyn_sink_polymorphism_works() {
let null = Arc::new(NullSink::<TestEvent>::new()) as Arc<dyn Sink<TestEvent>>;
let mem = Arc::new(InMemorySink::<TestEvent>::new()) as Arc<dyn Sink<TestEvent>>;
let sinks: Vec<Arc<dyn Sink<TestEvent>>> = vec![null, mem.clone()];
for s in &sinks {
s.record(&ev("z", 9));
}
}
#[test]
fn record_through_dyn_reference() {
let sink = InMemorySink::<TestEvent>::new();
let dyn_ref: &dyn Sink<TestEvent> = &sink;
dyn_ref.record(&ev("via_dyn", 42));
assert_eq!(sink.len(), 1);
assert_eq!(sink.snapshot()[0].n, 42);
}
}