1use std::{
4 fs::File,
5 io::{BufRead, BufReader, Write},
6 ops::Range,
7 path::Path,
8};
9
10use base64::{engine::general_purpose, read::DecoderReader};
11use memchr::memmem::Finder;
12use once_cell::sync::Lazy;
13use quick_xml::{events::Event, Reader};
14
15use crate::{
16 data::{
17 binary::Binary, binary2::Binary2, stream::Stream, tabledata::TableData, TableOrBinOrBin2,
18 },
19 error::VOTableError,
20 impls::{
21 b64::read::{
22 B64Cleaner, BulkBinaryRowDeserializer, OwnedB64Cleaner, OwnedBulkBinaryRowDeserializer,
23 },
24 mem::VoidTableDataContent,
25 Schema, VOTableValue,
26 },
27 iter::elems::{
28 Binary2RowValueIterator, BinaryRowValueIterator, DataTableRowValueIterator, RowValueIterator,
29 },
30 resource::{Resource, ResourceOrTable, ResourceSubElem},
31 table::{Table, TableElem},
32 utils::{discard_comment, discard_event, is_empty},
33 votable::{VOTable, VOTableWrapper},
34 VOTableElement,
35};
36
37pub mod elems;
38pub mod strings;
39
40static TR_END_FINDER: Lazy<Finder<'static>> = Lazy::new(|| Finder::new("</TR>"));
41static STREAM_END_FINDER: Lazy<Finder<'static>> = Lazy::new(|| Finder::new("</STREAM>"));
42static TABLEDATA_END_FINDER: Lazy<Finder<'static>> = Lazy::new(|| Finder::new("</TABLEDATA>"));
43
44pub struct TabledataRowIterator<'a, R: BufRead> {
48 reader: &'a mut Reader<R>,
49 reader_buff: &'a mut Vec<u8>,
50 has_next: bool,
51}
52
53impl<'a, R: BufRead> TabledataRowIterator<'a, R> {
54 pub fn new(reader: &'a mut Reader<R>, reader_buff: &'a mut Vec<u8>) -> Self {
56 Self {
57 reader,
58 reader_buff,
59 has_next: true,
60 }
61 }
62}
63
64impl<'a, R: BufRead> Iterator for TabledataRowIterator<'a, R> {
65 type Item = Result<Vec<u8>, VOTableError>;
66
67 fn next(&mut self) -> Option<Self::Item> {
68 next_tabledata_row(self.reader, self.reader_buff, &mut self.has_next)
69 }
70}
71
72pub struct OwnedTabledataRowIterator<R: BufRead> {
76 pub reader: Reader<R>,
77 pub reader_buff: Vec<u8>,
78 pub votable: VOTable<VoidTableDataContent>,
79 pub has_next: bool,
80}
81
82impl<R: BufRead> OwnedTabledataRowIterator<R> {
83 pub fn skip_remaining_data(&mut self) -> Result<(), VOTableError> {
84 self
85 .reader
86 .read_to_end(
87 TableData::<VoidTableDataContent>::TAG_BYTES,
88 &mut self.reader_buff,
89 )
90 .map_err(VOTableError::Read)
91 }
92
93 pub fn read_to_end(self) -> Result<VOTable<VoidTableDataContent>, VOTableError> {
94 let Self {
95 mut reader,
96 mut reader_buff,
97 mut votable,
98 has_next: _,
99 } = self;
100 votable
101 .read_from_data_end_to_end(&mut reader, &mut reader_buff)
102 .map(|()| votable)
103 }
104}
105
106impl<R: BufRead> Iterator for OwnedTabledataRowIterator<R> {
107 type Item = Result<Vec<u8>, VOTableError>;
108
109 fn next(&mut self) -> Option<Self::Item> {
110 next_tabledata_row(&mut self.reader, &mut self.reader_buff, &mut self.has_next)
111 }
112}
113
114fn next_tabledata_row<T: BufRead>(
115 reader: &mut Reader<T>,
116 reader_buff: &mut Vec<u8>,
117 has_next: &mut bool,
118) -> Option<Result<Vec<u8>, VOTableError>> {
119 if *has_next {
120 reader_buff.clear();
121 loop {
122 let event = reader.read_event(reader_buff);
123 match event {
124 Ok(Event::Start(e)) if e.name() == b"TR" => {
125 let mut raw_row: Vec<u8> = Vec::with_capacity(256);
126 return Some(
127 read_until_found(TR_END_FINDER.as_ref(), reader, &mut raw_row).map(move |_| raw_row),
128 );
129 }
130 Ok(Event::End(e)) if e.name() == TableData::<VoidTableDataContent>::TAG_BYTES => {
131 *has_next = false;
132 return None;
133 }
134 Ok(Event::Eof) => return Some(Err(VOTableError::PrematureEOF("reading rows"))),
135 Ok(Event::Text(e)) if is_empty(&e) => {}
136 Ok(Event::Comment(e)) => {
137 discard_comment(&e, reader, TableData::<VoidTableDataContent>::TAG)
138 }
139 Ok(event) => discard_event(event, TableData::<VoidTableDataContent>::TAG),
140 Err(e) => return Some(Err(VOTableError::Read(e))),
141 }
142 }
143 } else {
144 None
145 }
146}
147
148pub struct OwnedTabledataRowIteratorWithPosition<R: BufRead> {
153 pub reader: Reader<R>,
154 pub reader_buff: Vec<u8>,
155 pub votable: VOTable<VoidTableDataContent>,
156 pub has_next: bool,
157 pub n_bytes_readen_cumul: usize,
158}
159
160impl<R: BufRead> OwnedTabledataRowIteratorWithPosition<R> {
161 pub fn skip_remaining_data(&mut self) -> Result<(), VOTableError> {
162 self
163 .reader
164 .read_to_end(
165 TableData::<VoidTableDataContent>::TAG_BYTES,
166 &mut self.reader_buff,
167 )
168 .map_err(VOTableError::Read)
169 }
170
171 pub fn read_to_end(self) -> Result<VOTable<VoidTableDataContent>, VOTableError> {
172 let Self {
173 mut reader,
174 mut reader_buff,
175 mut votable,
176 has_next: _,
177 n_bytes_readen_cumul: _,
178 } = self;
179 votable
180 .read_from_data_end_to_end(&mut reader, &mut reader_buff)
181 .map(|()| votable)
182 }
183}
184
185impl<R: BufRead> Iterator for OwnedTabledataRowIteratorWithPosition<R> {
186 type Item = Result<(Range<usize>, Vec<u8>), VOTableError>;
187
188 fn next(&mut self) -> Option<Self::Item> {
189 next_tabledata_row_with_position(
190 &mut self.reader,
191 &mut self.reader_buff,
192 &mut self.has_next,
193 &mut self.n_bytes_readen_cumul,
194 )
195 }
196}
197
198fn next_tabledata_row_with_position<T: BufRead>(
203 reader: &mut Reader<T>,
204 reader_buff: &mut Vec<u8>,
205 has_next: &mut bool,
206 n_bytes_readen_cumul: &mut usize,
207) -> Option<Result<(Range<usize>, Vec<u8>), VOTableError>> {
208 if *has_next {
209 reader_buff.clear();
210 loop {
211 let pos = reader.buffer_position();
213 let event = reader.read_event(reader_buff);
214 match event {
215 Ok(Event::Start(ref e)) if e.name() == b"TR" => {
216 let mut raw_row: Vec<u8> = Vec::with_capacity(256);
217 return match read_until_found(TR_END_FINDER.as_ref(), reader, &mut raw_row) {
218 Ok(n_bytes_readen) => {
219 let start = pos + *n_bytes_readen_cumul;
220 *n_bytes_readen_cumul += n_bytes_readen;
221 let end = reader.buffer_position() + *n_bytes_readen_cumul;
222 Some(Ok((start..end, raw_row)))
223 }
224 Err(e) => Some(Err(e)),
225 };
226 }
227 Ok(Event::End(ref e)) if e.name() == TableData::<VoidTableDataContent>::TAG_BYTES => {
228 *has_next = false;
229 return None;
230 }
231 Ok(Event::Eof) => return Some(Err(VOTableError::PrematureEOF("reading rows"))),
232 Ok(Event::Text(ref e)) if is_empty(e) => {}
233 Ok(Event::Comment(ref e)) => {
234 discard_comment(e, reader, TableData::<VoidTableDataContent>::TAG)
235 }
236 Ok(event) => discard_event(event, TableData::<VoidTableDataContent>::TAG),
237 Err(e) => return Some(Err(VOTableError::Read(e))),
238 }
239 }
240 } else {
241 None
242 }
243}
244
245fn read_until_found<T: BufRead>(
250 finder: Finder<'_>,
251 reader: &mut Reader<T>,
252 buf: &mut Vec<u8>,
253) -> Result<usize, VOTableError> {
254 let needle = finder.needle();
255 let l = needle.len();
256 let r = reader.get_mut();
257 let mut ending_pattern: Option<(&[u8], &[u8])> = None;
258 let mut read = 0;
259 loop {
260 let (done, used) = {
261 let available = match r.fill_buf() {
262 Ok(n) => n,
263 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
264 Err(e) => return Err(VOTableError::Io(e)),
265 };
266 if let Some((start, end)) = ending_pattern {
267 if available.starts_with(end) {
268 r.consume(end.len());
269 read += end.len();
270 return Ok(read);
271 } else {
272 buf.extend_from_slice(start);
274 ending_pattern = None;
275 }
276 }
277 match finder.find(available) {
278 Some(i) => {
279 buf.extend_from_slice(&available[..i]);
280 (true, i + l)
281 }
282 None => {
283 let len = available.len();
284 for sub in 1..l {
285 if available.ends_with(&needle[0..sub]) {
286 ending_pattern = Some((&needle[0..sub], &needle[sub..l]));
292 buf.extend_from_slice(&available[..len - sub]);
293 break;
294 }
295 }
296 if ending_pattern.is_none() {
297 buf.extend_from_slice(available);
298 }
299 (false, len)
300 }
301 }
302 };
303 r.consume(used);
304 read += used;
305 if done || used == 0 {
306 return Ok(read);
307 }
308 }
309}
310
311fn copy_until_found<R, W>(
312 finder: Finder<'_>,
313 reader: &mut R,
314 writer: &mut W,
315) -> Result<usize, VOTableError>
316where
317 R: BufRead,
318 W: Write,
319{
320 let needle = finder.needle();
321 let l = needle.len();
322 let r = reader;
323 let mut ending_pattern: Option<(&[u8], &[u8])> = None;
324 let mut read = 0;
325 loop {
326 let (done, used) = {
327 let available = match r.fill_buf() {
328 Ok(n) => n,
329 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
330 Err(e) => return Err(VOTableError::Io(e)),
331 };
332 if let Some((start, end)) = ending_pattern {
333 if available.starts_with(end) {
334 r.consume(end.len());
335 read += end.len();
336 return Ok(read);
337 } else {
338 writer.write_all(start).map_err(VOTableError::Io)?;
340 ending_pattern = None;
341 }
342 }
343 match finder.find(available) {
344 Some(i) => {
345 writer
346 .write_all(&available[..i])
347 .map_err(VOTableError::Io)?;
348 (true, i + l)
349 }
350 None => {
351 let len = available.len();
352 for sub in 1..l {
353 if available.ends_with(&needle[0..sub]) {
354 ending_pattern = Some((&needle[0..sub], &needle[sub..l]));
355 writer
356 .write_all(&available[..len - sub])
357 .map_err(VOTableError::Io)?;
358 break;
359 }
360 }
361 if ending_pattern.is_none() {
362 writer.write_all(available).map_err(VOTableError::Io)?;
363 }
364 (false, len)
365 }
366 }
367 };
368 r.consume(used);
369 read += used;
370 if done || used == 0 {
371 return Ok(read);
372 }
373 }
374}
375
376pub struct Binary1or2RowIterator<'a, R: BufRead> {
380 reader: BulkBinaryRowDeserializer<'a, R>,
381}
382
383impl<'a, R: BufRead> Binary1or2RowIterator<'a, R> {
384 pub fn new(reader: &'a mut Reader<R>, context: &[TableElem], is_binary2: bool) -> Self {
386 let b64_cleaner = B64Cleaner::new(reader.get_mut());
387 let decoder = DecoderReader::new(b64_cleaner, &general_purpose::STANDARD);
388 let schema: Vec<Schema> = context
390 .iter()
391 .filter_map(|table_elem| match table_elem {
392 TableElem::Field(field) => Some(field.into()),
393 _ => None,
394 })
395 .collect();
396 let reader = if is_binary2 {
397 BulkBinaryRowDeserializer::new_binary2(decoder, schema.as_slice())
398 } else {
399 BulkBinaryRowDeserializer::new_binary(decoder, schema.as_slice())
400 };
401 Self { reader }
402 }
403}
404
405impl<'a, R: BufRead> Iterator for Binary1or2RowIterator<'a, R> {
406 type Item = Result<Vec<u8>, VOTableError>;
407
408 fn next(&mut self) -> Option<Self::Item> {
409 if self.reader.has_data_left().unwrap_or(false) {
410 let mut row = Vec::with_capacity(512);
411 Some(self.reader.read_raw_row(&mut row).map(|_| {
412 row.shrink_to_fit();
413 row
414 }))
415 } else {
416 None
417 }
418 }
419}
420
421pub struct OwnedBinary1or2RowIterator<R: BufRead> {
422 pub votable: VOTable<VoidTableDataContent>,
423 pub reader: OwnedBulkBinaryRowDeserializer<R>,
424 pub is_binary2: bool,
425}
426
427impl<R: BufRead> OwnedBinary1or2RowIterator<R> {
428 pub fn new(reader: Reader<R>, votable: VOTable<VoidTableDataContent>, is_binary2: bool) -> Self {
429 let b64_cleaner = OwnedB64Cleaner::new(reader.into_inner());
430 let decoder = DecoderReader::new(b64_cleaner, &general_purpose::STANDARD);
431 let schema: Vec<Schema> = votable
433 .get_first_table()
434 .unwrap() .elems
436 .iter()
437 .filter_map(|table_elem| match table_elem {
438 TableElem::Field(field) => Some(field.into()),
439 _ => None,
440 })
441 .collect();
442 let reader = if is_binary2 {
443 OwnedBulkBinaryRowDeserializer::new_binary2(decoder, schema.as_slice())
444 } else {
445 OwnedBulkBinaryRowDeserializer::new_binary(decoder, schema.as_slice())
446 };
447 Self {
448 votable,
449 reader,
450 is_binary2,
451 }
452 }
453
454 pub fn skip_remaining_data(mut self) -> Result<Self, VOTableError> {
455 match self.reader.has_data_left() {
456 Ok(true) => {
457 let Self {
458 votable,
459 reader,
460 is_binary2,
461 } = self;
462 reader.skip_remaining_data().map(|reader| Self {
463 votable,
464 reader,
465 is_binary2,
466 })
467 }
468 Ok(false) => Ok(self),
469 Err(e) => Err(e),
470 }
471 }
472
473 pub fn read_to_end(self) -> Result<VOTable<VoidTableDataContent>, VOTableError> {
474 let Self {
475 mut votable,
476 reader,
477 is_binary2,
478 } = self;
479 let mut reader = Reader::from_reader(reader.into_inner());
481 reader.check_end_names(false);
482 let mut reader_buff: Vec<u8> = Vec::with_capacity(512);
483 reader
484 .read_to_end(
485 if is_binary2 {
486 b"BINARY2".to_vec()
487 } else {
488 b"BINARY".to_vec()
489 },
490 &mut reader_buff,
491 )
492 .map_err(|e| VOTableError::Custom(format!("Reading to BINARY or BINARY2... {:?}", e)))?;
493 votable
494 .read_from_data_end_to_end(&mut reader, &mut reader_buff)
495 .map(|()| votable)
496 }
497}
498
499impl<R: BufRead> Iterator for OwnedBinary1or2RowIterator<R> {
500 type Item = Result<Vec<u8>, VOTableError>;
501
502 fn next(&mut self) -> Option<Self::Item> {
503 if self.reader.has_data_left().unwrap_or(false) {
505 let mut row = Vec::with_capacity(512);
506 Some(self.reader.read_raw_row(&mut row).map(|_| {
507 row.shrink_to_fit();
508 row
509 }))
510 } else {
511 None
512 }
513 }
514}
515
516pub struct SimpleVOTableRowIterator<R: BufRead> {
519 pub reader: Reader<R>,
520 pub reader_buff: Vec<u8>,
521 pub votable: VOTable<VoidTableDataContent>,
522 pub data_type: TableOrBinOrBin2,
523}
524
525impl SimpleVOTableRowIterator<BufReader<File>> {
526 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, VOTableError> {
530 let mut reader_buff: Vec<u8> = Vec::with_capacity(1024);
531 let (votable, resource, reader) =
532 VOTableWrapper::<VoidTableDataContent>::manual_from_ivoa_xml_file(path, &mut reader_buff)?;
533 SimpleVOTableRowIterator::<BufReader<File>>::from_votable_resource_reader(
534 votable,
535 resource,
536 reader,
537 reader_buff,
538 )
539 }
540}
541
542impl<R: BufRead> SimpleVOTableRowIterator<R> {
543 pub fn from_reader(reader: R) -> Result<Self, VOTableError> {
544 let mut reader_buff: Vec<u8> = Vec::with_capacity(1024);
545 let (votable, resource, reader) =
546 VOTable::from_reader_till_next_resource(reader, &mut reader_buff)?;
547 Self::from_votable_resource_reader(votable, resource, reader, reader_buff)
548 }
549
550 fn from_votable_resource_reader(
551 mut votable: VOTable<VoidTableDataContent>,
552 mut resource: Resource<VoidTableDataContent>,
553 mut reader: Reader<R>,
554 mut reader_buff: Vec<u8>,
555 ) -> Result<Self, VOTableError> {
556 let mut sub_elem = resource
557 .read_till_next_table_by_ref(&mut reader, &mut reader_buff)
558 .and_then(|opt_sub_elem| {
559 opt_sub_elem.ok_or_else(|| {
560 VOTableError::Custom(String::from("No table found in the VOTable resource!"))
561 })
562 })?;
563 match &mut sub_elem {
564 ResourceSubElem {
565 links: _,
566 resource_or_table: ResourceOrTable::<_>::Table(table),
567 ..
568 } => {
569 if let Some(mut data) = table.read_till_data_by_ref(&mut reader, &mut reader_buff)? {
570 match data.read_till_table_bin_or_bin2_or_fits_by_ref(&mut reader, &mut reader_buff)? {
571 Some(TableOrBinOrBin2::TableData) => {
572 table.set_data_by_ref(data);
573 resource.push_sub_elem_by_ref(sub_elem);
574 votable.push_resource_by_ref(resource);
575 Ok(SimpleVOTableRowIterator {
576 reader,
577 reader_buff,
578 votable,
579 data_type: TableOrBinOrBin2::TableData,
580 })
581 }
582 Some(TableOrBinOrBin2::Binary) => {
583 let stream = Stream::open_stream(&mut reader, &mut reader_buff)?;
584 let binary = Binary::from_stream(stream);
585 data.set_binary_by_ref(binary);
586 table.set_data_by_ref(data);
587 resource.push_sub_elem_by_ref(sub_elem);
588 votable.push_resource_by_ref(resource);
589 Ok(SimpleVOTableRowIterator {
590 reader,
591 reader_buff,
592 votable,
593 data_type: TableOrBinOrBin2::Binary,
594 })
595 }
596 Some(TableOrBinOrBin2::Binary2) => {
597 let stream = Stream::open_stream(&mut reader, &mut reader_buff)?;
598 let binary2 = Binary2::from_stream(stream);
599 data.set_binary2_by_ref(binary2);
600 table.set_data_by_ref(data);
601 resource.push_sub_elem_by_ref(sub_elem);
602 votable.push_resource_by_ref(resource);
603 Ok(SimpleVOTableRowIterator {
604 reader,
605 reader_buff,
606 votable,
607 data_type: TableOrBinOrBin2::Binary2,
608 })
609 }
610 Some(TableOrBinOrBin2::Fits(_)) => Err(VOTableError::Custom(String::from(
611 "FITS data not supported",
612 ))),
613 None => Err(VOTableError::Custom(String::from(
614 "No data found in the first VOtable table",
615 ))),
616 }
617 } else {
618 Err(VOTableError::Custom(String::from(
619 "No data found in the first VOTable table",
620 )))
621 }
622 }
623 _ => Err(VOTableError::Custom(String::from("Not a table?!"))),
624 }
625 }
626
627 pub fn data_type(&self) -> &TableOrBinOrBin2 {
628 &self.data_type
629 }
630
631 pub fn votable(&self) -> &VOTable<VoidTableDataContent> {
632 &self.votable
633 }
634
635 pub fn borrow_mut_reader_and_buff(&mut self) -> (&mut Reader<R>, &mut Vec<u8>) {
640 (&mut self.reader, &mut self.reader_buff)
641 }
642
643 pub fn to_row_value_iter(&mut self) -> RowValueIterator<'_, R> {
648 let table = self.votable.get_first_table_mut().unwrap();
649 let schema: Vec<Schema> = table
650 .elems
651 .iter()
652 .filter_map(|table_elem| match table_elem {
653 TableElem::Field(field) => Some(field.into()),
654 _ => None,
655 })
656 .collect();
657 match &self.data_type {
658 TableOrBinOrBin2::TableData => RowValueIterator::TableData(DataTableRowValueIterator::new(
659 &mut self.reader,
660 &mut self.reader_buff,
661 table,
662 schema,
663 )),
664 TableOrBinOrBin2::Binary => {
665 RowValueIterator::BinaryTable(BinaryRowValueIterator::new(&mut self.reader, table, schema))
666 }
667 TableOrBinOrBin2::Binary2 => RowValueIterator::Binary2Table(Binary2RowValueIterator::new(
668 &mut self.reader,
669 table,
670 schema,
671 )),
672 _ => unreachable!(),
673 }
674 }
675
676 pub fn to_owned_tabledata_row_iterator(self) -> OwnedTabledataRowIterator<R> {
678 assert!(matches!(self.data_type, TableOrBinOrBin2::TableData));
679 OwnedTabledataRowIterator {
680 reader: self.reader,
681 reader_buff: self.reader_buff,
682 votable: self.votable,
683 has_next: true,
684 }
685 }
686
687 pub fn to_owned_tabledata_row_iterator_with_position(
690 self,
691 ) -> OwnedTabledataRowIteratorWithPosition<R> {
692 assert!(matches!(self.data_type, TableOrBinOrBin2::TableData));
693 OwnedTabledataRowIteratorWithPosition {
694 reader: self.reader,
695 reader_buff: self.reader_buff,
696 votable: self.votable,
697 has_next: true,
698 n_bytes_readen_cumul: 0,
699 }
700 }
701
702 pub fn to_owned_binary_row_iterator(self) -> OwnedBinary1or2RowIterator<R> {
704 assert!(matches!(self.data_type, TableOrBinOrBin2::Binary));
705 OwnedBinary1or2RowIterator::new(self.reader, self.votable, false)
706 }
707
708 pub fn to_owned_binary2_row_iterator(self) -> OwnedBinary1or2RowIterator<R> {
710 assert!(matches!(self.data_type, TableOrBinOrBin2::Binary2));
711 OwnedBinary1or2RowIterator::new(self.reader, self.votable, true)
712 }
713
714 pub fn skip_remaining_data(&mut self) -> Result<(), VOTableError> {
719 match self.data_type {
720 TableOrBinOrBin2::TableData => self
721 .reader
722 .read_to_end(
723 TableData::<VoidTableDataContent>::TAG_BYTES,
724 &mut self.reader_buff,
725 )
726 .map_err(VOTableError::Read),
727 TableOrBinOrBin2::Binary => self
728 .reader
729 .read_to_end(
730 Stream::<VoidTableDataContent>::TAG_BYTES,
731 &mut self.reader_buff,
732 )
733 .map_err(VOTableError::Read)
734 .and_then(|_| {
735 self
736 .reader
737 .read_to_end(
738 Binary::<VoidTableDataContent>::TAG_BYTES,
739 &mut self.reader_buff,
740 )
741 .map_err(VOTableError::Read)
742 }),
743 TableOrBinOrBin2::Binary2 => self
744 .reader
745 .read_to_end(
746 Stream::<VoidTableDataContent>::TAG_BYTES,
747 &mut self.reader_buff,
748 )
749 .map_err(VOTableError::Read)
750 .and_then(|_| {
751 self
752 .reader
753 .read_to_end(
754 Binary2::<VoidTableDataContent>::TAG_BYTES,
755 &mut self.reader_buff,
756 )
757 .map_err(VOTableError::Read)
758 }),
759 _ => unreachable!(),
760 }
761 }
762
763 pub fn copy_remaining_data<W: Write>(&mut self, mut write: W) -> Result<(), VOTableError> {
768 match self.data_type {
769 TableOrBinOrBin2::TableData => copy_until_found(
770 TABLEDATA_END_FINDER.as_ref(),
771 self.reader.get_mut(),
772 &mut write,
773 ),
774 TableOrBinOrBin2::Binary => copy_until_found(
775 STREAM_END_FINDER.as_ref(),
776 self.reader.get_mut(),
777 &mut write,
778 ),
779 TableOrBinOrBin2::Binary2 => copy_until_found(
780 STREAM_END_FINDER.as_ref(),
781 self.reader.get_mut(),
782 &mut write,
783 ),
784 _ => unreachable!(),
785 }
786 .map(|_| ())
787 }
788
789 pub fn end_of_it(self) -> VOTable<VoidTableDataContent> {
790 self.votable
791 }
792
793 pub fn read_to_end(self) -> Result<VOTable<VoidTableDataContent>, VOTableError> {
794 let Self {
795 mut reader,
796 mut reader_buff,
797 mut votable,
798 data_type: _,
799 } = self;
800 votable
801 .read_from_data_end_to_end(&mut reader, &mut reader_buff)
802 .map(|()| votable)
803 }
804}
805
806pub trait TableIter: Iterator<Item = Result<Vec<VOTableValue>, VOTableError>> {
808 fn table(&mut self) -> &mut Table<VoidTableDataContent>;
810 fn read_to_end(self) -> Result<(), VOTableError>;
812}
813
814pub struct VOTableIterator<R: BufRead> {
821 reader: Reader<R>,
822 reader_buff: Vec<u8>,
823 votable: VOTable<VoidTableDataContent>,
824 resource_stack: Vec<Resource<VoidTableDataContent>>,
825 resource_sub_elems_stack: Vec<ResourceSubElem<VoidTableDataContent>>,
826}
827
828impl VOTableIterator<BufReader<File>> {
829 pub fn from_file<P: AsRef<Path>>(
830 path: P,
831 ) -> Result<VOTableIterator<BufReader<File>>, VOTableError> {
832 let mut reader_buff: Vec<u8> = Vec::with_capacity(1024);
833 let (votable, resource, reader) =
834 VOTableWrapper::<VoidTableDataContent>::manual_from_ivoa_xml_file(path, &mut reader_buff)?;
835 let mut resource_stack = Vec::with_capacity(4);
836 resource_stack.push(resource);
837 Ok(VOTableIterator::<BufReader<File>> {
838 reader,
839 reader_buff,
840 votable,
841 resource_stack,
842 resource_sub_elems_stack: Vec::with_capacity(10),
843 })
844 }
845
846 pub fn end_of_it(self) -> VOTable<VoidTableDataContent> {
847 self.votable
848 }
849}
850
851impl<R: BufRead> VOTableIterator<R> {
852 pub fn from_reader(reader: R) -> Result<Self, VOTableError> {
853 let mut reader_buff: Vec<u8> = Vec::with_capacity(1024);
854 let (votable, resource, reader) =
855 VOTable::from_reader_till_next_resource(reader, &mut reader_buff)?;
856 let mut resource_stack = Vec::with_capacity(4);
857 resource_stack.push(resource);
858 Ok(VOTableIterator::<R> {
859 reader,
860 reader_buff,
861 votable,
862 resource_stack,
863 resource_sub_elems_stack: Vec::with_capacity(10),
864 })
865 }
866
867 pub fn read_all_skipping_data(mut self) -> Result<VOTable<VoidTableDataContent>, VOTableError> {
868 while let Some(table_it) = self.next_table_row_value_iter()? {
869 table_it.read_to_end()?;
870 }
871 assert!(self.resource_sub_elems_stack.is_empty());
872 assert!(self.resource_stack.is_empty());
873 Ok(self.votable)
874 }
875
876 pub fn next_table_row_value_iter(
877 &mut self,
878 ) -> Result<Option<RowValueIterator<'_, R>>, VOTableError> {
879 loop {
880 if let Some(mut sub_resource) = self.resource_sub_elems_stack.pop() {
881 match &mut sub_resource.resource_or_table {
882 ResourceOrTable::<_>::Resource(r) => {
883 match r
884 .read_till_next_resource_or_table_by_ref(&mut self.reader, &mut self.reader_buff)?
885 {
886 Some(s) => {
887 self.resource_sub_elems_stack.push(sub_resource);
888 self.resource_sub_elems_stack.push(s);
889 }
890 None => {
891 if let Some(last) = self.resource_sub_elems_stack.last_mut() {
892 last.push_sub_elem_by_ref(sub_resource)?;
893 } else if let Some(last) = self.resource_stack.last_mut() {
894 last.push_sub_elem_by_ref(sub_resource);
895 } else {
896 return Err(VOTableError::Custom(String::from(
897 "No more RESOURCE in the stack :o/",
898 )));
899 }
900 }
901 }
902 }
903 ResourceOrTable::<_>::Table(table) => {
904 if let Some(mut data) =
905 table.read_till_data_by_ref(&mut self.reader, &mut self.reader_buff)?
906 {
907 match data.read_till_table_bin_or_bin2_or_fits_by_ref(
908 &mut self.reader,
909 &mut self.reader_buff,
910 )? {
911 Some(TableOrBinOrBin2::TableData) => {
912 table.set_data_by_ref(data);
913
914 let schema: Vec<Schema> = table
915 .elems
916 .iter()
917 .filter_map(|table_elem| match table_elem {
918 TableElem::Field(field) => Some(field.into()),
919 _ => None,
920 })
921 .collect();
922
923 if let Some(last) = self.resource_sub_elems_stack.last_mut() {
924 last.push_sub_elem_by_ref(sub_resource)?;
925 } else if let Some(last) = self.resource_stack.last_mut() {
926 last.push_sub_elem_by_ref(sub_resource);
927 } else {
928 return Err(VOTableError::Custom(String::from(
929 "No more RESOURCE in the stack :o/",
930 )));
931 }
932
933 let row_it = DataTableRowValueIterator::new(
934 &mut self.reader,
935 &mut self.reader_buff,
936 self
937 .resource_stack
938 .last_mut()
939 .unwrap()
940 .get_last_table_mut()
941 .unwrap(),
942 schema,
943 );
944 return Ok(Some(RowValueIterator::TableData(row_it)));
945 }
946 Some(TableOrBinOrBin2::Binary) => {
947 let stream = Stream::open_stream(&mut self.reader, &mut self.reader_buff)?;
948 let binary = Binary::from_stream(stream);
949 data.set_binary_by_ref(binary);
950 table.set_data_by_ref(data);
951
952 let schema: Vec<Schema> = table
953 .elems
954 .iter()
955 .filter_map(|table_elem| match table_elem {
956 TableElem::Field(field) => Some(field.into()),
957 _ => None,
958 })
959 .collect();
960
961 if let Some(last) = self.resource_sub_elems_stack.last_mut() {
962 last.push_sub_elem_by_ref(sub_resource)?;
963 } else if let Some(last) = self.resource_stack.last_mut() {
964 last.push_sub_elem_by_ref(sub_resource);
965 } else {
966 return Err(VOTableError::Custom(String::from(
967 "No more RESOURCE in the stack :o/",
968 )));
969 }
970
971 let row_it = BinaryRowValueIterator::new(
972 &mut self.reader,
973 self
974 .resource_stack
975 .last_mut()
976 .unwrap()
977 .get_last_table_mut()
978 .unwrap(),
979 schema,
980 );
981 return Ok(Some(RowValueIterator::BinaryTable(row_it)));
982 }
983 Some(TableOrBinOrBin2::Binary2) => {
984 let stream = Stream::open_stream(&mut self.reader, &mut self.reader_buff)?;
985 let binary2 = Binary2::from_stream(stream);
986 data.set_binary2_by_ref(binary2);
987 table.set_data_by_ref(data);
988
989 let schema: Vec<Schema> = table
990 .elems
991 .iter()
992 .filter_map(|table_elem| match table_elem {
993 TableElem::Field(field) => Some(field.into()),
994 _ => None,
995 })
996 .collect();
997
998 if let Some(last) = self.resource_sub_elems_stack.last_mut() {
999 last.push_sub_elem_by_ref(sub_resource)?;
1000 } else if let Some(last) = self.resource_stack.last_mut() {
1001 last.push_sub_elem_by_ref(sub_resource);
1002 } else {
1003 return Err(VOTableError::Custom(String::from(
1004 "No more RESOURCE in the stack :o/",
1005 )));
1006 }
1007
1008 let row_it = Binary2RowValueIterator::new(
1009 &mut self.reader,
1010 self
1011 .resource_stack
1012 .last_mut()
1013 .unwrap()
1014 .get_last_table_mut()
1015 .unwrap(),
1016 schema,
1017 );
1018 return Ok(Some(RowValueIterator::Binary2Table(row_it)));
1019 }
1020 Some(TableOrBinOrBin2::Fits(fits)) => {
1021 data.set_fits_by_ref(fits);
1022 table.set_data_by_ref(data);
1023
1024 if let Some(last) = self.resource_sub_elems_stack.last_mut() {
1025 last.push_sub_elem_by_ref(sub_resource)?;
1026 } else if let Some(last) = self.resource_stack.last_mut() {
1027 last.push_sub_elem_by_ref(sub_resource);
1028 } else {
1029 return Err(VOTableError::Custom(String::from(
1030 "No more RESOURCE in the stack :o/",
1031 )));
1032 }
1033 }
1034 None => {
1035 return Err(VOTableError::Custom(String::from("Unexpected empty DATA")));
1036 }
1037 }
1038 } else {
1039 if let Some(last) = self.resource_sub_elems_stack.last_mut() {
1040 last.push_sub_elem_by_ref(sub_resource)?;
1041 } else if let Some(last) = self.resource_stack.last_mut() {
1042 last.push_sub_elem_by_ref(sub_resource);
1043 } else {
1044 return Err(VOTableError::Custom(String::from(
1045 "No more RESOURCE in the stack :o/",
1046 )));
1047 }
1048 }
1049 }
1050 }
1051 } else if let Some(mut resource) = self.resource_stack.pop() {
1056 match resource
1058 .read_till_next_resource_or_table_by_ref(&mut self.reader, &mut self.reader_buff)?
1059 {
1060 Some(resource_sub_elem) => {
1062 self.resource_sub_elems_stack.push(resource_sub_elem);
1063 self.resource_stack.push(resource);
1064 }
1065 None => self.votable.push_resource_by_ref(resource),
1067 }
1068 } else {
1069 match self
1071 .votable
1072 .read_till_next_resource_by_ref(&mut self.reader, &mut self.reader_buff)?
1073 {
1074 Some(resource) => self.resource_stack.push(resource),
1075 None => return Ok(None),
1076 }
1077 }
1078 }
1079 }
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084 use std::io::Cursor;
1085
1086 use serde::{de::DeserializeSeed, Deserializer};
1087
1088 use crate::{
1089 data::TableOrBinOrBin2,
1090 impls::{
1091 b64::read::BinaryDeserializer, visitors::FixedLengthArrayVisitor, Schema, VOTableValue,
1092 },
1093 iter::{Binary1or2RowIterator, SimpleVOTableRowIterator, TabledataRowIterator},
1094 table::TableElem,
1095 };
1096
1097 #[test]
1098 fn test_simple_votable_read_iter_tabledata() {
1099 println!();
1100 println!("-- next_table_row_value_iter dss12.vot --");
1101 println!();
1102
1103 let mut svor = SimpleVOTableRowIterator::from_file("resources/sdss12.vot").unwrap();
1104 assert!(matches!(svor.data_type(), &TableOrBinOrBin2::TableData));
1105
1106 let raw_row_it = TabledataRowIterator::new(&mut svor.reader, &mut svor.reader_buff);
1108 for raw_row_res in raw_row_it {
1109 eprintln!(
1110 "ROW: {:?}",
1111 std::str::from_utf8(&raw_row_res.unwrap()).unwrap()
1112 );
1113 }
1114 let votable = svor.read_to_end().unwrap();
1115 println!("VOTable: {}", votable.wrap().to_toml_string(true).unwrap());
1116
1117 assert!(true)
1118 }
1119
1120 #[test]
1121 fn test_simple_votable_read_iter_tabledata_owned() {
1122 println!();
1123 println!("-- next_table_row_value_iter sdss12.vot --");
1124 println!();
1125
1126 let svor = SimpleVOTableRowIterator::from_file("resources/sdss12.vot").unwrap();
1127 assert!(matches!(svor.data_type(), &TableOrBinOrBin2::TableData));
1128 let mut raw_row_it = svor.to_owned_tabledata_row_iterator();
1129 let mut n_row = 0_u32;
1130 while let Some(raw_row_res) = raw_row_it.next() {
1131 eprintln!(
1132 "ROW: {:?}",
1133 std::str::from_utf8(&raw_row_res.unwrap()).unwrap()
1134 );
1135 n_row += 1;
1136 }
1137 let votable = raw_row_it.read_to_end().unwrap();
1138 println!("VOTable: {}", votable.wrap().to_toml_string(true).unwrap());
1139
1140 assert_eq!(n_row, 50)
1141 }
1142
1143 #[test]
1144 fn test_simple_votable_read_iter_binary1() {
1145 println!();
1146 println!("-- next_table_row_value_iter binary.b64 --");
1147 println!();
1148
1149 let mut svor = SimpleVOTableRowIterator::from_file("resources/binary.b64").unwrap();
1150 assert!(matches!(svor.data_type(), &TableOrBinOrBin2::Binary));
1151
1152 let context = svor.votable.get_first_table().unwrap().elems.as_slice();
1153 let raw_row_it = Binary1or2RowIterator::new(&mut svor.reader, context, false);
1155 let schema: Vec<Schema> = context
1156 .iter()
1157 .filter_map(|table_elem| match table_elem {
1158 TableElem::Field(field) => Some(field.into()),
1159 _ => None,
1160 })
1161 .collect();
1162 let schema_len = schema.len();
1163 for raw_row_res in raw_row_it {
1164 eprintln!(
1169 "ROW: {:?}",
1170 raw_row_res.map(|row| {
1171 let mut binary_deser = BinaryDeserializer::new(Cursor::new(row));
1172 let mut row: Vec<VOTableValue> = Vec::with_capacity(schema_len);
1173 for field_schema in schema.iter() {
1174 let field = field_schema.deserialize(&mut binary_deser).unwrap();
1175 row.push(field);
1176 }
1177 row
1178 })
1179 );
1180 }
1181 let votable = svor.read_to_end().unwrap();
1182 println!("VOTable: {}", votable.wrap().to_toml_string(true).unwrap());
1183 }
1184
1185 #[test]
1186 fn test_simple_votable_read_iter_binary2() {
1187 println!();
1188 println!("-- next_table_row_value_iter gaia_dr3.b264 --");
1189 println!();
1190
1191 let mut svor = SimpleVOTableRowIterator::from_file("resources/gaia_dr3.b264").unwrap();
1192 assert!(matches!(svor.data_type(), &TableOrBinOrBin2::Binary2));
1193
1194 let context = svor.votable.resources[0]
1195 .get_first_table()
1196 .unwrap()
1197 .elems
1198 .as_slice();
1199 let schema: Vec<Schema> = context
1201 .iter()
1202 .filter_map(|table_elem| match table_elem {
1203 TableElem::Field(field) => Some(field.into()),
1204 _ => None,
1205 })
1206 .collect();
1207 let schema_len = schema.len();
1208 let n_bytes = (schema.len() + 7) / 8;
1209 let raw_row_it = Binary1or2RowIterator::new(&mut svor.reader, context, true);
1210 for raw_row_res in raw_row_it {
1211 eprintln!(
1216 "ROW: {:?}",
1217 raw_row_res.map(|row| {
1218 let mut binary_deser = BinaryDeserializer::new(Cursor::new(row));
1219 let mut row: Vec<VOTableValue> = Vec::with_capacity(schema_len);
1220 let bytes_visitor = FixedLengthArrayVisitor::new(n_bytes);
1221 let null_flags: Vec<u8> = (&mut binary_deser)
1222 .deserialize_tuple(n_bytes, bytes_visitor)
1223 .unwrap();
1224 for (i_col, field_schema) in schema.iter().enumerate() {
1225 let field = field_schema.deserialize(&mut binary_deser).unwrap();
1226 let is_null = (null_flags[i_col >> 3] & (128_u8 >> (i_col & 7))) != 0;
1227 if is_null {
1228 row.push(VOTableValue::Null)
1229 } else {
1230 row.push(field)
1231 };
1232 }
1233 row
1234 })
1235 );
1236 }
1237 let votable = svor.read_to_end().unwrap();
1238 println!("VOTable: {}", votable.wrap().to_toml_string(true).unwrap());
1239
1240 assert!(true)
1241 }
1242
1243 }