harn-vm 0.8.43

Async bytecode virtual machine for the Harn programming language
Documentation
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};

use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};

use super::util::{
    dir_size_bytes, now_ms, prepare_event_after, sanitize_filename, stream_from_broadcast,
    sync_tree, write_json_atomically, BroadcastMap,
};
use super::{
    AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
    EventLogDescription, LogError, LogEvent, LogEventBytes, Topic,
};

#[derive(Serialize, Deserialize)]
struct FileRecord {
    id: EventId,
    event: LogEvent,
}

pub struct FileEventLog {
    root: PathBuf,
    latest_ids: Mutex<HashMap<String, EventId>>,
    write_lock: Mutex<()>,
    pub(super) broadcasts: BroadcastMap,
    pub(super) queue_depth: usize,
}

impl FileEventLog {
    pub fn open(root: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
        std::fs::create_dir_all(root.join("topics"))
            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
        std::fs::create_dir_all(root.join("consumers"))
            .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
        Ok(Self {
            root,
            latest_ids: Mutex::new(HashMap::new()),
            write_lock: Mutex::new(()),
            broadcasts: BroadcastMap::default(),
            queue_depth: queue_depth.max(1),
        })
    }

    fn topic_path(&self, topic: &Topic) -> PathBuf {
        self.root
            .join("topics")
            .join(format!("{}.jsonl", topic.as_str()))
    }

    fn consumer_path(&self, topic: &Topic, consumer: &ConsumerId) -> PathBuf {
        self.root.join("consumers").join(format!(
            "{}__{}.json",
            topic.as_str(),
            sanitize_filename(consumer.as_str())
        ))
    }

    fn latest_id_for_topic(&self, topic: &Topic) -> Result<EventId, LogError> {
        if let Some(event_id) = self
            .latest_ids
            .lock()
            .expect("file event log latest ids poisoned")
            .get(topic.as_str())
            .copied()
        {
            return Ok(event_id);
        }

        let mut latest = 0;
        let path = self.topic_path(topic);
        if path.is_file() {
            for record in read_file_records(&path)? {
                latest = record.id;
            }
        }
        self.latest_ids
            .lock()
            .expect("file event log latest ids poisoned")
            .insert(topic.as_str().to_string(), latest);
        Ok(latest)
    }

    fn read_range_sync(
        &self,
        topic: &Topic,
        from: Option<EventId>,
        limit: usize,
    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
        let path = self.topic_path(topic);
        if !path.is_file() {
            return Ok(Vec::new());
        }
        let from = from.unwrap_or(0);
        let mut events = Vec::new();
        for record in read_file_records(&path)? {
            if record.id > from {
                events.push((record.id, record.event));
            }
            if events.len() >= limit {
                break;
            }
        }
        Ok(events)
    }

    pub(super) fn topics(&self) -> Result<Vec<Topic>, LogError> {
        let topics_dir = self.root.join("topics");
        if !topics_dir.is_dir() {
            return Ok(Vec::new());
        }
        let mut topics = Vec::new();
        for entry in std::fs::read_dir(&topics_dir)
            .map_err(|error| LogError::Io(format!("event log topics read error: {error}")))?
        {
            let entry = entry
                .map_err(|error| LogError::Io(format!("event log topic entry error: {error}")))?;
            let path = entry.path();
            if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") {
                continue;
            }
            let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
                continue;
            };
            topics.push(Topic::new(stem.to_string())?);
        }
        topics.sort_by(|left, right| left.as_str().cmp(right.as_str()));
        Ok(topics)
    }

    pub(super) fn append_idempotent_by_header(
        &self,
        topic: &Topic,
        header: &str,
        value: &str,
        event: LogEvent,
    ) -> Result<AppendOutcome, LogError> {
        let _guard = self
            .write_lock
            .lock()
            .expect("file event log write lock poisoned");
        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
        if let Some((event_id, existing)) = existing_events.iter().find(|(_, event)| {
            event
                .headers
                .get(header)
                .is_some_and(|found| found == value)
        }) {
            return Ok(AppendOutcome {
                event_id: *event_id,
                event: existing.clone(),
                inserted: false,
            });
        }

        let next_id = self.latest_id_for_topic(topic)? + 1;
        let previous = existing_events
            .last()
            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
        let event = prepare_event_after(topic, next_id, previous, event)?;
        self.append_record_locked(topic, next_id, event)
    }

    fn append_record_locked(
        &self,
        topic: &Topic,
        event_id: EventId,
        event: LogEvent,
    ) -> Result<AppendOutcome, LogError> {
        let record = FileRecord {
            id: event_id,
            event: event.clone(),
        };
        let path = self.topic_path(topic);
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent)
                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
        }
        let line = serde_json::to_string(&record)
            .map_err(|error| LogError::Serde(format!("event log encode error: {error}")))?;
        use std::io::Write as _;
        let mut file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(&path)
            .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
        writeln!(file, "{line}")
            .map_err(|error| LogError::Io(format!("event log write error: {error}")))?;
        self.latest_ids
            .lock()
            .expect("file event log latest ids poisoned")
            .insert(topic.as_str().to_string(), event_id);
        self.broadcasts
            .publish(topic, self.queue_depth, (event_id, event.clone()));
        Ok(AppendOutcome {
            event_id,
            event,
            inserted: true,
        })
    }
}

