#![allow(deprecated)]
use std::{
fs::File,
io::{self, BufReader},
path::Path,
};
use crate::{Compression, HasRType, Metadata, RecordRef, VersionUpgradePolicy};
use super::{
dbn, dbz, private, zstd, DbnMetadata, DecodeRecord, DecodeRecordRef, DecodeStream,
StreamIterDecoder,
};
pub struct DynDecoder<'a, R>(DynDecoderImpl<'a, R>)
where
R: io::BufRead;
enum DynDecoderImpl<'a, R>
where
R: io::BufRead,
{
Dbn(dbn::Decoder<R>),
ZstdDbn(dbn::Decoder<::zstd::stream::Decoder<'a, R>>),
LegacyDbz(dbz::Decoder<R>),
}
impl<R> DynDecoder<'_, BufReader<R>>
where
R: io::Read,
{
pub fn new(
reader: R,
compression: Compression,
upgrade_policy: VersionUpgradePolicy,
) -> crate::Result<Self> {
Self::with_buffer(BufReader::new(reader), compression, upgrade_policy)
}
pub fn new_inferred(reader: R, upgrade_policy: VersionUpgradePolicy) -> crate::Result<Self> {
Self::inferred_with_buffer(BufReader::new(reader), upgrade_policy)
}
}
impl<R> DynDecoder<'_, R>
where
R: io::BufRead,
{
pub fn with_buffer(
reader: R,
compression: Compression,
upgrade_policy: VersionUpgradePolicy,
) -> crate::Result<Self> {
match compression {
Compression::None => Ok(Self(DynDecoderImpl::Dbn(
dbn::Decoder::with_upgrade_policy(reader, upgrade_policy)?,
))),
Compression::Zstd => Ok(Self(DynDecoderImpl::ZstdDbn(
dbn::Decoder::with_upgrade_policy(
::zstd::stream::Decoder::with_buffer(reader)
.map_err(|e| crate::Error::io(e, "creating zstd decoder"))?,
upgrade_policy,
)?,
))),
}
}
pub fn inferred_with_buffer(
mut reader: R,
upgrade_policy: VersionUpgradePolicy,
) -> crate::Result<Self> {
let first_bytes = reader
.fill_buf()
.map_err(|e| crate::Error::io(e, "creating buffer to infer encoding"))?;
if dbz::starts_with_prefix(first_bytes) {
Ok(Self(DynDecoderImpl::LegacyDbz(
dbz::Decoder::with_upgrade_policy(reader, upgrade_policy)?,
)))
} else if dbn::starts_with_prefix(first_bytes) {
Ok(Self(DynDecoderImpl::Dbn(
dbn::Decoder::with_upgrade_policy(reader, upgrade_policy)?,
)))
} else if zstd::starts_with_prefix(first_bytes) {
Ok(Self(DynDecoderImpl::ZstdDbn(
dbn::Decoder::with_upgrade_policy(
::zstd::stream::Decoder::with_buffer(reader)
.map_err(|e| crate::Error::io(e, "creating zstd decoder"))?,
upgrade_policy,
)?,
)))
} else {
Err(crate::Error::decode("unable to determine encoding"))
}
}
}
impl DynDecoder<'_, BufReader<File>> {
pub fn from_file(
path: impl AsRef<Path>,
upgrade_policy: VersionUpgradePolicy,
) -> crate::Result<Self> {
let file = File::open(path.as_ref()).map_err(|e| {
crate::Error::io(
e,
format!(
"opening file to decode at path '{}'",
path.as_ref().display()
),
)
})?;
DynDecoder::new_inferred(file, upgrade_policy)
}
}
impl<R> DecodeRecordRef for DynDecoder<'_, R>
where
R: io::BufRead,
{
fn decode_record_ref(&mut self) -> crate::Result<Option<RecordRef<'_>>> {
match &mut self.0 {
DynDecoderImpl::Dbn(decoder) => decoder.decode_record_ref(),
DynDecoderImpl::ZstdDbn(decoder) => decoder.decode_record_ref(),
DynDecoderImpl::LegacyDbz(decoder) => decoder.decode_record_ref(),
}
}
}
impl<R> DbnMetadata for DynDecoder<'_, R>
where
R: io::BufRead,
{
fn metadata(&self) -> &Metadata {
match &self.0 {
DynDecoderImpl::Dbn(decoder) => decoder.metadata(),
DynDecoderImpl::ZstdDbn(decoder) => decoder.metadata(),
DynDecoderImpl::LegacyDbz(decoder) => decoder.metadata(),
}
}
fn metadata_mut(&mut self) -> &mut Metadata {
match &mut self.0 {
DynDecoderImpl::Dbn(decoder) => decoder.metadata_mut(),
DynDecoderImpl::ZstdDbn(decoder) => decoder.metadata_mut(),
DynDecoderImpl::LegacyDbz(decoder) => decoder.metadata_mut(),
}
}
}
impl<R> DecodeRecord for DynDecoder<'_, R>
where
R: io::BufRead,
{
fn decode_record<T: HasRType>(&mut self) -> crate::Result<Option<&T>> {
match &mut self.0 {
DynDecoderImpl::Dbn(decoder) => decoder.decode_record(),
DynDecoderImpl::ZstdDbn(decoder) => decoder.decode_record(),
DynDecoderImpl::LegacyDbz(decoder) => decoder.decode_record(),
}
}
}
impl<R> DecodeStream for DynDecoder<'_, R>
where
R: io::BufRead,
{
fn decode_stream<T: HasRType>(self) -> StreamIterDecoder<Self, T>
where
Self: Sized,
{
StreamIterDecoder::new(self)
}
}
impl<R> private::LastRecord for DynDecoder<'_, R>
where
R: io::BufRead,
{
fn last_record(&self) -> Option<RecordRef<'_>> {
match &self.0 {
DynDecoderImpl::Dbn(decoder) => decoder.last_record(),
DynDecoderImpl::ZstdDbn(decoder) => decoder.last_record(),
DynDecoderImpl::LegacyDbz(decoder) => decoder.last_record(),
}
}
}
#[cfg(test)]
mod tests {
use std::io::Read;
use crate::{decode::tests::TEST_DATA_PATH, enums::VersionUpgradePolicy};
use super::*;
#[test]
fn test_detects_any_dbn_version_as_dbn() {
let mut buf = Vec::new();
let mut file = File::open(format!("{TEST_DATA_PATH}/test_data.mbo.v3.dbn")).unwrap();
file.read_to_end(&mut buf).unwrap();
buf[3] = crate::DBN_VERSION + 1;
let res = DynDecoder::new_inferred(io::Cursor::new(buf), VersionUpgradePolicy::default());
assert!(matches!(res, Err(e) if e
.to_string()
.contains("can't decode newer version of DBN")));
}
}