use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::engine::engine::MeruEngine;
use crate::iceberg::Manifest;
use crate::store::traits::MeruStore;
use crate::types::MeruError;
use bytes::Bytes;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use crate::options::MirrorConfig;
const LOW_WATER_PATH: &str = "metadata/low_water.txt";
fn manifest_path(v: i64) -> String {
format!("metadata/v{v}.manifest.bin")
}
const POLL_INTERVAL: Duration = Duration::from_secs(5);
pub struct MirrorWorker {
shutdown_flag: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
handle: Option<JoinHandle<()>>,
mirror_seq: Arc<AtomicI64>,
last_upload_unix_secs: Arc<AtomicI64>,
mirror_advanced: Arc<Notify>,
wake: Arc<Notify>,
}
impl MirrorWorker {
pub fn spawn(engine: Arc<MeruEngine>, config: MirrorConfig) -> Self {
let shutdown_flag = Arc::new(AtomicBool::new(false));
let shutdown_notify = Arc::new(Notify::new());
let mirror_seq = Arc::new(AtomicI64::new(0));
let last_upload = Arc::new(AtomicI64::new(0));
let mirror_advanced = Arc::new(Notify::new());
let wake = Arc::new(Notify::new());
let state = MirrorLoopState {
shutdown_flag: shutdown_flag.clone(),
shutdown_notify: shutdown_notify.clone(),
mirror_seq: mirror_seq.clone(),
last_upload_unix_secs: last_upload.clone(),
mirror_advanced: mirror_advanced.clone(),
wake: wake.clone(),
};
let handle = tokio::spawn(async move {
mirror_loop(engine, config, state).await;
});
Self {
shutdown_flag,
shutdown_notify,
handle: Some(handle),
mirror_seq,
last_upload_unix_secs: last_upload,
mirror_advanced,
wake,
}
}
pub fn mirror_seq(&self) -> i64 {
self.mirror_seq.load(Ordering::Relaxed)
}
pub fn mirror_lag_secs(&self) -> Option<u64> {
let last = self.last_upload_unix_secs.load(Ordering::Relaxed);
if last == 0 {
return None;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(last);
Some((now - last).max(0) as u64)
}
pub(crate) fn mirror_seq_arc(&self) -> Arc<AtomicI64> {
self.mirror_seq.clone()
}
pub(crate) fn mirror_advanced_arc(&self) -> Arc<Notify> {
self.mirror_advanced.clone()
}
pub(crate) fn wake_arc(&self) -> Arc<Notify> {
self.wake.clone()
}
pub async fn shutdown(&mut self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
self.shutdown_notify.notify_waiters();
if let Some(handle) = self.handle.take() {
let _ = handle.await;
}
}
}
struct MirrorLoopState {
shutdown_flag: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
mirror_seq: Arc<AtomicI64>,
last_upload_unix_secs: Arc<AtomicI64>,
mirror_advanced: Arc<Notify>,
wake: Arc<Notify>,
}
async fn mirror_loop(engine: Arc<MeruEngine>, config: MirrorConfig, state: MirrorLoopState) {
let MirrorLoopState {
shutdown_flag,
shutdown_notify,
mirror_seq,
last_upload_unix_secs,
mirror_advanced,
wake,
} = state;
info!("mirror worker started (Issue #31 Phase 2b — observe + upload)");
let catalog_path = PathBuf::from(engine.catalog_path());
let mut last_uploaded: i64 = 0;
let alert_threshold = config.max_lag_alert_secs;
loop {
if shutdown_flag.load(Ordering::SeqCst) {
break;
}
let current = engine.current_snapshot_id();
if current > last_uploaded && current > 0 {
match mirror_snapshot(&engine, &catalog_path, &config, current).await {
Ok(()) => {
info!(
snapshot_id = current,
previous_mirror_seq = last_uploaded,
"mirror worker uploaded snapshot"
);
last_uploaded = current;
mirror_seq.store(current, Ordering::Relaxed);
mirror_advanced.notify_waiters();
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
last_upload_unix_secs.store(now_secs, Ordering::Relaxed);
}
Err(e) => {
warn!(
snapshot_id = current,
error = %e,
"mirror worker failed to upload snapshot — will retry next tick"
);
}
}
} else {
debug!(
snapshot_id = current,
"mirror worker tick — no new snapshot"
);
}
let last = last_upload_unix_secs.load(Ordering::Relaxed);
if last > 0 && alert_threshold > 0 {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(last);
let lag = (now_secs - last).max(0) as u64;
if lag >= alert_threshold {
warn!(
mirror_lag_secs = lag,
max_lag_alert_secs = alert_threshold,
mirror_seq = last_uploaded,
primary_snapshot_id = current,
"mirror worker: upload lag exceeded alert threshold — no backpressure, \
destination may be slow or unreachable"
);
}
}
tokio::select! {
_ = tokio::time::sleep(POLL_INTERVAL) => {}
_ = shutdown_notify.notified() => {}
_ = wake.notified() => {}
}
}
info!(last_uploaded_seq = last_uploaded, "mirror worker shut down");
}
async fn mirror_snapshot(
engine: &MeruEngine,
catalog_path: &std::path::Path,
config: &MirrorConfig,
version: i64,
) -> Result<(), MeruError> {
let manifest: Manifest = engine.current_manifest().await;
let semaphore = Arc::new(tokio::sync::Semaphore::new(
config.mirror_parallelism.max(1),
));
let mut join = tokio::task::JoinSet::new();
for entry in &manifest.entries {
if entry.status == "deleted" {
continue;
}
spawn_upload(
&mut join,
semaphore.clone(),
config.target.clone(),
catalog_path.to_path_buf(),
entry.path.clone(),
);
if let Some(dv_path) = entry.dv_path.clone() {
spawn_upload(
&mut join,
semaphore.clone(),
config.target.clone(),
catalog_path.to_path_buf(),
dv_path,
);
}
}
while let Some(res) = join.join_next().await {
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(join_err) => {
return Err(MeruError::ObjectStore(format!(
"mirror upload task panicked: {join_err}"
)));
}
}
}
let pb_bytes = manifest.to_protobuf()?;
match config
.target
.put_if_absent(&manifest_path(version), Bytes::from(pb_bytes))
.await
{
Ok(()) | Err(MeruError::AlreadyExists(_)) => {}
Err(e) => return Err(e),
}
config
.target
.put(LOW_WATER_PATH, Bytes::from(version.to_string()))
.await?;
Ok(())
}
fn spawn_upload(
join: &mut tokio::task::JoinSet<Result<(), MeruError>>,
semaphore: Arc<tokio::sync::Semaphore>,
target: Arc<dyn MeruStore>,
catalog_path: PathBuf,
rel_path: String,
) {
join.spawn(async move {
let _permit = semaphore
.acquire_owned()
.await
.expect("semaphore never closed");
let abs = catalog_path.join(&rel_path);
let bytes = tokio::fs::read(&abs).await.map_err(MeruError::Io)?;
match target.put_if_absent(&rel_path, Bytes::from(bytes)).await {
Ok(()) | Err(MeruError::AlreadyExists(_)) => Ok(()),
Err(e) => Err(e),
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::config::EngineConfig;
use crate::store::local::LocalFileStore;
use crate::types::schema::{ColumnDef, ColumnType, TableSchema};
fn schema() -> TableSchema {
TableSchema {
table_name: "mirror-worker-test".into(),
columns: vec![ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
}],
primary_key: vec![0],
..Default::default()
}
}
fn engine_config(tmp: &tempfile::TempDir) -> EngineConfig {
EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
..Default::default()
}
}
#[tokio::test]
async fn spawn_and_shutdown_cleanly() {
let tmp = tempfile::tempdir().unwrap();
let mirror_dir = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(engine_config(&tmp)).await.unwrap();
let store = Arc::new(LocalFileStore::new(mirror_dir.path()).unwrap());
let cfg = MirrorConfig::new(store);
let mut worker = MirrorWorker::spawn(engine, cfg);
assert_eq!(worker.mirror_seq(), 0);
tokio::time::timeout(Duration::from_secs(5), worker.shutdown())
.await
.expect("mirror worker shutdown hung past 5s");
}
#[tokio::test]
async fn double_shutdown_is_noop() {
let tmp = tempfile::tempdir().unwrap();
let mirror_dir = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(engine_config(&tmp)).await.unwrap();
let store = Arc::new(LocalFileStore::new(mirror_dir.path()).unwrap());
let mut worker = MirrorWorker::spawn(engine, MirrorConfig::new(store));
worker.shutdown().await;
worker.shutdown().await; }
#[tokio::test]
async fn mirror_lag_transitions_from_none_to_some() {
let tmp = tempfile::tempdir().unwrap();
let mirror_dir = tempfile::tempdir().unwrap();
let engine = MeruEngine::open(engine_config(&tmp)).await.unwrap();
let store = Arc::new(LocalFileStore::new(mirror_dir.path()).unwrap());
let mut worker = MirrorWorker::spawn(engine, MirrorConfig::new(store));
worker.shutdown().await;
assert_eq!(worker.mirror_lag_secs(), None);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
worker.last_upload_unix_secs.store(now, Ordering::Relaxed);
let lag = worker.mirror_lag_secs().expect("lag is Some after upload");
assert!(lag < 10, "lag should be near-zero on fresh upload: {lag}");
}
#[tokio::test]
async fn mirror_snapshot_uploads_files_manifest_and_low_water() {
use crate::iceberg::{
snapshot::{IcebergDataFile, SnapshotTransaction},
IcebergCatalog,
};
use crate::types::level::{Level, ParquetFileMeta};
let tmp = tempfile::tempdir().unwrap();
let mirror_dir = tempfile::tempdir().unwrap();
let schema = std::sync::Arc::new(schema());
let catalog = IcebergCatalog::open(tmp.path(), schema.as_ref().clone())
.await
.unwrap();
tokio::fs::create_dir_all(tmp.path().join("data/L0"))
.await
.unwrap();
let mut txn = SnapshotTransaction::new();
for i in 0..2 {
let path = format!("data/L0/f{i}.parquet");
tokio::fs::write(tmp.path().join(&path), format!("pq-body-{i}"))
.await
.unwrap();
txn.add_file(IcebergDataFile {
path,
file_size: 9,
num_rows: 100,
meta: ParquetFileMeta {
level: Level(0),
seq_min: 1,
seq_max: 10,
key_min: vec![0x01],
key_max: vec![0xFF],
num_rows: 100,
file_size: 9,
dv_path: None,
dv_offset: None,
dv_length: None,
format: None,
column_stats: None,
},
});
}
catalog.commit(&txn, schema.clone()).await.unwrap();
let engine = MeruEngine::open(engine_config(&tmp)).await.unwrap();
assert_eq!(engine.current_snapshot_id(), 1);
let store = Arc::new(LocalFileStore::new(mirror_dir.path()).unwrap());
let cfg = MirrorConfig::new(store.clone());
let catalog_path = PathBuf::from(engine.catalog_path());
super::mirror_snapshot(&engine, &catalog_path, &cfg, 1)
.await
.unwrap();
for i in 0..2 {
let path = format!("data/L0/f{i}.parquet");
let got = store.get(&path).await.unwrap();
assert_eq!(got.as_ref(), format!("pq-body-{i}").as_bytes());
}
assert!(store.exists("metadata/v1.manifest.bin").await.unwrap());
let lw = store.get("metadata/low_water.txt").await.unwrap();
assert_eq!(lw.as_ref(), b"1");
super::mirror_snapshot(&engine, &catalog_path, &cfg, 1)
.await
.unwrap();
}
}