use std::{
cmp::Ordering,
collections::HashMap,
fmt::{self, Write},
num::NonZeroU64,
path::{Path, PathBuf},
str::FromStr,
};
use dbn::{Compression, Encoding, SType, Schema};
use futures::StreamExt;
use hex::ToHex;
use reqwest::RequestBuilder;
use serde::{de, Deserialize, Deserializer};
use sha2::{Digest, Sha256};
use time::OffsetDateTime;
use tokio::{
fs::File,
io::{AsyncReadExt, BufWriter},
};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use typed_builder::TypedBuilder;
use crate::{
deserialize::{deserialize_date_time, deserialize_opt_date_time},
historical::{check_http_error, AddToForm, Limit, ReqwestForm},
Error, Symbols,
};
use super::{handle_response, DateTimeRange};
#[derive(Debug)]
pub struct BatchClient<'a> {
pub(crate) inner: &'a mut super::Client,
}
struct SplitSize(Option<NonZeroU64>);
impl AddToForm<SplitSize> for ReqwestForm {
fn add_to_form(mut self, SplitSize(split_size): &SplitSize) -> Self {
if let Some(split_size) = split_size {
self.push(("split_size", split_size.to_string()));
}
self
}
}
impl AddToForm<SplitDuration> for ReqwestForm {
fn add_to_form(mut self, split_duration: &SplitDuration) -> Self {
self.push(("split_duration", split_duration.to_string()));
self
}
}
impl BatchClient<'_> {
#[instrument(name = "batch.submit_job")]
pub async fn submit_job(&mut self, params: &SubmitJobParams) -> crate::Result<BatchJob> {
let form = vec![
("dataset", params.dataset.to_string()),
("schema", params.schema.to_string()),
("encoding", params.encoding.to_string()),
("compression", params.compression.to_string()),
("pretty_px", params.pretty_px.to_string()),
("pretty_ts", params.pretty_ts.to_string()),
("map_symbols", params.map_symbols.to_string()),
("split_symbols", params.split_symbols.to_string()),
("delivery", params.delivery.to_string()),
("stype_in", params.stype_in.to_string()),
("stype_out", params.stype_out.to_string()),
("symbols", params.symbols.to_api_string()),
]
.add_to_form(¶ms.date_time_range)
.add_to_form(&Limit(params.limit))
.add_to_form(&SplitSize(params.split_size))
.add_to_form(¶ms.split_duration);
let builder = self.post("submit_job")?.form(&form);
let resp = builder.send().await?;
handle_response(resp).await
}
#[instrument(name = "batch.list_jobs")]
pub async fn list_jobs(&mut self, params: &ListJobsParams) -> crate::Result<Vec<BatchJob>> {
let mut builder = self.get("list_jobs")?;
if let Some(ref states) = params.states {
let states_str = states.iter().fold(String::new(), |mut acc, s| {
if acc.is_empty() {
s.as_str().to_owned()
} else {
write!(acc, ",{}", s.as_str()).unwrap();
acc
}
});
builder = builder.query(&[("states", states_str)]);
}
if let Some(ref since) = params.since {
builder = builder.query(&[("since", &since.unix_timestamp_nanos().to_string())]);
}
let resp = builder.send().await?;
handle_response(resp).await
}
#[instrument(name = "batch.list_files")]
pub async fn list_files(&mut self, job_id: &str) -> crate::Result<Vec<BatchFileDesc>> {
let resp = self
.get("list_files")?
.query(&[("job_id", job_id)])
.send()
.await?;
handle_response(resp).await
}
#[instrument(name = "batch.download")]
pub async fn download(&mut self, params: &DownloadParams) -> crate::Result<Vec<PathBuf>> {
let job_dir = params.output_dir.join(¶ms.job_id);
if job_dir.exists() {
if !job_dir.is_dir() {
return Err(Error::bad_arg(
"output_dir",
"exists but is not a directory",
));
}
} else {
tokio::fs::create_dir_all(&job_dir).await?;
}
let job_files = self.list_files(¶ms.job_id).await?;
if let Some(filename_to_download) = params.filename_to_download.as_ref() {
let Some(file_desc) = job_files
.iter()
.find(|file| file.filename == *filename_to_download)
else {
return Err(Error::bad_arg(
"filename_to_download",
"not found for batch job",
));
};
let output_path = job_dir.join(filename_to_download);
let https_url = file_desc
.urls
.get("https")
.ok_or_else(|| Error::internal("Missing https URL for batch file"))?;
self.download_file(https_url, &output_path, &file_desc.hash, file_desc.size)
.await?;
Ok(vec![output_path])
} else {
let mut paths = Vec::with_capacity(job_files.len());
for file_desc in job_files.iter() {
let output_path = params
.output_dir
.join(¶ms.job_id)
.join(&file_desc.filename);
let https_url = file_desc
.urls
.get("https")
.ok_or_else(|| Error::internal("Missing https URL for batch file"))?;
self.download_file(https_url, &output_path, &file_desc.hash, file_desc.size)
.await?;
paths.push(output_path);
}
Ok(paths)
}
}
#[instrument(name = "batch.download_file")]
async fn download_file(
&mut self,
url: &str,
path: &Path,
hash: &str,
exp_size: u64,
) -> crate::Result<()> {
const MAX_RETRIES: usize = 5;
let url = reqwest::Url::parse(url)
.map_err(|e| Error::internal(format!("Unable to parse URL: {e:?}")))?;
let Some((hash_algo, exp_hash_hex)) = hash.split_once(':') else {
return Err(Error::internal("Unexpected hash string format {hash:?}"));
};
let mut hasher = if hash_algo == "sha256" {
Some(Sha256::new())
} else {
warn!(
hash_algo,
"Skipping checksum with unsupported hash algorithm"
);
None
};
let span = info_span!("BatchDownload", %url, path=%path.display());
async move {
let mut retries = 0;
'retry: loop {
let mut req = self.inner.get_with_path(url.path())?;
match Self::check_if_exists(path, exp_size, &mut hasher).await? {
Header::Skip => {
return Ok(());
}
Header::Range(Some((key, val))) => {
req = req.header(key, val);
}
Header::Range(None) => {}
}
let resp = req.send().await?;
let mut stream = check_http_error(resp).await?.bytes_stream();
info!("Downloading file");
let mut output = BufWriter::new(
tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.write(true)
.open(path)
.await?,
);
while let Some(chunk) = stream.next().await {
let chunk = match chunk {
Ok(chunk) => chunk,
Err(err) if retries < MAX_RETRIES => {
retries += 1;
error!(?err, retries, "Retrying download");
continue 'retry;
}
Err(err) => {
return Err(crate::Error::from(err));
}
};
if retries > 0 {
retries = 0;
info!("Resumed download");
}
if let Some(hasher) = hasher.as_mut() {
hasher.update(&chunk)
}
tokio::io::copy(&mut chunk.as_ref(), &mut output).await?;
}
debug!("Completed download");
Self::verify_hash(hasher, exp_hash_hex).await;
return Ok(());
}
}
.instrument(span)
.await
}
async fn check_if_exists(
path: &Path,
exp_size: u64,
hasher: &mut Option<Sha256>,
) -> crate::Result<Header> {
let Ok(metadata) = tokio::fs::metadata(path).await else {
return Ok(Header::Range(None));
};
let actual_size = metadata.len();
match actual_size.cmp(&exp_size) {
Ordering::Less => {
debug!(
prev_downloaded_bytes = actual_size,
total_bytes = exp_size,
"Found existing file, resuming download"
);
if let Some(hasher) = hasher {
let mut buf = vec![0; 1 << 23];
let mut file = File::open(path).await?;
loop {
let read_size = file.read(&mut buf).await?;
if read_size == 0 {
break;
}
hasher.update(&buf[..read_size]);
}
}
}
Ordering::Equal => {
debug!("Skipping download as file already exists and matches expected size");
return Ok(Header::Skip);
}
Ordering::Greater => {
return Err(crate::Error::Io(std::io::Error::other(format!(
"Batch file {} already exists with size {actual_size} which is larger than expected size {exp_size}",
path.file_name().unwrap().display(),
))));
}
}
Ok(Header::Range(Some((
"Range",
format!("bytes={}-", metadata.len()),
))))
}
async fn verify_hash(hasher: Option<Sha256>, exp_hash_hex: &str) {
let Some(hasher) = hasher else {
return;
};
let hash_hex = hasher.finalize().encode_hex::<String>();
if hash_hex != exp_hash_hex {
warn!(
hash_hex,
exp_hash_hex, "Downloaded file failed checksum verification"
);
} else {
debug!("Successfully verified checksum");
}
}
const PATH_PREFIX: &'static str = "batch";
fn get(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
self.inner.get(&format!("{}.{slug}", Self::PATH_PREFIX))
}
fn post(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
self.inner.post(&format!("{}.{slug}", Self::PATH_PREFIX))
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum SplitDuration {
#[default]
Day,
Week,
Month,
None,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum Delivery {
#[default]
Download,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum JobState {
Queued,
Processing,
Done,
Expired,
}
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct SubmitJobParams {
#[builder(setter(transform = |dt: impl ToString| dt.to_string()))]
pub dataset: String,
#[builder(setter(into))]
pub symbols: Symbols,
pub schema: Schema,
#[builder(setter(into))]
pub date_time_range: DateTimeRange,
#[builder(default = Encoding::Dbn)]
pub encoding: Encoding,
#[builder(default = Compression::Zstd)]
pub compression: Compression,
#[builder(default)]
pub pretty_px: bool,
#[builder(default)]
pub pretty_ts: bool,
#[builder(default_code = "*encoding != Encoding::Dbn")]
pub map_symbols: bool,
#[builder(default)]
pub split_symbols: bool,
#[builder(default)]
pub split_duration: SplitDuration,
#[builder(default, setter(strip_option))]
pub split_size: Option<NonZeroU64>,
#[builder(default)]
pub delivery: Delivery,
#[builder(default = SType::RawSymbol)]
pub stype_in: SType,
#[builder(default = SType::InstrumentId)]
pub stype_out: SType,
#[builder(default)]
pub limit: Option<NonZeroU64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct BatchJob {
pub id: String,
pub user_id: Option<String>,
pub cost_usd: Option<f64>,
pub dataset: String,
pub symbols: Symbols,
pub stype_in: SType,
pub stype_out: SType,
pub schema: Schema,
#[serde(deserialize_with = "deserialize_date_time")]
pub start: OffsetDateTime,
#[serde(deserialize_with = "deserialize_date_time")]
pub end: OffsetDateTime,
pub limit: Option<NonZeroU64>,
pub encoding: Encoding,
#[serde(deserialize_with = "deserialize_compression")]
pub compression: Compression,
pub pretty_px: bool,
pub pretty_ts: bool,
pub map_symbols: bool,
pub split_symbols: bool,
pub split_duration: SplitDuration,
pub split_size: Option<NonZeroU64>,
pub delivery: Delivery,
pub record_count: Option<u64>,
pub billed_size: Option<u64>,
pub actual_size: Option<u64>,
pub package_size: Option<u64>,
pub state: JobState,
#[serde(deserialize_with = "deserialize_date_time")]
pub ts_received: OffsetDateTime,
#[serde(deserialize_with = "deserialize_opt_date_time")]
pub ts_queued: Option<OffsetDateTime>,
#[serde(deserialize_with = "deserialize_opt_date_time")]
pub ts_process_start: Option<OffsetDateTime>,
#[serde(deserialize_with = "deserialize_opt_date_time")]
pub ts_process_done: Option<OffsetDateTime>,
#[serde(deserialize_with = "deserialize_opt_date_time")]
pub ts_expiration: Option<OffsetDateTime>,
#[serde(default)]
pub progress: Option<u8>,
}
#[derive(Debug, Clone, Default, TypedBuilder, PartialEq, Eq)]
pub struct ListJobsParams {
#[builder(default, setter(strip_option))]
pub states: Option<Vec<JobState>>,
#[builder(default, setter(strip_option))]
pub since: Option<OffsetDateTime>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct BatchFileDesc {
pub filename: String,
pub size: u64,
pub hash: String,
pub urls: HashMap<String, String>,
}
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct DownloadParams {
#[builder(setter(transform = |dt: impl Into<PathBuf>| dt.into()))]
pub output_dir: PathBuf,
#[builder(setter(transform = |dt: impl ToString| dt.to_string()))]
pub job_id: String,
#[builder(default, setter(transform = |filename: impl ToString| Some(filename.to_string())))]
pub filename_to_download: Option<String>,
}
impl SplitDuration {
pub const fn as_str(&self) -> &'static str {
match self {
SplitDuration::Day => "day",
SplitDuration::Week => "week",
SplitDuration::Month => "month",
SplitDuration::None => "none",
}
}
}
impl fmt::Display for SplitDuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl FromStr for SplitDuration {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"day" => Ok(SplitDuration::Day),
"week" => Ok(SplitDuration::Week),
"month" => Ok(SplitDuration::Month),
"none" => Ok(SplitDuration::None),
_ => Err(crate::Error::bad_arg(
"s",
format!(
"{s} does not correspond with any {} variant",
std::any::type_name::<Self>()
),
)),
}
}
}
impl<'de> Deserialize<'de> for SplitDuration {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let opt = Option::<String>::deserialize(deserializer)?;
match opt {
Some(str) => FromStr::from_str(&str).map_err(de::Error::custom),
None => Ok(SplitDuration::None),
}
}
}
impl Delivery {
pub const fn as_str(&self) -> &'static str {
match self {
Delivery::Download => "download",
}
}
}
impl fmt::Display for Delivery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl FromStr for Delivery {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"download" => Ok(Delivery::Download),
_ => Err(crate::Error::bad_arg(
"s",
format!(
"{s} does not correspond with any {} variant",
std::any::type_name::<Self>()
),
)),
}
}
}
impl<'de> Deserialize<'de> for Delivery {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let str = String::deserialize(deserializer)?;
FromStr::from_str(&str).map_err(de::Error::custom)
}
}
impl JobState {
pub const fn as_str(&self) -> &'static str {
match self {
JobState::Queued => "queued",
JobState::Processing => "processing",
JobState::Done => "done",
JobState::Expired => "expired",
}
}
}
impl fmt::Display for JobState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl FromStr for JobState {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"queued" => Ok(JobState::Queued),
"processing" => Ok(JobState::Processing),
"done" => Ok(JobState::Done),
"expired" => Ok(JobState::Expired),
_ => Err(crate::Error::bad_arg(
"s",
format!(
"{s} does not correspond with any {} variant",
std::any::type_name::<Self>()
),
)),
}
}
}
impl<'de> Deserialize<'de> for JobState {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let str = String::deserialize(deserializer)?;
FromStr::from_str(&str).map_err(de::Error::custom)
}
}
fn deserialize_compression<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<Compression, D::Error> {
let opt = Option::<Compression>::deserialize(deserializer)?;
Ok(opt.unwrap_or(Compression::None))
}
enum Header {
Skip,
Range(Option<(&'static str, String)>),
}
#[cfg(test)]
mod tests {
use dbn::Dataset;
use reqwest::StatusCode;
use serde_json::json;
use time::macros::datetime;
use wiremock::{
matchers::{basic_auth, method, path, query_param_is_missing},
Mock, MockServer, ResponseTemplate,
};
use super::*;
use crate::{
body_contains,
historical::test_infra::{client, API_KEY},
historical::API_VERSION,
};
#[tokio::test]
async fn test_submit_job() -> crate::Result<()> {
const START: time::OffsetDateTime = datetime!(2023 - 06 - 14 00:00 UTC);
const END: time::OffsetDateTime = datetime!(2023 - 06 - 17 00:00 UTC);
const SCHEMA: Schema = Schema::Trades;
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/batch.submit_job")))
.and(body_contains("dataset", "XNAS.ITCH"))
.and(body_contains("schema", "trades"))
.and(body_contains("symbols", "TSLA"))
.and(body_contains(
"start",
START.unix_timestamp_nanos().to_string(),
))
.and(body_contains("encoding", "dbn"))
.and(body_contains("compression", "zstd"))
.and(body_contains("map_symbols", "false"))
.and(body_contains("end", END.unix_timestamp_nanos().to_string()))
.and(body_contains("stype_in", "raw_symbol"))
.and(body_contains("stype_out", "instrument_id"))
.respond_with(
ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_json(json!({
"id": "123",
"user_id": "test_user",
"cost_usd": 10.50,
"dataset": "XNAS.ITCH",
"symbols": ["TSLA"],
"stype_in": "raw_symbol",
"stype_out": "instrument_id",
"schema": SCHEMA.as_str(),
"start": "2023-06-14T00:00:00.000000000Z",
"end": "2023-06-17 00:00:00.000000+00:00",
"limit": null,
"encoding": "dbn",
"compression": "zstd",
"pretty_px": false,
"pretty_ts": false,
"map_symbols": false,
"split_symbols": false,
"split_duration": "day",
"split_size": null,
"delivery": "download",
"state": "queued",
"ts_received": "2023-07-19T23:00:04.095538123Z",
"ts_queued": null,
"ts_process_start": null,
"ts_process_done": null,
"ts_expiration": null
})),
)
.mount(&mock_server)
.await;
let mut target = client(&mock_server);
let job_desc = target
.batch()
.submit_job(
&SubmitJobParams::builder()
.dataset(dbn::Dataset::XnasItch)
.schema(SCHEMA)
.symbols("TSLA")
.date_time_range(START..END)
.build(),
)
.await?;
assert_eq!(job_desc.dataset, dbn::Dataset::XnasItch.as_str());
Ok(())
}
#[tokio::test]
async fn test_submit_job_param_map_symbols() -> crate::Result<()> {
const START: time::OffsetDateTime = datetime!(2023 - 06 - 14 00:00 UTC);
const END: time::OffsetDateTime = datetime!(2023 - 06 - 17 00:00 UTC);
let params = SubmitJobParams::builder()
.dataset(Dataset::GlbxMdp3)
.encoding(Encoding::Dbn)
.symbols("ESM5")
.schema(Schema::Mbo)
.date_time_range(START..END)
.build();
assert_eq!(params.encoding, Encoding::Dbn);
assert_eq!(params.map_symbols, false);
let params = SubmitJobParams::builder()
.dataset(Dataset::GlbxMdp3)
.encoding(Encoding::Csv)
.symbols("ESM5")
.schema(Schema::Mbo)
.date_time_range(START..END)
.build();
assert_eq!(params.encoding, Encoding::Csv);
assert_eq!(params.map_symbols, true);
let params = SubmitJobParams::builder()
.dataset(Dataset::GlbxMdp3)
.encoding(Encoding::Json)
.symbols("ESM5")
.schema(Schema::Mbo)
.date_time_range(START..END)
.map_symbols(false)
.build();
assert_eq!(params.encoding, Encoding::Json);
assert_eq!(params.map_symbols, false);
Ok(())
}
#[tokio::test]
async fn test_list_jobs() -> crate::Result<()> {
const SCHEMA: Schema = Schema::Trades;
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/batch.list_jobs")))
.and(query_param_is_missing("states"))
.and(query_param_is_missing("since"))
.respond_with(
ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_json(json!([{
"id": "123",
"user_id": "test_user",
"cost_usd": 10.50,
"dataset": "XNAS.ITCH",
"symbols": "TSLA",
"stype_in": "raw_symbol",
"stype_out": "instrument_id",
"schema": SCHEMA.as_str(),
"start": "2023-06-14 00:00:00+00:00",
"end": "2023-06-17T00:00:00.012345678Z",
"limit": null,
"encoding": "json",
"compression": "zstd",
"pretty_px": true,
"pretty_ts": false,
"map_symbols": true,
"split_symbols": false,
"split_duration": "day",
"split_size": null,
"delivery": "download",
"state": "processing",
"ts_received": "2023-07-19 23:00:04.095538+00:00",
"ts_queued": "2023-07-19T23:00:08.095538123Z",
"ts_process_start": "2023-07-19 23:01:04.000000+00:00",
"ts_process_done": null,
"ts_expiration": null
},
{
"id": "XNAS-20250602-5KM3HL5BUW",
"user_id": "AA89XSlBV",
"cost_usd": 0.0,
"dataset": "XNAS.ITCH",
"symbols": "MSFT",
"stype_in": "raw_symbol",
"stype_out": "instrument_id",
"schema": "trades",
"start": "2022-06-10T12:30:00.000000000Z",
"end": "2022-06-10T14:00:00.000000000Z",
"limit": 1000,
"encoding": "csv",
"compression": null,
"pretty_px": false,
"pretty_ts": false,
"map_symbols": true,
"split_symbols": false,
"split_duration": null,
"split_size": null,
"packaging": null,
"delivery": "download",
"record_count": 1000,
"billed_size": 48000,
"actual_size": 94000,
"package_size": 97690,
"state": "done",
"ts_received": "2025-06-02T15:51:19.251582000Z",
"ts_queued": "2025-06-02T15:51:20.997673000Z",
"ts_process_start": "2025-06-02T15:51:45.312317000Z",
"ts_process_done": "2025-06-02T15:51:46.324860000Z",
"ts_expiration": "2025-07-02T16:00:00.000000000Z",
"progress": 100
}])),
)
.mount(&mock_server)
.await;
let mut target = client(&mock_server);
let job_descs = target.batch().list_jobs(&ListJobsParams::default()).await?;
assert_eq!(job_descs.len(), 2);
let mut job_desc = &job_descs[0];
assert_eq!(
job_desc.ts_queued.unwrap(),
datetime!(2023-07-19 23:00:08.095538123 UTC)
);
assert_eq!(
job_desc.ts_process_start.unwrap(),
datetime!(2023-07-19 23:01:04 UTC)
);
assert_eq!(job_desc.encoding, Encoding::Json);
assert!(job_desc.pretty_px);
assert!(!job_desc.pretty_ts);
assert!(job_desc.map_symbols);
assert_eq!(job_desc.split_duration, SplitDuration::Day);
assert!(job_desc.progress.is_none());
job_desc = &job_descs[1];
assert_eq!(
job_desc.ts_queued.unwrap(),
datetime!(2025-06-02 15:51:20.997673000 UTC)
);
assert_eq!(
job_desc.ts_process_start.unwrap(),
datetime!(2025-06-02 15:51:45.312317000 UTC)
);
assert_eq!(job_desc.start, datetime!(2022-06-10 12:30:00.000000000 UTC));
assert_eq!(job_desc.end, datetime!(2022-06-10 14:00:00.000000000 UTC));
assert_eq!(job_desc.encoding, Encoding::Csv);
assert!(!job_desc.pretty_px);
assert!(!job_desc.pretty_ts);
assert!(job_desc.map_symbols);
assert!(!job_desc.split_symbols);
assert_eq!(job_desc.split_duration, SplitDuration::None);
assert_eq!(job_desc.progress, Some(100));
Ok(())
}
#[test]
fn test_deserialize_compression() {
#[derive(serde::Deserialize)]
struct Test {
#[serde(deserialize_with = "deserialize_compression")]
compression: Compression,
}
const JSON: &str =
r#"[{"compression":null}, {"compression":"none"}, {"compression":"zstd"}]"#;
let res: Vec<Test> = serde_json::from_str(JSON).unwrap();
assert_eq!(
res.into_iter().map(|t| t.compression).collect::<Vec<_>>(),
vec![Compression::None, Compression::None, Compression::Zstd]
);
}
}