1use std::{
4 cell::RefMut,
5 collections::HashMap,
6 io::{self, Read, BufReader, Seek, Cursor, Write, Take},
7 fmt,
8 fs::File,
9 num::NonZeroU64,
10 path::PathBuf,
11 str::FromStr,
12};
13
14use digest::Digest;
15use error_stack::{Report, Result, ResultExt, report};
16use flate2::read::ZlibDecoder;
17use libxml::{readonly::RoNode, tree::NodeType};
18use ndarray::Array2;
19use parse_int::parse as parse_auto_radix;
20use remotefs::{RemoteError, RemoteErrorType};
21use sha1::Sha1;
22use sha2::{Sha256, Sha512};
23use sha3::{Sha3_256, Sha3_512};
24use strum::{EnumString, Display, EnumVariantNames};
25use url::Url;
26use crate::error::{
27 ParseValueError,
28 ParseNodeError,
29 ParseNodeErrorKind::{self, *},
30 ReadDataBlockError,
31};
32
33mod context;
34pub use context::*;
35
36mod sub_blocks;
37use sub_blocks::*;
38
39#[derive(Debug, Clone, PartialEq)]
44pub struct DataBlock {
45 pub location: Location,
47 pub byte_order: ByteOrder,
49 pub checksum: Option<Checksum>,
51 pub compression: Option<Compression>,
53}
54impl DataBlock {
55 pub(crate) fn parse_node(node: RoNode, tag: &'static str, attrs: &mut HashMap<String, String>) -> Result<Option<Self>, ParseNodeError> {
61 let context = |kind| -> ParseNodeError {
62 ParseNodeError::new(tag, kind)
63 };
64 let report = |kind: ParseNodeErrorKind| -> Report<ParseNodeError> {
65 report!(ParseNodeError::new(tag, kind))
66 };
67
68 if let Some(location) = Location::parse_node(node, tag, attrs)? {
69 let byte_order = match attrs.remove("byteOrder") {
70 Some(byte_order) => {
71 byte_order.parse::<ByteOrder>()
72 .change_context(context(InvalidAttr))
73 .attach_printable_lazy(|| format!("Invalid byteOrder attribute: expected one of [big, little], found {byte_order}"))?
74 },
75 None => Default::default(),
76 };
77
78 let checksum = match attrs.remove("checksum") {
79 Some(checksum) => Some(
80 checksum.parse::<Checksum>()
81 .change_context(context(InvalidAttr))
82 .attach_printable("Invalid checksum attribute")?
83 ),
84 None => None,
85 };
86
87 let compression_attr = match attrs.remove("compression") {
88 Some(compression) => Some(
89 compression.parse::<CompressionAttr>()
90 .change_context(context(InvalidAttr))
91 .attach_printable("Invalid compression attribute")?
92 ),
93 None => None,
94 };
95
96 let sub_blocks = match attrs.remove("subblocks") {
97 Some(compression) => {
98 compression.parse::<SubBlocks>()
99 .change_context(context(InvalidAttr))
100 .attach_printable("Invalid subblocks attribute")?
101 },
102 None => SubBlocks(vec![]),
103 };
104 let compression = {
105 match (compression_attr, sub_blocks.0.len()) {
106 (Some(attr), 0) => {
107 Some(Compression {
108 algorithm: attr.algorithm(),
109 sub_blocks: SubBlocks(vec![
110 (u64::MAX, attr.uncompressed_size()) ]),
112 byte_shuffling: attr.shuffle_item_size()
113 })
114 },
115 (Some(attr), _) => {
116 let uncompressed_size: u64 = sub_blocks.0.iter().map(|(_, un)| un).sum();
117 if uncompressed_size != attr.uncompressed_size() {
118 return Err(report(InvalidAttr))
119 .attach_printable("Compression sub-blocks must sum to the uncompressed size specified in the compression attribute")
120 }
121 Some(Compression {
122 algorithm: attr.algorithm(),
123 sub_blocks,
124 byte_shuffling: attr.shuffle_item_size()
125 })
126 },
127 (None, 0) => None,
128 (None, _) => {
129 tracing::warn!("Ignoring subblocks attribute because no compression was specified");
130 None
131 }
132 }
133 };
134
135 Ok(Some(DataBlock {
136 location,
137 byte_order,
138 checksum,
139 compression,
140 }))
141 } else {
142 Ok(None)
143 }
144 }
145
146 pub(crate) fn verify_checksum(&self, ctx: &Context) -> Result<(), ReadDataBlockError> {
147 fn verify_checksum_impl<D: Digest + Write>(expected: &[u8], reader: &mut impl Read) -> Result<(), ReadDataBlockError> {
148 let mut hasher = D::new();
149 std::io::copy(reader, &mut hasher)
150 .change_context(ReadDataBlockError::IoError)
151 .attach_printable("Failed to calculate data block hash")?;
152 let actual = hasher.finalize();
153 if actual.as_slice() == expected {
154 Ok(())
155 } else {
156 let actual = hex_simd::encode_to_string(actual.as_slice(), hex_simd::AsciiCase::Lower);
157 let expected = hex_simd::encode_to_string(expected, hex_simd::AsciiCase::Lower);
158 Err(report!(ReadDataBlockError::DifferentChecksum))
159 .attach_printable(format!("Data block failed checksum verification: expected {expected}, found {actual}"))
160 }
161 }
162
163 if let Some(checksum) = &self.checksum {
164 let mut reader = self.location.raw_bytes(&ctx)?;
165 match checksum {
166 Checksum::Sha1(digest) => verify_checksum_impl::<Sha1>(digest, &mut reader),
167 Checksum::Sha256(digest) => verify_checksum_impl::<Sha256>(digest, &mut reader),
168 Checksum::Sha512(digest) => verify_checksum_impl::<Sha512>(digest, &mut reader),
169 Checksum::Sha3_256(digest) => verify_checksum_impl::<Sha3_256>(digest, &mut reader),
170 Checksum::Sha3_512(digest) => verify_checksum_impl::<Sha3_512>(digest, &mut reader),
171 }
172 } else {
173 Ok(())
174 }
175 }
176
177 pub(crate) fn decompressed_bytes<'a>(&self, ctx: &'a Context) -> Result<Box<dyn Read + 'a>, ReadDataBlockError> {
179 self.location.decompressed_bytes(ctx, &self.compression)
180 }
181}
182
183#[derive(Clone, Debug, PartialEq)]
185pub enum Location {
186 Text {
188 encoding: TextEncoding,
190 text: String,
192 },
193 Attachment {
195 position: u64,
197 size: u64,
199 },
200 Url {
202 url: Url,
204 index_id: Option<u64>,
208 },
209 Path {
211 path: PathBuf,
213 index_id: Option<u64>,
217 }
218}
219impl Location {
220 pub(crate) fn parse_node(node: RoNode, tag: &'static str, attrs: &mut HashMap<String, String>) -> Result<Option<Self>, ParseNodeError> {
226 let context = |kind| -> ParseNodeError {
227 ParseNodeError::new(tag, kind)
228 };
229 let report = |kind: ParseNodeErrorKind| -> Report<ParseNodeError> {
230 report!(ParseNodeError::new(tag, kind))
231 };
232
233 if let Some(attr) = attrs.remove("location") {
234 match attr.split(":").collect::<Vec<_>>().as_slice() {
235 &["inline", encoding] => {
236 let encoding = encoding.parse::<TextEncoding>()
237 .change_context(context(InvalidAttr))
238 .attach_printable("Invalid location attribute: failed to parse inline encoding")?;
239
240 match node.get_child_nodes().as_slice() {
241 [] => Err(report(MissingChild)).attach_printable("Missing child text node: required for inline data blocks"),
242 [text] if text.get_type() == Some(NodeType::TextNode) => {
243 let mut text = text.get_content();
244 text.retain(|c| !c.is_whitespace());
245 Ok(Some(
246 Self::Text {
247 encoding,
248 text,
249 }
250 ))
251 },
252 _other => Err(report(InvalidChild)).attach_printable("XISF Elements with inline data blocks are not permitted to have non-text child nodes"),
253 }
254 },
255 &["embedded"] => {
256 match node.get_child_nodes()
257 .into_iter()
258 .filter(|n| n.get_name() == "Data")
259 .collect::<Vec<_>>()
260 .as_slice()
261 {
262 [] => Err(report(MissingChild)).attach_printable("Missing embedded <Data> node: required for embedded data block location"),
263 [one] => {
264 if let Some(encoding) = one.get_attribute("encoding") {
265 let encoding = encoding.parse::<TextEncoding>()
266 .change_context(context(InvalidAttr))
267 .attach_printable("Invalid encoding attribute in embedded <Data> node")?;
268
269 match one.get_child_nodes().as_slice() {
270 [] => Err(report(MissingChild)).attach_printable("Embedded <Data> node missing child text node"),
271 [text] if text.get_type() == Some(NodeType::TextNode) => {
272 let mut text = text.get_content();
273 text.retain(|c| !c.is_whitespace());
274 Ok(Some(
275 Self::Text {
276 encoding,
277 text,
278 }
279 ))
280 },
281 _other => Err(report(InvalidChild)).attach_printable("Embedded <Data> nodes are not permitted to have non-text child nodes"),
282 }
283 } else {
284 Err(report(MissingAttr)).attach_printable("Embedded <Data> node missing encoding attribute")
285 }
286 },
287 _many => Err(report(InvalidChild)).attach_printable("Found more than one embedded <Data> node"),
288 }
289 },
290 &["attachment", position, size] => {
291 Ok(Some(Self::Attachment {
292 position: parse_auto_radix::<u64>(position.trim())
293 .change_context(context(InvalidAttr))
294 .attach_printable("Invalid location attribute: failed to parse position of attached data block")?,
295 size: parse_auto_radix::<u64>(size.trim())
296 .change_context(context(InvalidAttr))
297 .attach_printable("Invalid location attribute: failed to parse size of attached data block")?,
298 }))
299 },
300 &[url] if url.starts_with("url(") && url.ends_with(")") => {
301 Ok(Some(Self::Url {
304 url: Url::parse(&url[4..url.len()-1])
306 .change_context(context(InvalidAttr))
307 .attach_printable("Invalid location attribute: failed to parse URL of external data block")?,
308 index_id: None,
309 }))
310 },
311 &[url, index_id] if url.starts_with("url(") && url.ends_with(")") => {
312 Ok(Some(Self::Url {
315 url: Url::parse(&url[4..url.len()-1])
317 .change_context(context(InvalidAttr))
318 .attach_printable("Invalid location attribute: failed to parse URL of external data block")?,
319 index_id: Some(parse_auto_radix::<u64>(index_id.trim())
320 .change_context(context(InvalidAttr))
321 .attach_printable("Invalid location attribute: failed to parse index-id of external data block")?),
322 }))
323 },
324 &[path] if path.starts_with("path(") && path.ends_with(")") => {
325 Ok(Some(Self::Path {
328 path: PathBuf::from(&path[5..path.len()-1]),
330 index_id: None,
331 }))
332 },
333 &[path, index_id] if path.starts_with("path(") && path.ends_with(")") => {
334 Ok(Some(Self::Path {
337 path: PathBuf::from(&path[5..path.len()-1]),
339 index_id: Some(parse_auto_radix::<u64>(index_id.trim())
340 .change_context(context(InvalidAttr))
341 .attach_printable("Invalid location attribute: failed to parse index-id of external data block")?),
342 }))
343 },
344 _bad => Err(report(InvalidAttr)).attach_printable("Invalid location attribute: unrecognized pattern")
345 .attach_printable(format!("Expected one of [inline:encoding, embedded, attachment:position:size, url(...), url(...):index-id, path(...), path(...):index-id], found {attr}"))
346 }
347 } else {
348 Ok(None)
349 }
350 }
351
352 pub(crate) fn raw_bytes<'a>(&self, ctx: &'a Context) -> Result<Box<dyn Read + 'a>, ReadDataBlockError> {
354 match self {
355 Self::Text { encoding, text } => {
356 let buf = match encoding {
357 TextEncoding::Hex => hex_simd::decode_to_vec(text)
358 .change_context(ReadDataBlockError::BadTextEncoding)
359 .attach_printable("Bad hex encoding")?,
360 TextEncoding::Base64 => base64_simd::STANDARD.decode_to_vec(text)
361 .change_context(ReadDataBlockError::BadTextEncoding)
362 .attach_printable("Bad Base64 encoding")?,
363 };
364 Ok(Box::new(Cursor::new(buf)))
365 },
366 Self::Attachment { position, size } => {
367 if let Source::Monolithic(cell) = &ctx.source {
368 let mut reader = cell.try_borrow_mut()
369 .change_context(ReadDataBlockError::FileInUse)?;
370 reader.seek(io::SeekFrom::Start(*position))
371 .change_context(ReadDataBlockError::IoError)?;
372 Ok(Box::new(reader.take_ref_mut(*size)))
373 } else {
374 Err(report!(ReadDataBlockError::UnsupportedLocation))
375 .attach_printable("Data blocks with location=\"attachment\" are only supported for monolithic files")
376 }
377 },
378 Self::Url { url, index_id: None } => {
379 if let Source::Distributed(_) = &ctx.source {
380 if let Some(host) = url.host() {
381 ctx.ensure_trusted(host)?;
382 }
383 match url.scheme() {
384 #[cfg(feature = "remote-http")]
385 "http" | "https" => {
386 let resp = ureq::get(url.as_str())
387 .call()
388 .change_context(ReadDataBlockError::IoError)?;
389 Ok(resp.into_reader())
390 },
391 #[cfg(feature = "remote-ftp")]
392 "ftp" => {
393 use remotefs::RemoteFs;
394 const DEFAULT_FTP_PORT: u16 = 21;
395 let host = url.host().ok_or(report!(ReadDataBlockError::MissingHost))?;
396 let mut ftp = remotefs_ftp::FtpFs::new(
397 host.to_string(),
398 url.port().unwrap_or(DEFAULT_FTP_PORT)
399 ).username(url.username())
400 .password(url.password().unwrap_or(""));
401 match ftp.connect() {
402 Ok(_) => {},
403 Err(RemoteError { kind: RemoteErrorType::AuthenticationFailed, ..}) => {
404 return Err(report!(ReadDataBlockError::Unauthorized(url.clone())));
405 },
406 Err(_) => return Err(report!(ReadDataBlockError::IoError)).attach_printable("Failed to connect to FTP server"),
407 }
408 let file = ftp.open(url.path().as_ref())
409 .change_context(ReadDataBlockError::IoError)
410 .attach_printable("Failed to open file over FTP")?;
411 Ok(Box::new(file))
412 },
413 bad => Err(report!(ReadDataBlockError::UnsupportedScheme(bad.to_string())))
414 .attach_printable(format!("Unsupported scheme: {bad}"))
415 }
416 } else {
417 Err(report!(ReadDataBlockError::UnsupportedLocation))
418 .attach_printable("Data blocks with location=\"url(...)\" are only supported for distributed files")
419 }
420 },
421 #[allow(unused_variables)]
422 Self::Url { url, index_id: Some(idx) } => {
423 todo!()
424 },
425 Self::Path { path, index_id: None } => {
426 if let Source::Distributed(directory) = &ctx.source {
427 if path.starts_with("@header_dir/") {
428 let mut path_buf = directory.clone();
429 path_buf.push(path.strip_prefix("@header_dir/").unwrap());
431 let file = File::open(path_buf)
432 .change_context(ReadDataBlockError::IoError)?;
433 Ok(Box::new(BufReader::new(file)))
434 } else {
435 let file = File::open(path)
436 .change_context(ReadDataBlockError::IoError)?;
437 Ok(Box::new(BufReader::new(file)))
438 }
439 } else {
440 Err(report!(ReadDataBlockError::UnsupportedLocation))
441 .attach_printable("Data blocks with location=\"path(...)\" are only supported for distributed files")
442 }
443 },
444 #[allow(unused_variables)]
445 Self::Path { path, index_id: Some(idx) } => {
446 todo!()
447 },
448 }
449 }
450
451 pub(crate) fn decompressed_bytes<'a>(&self, ctx: &'a Context, compression: &Option<Compression>) -> Result<Box<dyn Read + 'a>, ReadDataBlockError> {
453 let raw = self.raw_bytes(ctx)?;
454 if let Some(compression) = compression {
455 let uncompressed_sizes: Vec<_> = compression.sub_blocks.0.iter().map(|tup| tup.0).collect();
456 match compression.algorithm {
457 CompressionAlgorithm::Zlib => {
458 let zlib = raw.multi_take(uncompressed_sizes)
459 .map(|shared| {
460 Ok(ZlibDecoder::new(shared))
461 }).multi_chain()
462 .unwrap();
463
464 Self::unshuffle(zlib, compression)
465 },
466 CompressionAlgorithm::Lz4 | CompressionAlgorithm::Lz4HC => {
467 let lz4 = raw.multi_take(uncompressed_sizes)
468 .map(|shared| {
469 Ok(
470 lz4::Decoder::new(shared)
471 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
472 )
473 }).multi_chain()
474 .change_context(ReadDataBlockError::IoError)
475 .attach_printable("Failed to initialize Lz4 decoder")?;
476
477 Self::unshuffle(lz4, compression)
478 },
479 CompressionAlgorithm::Zstd => {
480 let zstd = raw.multi_take(uncompressed_sizes)
481 .map(|shared| {
482 Ok(
483 zstd::Decoder::new(shared)
484 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
485 )
486 }).multi_chain()
487 .change_context(ReadDataBlockError::IoError)
488 .attach_printable("Failed to initialize Zstd decoder")?;
489
490 Self::unshuffle(zstd, compression)
491 },
492 }
493 } else {
494 Ok(raw)
495 }
496 }
497
498 fn unshuffle<'a>(mut reader: impl Read + 'a, compression: &Compression) -> Result<Box<dyn Read + 'a>, ReadDataBlockError> {
501 const ONE: NonZeroU64 = unsafe { NonZeroU64::new_unchecked(1) };
502 match compression.byte_shuffling {
503 Some(item_size) if item_size > ONE => {
506 let item_size: u64 = item_size.into();
507 let n = compression.uncompressed_size() / item_size;
508 if n * item_size != compression.uncompressed_size() {
509 return Err(report!(ReadDataBlockError::BadByteShuffleItemSize))
510 }
511 let mut buf = Array2::<u8>::zeros([n as usize, item_size as usize]);
513 reader.read_exact(buf.as_slice_memory_order_mut().unwrap())
514 .change_context(ReadDataBlockError::IoError)
515 .attach_printable("Failed to read bytes into temporary buffer for unshuffling")?;
516 buf.swap_axes(0, 1);
517 Ok(Box::new(Cursor::new(buf.as_standard_layout().to_owned().into_raw_vec())))
518 },
519 _ => Ok(Box::new(reader))
520 }
521 }
522}
523
524pub(super) struct RefMutReader<'a, R>(RefMut<'a, R>);
527impl<'a, R> Read for RefMutReader<'a, R> where R: Read {
528 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
529 self.0.read(buf)
530 }
531}
532
533pub(super) trait ReadTakeRefExt<'a, R> {
534 fn take_ref_mut(self, limit: u64) -> Take<RefMutReader<'a, R>>;
535}
536impl<'a, R> ReadTakeRefExt<'a, R> for RefMut<'a, R> where R: Read {
537 fn take_ref_mut(self, limit: u64) -> Take<RefMutReader<'a, R>> {
538 RefMutReader(self).take(limit)
539 }
540}
541
542#[derive(Clone, Copy, Debug, Default, Display, EnumString, EnumVariantNames, PartialEq)]
544pub enum TextEncoding {
545 #[default]
547 #[strum(serialize = "base64")]
548 Base64,
549 #[strum(serialize = "hex")]
551 Hex,
552}
553
554#[derive(Clone, Copy, Debug, Default, Display, EnumString, EnumVariantNames, PartialEq)]
556pub enum ByteOrder {
557 #[strum(serialize = "big")]
559 Big,
560 #[default]
562 #[strum(serialize = "little")]
563 Little,
564}
565
566#[derive(Clone, Copy, Debug, Display, EnumString, EnumVariantNames, PartialEq)]
568pub enum ChecksumAlgorithm {
569 #[strum(serialize = "sha-1", serialize = "sha1")]
571 Sha1,
572 #[strum(serialize = "sha-256", serialize = "sha256")]
574 Sha256,
575 #[strum(serialize = "sha-512", serialize = "sha512")]
577 Sha512,
578 #[strum(serialize = "sha3-256")]
580 Sha3_256,
581 #[strum(serialize = "sha3-512")]
583 Sha3_512,
584}
585
586#[derive(Clone, Debug, PartialEq)]
588pub enum Checksum {
589 Sha1([u8; 20]),
591 Sha256([u8; 32]),
593 Sha512([u8; 64]),
595 Sha3_256([u8; 32]),
597 Sha3_512([u8; 64]),
599}
600impl fmt::Display for Checksum {
601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602 fn to_hex(digest: &[u8]) -> String {
603 hex_simd::encode_to_string(digest, hex_simd::AsciiCase::Lower)
604 }
605 match &self {
606 Self::Sha1(digest) => f.write_fmt(format_args!("sha-1:{}", to_hex(digest))),
607 Self::Sha256(digest) => f.write_fmt(format_args!("sha-256:{}", to_hex(digest))),
608 Self::Sha512(digest) => f.write_fmt(format_args!("sha-512:{}", to_hex(digest))),
609 Self::Sha3_256(digest) => f.write_fmt(format_args!("sha3-256:{}", to_hex(digest))),
610 Self::Sha3_512(digest) => f.write_fmt(format_args!("sha3-512:{}", to_hex(digest))),
611 }
612 }
613}
614impl FromStr for Checksum {
615 type Err = Report<ParseValueError>;
616 fn from_str(s: &str) -> Result<Self, ParseValueError> {
617 const CONTEXT: ParseValueError = ParseValueError("Checksum");
618
619 fn from_hex(digest: &str, out: &mut [u8]) -> Result<(), ParseValueError> {
620 use hex_simd::AsOut;
621 hex_simd::decode(digest.as_bytes(), out[..].as_out())
624 .map(|_| ())
625 .change_context(CONTEXT)
626 .attach_printable("Failed to decode checksum digest from hexadecimal")
627 }
628
629 match s.split_once(":") {
630 Some(("sha-1" | "sha1", hex_digest)) => {
631 let mut buf = [0u8; 20];
632 from_hex(hex_digest, &mut buf[..])?;
633 Ok(Self::Sha1(buf))
634 },
635 Some(("sha-256" | "sha256", hex_digest)) => {
636 let mut buf = [0u8; 32];
637 from_hex(hex_digest, &mut buf[..])?;
638 Ok(Self::Sha256(buf))
639 },
640 Some(("sha-512" | "sha512", hex_digest)) => {
641 let mut buf = [0u8; 64];
642 from_hex(hex_digest, &mut buf[..])?;
643 Ok(Self::Sha512(buf))
644 },
645 Some(("sha3-256", hex_digest)) => {
646 let mut buf = [0u8; 32];
647 from_hex(hex_digest, &mut buf[..])?;
648 Ok(Self::Sha3_256(buf))
649 },
650 Some(("sha3-512", hex_digest)) => {
651 let mut buf = [0u8; 64];
652 from_hex(hex_digest, &mut buf[..])?;
653 Ok(Self::Sha3_512(buf))
654 },
655 _bad => Err(report!(CONTEXT))
656 .attach_printable(format!("Unrecognized pattern: expected checksum-algorithm:hex-digest, found {s}"))
657 .attach_printable("Supported checksum algorithms: sha-1, sha-256, sha-512, sha3-256, sha3-512")
658 }
659 }
660}
661impl Checksum {
662 pub fn as_slice(&self) -> &[u8] {
664 match self {
665 Checksum::Sha1(digest) => &digest[..],
666 Checksum::Sha256(digest) => &digest[..],
667 Checksum::Sha512(digest) => &digest[..],
668 Checksum::Sha3_256(digest) => &digest[..],
669 Checksum::Sha3_512(digest) => &digest[..],
670 }
671 }
672}
673
674#[derive(Clone, Debug, PartialEq)]
676pub struct Compression {
677 pub algorithm: CompressionAlgorithm,
679 pub(crate) sub_blocks: SubBlocks,
691 pub byte_shuffling: Option<NonZeroU64>,
694}
695impl Compression {
696 pub fn uncompressed_size(&self) -> u64 {
698 self.sub_blocks.0.iter().map(|(_, un)| un).sum()
699 }
700}
701
702#[derive(Clone, Copy, Debug, PartialEq)]
704pub enum CompressionAlgorithm {
705 Zlib,
707 Lz4,
709 Lz4HC,
711 Zstd,
713}
714
715#[derive(Clone, Debug, PartialEq)]
718enum CompressionAttr {
719 Zlib(u64),
720 ZlibByteShuffling(u64, NonZeroU64),
721 Lz4(u64),
722 Lz4ByteShuffling(u64, NonZeroU64),
723 Lz4HC(u64),
724 Lz4HCByteShuffling(u64, NonZeroU64),
725 Zstd(u64),
726 ZstdByteShuffling(u64, NonZeroU64),
727}
728impl CompressionAttr {
729 pub fn algorithm(&self) -> CompressionAlgorithm {
730 match self {
731 Self::Zlib(_) | Self::ZlibByteShuffling(..) => CompressionAlgorithm::Zlib,
732 Self::Lz4(_) | Self::Lz4ByteShuffling(..) => CompressionAlgorithm::Lz4,
733 Self::Lz4HC(_) | Self::Lz4HCByteShuffling(..) => CompressionAlgorithm::Lz4HC,
734 Self::Zstd(_) | Self::ZstdByteShuffling(..) => CompressionAlgorithm::Zstd,
735 }
736 }
737 pub fn uncompressed_size(&self) -> u64 {
738 match self {
739 &Self::Zlib(size) => size,
740 &Self::ZlibByteShuffling(size, _) => size,
741 &Self::Lz4(size) => size,
742 &Self::Lz4ByteShuffling(size, _) => size,
743 &Self::Lz4HC(size) => size,
744 &Self::Lz4HCByteShuffling(size, _) => size,
745 &Self::Zstd(size) => size,
746 &Self::ZstdByteShuffling(size, _) => size,
747 }
748 }
749 pub fn shuffle_item_size(&self) -> Option<NonZeroU64> {
750 match self {
751 Self::Zlib(_) | Self::Lz4(_) | Self::Lz4HC(_) | Self::Zstd(_) => None,
752 &Self::ZlibByteShuffling(_, item_size) => Some(item_size),
753 &Self::Lz4ByteShuffling(_, item_size) => Some(item_size),
754 &Self::Lz4HCByteShuffling(_, item_size) => Some(item_size),
755 &Self::ZstdByteShuffling(_, item_size) => Some(item_size),
756 }
757 }
758}
759impl fmt::Display for CompressionAttr {
760 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
761 match self {
762 Self::Zlib(uncompressed_size) =>
763 f.write_fmt(format_args!("zlib:{uncompressed_size}")),
764 Self::ZlibByteShuffling(uncompressed_size, item_size) =>
765 f.write_fmt(format_args!("zlib+sh:{uncompressed_size}:{item_size}")),
766 Self::Lz4(uncompressed_size) =>
767 f.write_fmt(format_args!("lz4:{uncompressed_size}")),
768 Self::Lz4ByteShuffling(uncompressed_size, item_size) =>
769 f.write_fmt(format_args!("lz4+sh:{uncompressed_size}:{item_size}")),
770 Self::Lz4HC(uncompressed_size) =>
771 f.write_fmt(format_args!("lz4hc:{uncompressed_size}")),
772 Self::Lz4HCByteShuffling(uncompressed_size, item_size) =>
773 f.write_fmt(format_args!("lz4hc+sh:{uncompressed_size}:{item_size}")),
774 Self::Zstd(uncompressed_size) =>
775 f.write_fmt(format_args!("zstd:{uncompressed_size}")),
776 Self::ZstdByteShuffling(uncompressed_size, item_size) =>
777 f.write_fmt(format_args!("zstd+sh:{uncompressed_size}:{item_size}")),
778 }
779 }
780}
781impl FromStr for CompressionAttr {
782 type Err = Report<ParseValueError>;
783 fn from_str(s: &str) -> Result<Self, ParseValueError> {
784 const CONTEXT: ParseValueError = ParseValueError("Compression");
785 const UNCOMPRESSED_SIZE_ERR: &'static str = "Failed to read uncompressed size";
786 const ITEM_SIZE_ERR: &'static str = "Failed to read byte shuffling item size";
787 fn parse_u64(size: &str, err_msg: &'static str) -> Result<u64, ParseValueError> {
788 parse_auto_radix::<u64>(size.trim())
789 .change_context(CONTEXT)
790 .attach_printable(err_msg)
791 }
792 match s.split(":").collect::<Vec<_>>().as_slice() {
793 &["zlib", uncompressed_size] => Ok(Self::Zlib(
794 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?
795 )),
796 &["zlib+sh", uncompressed_size, item_size] => Ok(Self::ZlibByteShuffling(
797 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?,
798 NonZeroU64::new(parse_u64(item_size, ITEM_SIZE_ERR)?)
799 .ok_or(report!(CONTEXT))
800 .attach_printable("Byte shuffling item size cannot be zero")?
801 )),
802 &["lz4", uncompressed_size] => Ok(Self::Lz4(
803 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?
804 )),
805 &["lz4+sh", uncompressed_size, item_size] => Ok(Self::Lz4ByteShuffling(
806 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?,
807 NonZeroU64::new(parse_u64(item_size, ITEM_SIZE_ERR)?)
808 .ok_or(report!(CONTEXT))
809 .attach_printable("Byte shuffling item size cannot be zero")?
810 )),
811 &["lz4hc", uncompressed_size] => Ok(Self::Lz4HC(
812 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?
813 )),
814 &["lz4hc+sh", uncompressed_size, item_size] => Ok(Self::Lz4HCByteShuffling(
815 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?,
816 NonZeroU64::new(parse_u64(item_size, ITEM_SIZE_ERR)?)
817 .ok_or(report!(CONTEXT))
818 .attach_printable("Byte shuffling item size cannot be zero")?
819 )),
820 &["zstd", uncompressed_size] => Ok(Self::Zstd(
821 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?
822 )),
823 &["zstd+sh", uncompressed_size, item_size] => Ok(Self::ZstdByteShuffling(
824 parse_u64(uncompressed_size, UNCOMPRESSED_SIZE_ERR)?,
825 NonZeroU64::new(parse_u64(item_size, ITEM_SIZE_ERR)?)
826 .ok_or(report!(CONTEXT))
827 .attach_printable("Byte shuffling item size cannot be zero")?
828 )),
829 _bad => Err(report!(CONTEXT)).attach_printable(format!(
830 "Unrecognized pattern: expected one of [zlib:len, zlib+sh:len:item-size, lz4:len, lz4+sh:len:item-size, lz4hc:len, lz4hc+sh:len:item-size, zstd:len, zstd+sh:len:item-size], found {s}"
831 ))
832 }
833 }
834}
835
836#[derive(Clone, PartialEq)]
838pub(crate) struct SubBlocks(pub(crate) Vec<(u64, u64)>);
839impl fmt::Debug for SubBlocks {
840 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
841 self.0.fmt(f)
842 }
843}
844impl fmt::Display for SubBlocks {
845 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
846 if self.0.len() > 0 {
847 let s = self.0.iter()
848 .map(|(uncompressed_size, item_size)| format!("{uncompressed_size},{item_size}"))
849 .reduce(|acc, next| format!("{acc}:{next}"))
850 .unwrap(); f.write_str(s.as_str())
852 } else {
853 f.write_str("")
854 }
855 }
856}
857impl FromStr for SubBlocks {
858 type Err = Report<ParseValueError>;
859 fn from_str(s: &str) -> Result<Self, ParseValueError> {
860 const CONTEXT: ParseValueError = ParseValueError("Compression Sub-Blocks");
861 let mut sub_blocks = vec![];
862 for token in s.split(":") {
863 if let Some((uncompressed_size, item_size)) = token.split_once(",") {
864 sub_blocks.push((
865 parse_auto_radix::<u64>(uncompressed_size.trim())
866 .change_context(CONTEXT)?,
867
868 parse_auto_radix::<u64>(item_size.trim())
869 .change_context(CONTEXT)?,
870 ));
871 } else {
872 return Err(report!(CONTEXT)).attach_printable(format!("Expected pattern x,i:y,j:...:z,k, found {s}"));
873 }
874 }
875 if sub_blocks.len() == 0 {
876 return Err(report!(CONTEXT)).attach_printable("Requires at least one compressed-size,uncompressed-size pair");
877 } else {
878 Ok(Self(sub_blocks))
879 }
880 }
881}
882
883#[repr(transparent)]
887#[derive(Clone, Copy, Debug, PartialEq)]
888pub struct CompressionLevel(u8);
889impl CompressionLevel {
890 pub const AUTO: Self = Self(0);
896}
897impl CompressionLevel {
898 pub fn new(level: u8) -> Result<Self, ParseValueError> {
902 match level {
903 val @ 1..=100 => Ok(Self(val)),
904 bad => Err(ParseValueError("CompressionLevel"))
905 .attach_printable(format!("Must be between 1 and 100, found {bad}"))
906 }
907 }
908}
909impl Default for CompressionLevel {
910 fn default() -> Self {
911 Self::AUTO
912 }
913}
914
915#[cfg(test)]
916mod tests {
917 use ndarray::Array3;
918
919 use super::*;
920
921 const GRADIENT_SIZE: u64 = 250 * 200 * 3;
922 fn check_gradient(arr: &Array3<u8>) {
923 for ((x, y, z), v) in arr.indexed_iter() {
924 match z {
925 0 => assert_eq!(*v, y as u8),
926 1 => assert_eq!(*v, x as u8),
927 2 => assert_eq!(*v, 255 - (x as u8).min(y as u8)),
928 _ => unreachable!(),
929 };
930 }
931 }
932
933 #[test]
934 fn plaintext() {
935 use hex_simd::AsciiCase;
936 let data: Vec<_> = (0u8..=255u8).collect();
937 let text = hex_simd::encode_to_string(&data, AsciiCase::Lower);
938 let hex = DataBlock {
939 location: Location::Text { encoding: TextEncoding::Hex, text },
940 byte_order: ByteOrder::Little, checksum: None,
942 compression: None,
943 };
944 let ctx = Context::distributed("tests/files/");
945 let mut reader = hex.location.raw_bytes(&ctx).unwrap();
946 let mut array = [0u8; 256];
947 reader.read_exact(&mut array).unwrap();
948 assert_eq!(array, &data[..]);
949
950 let text = base64_simd::STANDARD.encode_to_string(&data);
951 let base64 = DataBlock {
952 location: Location::Text { encoding: TextEncoding::Base64, text },
953 ..hex
954 };
955 let mut reader = base64.location.raw_bytes(&ctx).unwrap();
956 let mut array = [0u8; 256];
957 reader.read_exact(&mut array).unwrap();
958 assert_eq!(array, &data[..]);
959 }
960
961 #[test]
962 fn attachment() {
963 let attachment = DataBlock {
964 location: Location::Attachment { position: 0, size: GRADIENT_SIZE },
965 byte_order: ByteOrder::Little, checksum: None,
967 compression: None,
968 };
969
970 let ctx = Context::distributed("tests/files/");
972 let err = attachment.location.raw_bytes(&ctx).err().unwrap();
973 assert_eq!(err.current_context(), &ReadDataBlockError::UnsupportedLocation);
974
975 let file = File::open("tests/files/gradient.bin").unwrap();
976 let buf_read = BufReader::new(file);
977 let ctx = Context::monolithic(buf_read);
978 let mut reader = attachment.location.raw_bytes(&ctx).unwrap();
979 let mut array: Array3<u8> = Array3::zeros((200, 250, 3)); reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
981 check_gradient(&array);
982 }
983
984 #[test]
985 fn local_bin_file() {
986 let local = DataBlock {
987 location: Location::Path { path: "tests/files/gradient.bin".into(), index_id: None },
988 byte_order: ByteOrder::Little, checksum: None,
990 compression: None,
991 };
992 let ctx = Context::distributed("tests/files/");
993 let mut reader = local.location.raw_bytes(&ctx).unwrap();
994 let mut array: Array3<u8> = Array3::zeros((200, 250, 3)); reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
996 check_gradient(&array);
997
998 let relative = DataBlock {
999 location: Location::Path { path: "@header_dir/gradient.bin".into(), index_id: None },
1000 ..local
1001 };
1002 let mut reader = relative.location.raw_bytes(&ctx).unwrap();
1003 reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
1004 check_gradient(&array);
1005 }
1006
1007 #[cfg(all(feature = "remote-http", not(docsrs)))]
1008 #[test]
1009 fn http_bin_file() {
1010 use url::Host;
1011
1012 let http = DataBlock {
1013 location: Location::Url { url: "https://github.com/wrenby/xisf/raw/main/tests/files/gradient.bin".try_into().unwrap(), index_id: None },
1014 byte_order: ByteOrder::Little, checksum: None,
1016 compression: None,
1017 };
1018 let mut ctx = Context::distributed("tests/files/");
1019 let untrusted = http.location.raw_bytes(&ctx).err().unwrap();
1020 assert_eq!(untrusted.current_context(), &ReadDataBlockError::UntrustedHost(Host::Domain("github.com".into())));
1021
1022 ctx.trust_host(Host::Domain("github.com".into()));
1023 let mut reader = http.location.raw_bytes(&ctx).unwrap();
1024 let mut array: Array3<u8> = Array3::zeros((200, 250, 3)); reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
1026 check_gradient(&array);
1027 }
1028
1029 #[cfg(all(feature = "remote-ftp", not(docsrs)))]
1030 #[test]
1031 fn ftp_bin_file() {
1032 use testcontainers::{core::WaitFor, clients::Cli, images::generic::GenericImage, RunnableImage};
1033 let mut server: RunnableImage<_> = GenericImage::new("delfer/alpine-ftp-server", "latest")
1034 .with_env_var("USERS", "computer|deactivate_iguana|/files")
1035 .with_wait_for(WaitFor::message_on_stderr("passwd: password for computer changed by root"))
1036 .into();
1037 server = server.with_mapped_port((2121, 21))
1038 .with_volume(("./tests/files", "/files"));
1039
1040 for pasv in 21000..=21010 {
1041 server = server.with_mapped_port((pasv, pasv));
1042 }
1043
1044 let docker = Cli::docker();
1045 let container = docker.run(server);
1046
1047 let ftp = DataBlock {
1048 location: Location::Url { url: "ftp://computer:deactivate_iguana@localhost:2121/files/gradient.bin".try_into().unwrap(), index_id: None },
1049 byte_order: ByteOrder::Little, checksum: None,
1051 compression: None,
1052 };
1053 let ctx = Context::distributed("tests/files/");
1054 let mut reader = ftp.location.raw_bytes(&ctx).unwrap();
1055 let mut array: Array3<u8> = Array3::zeros((200, 250, 3)); reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
1057 check_gradient(&array);
1058
1059 container.stop();
1060 }
1061
1062 #[test]
1063 fn zlib() {
1064 let file = File::open("tests/files/gradient.bin.zlib").unwrap();
1065 let size = file.metadata().unwrap().len();
1066 let zlib = DataBlock {
1067 location: Location::Attachment { position: 0, size },
1068 byte_order: ByteOrder::Little, checksum: None,
1070 compression: Some(Compression {
1071 algorithm: CompressionAlgorithm::Zlib,
1072 sub_blocks: SubBlocks(vec![(u64::MAX, GRADIENT_SIZE)]),
1073 byte_shuffling: None,
1074 }),
1075 };
1076 let ctx = Context::monolithic(BufReader::new(file));
1077 let mut reader = zlib.decompressed_bytes(&ctx).unwrap();
1078 let mut array: Array3<u8> = Array3::zeros((200, 250, 3)); reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
1080 check_gradient(&array);
1081 }
1082
1083 #[test]
1084 fn lz4() {
1085 let file = File::open("tests/files/gradient.bin.lz4").unwrap();
1086 let size = file.metadata().unwrap().len();
1087 let lz4 = DataBlock {
1088 location: Location::Attachment { position: 0, size },
1089 byte_order: ByteOrder::Little, checksum: None,
1091 compression: Some(Compression {
1092 algorithm: CompressionAlgorithm::Lz4,
1093 sub_blocks: SubBlocks(vec![(u64::MAX, GRADIENT_SIZE)]),
1094 byte_shuffling: None,
1095 }),
1096 };
1097 let ctx = Context::monolithic(BufReader::new(file));
1098 let mut reader = lz4.decompressed_bytes(&ctx).unwrap();
1099 let mut array: Array3<u8> = Array3::zeros((200, 250, 3)); reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
1101 check_gradient(&array);
1102 }
1103
1104 #[test]
1105 fn zstd() {
1106 let file = File::open("tests/files/gradient.bin.zst").unwrap();
1107 let size = file.metadata().unwrap().len();
1108 let zstd = DataBlock {
1109 location: Location::Attachment { position: 0, size },
1110 byte_order: ByteOrder::Little, checksum: None,
1112 compression: Some(Compression {
1113 algorithm: CompressionAlgorithm::Zstd,
1114 sub_blocks: SubBlocks(vec![(u64::MAX, GRADIENT_SIZE)]),
1115 byte_shuffling: None,
1116 }),
1117 };
1118 let ctx = Context::monolithic(BufReader::new(file));
1119 let mut reader = zstd.decompressed_bytes(&ctx).unwrap();
1120 let mut array: Array3<u8> = Array3::zeros((200, 250, 3)); reader.read_exact(array.as_slice_mut().unwrap()).unwrap();
1122 check_gradient(&array);
1123 }
1124}