use crate::archive;
use crate::event::Event;
use crate::view::{ReduceFn, View, ViewOps};
use fs2::FileExt;
use notify::{EventKind, RecursiveMode, Watcher};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::Duration;
type FullEventIter = Box<dyn Iterator<Item = io::Result<(Event, String)>>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LockMode {
#[default]
Flock,
None,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WaitResult {
NewData(u64),
Timeout,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppendConflict {
pub expected_offset: u64,
pub actual_offset: u64,
pub expected_hash: String,
pub actual_hash: Option<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum ConditionalAppendError {
#[error(
"conditional append conflict: expected offset {} (hash {:?}), \
actual offset {} (hash {:?})",
.0.expected_offset, .0.expected_hash, .0.actual_offset, .0.actual_hash
)]
Conflict(AppendConflict),
#[error("I/O error: {0}")]
Io(#[from] io::Error),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppendResult {
pub start_offset: u64,
pub end_offset: u64,
pub line_hash: String,
}
pub struct EventWriter {
file: File,
log_path: PathBuf,
archive_path: PathBuf,
views_dir: PathBuf,
max_log_size: u64,
}
impl std::fmt::Debug for EventWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventWriter")
.field("log_path", &self.log_path)
.field("archive_path", &self.archive_path)
.field("views_dir", &self.views_dir)
.field("max_log_size", &self.max_log_size)
.finish()
}
}
impl EventWriter {
pub fn open(dir: impl AsRef<Path>) -> io::Result<Self> {
Self::open_with_lock(dir, LockMode::Flock)
}
pub fn open_with_lock(dir: impl AsRef<Path>, lock: LockMode) -> io::Result<Self> {
let dir = dir.as_ref().to_path_buf();
let views_dir = dir.join("views");
let log_path = dir.join("app.jsonl");
let archive_path = dir.join("archive.jsonl.zst");
fs::create_dir_all(&views_dir)?;
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)?;
if lock == LockMode::Flock {
file.try_lock_exclusive().map_err(|e| {
io::Error::new(
io::ErrorKind::AlreadyExists,
format!(
"another writer holds the lock on {}: {e}",
log_path.display()
),
)
})?;
}
Ok(EventWriter {
file,
log_path,
archive_path,
views_dir,
max_log_size: 0,
})
}
pub fn append(&mut self, event: &Event) -> io::Result<AppendResult> {
let (result, _) = self.append_raw(event)?;
Ok(result)
}
pub(crate) fn append_raw(&mut self, event: &Event) -> io::Result<(AppendResult, bool)> {
let start_offset = self.file.seek(SeekFrom::End(0))?;
let json = serde_json::to_string(event)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let hash = line_hash(json.as_bytes());
writeln!(self.file, "{json}")?;
self.file.sync_data()?;
let end_offset = start_offset + json.len() as u64 + 1;
let needs_rotate =
self.max_log_size > 0 && self.active_log_size()? >= self.max_log_size;
Ok((
AppendResult {
start_offset,
end_offset,
line_hash: hash,
},
needs_rotate,
))
}
pub fn append_if(
&mut self,
event: &Event,
expected_offset: u64,
expected_hash: &str,
) -> Result<AppendResult, ConditionalAppendError> {
let current_size = self.active_log_size()?;
if current_size != expected_offset {
return Err(ConditionalAppendError::Conflict(AppendConflict {
expected_offset,
actual_offset: current_size,
expected_hash: expected_hash.to_string(),
actual_hash: None,
}));
}
if expected_offset > 0 {
let reader = self.reader();
let actual_hash = reader
.read_line_hash_before(expected_offset)?
.unwrap_or_default();
if actual_hash != expected_hash {
return Err(ConditionalAppendError::Conflict(AppendConflict {
expected_offset,
actual_offset: current_size,
expected_hash: expected_hash.to_string(),
actual_hash: Some(actual_hash),
}));
}
}
Ok(self.append(event)?)
}
pub fn rotate(
&mut self,
reader: &EventReader,
views: &mut HashMap<String, Box<dyn ViewOps>>,
) -> io::Result<()> {
for view in views.values_mut() {
view.refresh_boxed(reader)?;
}
let contents = fs::read(&self.log_path)?;
if contents.is_empty() {
return Ok(());
}
archive::append_compressed_frame(&self.archive_path, &contents)?;
self.file.set_len(0)?;
self.file.sync_data()?;
for view in views.values_mut() {
view.reset_offset()?;
}
Ok(())
}
pub fn reader(&self) -> EventReader {
EventReader {
log_path: self.log_path.clone(),
archive_path: self.archive_path.clone(),
}
}
pub fn dir(&self) -> &Path {
self.log_path
.parent()
.expect("log_path always has a parent directory")
}
pub fn log_path(&self) -> &Path {
&self.log_path
}
pub fn archive_path(&self) -> &Path {
&self.archive_path
}
pub fn views_dir(&self) -> &Path {
&self.views_dir
}
pub fn active_log_size(&self) -> io::Result<u64> {
Ok(fs::metadata(&self.log_path)?.len())
}
pub(crate) fn set_max_log_size(&mut self, bytes: u64) {
self.max_log_size = bytes;
}
}
#[derive(Debug, Clone)]
pub struct EventReader {
log_path: PathBuf,
archive_path: PathBuf,
}
impl EventReader {
pub fn new(dir: impl AsRef<Path>) -> Self {
let dir = dir.as_ref();
EventReader {
log_path: dir.join("app.jsonl"),
archive_path: dir.join("archive.jsonl.zst"),
}
}
pub fn read_from(
&self,
offset: u64,
) -> io::Result<impl Iterator<Item = io::Result<(Event, u64, String)>>> {
let mut file = File::open(&self.log_path)?;
file.seek(SeekFrom::Start(offset))?;
let file_len = file.metadata()?.len();
let reader = BufReader::new(file);
Ok(LogIterator {
lines: reader.lines(),
pos: offset,
file_len,
})
}
pub fn read_full(&self) -> io::Result<FullEventIter> {
let archive_iter: Box<dyn Iterator<Item = io::Result<(Event, String)>>> =
match archive::open_archive_reader(&self.archive_path)? {
Some(reader) => Box::new(EventLineIter {
reader,
buf: String::new(),
}),
None => Box::new(std::iter::empty()),
};
let file = File::open(&self.log_path)?;
let reader = BufReader::new(file);
let active_iter: Box<dyn Iterator<Item = io::Result<(Event, String)>>> =
Box::new(EventLineIter {
reader,
buf: String::new(),
});
Ok(Box::new(archive_iter.chain(active_iter)))
}
pub fn read_line_hash_before(&self, offset: u64) -> io::Result<Option<String>> {
if offset == 0 {
return Ok(None);
}
let mut file = File::open(&self.log_path)?;
let file_len = file.metadata()?.len();
if offset > file_len {
return Ok(None);
}
let newline_pos = offset - 1;
let mut start = 0u64;
if newline_pos > 0 {
let scan_start = newline_pos.saturating_sub(8192);
file.seek(SeekFrom::Start(scan_start))?;
let mut buf = vec![0u8; (newline_pos - scan_start) as usize];
file.read_exact(&mut buf)?;
if let Some(pos) = buf.iter().rposition(|&b| b == b'\n') {
start = scan_start + pos as u64 + 1;
} else {
start = scan_start;
}
}
file.seek(SeekFrom::Start(start))?;
let line_len = (newline_pos - start) as usize;
let mut line_buf = vec![0u8; line_len];
file.read_exact(&mut line_buf)?;
Ok(Some(line_hash(&line_buf)))
}
pub fn active_log_size(&self) -> io::Result<u64> {
Ok(fs::metadata(&self.log_path)?.len())
}
pub fn has_new_events(&self, offset: u64) -> io::Result<bool> {
Ok(fs::metadata(&self.log_path)?.len() > offset)
}
pub fn wait_for_events(
&self,
offset: u64,
timeout: Duration,
) -> io::Result<WaitResult> {
let current_size = self.active_log_size()?;
if current_size > offset {
return Ok(WaitResult::NewData(current_size));
}
let (tx, rx) = mpsc::channel();
let mut watcher =
notify::recommended_watcher(move |res: Result<notify::Event, _>| {
if let Ok(event) = res
&& matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_))
{
let _ = tx.send(());
}
})
.map_err(io::Error::other)?;
watcher
.watch(
self.log_path.parent().unwrap_or(&self.log_path),
RecursiveMode::NonRecursive,
)
.map_err(io::Error::other)?;
let current_size = self.active_log_size()?;
if current_size > offset {
return Ok(WaitResult::NewData(current_size));
}
match rx.recv_timeout(timeout) {
Ok(()) => {
let new_size = self.active_log_size()?;
if new_size > offset {
Ok(WaitResult::NewData(new_size))
} else {
Ok(WaitResult::Timeout)
}
}
Err(mpsc::RecvTimeoutError::Timeout) => Ok(WaitResult::Timeout),
Err(mpsc::RecvTimeoutError::Disconnected) => {
Err(io::Error::other("file watcher disconnected"))
}
}
}
pub fn log_path(&self) -> &Path {
&self.log_path
}
pub fn archive_path(&self) -> &Path {
&self.archive_path
}
}
pub struct EventLog {
writer: EventWriter,
reader: EventReader,
views: HashMap<String, Box<dyn ViewOps>>,
}
impl std::fmt::Debug for EventLog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventLog")
.field("writer", &self.writer)
.field("reader", &self.reader)
.field("view_count", &self.views.len())
.finish()
}
}
type ViewFactory = Box<dyn FnOnce(&Path) -> Box<dyn ViewOps>>;
pub struct EventLogBuilder {
dir: PathBuf,
max_log_size: u64,
lock_mode: LockMode,
view_factories: Vec<ViewFactory>,
}
impl std::fmt::Debug for EventLogBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventLogBuilder")
.field("dir", &self.dir)
.field("max_log_size", &self.max_log_size)
.field("lock_mode", &self.lock_mode)
.field("view_count", &self.view_factories.len())
.finish()
}
}
impl EventLogBuilder {
pub fn max_log_size(mut self, bytes: u64) -> Self {
self.max_log_size = bytes;
self
}
pub fn lock_mode(mut self, mode: LockMode) -> Self {
self.lock_mode = mode;
self
}
pub fn view<S>(mut self, name: &str, reducer: ReduceFn<S>) -> Self
where
S: Serialize + DeserializeOwned + Default + Clone + 'static,
{
let name = name.to_string();
self.view_factories.push(Box::new(move |views_dir| {
Box::new(View::new(&name, reducer, views_dir))
}));
self
}
pub fn open(self) -> io::Result<EventLog> {
let mut writer = EventWriter::open_with_lock(&self.dir, self.lock_mode)?;
writer.set_max_log_size(self.max_log_size);
let reader = writer.reader();
let mut views = HashMap::new();
for factory in self.view_factories {
let view = factory(writer.views_dir());
views.insert(view.view_name().to_string(), view);
}
let mut log = EventLog {
writer,
reader,
views,
};
if log.writer.max_log_size > 0
&& log.reader.active_log_size()? >= log.writer.max_log_size
{
log.rotate()?;
}
Ok(log)
}
}
pub fn line_hash(line: &[u8]) -> String {
let hash = xxhash_rust::xxh64::xxh64(line, 0);
format!("{:016x}", hash)
}
impl EventLog {
pub fn open(dir: impl AsRef<Path>) -> io::Result<Self> {
let writer = EventWriter::open(dir)?;
let reader = writer.reader();
Ok(EventLog {
writer,
reader,
views: HashMap::new(),
})
}
pub fn builder(dir: impl AsRef<Path>) -> EventLogBuilder {
EventLogBuilder {
dir: dir.as_ref().to_path_buf(),
max_log_size: 0,
lock_mode: LockMode::default(),
view_factories: Vec::new(),
}
}
pub fn append(&mut self, event: &Event) -> io::Result<AppendResult> {
let (result, needs_rotate) = self.writer.append_raw(event)?;
if needs_rotate {
self.rotate()?;
}
Ok(result)
}
pub fn append_if(
&mut self,
event: &Event,
expected_offset: u64,
expected_hash: &str,
) -> Result<AppendResult, ConditionalAppendError> {
let result = self.writer.append_if(event, expected_offset, expected_hash)?;
if self.writer.max_log_size > 0
&& self.writer.active_log_size()? >= self.writer.max_log_size
{
self.rotate()?;
}
Ok(result)
}
pub fn read_from(
&self,
offset: u64,
) -> io::Result<impl Iterator<Item = io::Result<(Event, u64, String)>>> {
self.reader.read_from(offset)
}
pub fn read_full(&self) -> io::Result<FullEventIter> {
self.reader.read_full()
}
pub fn rotate(&mut self) -> io::Result<()> {
self.writer.rotate(&self.reader, &mut self.views)
}
pub fn refresh_all(&mut self) -> io::Result<()> {
for view in self.views.values_mut() {
view.refresh_boxed(&self.reader)?;
}
Ok(())
}
pub fn view<S>(&self, name: &str) -> io::Result<&S>
where
S: Serialize + DeserializeOwned + Default + Clone + 'static,
{
let view = self.views.get(name).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("view '{name}' not found"),
)
})?;
let typed = view
.as_any()
.downcast_ref::<View<S>>()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("view '{name}' type mismatch"),
)
})?;
Ok(typed.state())
}
pub fn reader(&self) -> EventReader {
self.reader.clone()
}
pub fn writer(&self) -> &EventWriter {
&self.writer
}
pub fn writer_mut(&mut self) -> &mut EventWriter {
&mut self.writer
}
pub fn dir(&self) -> &Path {
self.writer.dir()
}
pub fn log_path(&self) -> &Path {
self.writer.log_path()
}
pub fn archive_path(&self) -> &Path {
self.writer.archive_path()
}
pub fn views_dir(&self) -> &Path {
self.writer.views_dir()
}
pub fn active_log_size(&self) -> io::Result<u64> {
self.reader.active_log_size()
}
pub fn has_new_events(&self, offset: u64) -> io::Result<bool> {
self.reader.has_new_events(offset)
}
pub fn wait_for_events(
&self,
offset: u64,
timeout: Duration,
) -> io::Result<WaitResult> {
self.reader.wait_for_events(offset, timeout)
}
pub fn read_line_hash_before(&self, offset: u64) -> io::Result<Option<String>> {
self.reader.read_line_hash_before(offset)
}
}
struct LogIterator<I> {
lines: I,
pos: u64,
file_len: u64,
}
impl<I: Iterator<Item = io::Result<String>>> Iterator for LogIterator<I> {
type Item = io::Result<(Event, u64, String)>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let line = match self.lines.next()? {
Ok(line) => line,
Err(e) => return Some(Err(e)),
};
let line_bytes = line.len() as u64;
if self.pos + line_bytes >= self.file_len {
return None;
}
let next_pos = self.pos + line_bytes + 1;
if line.is_empty() {
self.pos = next_pos;
continue;
}
let hash = line_hash(line.as_bytes());
let event: Event = match serde_json::from_str(&line) {
Ok(e) => e,
Err(e) => {
return Some(Err(io::Error::new(io::ErrorKind::InvalidData, e)));
}
};
self.pos = next_pos;
return Some(Ok((event, next_pos, hash)));
}
}
}
struct EventLineIter<R> {
reader: R,
buf: String,
}
impl<R: BufRead> Iterator for EventLineIter<R> {
type Item = io::Result<(Event, String)>;
fn next(&mut self) -> Option<Self::Item> {
loop {
self.buf.clear();
match self.reader.read_line(&mut self.buf) {
Ok(0) => return None,
Ok(_) => {
if !self.buf.ends_with('\n') {
return None;
}
let line = self.buf.trim_end_matches('\n').trim_end_matches('\r');
if line.is_empty() {
continue;
}
let hash = line_hash(line.as_bytes());
match serde_json::from_str::<Event>(line) {
Ok(event) => return Some(Ok((event, hash))),
Err(e) => {
return Some(Err(io::Error::new(io::ErrorKind::InvalidData, e)))
}
}
}
Err(e) => return Some(Err(e)),
}
}
}
}