rustcdc 0.6.4

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
use std::{
    collections::HashSet,
    path::Path,
    time::{Duration, Instant},
};

pub struct WorkerMarker {
    pub events: usize,
    pub acked: bool,
    pub ids: HashSet<String>,
}

pub fn wait_for_marker(path: &Path, timeout: Duration) -> rustcdc::Result<()> {
    let deadline = Instant::now() + timeout;
    while Instant::now() < deadline {
        if path.exists() && read_worker_marker(path).is_ok() {
            return Ok(());
        }
        std::thread::sleep(Duration::from_millis(50));
    }

    Err(rustcdc::Error::TimeoutError(format!(
        "timed out waiting for crash worker marker at {}",
        path.display()
    )))
}

pub fn read_worker_batch_len(path: &Path) -> rustcdc::Result<usize> {
    Ok(read_worker_marker(path)?.events)
}

pub fn read_worker_marker(path: &Path) -> rustcdc::Result<WorkerMarker> {
    let marker = std::fs::read_to_string(path).map_err(rustcdc::Error::IoError)?;
    let mut events = None;
    let mut acked = false;
    let mut ids = HashSet::new();

    for line in marker.lines() {
        if let Some(value) = line.strip_prefix("events=") {
            if value.is_empty() {
                return Err(rustcdc::Error::StateError(
                    "worker marker events field is empty".into(),
                ));
            }
            events = Some(value.parse::<usize>().map_err(|error| {
                rustcdc::Error::StateError(format!("invalid worker marker events: {error}"))
            })?);
        } else if let Some(value) = line.strip_prefix("acked=") {
            acked = value == "1";
        } else if let Some(value) = line.strip_prefix("ids=") {
            for id in value.split(',').map(str::trim).filter(|id| !id.is_empty()) {
                ids.insert(id.to_string());
            }
        }
    }

    let events = events
        .ok_or_else(|| rustcdc::Error::StateError("worker marker missing events field".into()))?;

    Ok(WorkerMarker { events, acked, ids })
}