pub use ferrokinesis_core::capture::{CaptureOp, CaptureRecord, read_capture_file};
use serde::Serialize;
use std::borrow::Cow;
use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
#[derive(Serialize)]
pub struct CaptureRecordRef<'a> {
pub op: CaptureOp,
pub ts: u64,
pub stream: &'a str,
pub partition_key: Cow<'a, str>,
pub data: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
pub explicit_hash_key: Option<&'a str>,
pub sequence_number: &'a str,
pub shard_id: &'a str,
}
#[derive(Clone)]
pub struct CaptureWriter {
inner: Arc<Mutex<BufWriter<File>>>,
scrub: bool,
}
impl CaptureWriter {
pub fn new(path: &Path, scrub: bool) -> io::Result<Self> {
let file = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self {
inner: Arc::new(Mutex::new(BufWriter::new(file))),
scrub,
})
}
pub fn write_record(&self, record: &CaptureRecordRef<'_>) {
self.write_records(std::slice::from_ref(record));
}
pub fn write_records(&self, records: &[CaptureRecordRef<'_>]) {
let mut lines: Vec<Vec<u8>> = Vec::with_capacity(records.len());
for record in records {
let Ok(mut line) = (if self.scrub {
let scrubbed = CaptureRecordRef {
op: record.op,
ts: record.ts,
stream: record.stream,
partition_key: Cow::Owned(scrub_partition_key(&record.partition_key)),
data: record.data,
explicit_hash_key: record.explicit_hash_key,
sequence_number: record.sequence_number,
shard_id: record.shard_id,
};
serde_json::to_vec(&scrubbed)
} else {
serde_json::to_vec(record)
}) else {
tracing::warn!("capture: failed to serialize record");
continue;
};
line.push(b'\n');
lines.push(line);
}
if lines.is_empty() {
return;
}
let Ok(mut writer) = self.inner.lock() else {
tracing::error!("capture: failed to acquire lock");
return;
};
for line in &lines {
if let Err(e) = writer.write_all(line) {
tracing::warn!("capture: write error: {e}");
return;
}
}
if let Err(e) = writer.flush() {
tracing::warn!("capture: flush error: {e}");
}
}
}
pub fn scrub_partition_key(key: &str) -> String {
use md5::{Digest, Md5};
let mut hasher = Md5::new();
hasher.update(key.as_bytes());
let result = hasher.finalize();
result.iter().fold(String::with_capacity(32), |mut s, b| {
use std::fmt::Write;
let _ = write!(s, "{b:02x}");
s
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn write_records_empty_slice_is_noop() {
let capture_file = tempfile::NamedTempFile::new().unwrap();
let writer = CaptureWriter::new(capture_file.path(), false).unwrap();
writer.write_records(&[]);
let captured = read_capture_file(capture_file.path()).unwrap();
assert_eq!(captured.len(), 0);
}
#[test]
fn scrub_is_deterministic() {
let a = scrub_partition_key("my-key");
let b = scrub_partition_key("my-key");
assert_eq!(a, b);
assert_ne!(a, "my-key");
assert_eq!(a.len(), 32);
}
#[test]
fn scrub_different_keys_differ() {
let a = scrub_partition_key("key-1");
let b = scrub_partition_key("key-2");
assert_ne!(a, b);
}
}