fn read_file_records(path: &Path) -> Result<Vec<FileRecord>, LogError> {
    let file = std::fs::File::open(path)
        .map_err(|error| LogError::Io(format!("event log open error: {error}")))?;
    let mut reader = std::io::BufReader::new(file);
    let mut records = Vec::new();
    let mut line = Vec::new();
    loop {
        line.clear();
        let bytes_read = std::io::BufRead::read_until(&mut reader, b'\n', &mut line)
            .map_err(|error| LogError::Io(format!("event log read error: {error}")))?;
        if bytes_read == 0 {
            break;
        }
        if line.iter().all(u8::is_ascii_whitespace) {
            continue;
        }
        let complete_line = line.ends_with(b"\n");
        match serde_json::from_slice::<FileRecord>(&line) {
            Ok(record) => records.push(record),
            Err(_) if !complete_line => break,
            Err(error) => {
                return Err(LogError::Serde(format!("event log parse error: {error}")));
            }
        }
    }
    Ok(records)
}

impl EventLog for FileEventLog {
    fn describe(&self) -> EventLogDescription {
        EventLogDescription {
            backend: EventLogBackendKind::File,
            location: Some(self.root.clone()),
            size_bytes: Some(dir_size_bytes(&self.root)),
            queue_depth: self.queue_depth,
        }
    }

    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
        let _guard = self
            .write_lock
            .lock()
            .expect("file event log write lock poisoned");
        let next_id = self.latest_id_for_topic(topic)? + 1;
        let existing_events = self.read_range_sync(topic, None, usize::MAX)?;
        let previous = existing_events
            .last()
            .map(|(previous_id, previous_event)| (*previous_id, previous_event));
        let event = prepare_event_after(topic, next_id, previous, event)?;
        self.append_record_locked(topic, next_id, event)
            .map(|outcome| outcome.event_id)
    }

    async fn flush(&self) -> Result<(), LogError> {
        sync_tree(&self.root)
    }

    async fn read_range(
        &self,
        topic: &Topic,
        from: Option<EventId>,
        limit: usize,
    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
        self.read_range_sync(topic, from, limit)
    }

    async fn read_range_bytes(
        &self,
        topic: &Topic,
        from: Option<EventId>,
        limit: usize,
    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
        self.read_range_sync(topic, from, limit)?
            .into_iter()
            .map(|(event_id, event)| Ok((event_id, event.try_into()?)))
            .collect()
    }

    async fn subscribe(
        self: Arc<Self>,
        topic: &Topic,
        from: Option<EventId>,
    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
        let history = self.read_range_sync(topic, from, usize::MAX)?;
        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
    }

    async fn ack(
        &self,
        topic: &Topic,
        consumer: &ConsumerId,
        up_to: EventId,
    ) -> Result<(), LogError> {
        let path = self.consumer_path(topic, consumer);
        let payload = serde_json::json!({
            "topic": topic.as_str(),
            "consumer_id": consumer.as_str(),
            "cursor": up_to,
            "updated_at_ms": now_ms(),
        });
        write_json_atomically(&path, &payload)
    }

    async fn consumer_cursor(
        &self,
        topic: &Topic,
        consumer: &ConsumerId,
    ) -> Result<Option<EventId>, LogError> {
        let path = self.consumer_path(topic, consumer);
        if !path.is_file() {
            return Ok(None);
        }
        let raw = std::fs::read_to_string(&path)
            .map_err(|error| LogError::Io(format!("event log consumer read error: {error}")))?;
        let payload: serde_json::Value = serde_json::from_str(&raw)
            .map_err(|error| LogError::Serde(format!("event log consumer parse error: {error}")))?;
        let cursor = payload
            .get("cursor")
            .and_then(serde_json::Value::as_u64)
            .ok_or_else(|| {
                LogError::Serde("event log consumer record missing numeric cursor".to_string())
            })?;
        Ok(Some(cursor))
    }

    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
        let latest = self.latest_id_for_topic(topic)?;
        if latest == 0 {
            Ok(None)
        } else {
            Ok(Some(latest))
        }
    }

    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
        let _guard = self
            .write_lock
            .lock()
            .expect("file event log write lock poisoned");
        let path = self.topic_path(topic);
        if !path.is_file() {
            return Ok(CompactReport::default());
        }
        let retained = self.read_range_sync(topic, Some(before), usize::MAX)?;
        let removed = self.read_range_sync(topic, None, usize::MAX)?.len() - retained.len();
        if retained.is_empty() {
            let _ = std::fs::remove_file(&path);
        } else {
            crate::atomic_io::atomic_write_with(&path, |writer| {
                use std::io::Write as _;
                for (event_id, event) in &retained {
                    let line = serde_json::to_string(&FileRecord {
                        id: *event_id,
                        event: event.clone(),
                    })
                    .map_err(|error| std::io::Error::other(error.to_string()))?;
                    writeln!(writer, "{line}")?;
                }
                Ok(())
            })
            .map_err(|error| LogError::Io(format!("event log compact finalize error: {error}")))?;
        }
        let latest = retained.last().map(|(event_id, _)| *event_id);
        self.latest_ids
            .lock()
            .expect("file event log latest ids poisoned")
            .insert(topic.as_str().to_string(), latest.unwrap_or(0));
        Ok(CompactReport {
            removed,
            remaining: retained.len(),
            latest,
            checkpointed: false,
        })
    }
}