use anyhow::{anyhow, Context, Result};
use std::path::PathBuf;
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use crate::runtime::events::Event;
const WRITER_CHANNEL_CAPACITY: usize = 1024;
#[derive(Clone, Debug)]
pub struct JsonlWriter {
tx: mpsc::Sender<WriterMsg>,
}
struct WriterMsg {
payload: Vec<u8>,
ack: oneshot::Sender<Result<()>>,
}
impl JsonlWriter {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self::new_with_cancel(path, CancellationToken::new())
}
pub fn new_with_cancel(path: impl Into<PathBuf>, cancel_token: CancellationToken) -> Self {
let path = path.into();
let (tx, rx) = mpsc::channel::<WriterMsg>(WRITER_CHANNEL_CAPACITY);
tokio::spawn(writer_task(path, rx, cancel_token));
Self { tx }
}
pub async fn append_line(&self, payload: Vec<u8>) -> Result<()> {
let (ack_tx, ack_rx) = oneshot::channel();
self.tx
.send(WriterMsg {
payload,
ack: ack_tx,
})
.await
.map_err(|_| anyhow!("JsonlWriter actor has shut down before send"))?;
ack_rx
.await
.map_err(|_| anyhow!("JsonlWriter actor dropped ack channel"))?
}
}
async fn writer_task(
path: PathBuf,
mut rx: mpsc::Receiver<WriterMsg>,
cancel_token: CancellationToken,
) {
let mut file = match tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
{
Ok(f) => f,
Err(e) => {
warn!(error = %e, path = %path.display(), "JsonlWriter failed to open file; failing all incoming writes");
while let Some(msg) = rx.recv().await {
let _ = msg.ack.send(Err(anyhow!(
"JsonlWriter could not open '{}': {}",
path.display(),
e
)));
}
return;
}
};
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
debug!(path = %path.display(), "JsonlWriter actor received cancellation");
break;
}
msg = rx.recv() => {
let Some(msg) = msg else { break; };
let result = async {
file.write_all(&msg.payload).await?;
file.flush().await?;
Ok::<_, std::io::Error>(())
}
.await
.map_err(|e| {
anyhow!(
"JsonlWriter file write failed for '{}': {}",
path.display(),
e
)
});
let _ = msg.ack.send(result);
}
}
}
debug!(path = %path.display(), "JsonlWriter actor shutting down");
}
fn redact_event(event: &Event) -> Event {
let mut event = event.clone();
if let Some(ref mut payload) = event.payload {
*payload = crate::wire::protocol::redact_wire_secrets(payload);
}
event
}
#[derive(Clone, Debug)]
pub struct EventWriter {
backend: EventWriterBackend,
}
#[derive(Clone, Debug)]
enum EventWriterBackend {
File(JsonlWriter),
Db {
db: crate::runtime::db::DbHandle,
goal_id: String,
},
}
impl EventWriter {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self {
backend: Self::backend_for_path(path.into(), JsonlWriter::new),
}
}
pub fn new_with_cancel(path: impl Into<PathBuf>, cancel_token: CancellationToken) -> Self {
Self {
backend: Self::backend_for_path(path.into(), |p| {
JsonlWriter::new_with_cancel(p, cancel_token)
}),
}
}
fn backend_for_path(
path: PathBuf,
make_file: impl FnOnce(PathBuf) -> JsonlWriter,
) -> EventWriterBackend {
if let Some(db) = crate::runtime::db::global_db() {
if let Some(goal_id) = path
.parent()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
{
if path.file_name() == Some(std::ffi::OsStr::new("events.jsonl")) {
return EventWriterBackend::Db {
db,
goal_id: goal_id.to_string(),
};
}
}
}
EventWriterBackend::File(make_file(path))
}
pub async fn append(&self, event: &Event) -> Result<()> {
let event = redact_event(event);
match &self.backend {
EventWriterBackend::File(writer) => {
let mut buf = serde_json::to_vec(&event)
.with_context(|| format!("failed to serialize event {}", event.id))?;
buf.push(b'\n');
writer
.append_line(buf)
.await
.with_context(|| format!("failed to append event {}", event.id))?;
debug!(event_id = %event.id, "Appended event");
}
EventWriterBackend::Db { db, goal_id } => {
self.append_event_to_db(db, goal_id, &event).await?;
}
}
Ok(())
}
pub async fn append_many(&self, events: &[Event]) -> Result<()> {
match &self.backend {
EventWriterBackend::File(writer) => {
let mut buffer = Vec::new();
for event in events {
let event = redact_event(event);
serde_json::to_writer(&mut buffer, &event)
.with_context(|| format!("failed to serialize event {}", event.id))?;
buffer.push(b'\n');
}
writer.append_line(buffer).await.with_context(|| {
format!("failed to append batch of {} events", events.len())
})?;
}
EventWriterBackend::Db { db, goal_id } => {
for event in events {
let event = redact_event(event);
self.append_event_to_db(db, goal_id, &event).await?;
}
}
}
Ok(())
}
async fn append_event_to_db(
&self,
db: &crate::runtime::db::DbHandle,
goal_id: &str,
event: &Event,
) -> Result<()> {
use crate::runtime::db::EventRepo;
let payload = serde_json::to_string(event)
.with_context(|| format!("failed to serialize event {}", event.id))?;
let kind = serde_json::to_value(&event.kind)?
.as_str()
.unwrap_or("unknown")
.to_string();
db.event_repo()
.append(goal_id, &kind, &payload)
.await
.map_err(|e| anyhow!("db error: {e}"))?;
debug!(event_id = %event.id, goal_id = %goal_id, "Appended event to DB");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::test]
async fn concurrent_producers_do_not_interleave_lines() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("concurrent.jsonl");
let writer = Arc::new(JsonlWriter::new(&path));
const N: usize = 64;
let payload_len = 8192;
let mut handles = Vec::with_capacity(N);
for i in 0..N {
let writer = Arc::clone(&writer);
handles.push(tokio::spawn(async move {
let body = "x".repeat(payload_len);
let mut line = format!("{{\"i\":{},\"body\":\"{}\"}}", i, body).into_bytes();
line.push(b'\n');
writer.append_line(line).await.unwrap();
}));
}
for h in handles {
h.await.unwrap();
}
drop(writer);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let contents = tokio::fs::read_to_string(&path).await.unwrap();
let mut lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), N, "expected {} lines, got {}", N, lines.len());
lines.sort();
for line in lines {
let v: serde_json::Value =
serde_json::from_str(line).expect("each line must be intact JSON");
assert!(v.get("i").and_then(|v| v.as_u64()).is_some());
let body = v.get("body").and_then(|v| v.as_str()).unwrap();
assert_eq!(
body.len(),
payload_len,
"body payload must not be truncated"
);
}
}
#[tokio::test]
async fn open_failure_surfaces_to_every_caller() {
let dir = TempDir::new().unwrap();
let writer = JsonlWriter::new(dir.path()); let result = writer.append_line(b"hello\n".to_vec()).await;
assert!(
result.is_err(),
"append against a directory path must error"
);
}
}