use std::sync::Arc;
use crate::iceberg::{IcebergDataFile, SnapshotTransaction};
use crate::memtable::iterator::MemEntry;
use crate::types::{
key::InternalKey,
level::{Level, ParquetFileMeta},
sequence::SeqNum,
value::Row,
MeruError, Result,
};
use tracing::{debug, info, instrument};
use crate::engine::engine::MeruEngine;
#[instrument(skip(engine), fields(op = "flush"))]
pub async fn run_flush(engine: &Arc<MeruEngine>) -> Result<()> {
let _flush_guard = engine.flush_mutex.lock().await;
let flush_started_at = std::time::Instant::now();
let immutable = match engine.memtable.oldest_immutable() {
Some(m) => m,
None => return Ok(()), };
let first_seq = immutable.first_seq;
let last_seq = immutable.last_seq();
let read_seq = SeqNum(u64::MAX >> 8); info!(
first_seq = first_seq.0,
last_seq = last_seq.0,
"starting flush"
);
let entries: Vec<MemEntry> = immutable.iter(read_seq).collect();
if entries.is_empty() {
debug!("empty memtable, skipping flush");
engine.memtable.drop_flushed(first_seq);
return Ok(());
}
let mut rows: Vec<(InternalKey, Row)> = Vec::with_capacity(entries.len());
let mut key_min: Option<Vec<u8>> = None;
let mut key_max: Option<Vec<u8>> = None;
let mut seq_min = u64::MAX;
let mut seq_max = 0u64;
for entry in &entries {
let uk = entry.user_key.to_vec();
if key_min.is_none() {
key_min = Some(uk.clone());
}
key_max = Some(uk.clone());
if entry.seq.0 < seq_min {
seq_min = entry.seq.0;
}
if entry.seq.0 > seq_max {
seq_max = entry.seq.0;
}
let tag = (crate::types::sequence::SEQNUM_MAX.0 - entry.seq.0) << 8
| (entry.entry.op_type as u64);
let mut wire = Vec::with_capacity(uk.len() + 8);
wire.extend_from_slice(&uk);
wire.extend_from_slice(&tag.to_be_bytes());
let ikey = InternalKey::decode(&wire, &engine.schema)?;
let row = if !entry.entry.value.is_empty() {
crate::engine::codec::decode_row(&entry.entry.value)?
} else {
let pk_values = ikey.pk_values();
let mut fields: Vec<Option<crate::types::value::FieldValue>> =
vec![None; engine.schema.columns.len()];
for (pk_idx, &col_idx) in engine.schema.primary_key.iter().enumerate() {
if pk_idx < pk_values.len() {
fields[col_idx] = Some(pk_values[pk_idx].clone());
}
}
Row::new(fields)
};
rows.push((ikey, row));
}
let num_rows = rows.len() as u64;
let format = engine.config.file_format_for(Level(0));
let (parquet_bytes, _bloom_bytes, writer_meta) = crate::parquet::writer::write_sorted_rows(
rows,
engine.schema.clone(),
Level(0),
format,
engine.config.bloom_bits_per_key,
)?;
let file_id = uuid::Uuid::new_v4().to_string();
let parquet_path = format!("data/L0/{file_id}.parquet");
let full_path = engine.catalog.data_file_path(Level(0), &file_id);
engine.catalog.ensure_level_dir(Level(0)).await?;
if let Some(parent) = full_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(MeruError::Io)?;
}
let file_size = if parquet_bytes.is_empty() {
0u64
} else {
tokio::fs::write(&full_path, &parquet_bytes)
.await
.map_err(MeruError::Io)?;
tokio::fs::File::open(&full_path)
.await
.map_err(MeruError::Io)?
.sync_all()
.await
.map_err(MeruError::Io)?;
if let Some(parent) = full_path.parent() {
if let Ok(dir) = tokio::fs::File::open(parent).await {
let _ = dir.sync_all().await;
}
}
parquet_bytes.len() as u64
};
let meta = ParquetFileMeta {
level: Level(0),
seq_min: if seq_min == u64::MAX { 0 } else { seq_min },
seq_max,
key_min: key_min.unwrap_or_default(),
key_max: key_max.unwrap_or_default(),
num_rows,
file_size,
dv_path: None,
dv_offset: None,
dv_length: None,
format: Some(format),
column_stats: writer_meta.column_stats,
};
let mut txn = SnapshotTransaction::new();
txn.add_file(IcebergDataFile {
path: parquet_path.clone(),
file_size,
num_rows,
meta,
});
txn.set_prop("merutable.job", "flush");
txn.set_prop("merutable.first_seq", first_seq.0.to_string());
txn.set_prop("merutable.last_seq", last_seq.0.to_string());
let new_version = {
let _commit_guard = engine.commit_lock.lock().await;
let commit_started = std::time::Instant::now();
let v = engine.catalog.commit(&txn, engine.schema.clone()).await?;
crate::engine::metrics::record(
crate::engine::metrics::COMMIT_DURATION_SECONDS,
commit_started.elapsed().as_secs_f64(),
);
v
};
engine.version_set.install(new_version);
crate::engine::metrics::inc(crate::engine::metrics::FLUSHES_TOTAL);
crate::engine::metrics::inc(crate::engine::metrics::SNAPSHOTS_COMMITTED_TOTAL);
info!(
path = %parquet_path,
num_rows,
"flush committed"
);
engine.wal.lock().await.mark_flushed_seq(last_seq);
engine.memtable.drop_flushed(first_seq);
crate::engine::metrics::record(
crate::engine::metrics::FLUSH_DURATION_SECONDS,
flush_started_at.elapsed().as_secs_f64(),
);
crate::engine::metrics::record(crate::engine::metrics::FLUSH_OUTPUT_BYTES, file_size as f64);
Ok(())
}