use std::{
collections::{HashMap, hash_map::Entry},
fmt,
fs::File,
hash::{DefaultHasher, Hash, Hasher},
io::{BufRead, BufReader, BufWriter, Write},
path::{Path, PathBuf},
};
use eyre::{Context, Result};
use serde::{
Deserialize,
de::{DeserializeSeed, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor},
};
use super::model::CompareTransaction;
use crate::output::{
NdjsonScan, SimulationMetadata, SimulationSummary, SolChange, TokenChange,
missing_metadata_envelope, scan_ndjson_events,
};
pub(super) const TARGET_PARTITION_BYTES: u64 = 256 * 1024 * 1024;
const MAX_PARTITIONS: usize = 128;
const SPILL_WRITE_BUFFER_BYTES: usize = 64 * 1024;
pub(super) struct LoadedSide {
pub(super) partitions: Vec<PathBuf>,
pub(super) total_transactions: usize,
}
struct RecordSink {
paths: Vec<PathBuf>,
writers: Vec<BufWriter<File>>,
}
impl RecordSink {
fn new(partitions: usize, spill: &Path, label: &str) -> Result<Self> {
let mut paths = Vec::with_capacity(partitions);
let mut writers = Vec::with_capacity(partitions);
for i in 0..partitions {
let path = spill.join(format!("{label}-{i}.ndjson"));
let file = File::create(&path)
.with_context(|| format!("failed to create spill file {}", path.display()))?;
paths.push(path);
writers.push(BufWriter::with_capacity(SPILL_WRITE_BUFFER_BYTES, file));
}
Ok(Self { paths, writers })
}
fn push(&mut self, tx: &CompareTransaction) -> Result<()> {
let mut hasher = DefaultHasher::new();
tx.signature.hash(&mut hasher);
let index = (hasher.finish() % self.writers.len() as u64) as usize;
let writer = &mut self.writers[index];
serde_json::to_writer(&mut *writer, tx)?;
writer.write_all(b"\n")?;
Ok(())
}
fn finish(self) -> Result<Vec<PathBuf>> {
for mut writer in self.writers {
writer.flush()?;
}
Ok(self.paths)
}
}
pub(super) fn partition_count(baseline: &Path, experiment: &Path) -> Result<usize> {
let len = |p: &Path| -> Result<u64> {
Ok(std::fs::metadata(p)
.with_context(|| format!("failed to stat {}", p.display()))?
.len())
};
let max_len = len(baseline)?.max(len(experiment)?);
Ok(usize::try_from(max_len.div_ceil(TARGET_PARTITION_BYTES))
.unwrap_or(MAX_PARTITIONS)
.clamp(1, MAX_PARTITIONS))
}
#[derive(Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "snake_case")]
enum SlimEvent {
Metadata(#[allow(dead_code)] SimulationMetadata),
SessionStarted(IgnoredAny),
Tx(SlimTransaction),
Diff(IgnoredAny),
Summary(SimulationSummary),
}
#[derive(Deserialize)]
struct SlimTransaction {
slot: u64,
signature: String,
success: bool,
error: Option<String>,
logs: LogCount,
sol_changes: Vec<SolChange>,
token_changes: Vec<TokenChange>,
}
impl From<SlimTransaction> for CompareTransaction {
fn from(tx: SlimTransaction) -> Self {
Self {
slot: tx.slot,
signature: tx.signature,
success: tx.success,
error: tx.error,
log_count: tx.logs.0,
sol_changes: tx.sol_changes,
token_changes: tx.token_changes,
}
}
}
struct LogCount(usize);
impl<'de> Deserialize<'de> for LogCount {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
struct CountVisitor;
impl<'de> Visitor<'de> for CountVisitor {
type Value = LogCount;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("an array of log lines")
}
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<LogCount, A::Error> {
let mut count = 0;
while seq.next_element::<IgnoredAny>()?.is_some() {
count += 1;
}
Ok(LogCount(count))
}
}
deserializer.deserialize_seq(CountVisitor)
}
}
pub(super) fn load_side(
path: &Path,
partitions: usize,
spill: &Path,
label: &str,
) -> Result<LoadedSide> {
let mut sink = RecordSink::new(partitions, spill, label)?;
let file = File::open(path).with_context(|| format!("failed to read {}", path.display()))?;
let mut tx_count = 0_usize;
let mut seen_metadata = false;
let mut summary_total: Option<usize> = None;
let scan = scan_ndjson_events(BufReader::new(file), path, &mut |event| {
match event {
SlimEvent::Metadata(_) => seen_metadata = true,
SlimEvent::Tx(tx) => {
tx_count += 1;
sink.push(&CompareTransaction::from(tx))?;
}
SlimEvent::Summary(summary) => summary_total = Some(summary.total_transactions),
SlimEvent::SessionStarted(_) | SlimEvent::Diff(_) => {}
}
Ok(())
})?;
if let NdjsonScan::NotAnEnvelopeStream { .. } = scan {
return load_legacy_side(path, sink);
}
if !seen_metadata {
return Err(missing_metadata_envelope(path));
}
Ok(LoadedSide {
partitions: sink.finish()?,
total_transactions: summary_total.unwrap_or(tx_count),
})
}
fn load_legacy_side(path: &Path, mut sink: RecordSink) -> Result<LoadedSide> {
let file = File::open(path).with_context(|| format!("failed to read {}", path.display()))?;
let mut deserializer = serde_json::Deserializer::from_reader(BufReader::new(file));
let total_transactions = deserializer
.deserialize_map(LegacyBlobVisitor { sink: &mut sink })
.with_context(|| {
format!(
"{} is neither an NDJSON event stream nor a single-blob `SimulationOutput` JSON file",
path.display()
)
})?;
Ok(LoadedSide {
partitions: sink.finish()?,
total_transactions,
})
}
struct LegacyBlobVisitor<'a> {
sink: &'a mut RecordSink,
}
impl<'de> Visitor<'de> for LegacyBlobVisitor<'_> {
type Value = usize;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("a `SimulationOutput` object")
}
fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<usize, A::Error> {
let mut seen_metadata = false;
let mut seen_transactions = false;
let mut total: Option<usize> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"metadata" => {
if seen_metadata {
return Err(serde::de::Error::duplicate_field("metadata"));
}
seen_metadata = true;
map.next_value::<SimulationMetadata>()?;
}
"transactions" => {
if seen_transactions {
return Err(serde::de::Error::duplicate_field("transactions"));
}
seen_transactions = true;
map.next_value_seed(SpillTransactions {
sink: &mut *self.sink,
})?;
}
"summary" => {
if total.is_some() {
return Err(serde::de::Error::duplicate_field("summary"));
}
total = Some(map.next_value::<SimulationSummary>()?.total_transactions);
}
_ => {
map.next_value::<IgnoredAny>()?;
}
}
}
if !seen_metadata {
return Err(serde::de::Error::missing_field("metadata"));
}
if !seen_transactions {
return Err(serde::de::Error::missing_field("transactions"));
}
total.ok_or_else(|| serde::de::Error::missing_field("summary"))
}
}
struct SpillTransactions<'a> {
sink: &'a mut RecordSink,
}
impl<'de> DeserializeSeed<'de> for SpillTransactions<'_> {
type Value = ();
fn deserialize<D: Deserializer<'de>>(self, deserializer: D) -> Result<(), D::Error> {
deserializer.deserialize_seq(self)
}
}
impl<'de> Visitor<'de> for SpillTransactions<'_> {
type Value = ();
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("an array of transactions")
}
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<(), A::Error> {
while let Some(tx) = seq.next_element::<SlimTransaction>()? {
self.sink
.push(&CompareTransaction::from(tx))
.map_err(serde::de::Error::custom)?;
}
Ok(())
}
}
pub(super) fn read_partition(path: &Path) -> Result<HashMap<String, CompareTransaction>> {
let file = File::open(path).with_context(|| format!("failed to read {}", path.display()))?;
let mut reader = BufReader::new(file);
let mut line = String::new();
let mut records: HashMap<String, CompareTransaction> = HashMap::new();
loop {
line.clear();
if reader.read_line(&mut line)? == 0 {
return Ok(records);
}
let tx: CompareTransaction = serde_json::from_str(&line)?;
match records.entry(tx.signature.clone()) {
Entry::Vacant(vacant) => {
vacant.insert(tx);
}
Entry::Occupied(mut occupied) => {
if tx.slot >= occupied.get().slot {
occupied.insert(tx);
}
}
}
}
}