1use std::collections::BTreeMap;
15use std::ops::Range;
16#[cfg(feature = "async")]
17use std::sync::MutexGuard;
18use std::sync::{Arc, Mutex, OnceLock};
19
20use bytes::Bytes;
21use object_store::path::Path as ObjectPath;
22use object_store::{ObjectStore, ObjectStoreExt};
23use url::Url;
24
25use crate::decode::DecodeOptions;
26use crate::error::{Result, TensogramError};
27use crate::framing;
28use crate::metadata;
29use crate::types::{DataObjectDescriptor, GlobalMetadata, IndexFrame};
30use crate::wire::{
31 DATA_OBJECT_FOOTER_SIZE, DataObjectFlags, FRAME_END, FRAME_HEADER_SIZE, FrameHeader, FrameType,
32 MAGIC, MessageFlags, POSTAMBLE_SIZE, PREAMBLE_SIZE, Postamble, Preamble,
33};
34
35const REMOTE_SCHEMES: &[&str] = &["s3", "s3a", "gs", "az", "azure", "http", "https"];
38
39static SHARED_RUNTIME: OnceLock<std::result::Result<tokio::runtime::Runtime, String>> =
46 OnceLock::new();
47
48fn shared_runtime() -> Result<&'static tokio::runtime::Runtime> {
49 SHARED_RUNTIME
50 .get_or_init(|| {
51 tokio::runtime::Builder::new_multi_thread()
52 .worker_threads(2)
53 .enable_all()
54 .thread_name("tensogram-remote-io")
55 .build()
56 .map_err(|e| format!("tokio runtime: {e}"))
57 })
58 .as_ref()
59 .map_err(|e| TensogramError::Remote(e.clone()))
60}
61
62fn block_on_shared<T, Fut>(future: Fut) -> Result<T>
80where
81 T: Send,
82 Fut: std::future::Future<Output = std::result::Result<T, object_store::Error>> + Send,
83{
84 let rt = shared_runtime()?;
85 let handle = rt.handle().clone();
86
87 match tokio::runtime::Handle::try_current() {
88 Err(_) => {
89 handle
91 .block_on(future)
92 .map_err(|e| TensogramError::Remote(e.to_string()))
93 }
94 Ok(current) if current.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread => {
95 tokio::task::block_in_place(|| {
98 handle
99 .block_on(future)
100 .map_err(|e| TensogramError::Remote(e.to_string()))
101 })
102 }
103 Ok(_) => {
104 std::thread::scope(|s| {
107 match s
108 .spawn(|| {
109 handle
110 .block_on(future)
111 .map_err(|e| TensogramError::Remote(e.to_string()))
112 })
113 .join()
114 {
115 Ok(result) => result,
116 Err(_) => Err(TensogramError::Remote(
117 "remote I/O thread panicked".to_string(),
118 )),
119 }
120 })
121 }
122 }
123}
124
125pub fn is_remote_url(source: &str) -> bool {
126 match source.find("://") {
127 Some(pos) => {
128 let scheme = &source[..pos];
129 REMOTE_SCHEMES
130 .iter()
131 .any(|s| s.eq_ignore_ascii_case(scheme))
132 }
133 None => false,
134 }
135}
136
137#[derive(Debug, Clone)]
140struct MessageLayout {
141 offset: u64,
142 length: u64,
143 preamble: Preamble,
144 index: Option<IndexFrame>,
145 global_metadata: Option<GlobalMetadata>,
146}
147
148pub(crate) struct RemoteBackend {
151 source_url: String,
152 store: Arc<dyn ObjectStore>,
153 path: ObjectPath,
154 file_size: u64,
155 state: Mutex<RemoteState>,
156}
157
158#[derive(Debug, Default)]
159struct RemoteState {
160 layouts: Vec<MessageLayout>,
161 next_scan_offset: u64,
162 scan_complete: bool,
163}
164
165impl std::fmt::Debug for RemoteBackend {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 f.debug_struct("RemoteBackend")
168 .field("source", &self.source_url)
169 .field("file_size", &self.file_size)
170 .field(
171 "messages",
172 &self
173 .state
174 .lock()
175 .map(|state| state.layouts.len())
176 .unwrap_or(0),
177 )
178 .finish()
179 }
180}
181
182impl RemoteBackend {
183 pub(crate) fn source_url(&self) -> &str {
184 &self.source_url
185 }
186
187 #[cfg(feature = "async")]
188 fn lock_state(&self) -> Result<MutexGuard<'_, RemoteState>> {
189 self.state
190 .lock()
191 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))
192 }
193
194 pub(crate) fn open(source: &str, storage_options: &BTreeMap<String, String>) -> Result<Self> {
195 let url = Url::parse(source)
196 .map_err(|e| TensogramError::Remote(format!("invalid URL '{source}': {e}")))?;
197
198 let mut opts: Vec<(String, String)> = storage_options
199 .iter()
200 .map(|(k, v)| (k.clone(), v.clone()))
201 .collect();
202 if url.scheme() == "http" && !opts.iter().any(|(k, _)| k == "allow_http") {
203 opts.push(("allow_http".to_string(), "true".to_string()));
204 }
205 let (store, path) = object_store::parse_url_opts(&url, opts)
206 .map_err(|e| TensogramError::Remote(format!("cannot open '{source}': {e}")))?;
207
208 let store: Arc<dyn ObjectStore> = Arc::from(store);
209
210 let head_store = store.clone();
211 let head_path = path.clone();
212 let meta = block_on_shared(async move { head_store.head(&head_path).await })?;
213
214 let file_size = meta.size;
215 if file_size < (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64 {
216 return Err(TensogramError::Remote(format!(
217 "remote file too small ({file_size} bytes)"
218 )));
219 }
220
221 let backend = RemoteBackend {
222 source_url: source.to_string(),
223 store,
224 path,
225 file_size,
226 state: Mutex::new(RemoteState::default()),
227 };
228 {
229 let mut state = backend
230 .state
231 .lock()
232 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
233 backend.scan_next_locked(&mut state)?;
234 if state.layouts.is_empty() {
235 return Err(TensogramError::Remote(
236 "no valid messages found in remote file".to_string(),
237 ));
238 }
239 }
240 Ok(backend)
241 }
242
243 fn get_range(&self, range: Range<u64>) -> Result<Bytes> {
246 let store = self.store.clone();
247 let path = self.path.clone();
248 block_on_shared(async move { store.get_range(&path, range).await })
249 }
250
251 fn scan_next_locked(&self, state: &mut RemoteState) -> Result<()> {
254 if state.scan_complete {
255 return Ok(());
256 }
257 let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
258 let pos = state.next_scan_offset;
259
260 if pos + min_message_size > self.file_size {
261 state.scan_complete = true;
262 return Ok(());
263 }
264
265 let preamble_bytes = self.get_range(pos..pos + PREAMBLE_SIZE as u64)?;
266 if &preamble_bytes[..MAGIC.len()] != MAGIC {
267 state.scan_complete = true;
268 return Ok(());
269 }
270
271 let preamble = match Preamble::read_from(&preamble_bytes) {
272 Ok(p) => p,
273 Err(_) => {
274 state.scan_complete = true;
275 return Ok(());
276 }
277 };
278
279 let msg_len = preamble.total_length;
280
281 if msg_len == 0 {
282 let remaining = self.file_size - pos;
283 if remaining < min_message_size {
284 state.scan_complete = true;
285 return Ok(());
286 }
287 let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
288 let end_bytes =
289 self.get_range(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)?;
290 if &end_bytes[..] != crate::wire::END_MAGIC {
291 state.scan_complete = true;
292 return Ok(());
293 }
294 state.layouts.push(MessageLayout {
295 offset: pos,
296 length: remaining,
297 preamble,
298 index: None,
299 global_metadata: None,
300 });
301 state.scan_complete = true;
302 return Ok(());
303 }
304
305 let msg_end = match pos.checked_add(msg_len) {
306 Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
307 _ => {
308 state.scan_complete = true;
309 return Ok(());
310 }
311 };
312
313 state.layouts.push(MessageLayout {
314 offset: pos,
315 length: msg_len,
316 preamble,
317 index: None,
318 global_metadata: None,
319 });
320 state.next_scan_offset = msg_end;
321 Ok(())
322 }
323
324 fn ensure_message_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
325 while msg_idx >= state.layouts.len() && !state.scan_complete {
326 self.scan_next_locked(state)?;
327 }
328 if msg_idx >= state.layouts.len() {
329 return Err(TensogramError::Framing(format!(
330 "message index {} out of range (count={})",
331 msg_idx,
332 state.layouts.len()
333 )));
334 }
335 Ok(())
336 }
337
338 fn scan_all_locked(&self, state: &mut RemoteState) -> Result<()> {
339 while !state.scan_complete {
340 self.scan_next_locked(state)?;
341 }
342 Ok(())
343 }
344
345 fn scan_and_discover_next_locked(&self, state: &mut RemoteState) -> Result<()> {
346 if state.scan_complete {
347 return Ok(());
348 }
349 let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
350 let pos = state.next_scan_offset;
351
352 if pos + min_message_size > self.file_size {
353 state.scan_complete = true;
354 return Ok(());
355 }
356
357 let chunk_size = (self.file_size - pos).min(256 * 1024);
358 let chunk = self.get_range(pos..pos + chunk_size)?;
359
360 if chunk.len() < PREAMBLE_SIZE || &chunk[..MAGIC.len()] != MAGIC {
361 state.scan_complete = true;
362 return Ok(());
363 }
364
365 let preamble = match Preamble::read_from(&chunk[..PREAMBLE_SIZE]) {
366 Ok(p) => p,
367 Err(_) => {
368 state.scan_complete = true;
369 return Ok(());
370 }
371 };
372
373 let msg_len = preamble.total_length;
374
375 if msg_len == 0 {
376 let remaining = self.file_size - pos;
377 if remaining < min_message_size {
378 state.scan_complete = true;
379 return Ok(());
380 }
381 let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
382 let end_bytes =
383 self.get_range(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)?;
384 if &end_bytes[..] != crate::wire::END_MAGIC {
385 state.scan_complete = true;
386 return Ok(());
387 }
388 state.layouts.push(MessageLayout {
389 offset: pos,
390 length: remaining,
391 preamble,
392 index: None,
393 global_metadata: None,
394 });
395 state.scan_complete = true;
396 return Ok(());
397 }
398
399 let msg_end = match pos.checked_add(msg_len) {
400 Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
401 _ => {
402 state.scan_complete = true;
403 return Ok(());
404 }
405 };
406
407 let flags = preamble.flags;
408 let msg_idx = state.layouts.len();
409
410 state.layouts.push(MessageLayout {
411 offset: pos,
412 length: msg_len,
413 preamble,
414 index: None,
415 global_metadata: None,
416 });
417 state.next_scan_offset = msg_end;
418
419 if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
420 let chunk_end = (msg_len as usize).min(chunk.len());
421 Self::parse_header_frames(state, msg_idx, &chunk[..chunk_end])?;
422 } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
423 {
424 self.discover_footer_layout_from_suffix_locked(state, msg_idx)?;
425 }
426
427 Ok(())
428 }
429
430 fn discover_footer_layout_from_suffix_locked(
439 &self,
440 state: &mut RemoteState,
441 msg_idx: usize,
442 ) -> Result<()> {
443 let msg_offset = state.layouts[msg_idx].offset;
444 let msg_len = state.layouts[msg_idx].length;
445
446 let suffix_size = msg_len.min(256 * 1024);
447 let msg_end = msg_offset
448 .checked_add(msg_len)
449 .ok_or_else(|| TensogramError::Remote("message end overflow".to_string()))?;
450 let suffix_start = msg_end - suffix_size;
451 let suffix = self.get_range(suffix_start..msg_end)?;
452
453 if suffix.len() < POSTAMBLE_SIZE {
454 return Err(TensogramError::Remote(
455 "suffix too short for postamble".to_string(),
456 ));
457 }
458
459 let pa_bytes = &suffix[suffix.len() - POSTAMBLE_SIZE..];
460 let postamble = Postamble::read_from(pa_bytes)?;
461
462 if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
463 return Err(TensogramError::Remote(format!(
464 "first_footer_offset ({}) is before preamble end ({PREAMBLE_SIZE})",
465 postamble.first_footer_offset
466 )));
467 }
468
469 let footer_abs_start = msg_offset
470 .checked_add(postamble.first_footer_offset)
471 .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
472 let footer_abs_end = msg_end - POSTAMBLE_SIZE as u64;
473
474 if footer_abs_start >= footer_abs_end {
475 return Err(TensogramError::Remote(
476 "first_footer_offset points at or past postamble".to_string(),
477 ));
478 }
479
480 if footer_abs_start >= suffix_start {
481 let local_start = (footer_abs_start - suffix_start) as usize;
482 let local_end = suffix.len() - POSTAMBLE_SIZE;
483 Self::parse_footer_frames(state, msg_idx, &suffix[local_start..local_end])
484 } else {
485 let footer_bytes = self.get_range(footer_abs_start..footer_abs_end)?;
486 Self::parse_footer_frames(state, msg_idx, &footer_bytes)
487 }
488 }
489
490 fn ensure_layout_eager_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
491 while msg_idx >= state.layouts.len() && !state.scan_complete {
492 self.scan_and_discover_next_locked(state)?;
493 }
494 if msg_idx >= state.layouts.len() {
495 return Err(TensogramError::Framing(format!(
496 "message index {} out of range (count={})",
497 msg_idx,
498 state.layouts.len()
499 )));
500 }
501 if state.layouts[msg_idx].global_metadata.is_some()
502 && state.layouts[msg_idx].index.is_some()
503 {
504 return Ok(());
505 }
506 self.ensure_layout_locked(state, msg_idx)
507 }
508
509 fn ensure_layout_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
512 self.ensure_message_locked(state, msg_idx)?;
513 if state.layouts[msg_idx].global_metadata.is_some()
514 && state.layouts[msg_idx].index.is_some()
515 {
516 return Ok(());
517 }
518
519 let flags = state.layouts[msg_idx].preamble.flags;
520
521 if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
522 self.discover_header_layout_locked(state, msg_idx)?;
523 } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
524 {
525 self.discover_footer_layout_locked(state, msg_idx)?;
526 } else {
527 return Err(TensogramError::Remote(
528 "remote access requires header-indexed or footer-indexed messages".to_string(),
529 ));
530 }
531
532 Ok(())
533 }
534
535 fn discover_footer_layout_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
536 let msg_offset = state.layouts[msg_idx].offset;
537 let msg_len = state.layouts[msg_idx].length;
538
539 let pa_offset = msg_offset
540 .checked_add(msg_len)
541 .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
542 .ok_or_else(|| TensogramError::Remote("postamble offset overflow".to_string()))?;
543 let pa_bytes = self.get_range(pa_offset..pa_offset + POSTAMBLE_SIZE as u64)?;
544 let postamble = Postamble::read_from(&pa_bytes)?;
545
546 if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
547 return Err(TensogramError::Remote(format!(
548 "first_footer_offset ({}) is before preamble end ({})",
549 postamble.first_footer_offset, PREAMBLE_SIZE
550 )));
551 }
552 let footer_start = msg_offset
553 .checked_add(postamble.first_footer_offset)
554 .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
555 let footer_end = pa_offset;
556 if footer_start >= footer_end {
557 return Err(TensogramError::Remote(
558 "first_footer_offset points at or past postamble".to_string(),
559 ));
560 }
561 let footer_bytes = self.get_range(footer_start..footer_end)?;
562
563 Self::parse_footer_frames(state, msg_idx, &footer_bytes)
564 }
565
566 fn discover_header_layout_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
567 let layout = &state.layouts[msg_idx];
568 let msg_offset = layout.offset;
569 let msg_len = layout.length;
570
571 let chunk_size = msg_len.min(256 * 1024);
574 let header_bytes = self.get_range(msg_offset..msg_offset + chunk_size)?;
575
576 Self::parse_header_frames(state, msg_idx, &header_bytes)
577 }
578
579 fn parse_header_frames(state: &mut RemoteState, msg_idx: usize, buf: &[u8]) -> Result<()> {
580 let min_frame_size = FRAME_HEADER_SIZE + FRAME_END.len();
581 let mut pos = PREAMBLE_SIZE;
582
583 while pos + FRAME_HEADER_SIZE <= buf.len() {
584 if &buf[pos..pos + 2] != b"FR" {
585 pos += 1;
586 continue;
587 }
588 let fh = FrameHeader::read_from(&buf[pos..])?;
589 let frame_total = usize::try_from(fh.total_length).map_err(|_| {
590 TensogramError::Remote("frame total_length does not fit in usize".to_string())
591 })?;
592
593 if frame_total < min_frame_size {
594 return Err(TensogramError::Remote(format!(
595 "frame total_length ({frame_total}) smaller than minimum ({min_frame_size})"
596 )));
597 }
598 let frame_end = match pos.checked_add(frame_total) {
599 Some(end) if end <= buf.len() => end,
600 _ => break,
601 };
602
603 if &buf[frame_end - FRAME_END.len()..frame_end] != FRAME_END {
604 return Err(TensogramError::Remote(
605 "frame missing ENDF trailer".to_string(),
606 ));
607 }
608
609 let payload = &buf[pos + FRAME_HEADER_SIZE..frame_end - FRAME_END.len()];
610
611 match fh.frame_type {
612 FrameType::HeaderMetadata => {
613 let meta = metadata::cbor_to_global_metadata(payload)?;
614 state.layouts[msg_idx].global_metadata = Some(meta);
615 }
616 FrameType::HeaderIndex => {
617 let idx = metadata::cbor_to_index(payload)?;
618 state.layouts[msg_idx].index = Some(idx);
619 }
620 FrameType::DataObject | FrameType::PrecederMetadata => {
621 break;
622 }
623 _ => {}
624 }
625
626 let aligned = (frame_end.saturating_add(7)) & !7;
627 pos = aligned.min(buf.len());
628 }
629
630 if state.layouts[msg_idx].global_metadata.is_none() {
631 return Err(TensogramError::Remote(
632 "header region did not contain a metadata frame".to_string(),
633 ));
634 }
635 if state.layouts[msg_idx].index.is_none() {
636 return Err(TensogramError::Remote(
637 "header region did not contain an index frame (header chunk may be too small)"
638 .to_string(),
639 ));
640 }
641
642 Ok(())
643 }
644
645 fn parse_footer_frames(state: &mut RemoteState, msg_idx: usize, buf: &[u8]) -> Result<()> {
646 let min_frame_size = FRAME_HEADER_SIZE + FRAME_END.len();
647 let mut pos = 0;
648
649 while pos + FRAME_HEADER_SIZE <= buf.len() {
650 if &buf[pos..pos + 2] != b"FR" {
651 pos += 1;
652 continue;
653 }
654 let fh = FrameHeader::read_from(&buf[pos..])?;
655 let frame_total = usize::try_from(fh.total_length).map_err(|_| {
656 TensogramError::Remote(
657 "footer frame total_length does not fit in usize".to_string(),
658 )
659 })?;
660
661 if frame_total < min_frame_size {
662 return Err(TensogramError::Remote(format!(
663 "footer frame total_length ({frame_total}) smaller than minimum ({min_frame_size})"
664 )));
665 }
666 let frame_end = match pos.checked_add(frame_total) {
667 Some(end) if end <= buf.len() => end,
668 _ => break,
669 };
670
671 if &buf[frame_end - FRAME_END.len()..frame_end] != FRAME_END {
672 return Err(TensogramError::Remote(
673 "footer frame missing ENDF trailer".to_string(),
674 ));
675 }
676
677 let payload = &buf[pos + FRAME_HEADER_SIZE..frame_end - FRAME_END.len()];
678
679 match fh.frame_type {
680 FrameType::FooterMetadata => {
681 let meta = metadata::cbor_to_global_metadata(payload)?;
682 state.layouts[msg_idx].global_metadata = Some(meta);
683 }
684 FrameType::FooterIndex => {
685 let idx = metadata::cbor_to_index(payload)?;
686 state.layouts[msg_idx].index = Some(idx);
687 }
688 _ => {}
689 }
690
691 let aligned = (frame_end.saturating_add(7)) & !7;
692 pos = aligned.min(buf.len());
693 }
694
695 if state.layouts[msg_idx].global_metadata.is_none() {
696 return Err(TensogramError::Remote(
697 "footer region did not contain a metadata frame".to_string(),
698 ));
699 }
700 if state.layouts[msg_idx].index.is_none() {
701 return Err(TensogramError::Remote(
702 "footer region did not contain an index frame".to_string(),
703 ));
704 }
705
706 Ok(())
707 }
708
709 pub(crate) fn message_count(&self) -> Result<usize> {
712 let mut state = self
713 .state
714 .lock()
715 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
716 self.scan_all_locked(&mut state)?;
717 Ok(state.layouts.len())
718 }
719
720 pub(crate) fn read_message(&self, msg_idx: usize) -> Result<Vec<u8>> {
721 let (offset, length) = {
722 let mut state = self
723 .state
724 .lock()
725 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
726 self.ensure_message_locked(&mut state, msg_idx)?;
727 let layout = &state.layouts[msg_idx];
728 (layout.offset, layout.length)
729 };
730 let bytes = self.get_range(offset..offset + length)?;
731 Ok(bytes.to_vec())
732 }
733
734 pub(crate) fn read_metadata(&self, msg_idx: usize) -> Result<GlobalMetadata> {
735 let mut state = self
736 .state
737 .lock()
738 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
739 self.ensure_layout_eager_locked(&mut state, msg_idx)?;
740 state.layouts[msg_idx]
741 .global_metadata
742 .clone()
743 .ok_or_else(|| TensogramError::Remote("metadata not found".to_string()))
744 }
745
746 pub(crate) fn read_descriptors(
747 &self,
748 msg_idx: usize,
749 ) -> Result<(GlobalMetadata, Vec<DataObjectDescriptor>)> {
750 let layout = {
751 let mut state = self
752 .state
753 .lock()
754 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
755 self.ensure_layout_eager_locked(&mut state, msg_idx)?;
756 state.layouts[msg_idx].clone()
757 };
758 let msg_offset = layout.offset;
759
760 if let Some(ref index) = layout.index {
761 if index.offsets.len() != index.lengths.len() {
762 return Err(TensogramError::Remote(format!(
763 "corrupt index: offsets.len()={} != lengths.len()={}",
764 index.offsets.len(),
765 index.lengths.len()
766 )));
767 }
768
769 let meta = layout
770 .global_metadata
771 .clone()
772 .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
773
774 let msg_length = layout.length;
775 let mut descriptors = Vec::with_capacity(index.offsets.len());
776 for i in 0..index.offsets.len() {
777 let desc = self.read_descriptor_only(
778 msg_offset,
779 msg_length,
780 index.offsets[i],
781 index.lengths[i],
782 )?;
783 descriptors.push(desc);
784 }
785 Ok((meta, descriptors))
786 } else {
787 let msg_bytes = self.read_message(msg_idx)?;
788 crate::decode::decode_descriptors(&msg_bytes)
789 }
790 }
791
792 fn read_descriptor_only(
800 &self,
801 msg_offset: u64,
802 msg_length: u64,
803 frame_offset_in_msg: u64,
804 frame_length: u64,
805 ) -> Result<DataObjectDescriptor> {
806 const DESCRIPTOR_PREFIX_THRESHOLD: u64 = 64 * 1024;
807
808 let range =
809 Self::checked_frame_range(msg_offset, msg_length, frame_offset_in_msg, frame_length)?;
810
811 if frame_length <= DESCRIPTOR_PREFIX_THRESHOLD {
812 let frame_bytes = self.get_range(range.clone())?;
813 let (desc, _payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
814 return Ok(desc);
815 }
816
817 let frame_start = range.start;
818 let frame_end = range.end;
819
820 let header_bytes = self.get_range(frame_start..frame_start + FRAME_HEADER_SIZE as u64)?;
821 let fh = FrameHeader::read_from(&header_bytes)?;
822
823 if fh.frame_type != FrameType::DataObject {
824 return Err(TensogramError::Remote(format!(
825 "expected DataObject frame, got {:?}",
826 fh.frame_type
827 )));
828 }
829
830 let footer_start = frame_end - DATA_OBJECT_FOOTER_SIZE as u64;
831 let footer_bytes = self.get_range(footer_start..frame_end)?;
832
833 if footer_bytes.len() < DATA_OBJECT_FOOTER_SIZE {
834 return Err(TensogramError::Remote("frame footer too short".to_string()));
835 }
836 if &footer_bytes[8..] != FRAME_END {
837 return Err(TensogramError::Remote(
838 "frame missing ENDF trailer".to_string(),
839 ));
840 }
841
842 let cbor_offset = u64::from_be_bytes(
843 footer_bytes[..8]
844 .try_into()
845 .map_err(|_| TensogramError::Remote("footer cbor_offset truncated".to_string()))?,
846 );
847
848 if cbor_offset < FRAME_HEADER_SIZE as u64 {
849 return Err(TensogramError::Remote(format!(
850 "cbor_offset ({cbor_offset}) below frame header size ({FRAME_HEADER_SIZE})"
851 )));
852 }
853
854 let cbor_after = fh.flags & DataObjectFlags::CBOR_AFTER_PAYLOAD != 0;
855 let cbor_start = frame_start
856 .checked_add(cbor_offset)
857 .ok_or_else(|| TensogramError::Remote("cbor_start overflow".to_string()))?;
858
859 if cbor_after {
860 if cbor_start >= footer_start {
861 return Err(TensogramError::Remote(
862 "cbor_offset points at or past footer".to_string(),
863 ));
864 }
865 let cbor_bytes = self.get_range(cbor_start..footer_start)?;
866 metadata::cbor_to_object_descriptor(&cbor_bytes)
867 } else {
868 if cbor_start >= footer_start {
869 return Err(TensogramError::Remote(
870 "cbor_offset beyond frame body".to_string(),
871 ));
872 }
873 let max_cbor_len = footer_start - cbor_start;
874 let mut prefix_size: u64 = 8192;
875 loop {
876 let read_end = (cbor_start + prefix_size).min(footer_start);
877 let prefix_bytes = self.get_range(cbor_start..read_end)?;
878 match metadata::cbor_to_object_descriptor(&prefix_bytes) {
879 Ok(desc) => return Ok(desc),
880 Err(_) if prefix_size < max_cbor_len => {
881 prefix_size = (prefix_size * 2).min(max_cbor_len);
882 }
883 Err(e) => return Err(e),
884 }
885 }
886 }
887 }
888
889 pub(crate) fn read_object(
890 &self,
891 msg_idx: usize,
892 obj_idx: usize,
893 options: &DecodeOptions,
894 ) -> Result<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)> {
895 let layout = {
896 let mut state = self
897 .state
898 .lock()
899 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
900 self.ensure_layout_eager_locked(&mut state, msg_idx)?;
901 state.layouts[msg_idx].clone()
902 };
903 let msg_offset = layout.offset;
904
905 if let Some(ref index) = layout.index {
906 Self::validate_index_access(index, obj_idx)?;
907
908 let meta = layout
909 .global_metadata
910 .clone()
911 .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
912
913 let range = Self::checked_frame_range(
914 msg_offset,
915 layout.length,
916 index.offsets[obj_idx],
917 index.lengths[obj_idx],
918 )?;
919 let frame_bytes = self.get_range(range)?;
920
921 let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
922
923 let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
924
925 Ok((meta, desc, decoded))
926 } else {
927 let msg_bytes = self.read_message(msg_idx)?;
929 crate::decode::decode_object(&msg_bytes, obj_idx, options)
930 }
931 }
932
933 pub(crate) fn read_range_batch(
934 &self,
935 msg_indices: &[usize],
936 obj_idx: usize,
937 ranges: &[(u64, u64)],
938 options: &DecodeOptions,
939 ) -> Result<Vec<(DataObjectDescriptor, Vec<Vec<u8>>)>> {
940 let mut byte_ranges = Vec::with_capacity(msg_indices.len());
941 {
942 let mut state = self
943 .state
944 .lock()
945 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
946 for &msg_idx in msg_indices {
947 self.ensure_layout_eager_locked(&mut state, msg_idx)?;
948 }
949 for &msg_idx in msg_indices {
950 let layout = &state.layouts[msg_idx];
951 if let Some(ref index) = layout.index {
952 Self::validate_index_access(index, obj_idx)?;
953 byte_ranges.push(Self::checked_frame_range(
954 layout.offset,
955 layout.length,
956 index.offsets[obj_idx],
957 index.lengths[obj_idx],
958 )?);
959 } else {
960 return Err(TensogramError::Remote(format!(
961 "message {} has no index frame; batch requires indexed messages",
962 msg_idx
963 )));
964 }
965 }
966 }
967
968 let store = self.store.clone();
969 let path = self.path.clone();
970 let all_bytes =
971 block_on_shared(async move { store.get_ranges(&path, &byte_ranges).await })?;
972
973 let mut results = Vec::with_capacity(msg_indices.len());
974 for frame_bytes in &all_bytes {
975 let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
976 let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
977 results.push((desc, parts));
978 }
979 Ok(results)
980 }
981
982 pub(crate) fn read_object_batch(
983 &self,
984 msg_indices: &[usize],
985 obj_idx: usize,
986 options: &DecodeOptions,
987 ) -> Result<Vec<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)>> {
988 let mut byte_ranges = Vec::with_capacity(msg_indices.len());
989 let mut metas = Vec::with_capacity(msg_indices.len());
990 {
991 let mut state = self
992 .state
993 .lock()
994 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
995 for &msg_idx in msg_indices {
996 self.ensure_layout_eager_locked(&mut state, msg_idx)?;
997 }
998 for &msg_idx in msg_indices {
999 let layout = &state.layouts[msg_idx];
1000 if let Some(ref index) = layout.index {
1001 Self::validate_index_access(index, obj_idx)?;
1002 byte_ranges.push(Self::checked_frame_range(
1003 layout.offset,
1004 layout.length,
1005 index.offsets[obj_idx],
1006 index.lengths[obj_idx],
1007 )?);
1008 metas.push(layout.global_metadata.clone().ok_or_else(|| {
1009 TensogramError::Remote("metadata not cached".to_string())
1010 })?);
1011 } else {
1012 return Err(TensogramError::Remote(format!(
1013 "message {} has no index frame; batch decode requires indexed messages",
1014 msg_idx
1015 )));
1016 }
1017 }
1018 }
1019
1020 let store = self.store.clone();
1021 let path = self.path.clone();
1022 let all_bytes =
1023 block_on_shared(async move { store.get_ranges(&path, &byte_ranges).await })?;
1024
1025 let mut results = Vec::with_capacity(msg_indices.len());
1026 for (frame_bytes, meta) in all_bytes.iter().zip(metas) {
1027 let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
1028 let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
1029 results.push((meta, desc, decoded));
1030 }
1031 Ok(results)
1032 }
1033
1034 pub(crate) fn read_range(
1035 &self,
1036 msg_idx: usize,
1037 obj_idx: usize,
1038 ranges: &[(u64, u64)],
1039 options: &DecodeOptions,
1040 ) -> Result<(DataObjectDescriptor, Vec<Vec<u8>>)> {
1041 let layout = {
1042 let mut state = self
1043 .state
1044 .lock()
1045 .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
1046 self.ensure_layout_eager_locked(&mut state, msg_idx)?;
1047 state.layouts[msg_idx].clone()
1048 };
1049 let msg_offset = layout.offset;
1050
1051 if let Some(ref index) = layout.index {
1052 Self::validate_index_access(index, obj_idx)?;
1053
1054 let range = Self::checked_frame_range(
1055 msg_offset,
1056 layout.length,
1057 index.offsets[obj_idx],
1058 index.lengths[obj_idx],
1059 )?;
1060 let frame_bytes = self.get_range(range)?;
1061 let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
1062 let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
1063 Ok((desc, parts))
1064 } else {
1065 let msg_bytes = self.read_message(msg_idx)?;
1066 crate::decode::decode_range(&msg_bytes, obj_idx, ranges, options)
1067 }
1068 }
1069
1070 fn validate_index_access(index: &IndexFrame, obj_idx: usize) -> Result<()> {
1071 if index.offsets.len() != index.lengths.len() {
1072 return Err(TensogramError::Remote(format!(
1073 "corrupt index: offsets.len()={} != lengths.len()={}",
1074 index.offsets.len(),
1075 index.lengths.len()
1076 )));
1077 }
1078 if obj_idx >= index.offsets.len() {
1079 return Err(TensogramError::Object(format!(
1080 "object index {} out of range (count={})",
1081 obj_idx,
1082 index.offsets.len()
1083 )));
1084 }
1085 Ok(())
1086 }
1087
1088 fn checked_frame_range(
1089 msg_offset: u64,
1090 msg_length: u64,
1091 frame_offset_in_msg: u64,
1092 frame_length: u64,
1093 ) -> Result<Range<u64>> {
1094 let start = msg_offset
1095 .checked_add(frame_offset_in_msg)
1096 .ok_or_else(|| TensogramError::Remote("frame offset overflow".to_string()))?;
1097 let end = start
1098 .checked_add(frame_length)
1099 .ok_or_else(|| TensogramError::Remote("frame end overflow".to_string()))?;
1100 let msg_end = msg_offset
1101 .checked_add(msg_length)
1102 .ok_or_else(|| TensogramError::Remote("message end overflow".to_string()))?;
1103 if end > msg_end {
1104 return Err(TensogramError::Remote(format!(
1105 "indexed frame {start}..{end} exceeds message boundary {msg_end}"
1106 )));
1107 }
1108 Ok(start..end)
1109 }
1110}
1111
1112#[cfg(feature = "async")]
1115impl RemoteBackend {
1116 async fn get_range_async(&self, range: Range<u64>) -> Result<Bytes> {
1117 self.store
1118 .get_range(&self.path, range)
1119 .await
1120 .map_err(|e| TensogramError::Remote(e.to_string()))
1121 }
1122
1123 pub(crate) async fn open_async(
1124 source: &str,
1125 storage_options: &BTreeMap<String, String>,
1126 ) -> Result<Self> {
1127 let url = Url::parse(source)
1128 .map_err(|e| TensogramError::Remote(format!("invalid URL '{source}': {e}")))?;
1129
1130 let mut opts: Vec<(String, String)> = storage_options
1131 .iter()
1132 .map(|(k, v)| (k.clone(), v.clone()))
1133 .collect();
1134 if url.scheme() == "http" && !opts.iter().any(|(k, _)| k == "allow_http") {
1135 opts.push(("allow_http".to_string(), "true".to_string()));
1136 }
1137 let (store, path) = object_store::parse_url_opts(&url, opts)
1138 .map_err(|e| TensogramError::Remote(format!("cannot open '{source}': {e}")))?;
1139
1140 let store: Arc<dyn ObjectStore> = Arc::from(store);
1141 let meta = store
1142 .head(&path)
1143 .await
1144 .map_err(|e| TensogramError::Remote(e.to_string()))?;
1145
1146 let file_size = meta.size;
1147 if file_size < (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64 {
1148 return Err(TensogramError::Remote(format!(
1149 "remote file too small ({file_size} bytes)"
1150 )));
1151 }
1152
1153 let backend = RemoteBackend {
1154 source_url: source.to_string(),
1155 store,
1156 path,
1157 file_size,
1158 state: Mutex::new(RemoteState::default()),
1159 };
1160 backend.scan_next_async().await?;
1161 {
1162 let state = backend.lock_state()?;
1163 if state.layouts.is_empty() {
1164 return Err(TensogramError::Remote(
1165 "no valid messages found in remote file".to_string(),
1166 ));
1167 }
1168 }
1169 Ok(backend)
1170 }
1171
1172 async fn scan_next_async(&self) -> Result<()> {
1173 let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
1174 let pos = {
1175 let state = self.lock_state()?;
1176 if state.scan_complete {
1177 return Ok(());
1178 }
1179 state.next_scan_offset
1180 };
1181
1182 if pos + min_message_size > self.file_size {
1183 let mut state = self.lock_state()?;
1184 if state.next_scan_offset == pos {
1185 state.scan_complete = true;
1186 }
1187 return Ok(());
1188 }
1189
1190 let preamble_bytes = self
1191 .get_range_async(pos..pos + PREAMBLE_SIZE as u64)
1192 .await?;
1193 if &preamble_bytes[..MAGIC.len()] != MAGIC {
1194 let mut state = self.lock_state()?;
1195 if state.next_scan_offset == pos {
1196 state.scan_complete = true;
1197 }
1198 return Ok(());
1199 }
1200
1201 let preamble = match Preamble::read_from(&preamble_bytes) {
1202 Ok(preamble) => preamble,
1203 Err(_) => {
1204 let mut state = self.lock_state()?;
1205 if state.next_scan_offset == pos {
1206 state.scan_complete = true;
1207 }
1208 return Ok(());
1209 }
1210 };
1211
1212 let msg_len = preamble.total_length;
1213
1214 if msg_len == 0 {
1215 let remaining = self.file_size - pos;
1216 if remaining < min_message_size {
1217 let mut state = self.lock_state()?;
1218 if state.next_scan_offset == pos {
1219 state.scan_complete = true;
1220 }
1221 return Ok(());
1222 }
1223
1224 let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
1225 let end_bytes = self
1226 .get_range_async(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)
1227 .await?;
1228 if &end_bytes[..] != crate::wire::END_MAGIC {
1229 let mut state = self.lock_state()?;
1230 if state.next_scan_offset == pos {
1231 state.scan_complete = true;
1232 }
1233 return Ok(());
1234 }
1235
1236 let mut state = self.lock_state()?;
1237 if state.scan_complete || state.next_scan_offset != pos {
1238 return Ok(());
1239 }
1240 state.layouts.push(MessageLayout {
1241 offset: pos,
1242 length: remaining,
1243 preamble,
1244 index: None,
1245 global_metadata: None,
1246 });
1247 state.scan_complete = true;
1248 return Ok(());
1249 }
1250
1251 let msg_end = match pos.checked_add(msg_len) {
1252 Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
1253 _ => {
1254 let mut state = self.lock_state()?;
1255 if state.next_scan_offset == pos {
1256 state.scan_complete = true;
1257 }
1258 return Ok(());
1259 }
1260 };
1261
1262 let mut state = self.lock_state()?;
1263 if state.scan_complete || state.next_scan_offset != pos {
1264 return Ok(());
1265 }
1266 state.layouts.push(MessageLayout {
1267 offset: pos,
1268 length: msg_len,
1269 preamble,
1270 index: None,
1271 global_metadata: None,
1272 });
1273 state.next_scan_offset = msg_end;
1274 Ok(())
1275 }
1276
1277 async fn ensure_message_async(&self, msg_idx: usize) -> Result<()> {
1278 loop {
1279 let ready = {
1280 let state = self.lock_state()?;
1281 if msg_idx < state.layouts.len() {
1282 return Ok(());
1283 }
1284 if state.scan_complete {
1285 return Err(TensogramError::Framing(format!(
1286 "message index {} out of range (count={})",
1287 msg_idx,
1288 state.layouts.len()
1289 )));
1290 }
1291 false
1292 };
1293
1294 if !ready {
1295 self.scan_next_async().await?;
1296 }
1297 }
1298 }
1299
1300 async fn scan_and_discover_next_async(&self) -> Result<()> {
1301 let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
1302 let pos = {
1303 let state = self.lock_state()?;
1304 if state.scan_complete {
1305 return Ok(());
1306 }
1307 state.next_scan_offset
1308 };
1309
1310 if pos + min_message_size > self.file_size {
1311 let mut state = self.lock_state()?;
1312 if state.next_scan_offset == pos {
1313 state.scan_complete = true;
1314 }
1315 return Ok(());
1316 }
1317
1318 let chunk_size = (self.file_size - pos).min(256 * 1024);
1319 let chunk = self.get_range_async(pos..pos + chunk_size).await?;
1320
1321 if chunk.len() < PREAMBLE_SIZE || &chunk[..MAGIC.len()] != MAGIC {
1322 let mut state = self.lock_state()?;
1323 if state.next_scan_offset == pos {
1324 state.scan_complete = true;
1325 }
1326 return Ok(());
1327 }
1328
1329 let preamble = match Preamble::read_from(&chunk[..PREAMBLE_SIZE]) {
1330 Ok(preamble) => preamble,
1331 Err(_) => {
1332 let mut state = self.lock_state()?;
1333 if state.next_scan_offset == pos {
1334 state.scan_complete = true;
1335 }
1336 return Ok(());
1337 }
1338 };
1339
1340 let msg_len = preamble.total_length;
1341
1342 if msg_len == 0 {
1343 let remaining = self.file_size - pos;
1344 if remaining < min_message_size {
1345 let mut state = self.lock_state()?;
1346 if state.next_scan_offset == pos {
1347 state.scan_complete = true;
1348 }
1349 return Ok(());
1350 }
1351
1352 let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
1353 let end_bytes = self
1354 .get_range_async(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)
1355 .await?;
1356 if &end_bytes[..] != crate::wire::END_MAGIC {
1357 let mut state = self.lock_state()?;
1358 if state.next_scan_offset == pos {
1359 state.scan_complete = true;
1360 }
1361 return Ok(());
1362 }
1363
1364 let mut state = self.lock_state()?;
1365 if state.scan_complete || state.next_scan_offset != pos {
1366 return Ok(());
1367 }
1368 state.layouts.push(MessageLayout {
1369 offset: pos,
1370 length: remaining,
1371 preamble,
1372 index: None,
1373 global_metadata: None,
1374 });
1375 state.scan_complete = true;
1376 return Ok(());
1377 }
1378
1379 let msg_end = match pos.checked_add(msg_len) {
1380 Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
1381 _ => {
1382 let mut state = self.lock_state()?;
1383 if state.next_scan_offset == pos {
1384 state.scan_complete = true;
1385 }
1386 return Ok(());
1387 }
1388 };
1389
1390 let flags = preamble.flags;
1391 let msg_idx = {
1392 let mut state = self.lock_state()?;
1393 if state.scan_complete || state.next_scan_offset != pos {
1394 return Ok(());
1395 }
1396 let msg_idx = state.layouts.len();
1397 state.layouts.push(MessageLayout {
1398 offset: pos,
1399 length: msg_len,
1400 preamble,
1401 index: None,
1402 global_metadata: None,
1403 });
1404 state.next_scan_offset = msg_end;
1405 msg_idx
1406 };
1407
1408 if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
1409 let chunk_end = (msg_len as usize).min(chunk.len());
1410 let mut state = self.lock_state()?;
1411 if msg_idx < state.layouts.len()
1412 && state.layouts[msg_idx].offset == pos
1413 && state.layouts[msg_idx].global_metadata.is_none()
1414 && state.layouts[msg_idx].index.is_none()
1415 {
1416 Self::parse_header_frames(&mut state, msg_idx, &chunk[..chunk_end])?;
1417 }
1418 } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
1419 {
1420 self.discover_footer_layout_from_suffix_async(msg_idx)
1421 .await?;
1422 }
1423
1424 Ok(())
1425 }
1426
1427 async fn discover_footer_layout_from_suffix_async(&self, msg_idx: usize) -> Result<()> {
1428 let (msg_offset, msg_len) = {
1429 let state = self.lock_state()?;
1430 let layout = state.layouts.get(msg_idx).ok_or_else(|| {
1431 TensogramError::Framing(format!(
1432 "message index {} out of range (count={})",
1433 msg_idx,
1434 state.layouts.len()
1435 ))
1436 })?;
1437 (layout.offset, layout.length)
1438 };
1439
1440 let suffix_size = msg_len.min(256 * 1024);
1441 let msg_end = msg_offset
1442 .checked_add(msg_len)
1443 .ok_or_else(|| TensogramError::Remote("message end overflow".to_string()))?;
1444 let suffix_start = msg_end - suffix_size;
1445 let suffix = self.get_range_async(suffix_start..msg_end).await?;
1446
1447 if suffix.len() < POSTAMBLE_SIZE {
1448 return Err(TensogramError::Remote(
1449 "suffix too short for postamble".to_string(),
1450 ));
1451 }
1452
1453 let pa_bytes = &suffix[suffix.len() - POSTAMBLE_SIZE..];
1454 let postamble = Postamble::read_from(pa_bytes)?;
1455
1456 if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
1457 return Err(TensogramError::Remote(format!(
1458 "first_footer_offset ({}) is before preamble end ({PREAMBLE_SIZE})",
1459 postamble.first_footer_offset
1460 )));
1461 }
1462
1463 let footer_abs_start = msg_offset
1464 .checked_add(postamble.first_footer_offset)
1465 .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
1466 let footer_abs_end = msg_end - POSTAMBLE_SIZE as u64;
1467
1468 if footer_abs_start >= footer_abs_end {
1469 return Err(TensogramError::Remote(
1470 "first_footer_offset points at or past postamble".to_string(),
1471 ));
1472 }
1473
1474 if footer_abs_start >= suffix_start {
1475 let local_start = (footer_abs_start - suffix_start) as usize;
1476 let local_end = suffix.len() - POSTAMBLE_SIZE;
1477 let mut state = self.lock_state()?;
1478 if state.layouts[msg_idx].global_metadata.is_some()
1479 && state.layouts[msg_idx].index.is_some()
1480 {
1481 return Ok(());
1482 }
1483 Self::parse_footer_frames(&mut state, msg_idx, &suffix[local_start..local_end])
1484 } else {
1485 let footer_bytes = self
1486 .get_range_async(footer_abs_start..footer_abs_end)
1487 .await?;
1488 let mut state = self.lock_state()?;
1489 if state.layouts[msg_idx].global_metadata.is_some()
1490 && state.layouts[msg_idx].index.is_some()
1491 {
1492 return Ok(());
1493 }
1494 Self::parse_footer_frames(&mut state, msg_idx, &footer_bytes)
1495 }
1496 }
1497
1498 async fn ensure_layout_eager_async(&self, msg_idx: usize) -> Result<()> {
1499 loop {
1500 let should_scan = {
1501 let state = self.lock_state()?;
1502 if let Some(layout) = state.layouts.get(msg_idx) {
1503 if layout.global_metadata.is_some() && layout.index.is_some() {
1504 return Ok(());
1505 }
1506 false
1507 } else if state.scan_complete {
1508 return Err(TensogramError::Framing(format!(
1509 "message index {} out of range (count={})",
1510 msg_idx,
1511 state.layouts.len()
1512 )));
1513 } else {
1514 true
1515 }
1516 };
1517
1518 if should_scan {
1519 self.scan_and_discover_next_async().await?;
1520 continue;
1521 }
1522
1523 return self.ensure_layout_async(msg_idx).await;
1524 }
1525 }
1526
1527 async fn ensure_layout_async(&self, msg_idx: usize) -> Result<()> {
1528 self.ensure_message_async(msg_idx).await?;
1529
1530 let flags = {
1531 let state = self.lock_state()?;
1532 let layout = &state.layouts[msg_idx];
1533 if layout.global_metadata.is_some() && layout.index.is_some() {
1534 return Ok(());
1535 }
1536 layout.preamble.flags
1537 };
1538
1539 if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
1540 self.discover_header_layout_async(msg_idx).await?;
1541 } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
1542 {
1543 self.discover_footer_layout_async(msg_idx).await?;
1544 } else {
1545 return Err(TensogramError::Remote(
1546 "remote access requires header-indexed or footer-indexed messages".to_string(),
1547 ));
1548 }
1549
1550 Ok(())
1551 }
1552
1553 async fn discover_header_layout_async(&self, msg_idx: usize) -> Result<()> {
1554 let (msg_offset, msg_len) = {
1555 let state = self.lock_state()?;
1556 let layout = state.layouts.get(msg_idx).ok_or_else(|| {
1557 TensogramError::Framing(format!(
1558 "message index {} out of range (count={})",
1559 msg_idx,
1560 state.layouts.len()
1561 ))
1562 })?;
1563 (layout.offset, layout.length)
1564 };
1565
1566 let chunk_size = msg_len.min(256 * 1024);
1567 let header_bytes = self
1568 .get_range_async(msg_offset..msg_offset + chunk_size)
1569 .await?;
1570
1571 let mut state = self.lock_state()?;
1572 if state.layouts[msg_idx].global_metadata.is_some()
1573 && state.layouts[msg_idx].index.is_some()
1574 {
1575 return Ok(());
1576 }
1577 Self::parse_header_frames(&mut state, msg_idx, &header_bytes)
1578 }
1579
1580 async fn discover_footer_layout_async(&self, msg_idx: usize) -> Result<()> {
1581 let (msg_offset, msg_len) = {
1582 let state = self.lock_state()?;
1583 let layout = state.layouts.get(msg_idx).ok_or_else(|| {
1584 TensogramError::Framing(format!(
1585 "message index {} out of range (count={})",
1586 msg_idx,
1587 state.layouts.len()
1588 ))
1589 })?;
1590 (layout.offset, layout.length)
1591 };
1592
1593 let pa_offset = msg_offset
1594 .checked_add(msg_len)
1595 .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
1596 .ok_or_else(|| TensogramError::Remote("postamble offset overflow".to_string()))?;
1597 let pa_bytes = self
1598 .get_range_async(pa_offset..pa_offset + POSTAMBLE_SIZE as u64)
1599 .await?;
1600 let postamble = Postamble::read_from(&pa_bytes)?;
1601
1602 if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
1603 return Err(TensogramError::Remote(format!(
1604 "first_footer_offset ({}) is before preamble end ({})",
1605 postamble.first_footer_offset, PREAMBLE_SIZE
1606 )));
1607 }
1608 let footer_start = msg_offset
1609 .checked_add(postamble.first_footer_offset)
1610 .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
1611 let footer_end = pa_offset;
1612 if footer_start >= footer_end {
1613 return Err(TensogramError::Remote(
1614 "first_footer_offset points at or past postamble".to_string(),
1615 ));
1616 }
1617
1618 let footer_bytes = self.get_range_async(footer_start..footer_end).await?;
1619 let mut state = self.lock_state()?;
1620 if state.layouts[msg_idx].global_metadata.is_some()
1621 && state.layouts[msg_idx].index.is_some()
1622 {
1623 return Ok(());
1624 }
1625 Self::parse_footer_frames(&mut state, msg_idx, &footer_bytes)
1626 }
1627
1628 pub(crate) async fn message_count_async(&self) -> Result<usize> {
1629 loop {
1630 let done = {
1631 let state = self.lock_state()?;
1632 state.scan_complete
1633 };
1634 if done {
1635 break;
1636 }
1637 self.scan_and_discover_next_async().await?;
1638 }
1639 let state = self.lock_state()?;
1640 Ok(state.layouts.len())
1641 }
1642
1643 pub(crate) async fn read_message_async(&self, msg_idx: usize) -> Result<Vec<u8>> {
1644 self.ensure_message_async(msg_idx).await?;
1645 let (offset, length) = {
1646 let state = self.lock_state()?;
1647 let layout = &state.layouts[msg_idx];
1648 (layout.offset, layout.length)
1649 };
1650 let bytes = self.get_range_async(offset..offset + length).await?;
1651 Ok(bytes.to_vec())
1652 }
1653
1654 pub(crate) async fn read_metadata_async(&self, msg_idx: usize) -> Result<GlobalMetadata> {
1655 self.ensure_layout_eager_async(msg_idx).await?;
1656 let state = self.lock_state()?;
1657 state.layouts[msg_idx]
1658 .global_metadata
1659 .clone()
1660 .ok_or_else(|| TensogramError::Remote("metadata not found".to_string()))
1661 }
1662
1663 pub(crate) async fn read_descriptors_async(
1664 &self,
1665 msg_idx: usize,
1666 ) -> Result<(GlobalMetadata, Vec<DataObjectDescriptor>)> {
1667 self.ensure_layout_eager_async(msg_idx).await?;
1668 let layout = {
1669 let state = self.lock_state()?;
1670 state.layouts[msg_idx].clone()
1671 };
1672 let msg_offset = layout.offset;
1673
1674 if let Some(ref index) = layout.index {
1675 if index.offsets.len() != index.lengths.len() {
1676 return Err(TensogramError::Remote(format!(
1677 "corrupt index: offsets.len()={} != lengths.len()={}",
1678 index.offsets.len(),
1679 index.lengths.len()
1680 )));
1681 }
1682
1683 let meta = layout
1684 .global_metadata
1685 .clone()
1686 .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
1687
1688 let msg_length = layout.length;
1689 let mut descriptors = Vec::with_capacity(index.offsets.len());
1690 for i in 0..index.offsets.len() {
1691 let desc = self
1692 .read_descriptor_only_async(
1693 msg_offset,
1694 msg_length,
1695 index.offsets[i],
1696 index.lengths[i],
1697 )
1698 .await?;
1699 descriptors.push(desc);
1700 }
1701 Ok((meta, descriptors))
1702 } else {
1703 let msg_bytes = self.read_message_async(msg_idx).await?;
1704 crate::decode::decode_descriptors(&msg_bytes)
1705 }
1706 }
1707
1708 async fn read_descriptor_only_async(
1709 &self,
1710 msg_offset: u64,
1711 msg_length: u64,
1712 frame_offset_in_msg: u64,
1713 frame_length: u64,
1714 ) -> Result<DataObjectDescriptor> {
1715 const DESCRIPTOR_PREFIX_THRESHOLD: u64 = 64 * 1024;
1716
1717 let range =
1718 Self::checked_frame_range(msg_offset, msg_length, frame_offset_in_msg, frame_length)?;
1719
1720 if frame_length <= DESCRIPTOR_PREFIX_THRESHOLD {
1721 let frame_bytes = self.get_range_async(range.clone()).await?;
1722 let (desc, _payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
1723 return Ok(desc);
1724 }
1725
1726 let frame_start = range.start;
1727 let frame_end = range.end;
1728
1729 let header_bytes = self
1730 .get_range_async(frame_start..frame_start + FRAME_HEADER_SIZE as u64)
1731 .await?;
1732 let fh = FrameHeader::read_from(&header_bytes)?;
1733
1734 if fh.frame_type != FrameType::DataObject {
1735 return Err(TensogramError::Remote(format!(
1736 "expected DataObject frame, got {:?}",
1737 fh.frame_type
1738 )));
1739 }
1740
1741 let footer_start = frame_end - DATA_OBJECT_FOOTER_SIZE as u64;
1742 let footer_bytes = self.get_range_async(footer_start..frame_end).await?;
1743
1744 if footer_bytes.len() < DATA_OBJECT_FOOTER_SIZE {
1745 return Err(TensogramError::Remote("frame footer too short".to_string()));
1746 }
1747 if &footer_bytes[8..] != FRAME_END {
1748 return Err(TensogramError::Remote(
1749 "frame missing ENDF trailer".to_string(),
1750 ));
1751 }
1752
1753 let cbor_offset = u64::from_be_bytes(
1754 footer_bytes[..8]
1755 .try_into()
1756 .map_err(|_| TensogramError::Remote("footer cbor_offset truncated".to_string()))?,
1757 );
1758
1759 if cbor_offset < FRAME_HEADER_SIZE as u64 {
1760 return Err(TensogramError::Remote(format!(
1761 "cbor_offset ({cbor_offset}) below frame header size ({FRAME_HEADER_SIZE})"
1762 )));
1763 }
1764
1765 let cbor_after = fh.flags & DataObjectFlags::CBOR_AFTER_PAYLOAD != 0;
1766 let cbor_start = frame_start
1767 .checked_add(cbor_offset)
1768 .ok_or_else(|| TensogramError::Remote("cbor_start overflow".to_string()))?;
1769
1770 if cbor_after {
1771 if cbor_start >= footer_start {
1772 return Err(TensogramError::Remote(
1773 "cbor_offset points at or past footer".to_string(),
1774 ));
1775 }
1776 let cbor_bytes = self.get_range_async(cbor_start..footer_start).await?;
1777 metadata::cbor_to_object_descriptor(&cbor_bytes)
1778 } else {
1779 if cbor_start >= footer_start {
1780 return Err(TensogramError::Remote(
1781 "cbor_offset beyond frame body".to_string(),
1782 ));
1783 }
1784 let max_cbor_len = footer_start - cbor_start;
1785 let mut prefix_size: u64 = 8192;
1786 loop {
1787 let read_end = (cbor_start + prefix_size).min(footer_start);
1788 let prefix_bytes = self.get_range_async(cbor_start..read_end).await?;
1789 match metadata::cbor_to_object_descriptor(&prefix_bytes) {
1790 Ok(desc) => return Ok(desc),
1791 Err(_) if prefix_size < max_cbor_len => {
1792 prefix_size = (prefix_size * 2).min(max_cbor_len);
1793 }
1794 Err(e) => return Err(e),
1795 }
1796 }
1797 }
1798 }
1799
1800 pub(crate) async fn read_object_async(
1801 &self,
1802 msg_idx: usize,
1803 obj_idx: usize,
1804 options: &DecodeOptions,
1805 ) -> Result<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)> {
1806 self.ensure_layout_eager_async(msg_idx).await?;
1807 let layout = {
1808 let state = self.lock_state()?;
1809 state.layouts[msg_idx].clone()
1810 };
1811 let msg_offset = layout.offset;
1812
1813 if let Some(ref index) = layout.index {
1814 Self::validate_index_access(index, obj_idx)?;
1815
1816 let meta = layout
1817 .global_metadata
1818 .clone()
1819 .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
1820
1821 let range = Self::checked_frame_range(
1822 msg_offset,
1823 layout.length,
1824 index.offsets[obj_idx],
1825 index.lengths[obj_idx],
1826 )?;
1827 let frame_bytes = self.get_range_async(range).await?;
1828 let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
1829 let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
1830 Ok((meta, desc, decoded))
1831 } else {
1832 let msg_bytes = self.read_message_async(msg_idx).await?;
1833 crate::decode::decode_object(&msg_bytes, obj_idx, options)
1834 }
1835 }
1836
1837 pub(crate) async fn ensure_all_layouts_batch_async(&self, msg_indices: &[usize]) -> Result<()> {
1838 if msg_indices.is_empty() {
1839 return Ok(());
1840 }
1841 let max_idx = msg_indices.iter().copied().max().unwrap_or(0);
1842 loop {
1843 let (need_scan, scan_complete) = {
1844 let state = self.lock_state()?;
1845 (state.layouts.len() <= max_idx, state.scan_complete)
1846 };
1847 if !need_scan || scan_complete {
1848 break;
1849 }
1850 self.scan_and_discover_next_async().await?;
1851 }
1852
1853 {
1854 let state = self.lock_state()?;
1855 for &idx in msg_indices {
1856 if idx >= state.layouts.len() {
1857 return Err(TensogramError::Framing(format!(
1858 "message index {} out of range (count={})",
1859 idx,
1860 state.layouts.len()
1861 )));
1862 }
1863 }
1864 }
1865
1866 let needs_layout: Vec<usize> = {
1868 let state = self.lock_state()?;
1869 msg_indices
1870 .iter()
1871 .copied()
1872 .filter(|&idx| {
1873 state
1874 .layouts
1875 .get(idx)
1876 .is_none_or(|l| l.global_metadata.is_none() || l.index.is_none())
1877 })
1878 .collect()
1879 };
1880
1881 if needs_layout.is_empty() {
1882 return Ok(());
1883 }
1884
1885 let mut fetch_ranges: Vec<Range<u64>> = Vec::new();
1887 let mut fetch_map: Vec<(usize, bool)> = Vec::new(); {
1889 let state = self.lock_state()?;
1890 for &msg_idx in &needs_layout {
1891 let layout = &state.layouts[msg_idx];
1892 let flags = layout.preamble.flags;
1893 if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX)
1894 {
1895 let chunk_size = layout.length.min(256 * 1024);
1896 fetch_ranges.push(layout.offset..layout.offset + chunk_size);
1897 fetch_map.push((msg_idx, true));
1898 } else if flags.has(MessageFlags::FOOTER_METADATA)
1899 && flags.has(MessageFlags::FOOTER_INDEX)
1900 {
1901 let pa_offset = layout
1902 .offset
1903 .checked_add(layout.length)
1904 .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
1905 .ok_or_else(|| {
1906 TensogramError::Remote("postamble offset overflow".to_string())
1907 })?;
1908 fetch_ranges.push(pa_offset..pa_offset + POSTAMBLE_SIZE as u64);
1909 fetch_map.push((msg_idx, false));
1910 } else {
1911 return Err(TensogramError::Remote(
1912 "remote batch requires header-indexed or footer-indexed messages"
1913 .to_string(),
1914 ));
1915 }
1916 }
1917 }
1918
1919 let all_bytes = self
1921 .store
1922 .get_ranges(&self.path, &fetch_ranges)
1923 .await
1924 .map_err(|e| TensogramError::Remote(e.to_string()))?;
1925
1926 let mut footer_fetches: Vec<(usize, Range<u64>)> = Vec::new();
1928 {
1929 let mut state = self.lock_state()?;
1930 for (bytes, &(msg_idx, is_header)) in all_bytes.iter().zip(fetch_map.iter()) {
1931 if state.layouts[msg_idx].global_metadata.is_some()
1932 && state.layouts[msg_idx].index.is_some()
1933 {
1934 continue;
1935 }
1936 if is_header {
1937 Self::parse_header_frames(&mut state, msg_idx, bytes)?;
1938 } else {
1939 let postamble = Postamble::read_from(bytes)?;
1940 let layout = &state.layouts[msg_idx];
1941 let footer_start = layout
1942 .offset
1943 .checked_add(postamble.first_footer_offset)
1944 .ok_or_else(|| {
1945 TensogramError::Remote("footer offset overflow".to_string())
1946 })?;
1947 let pa_offset = layout
1948 .offset
1949 .checked_add(layout.length)
1950 .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
1951 .ok_or_else(|| {
1952 TensogramError::Remote("postamble offset overflow".to_string())
1953 })?;
1954 footer_fetches.push((msg_idx, footer_start..pa_offset));
1955 }
1956 }
1957 }
1958
1959 if !footer_fetches.is_empty() {
1961 let footer_ranges: Vec<Range<u64>> =
1962 footer_fetches.iter().map(|(_, r)| r.clone()).collect();
1963 let footer_bytes = self
1964 .store
1965 .get_ranges(&self.path, &footer_ranges)
1966 .await
1967 .map_err(|e| TensogramError::Remote(e.to_string()))?;
1968 let mut state = self.lock_state()?;
1969 for (bytes, &(msg_idx, _)) in footer_bytes.iter().zip(footer_fetches.iter()) {
1970 if state.layouts[msg_idx].global_metadata.is_some()
1971 && state.layouts[msg_idx].index.is_some()
1972 {
1973 continue;
1974 }
1975 Self::parse_footer_frames(&mut state, msg_idx, bytes)?;
1976 }
1977 }
1978
1979 Ok(())
1980 }
1981
1982 pub(crate) async fn read_object_batch_async(
1983 &self,
1984 msg_indices: &[usize],
1985 obj_idx: usize,
1986 options: &DecodeOptions,
1987 ) -> Result<Vec<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)>> {
1988 self.ensure_all_layouts_batch_async(msg_indices).await?;
1989
1990 let mut byte_ranges = Vec::with_capacity(msg_indices.len());
1991 let mut metas = Vec::with_capacity(msg_indices.len());
1992 {
1993 let state = self.lock_state()?;
1994 for &msg_idx in msg_indices {
1995 let layout = &state.layouts[msg_idx];
1996 if let Some(ref index) = layout.index {
1997 Self::validate_index_access(index, obj_idx)?;
1998 byte_ranges.push(Self::checked_frame_range(
1999 layout.offset,
2000 layout.length,
2001 index.offsets[obj_idx],
2002 index.lengths[obj_idx],
2003 )?);
2004 metas.push(layout.global_metadata.clone().ok_or_else(|| {
2005 TensogramError::Remote("metadata not cached".to_string())
2006 })?);
2007 } else {
2008 return Err(TensogramError::Remote(format!(
2009 "message {} has no index frame; batch decode requires indexed messages",
2010 msg_idx
2011 )));
2012 }
2013 }
2014 }
2015
2016 let all_bytes = self
2017 .store
2018 .get_ranges(&self.path, &byte_ranges)
2019 .await
2020 .map_err(|e| TensogramError::Remote(e.to_string()))?;
2021
2022 let mut results = Vec::with_capacity(msg_indices.len());
2023 for (frame_bytes, meta) in all_bytes.iter().zip(metas) {
2024 let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
2025 let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
2026 results.push((meta, desc, decoded));
2027 }
2028 Ok(results)
2029 }
2030
2031 pub(crate) async fn read_range_batch_async(
2032 &self,
2033 msg_indices: &[usize],
2034 obj_idx: usize,
2035 ranges: &[(u64, u64)],
2036 options: &DecodeOptions,
2037 ) -> Result<Vec<(DataObjectDescriptor, Vec<Vec<u8>>)>> {
2038 self.ensure_all_layouts_batch_async(msg_indices).await?;
2040
2041 let mut byte_ranges = Vec::with_capacity(msg_indices.len());
2043 {
2044 let state = self.lock_state()?;
2045 for &msg_idx in msg_indices {
2046 let layout = &state.layouts[msg_idx];
2047 if let Some(ref index) = layout.index {
2048 Self::validate_index_access(index, obj_idx)?;
2049 let range = Self::checked_frame_range(
2050 layout.offset,
2051 layout.length,
2052 index.offsets[obj_idx],
2053 index.lengths[obj_idx],
2054 )?;
2055 byte_ranges.push(range);
2056 } else {
2057 return Err(TensogramError::Remote(format!(
2058 "message {} has no index frame; batch range decode requires indexed messages",
2059 msg_idx
2060 )));
2061 }
2062 }
2063 }
2064
2065 let all_bytes = self
2067 .store
2068 .get_ranges(&self.path, &byte_ranges)
2069 .await
2070 .map_err(|e| TensogramError::Remote(e.to_string()))?;
2071
2072 let mut results = Vec::with_capacity(msg_indices.len());
2074 for frame_bytes in &all_bytes {
2075 let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
2076 let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
2077 results.push((desc, parts));
2078 }
2079 Ok(results)
2080 }
2081
2082 pub(crate) async fn read_range_async(
2083 &self,
2084 msg_idx: usize,
2085 obj_idx: usize,
2086 ranges: &[(u64, u64)],
2087 options: &DecodeOptions,
2088 ) -> Result<(DataObjectDescriptor, Vec<Vec<u8>>)> {
2089 self.ensure_layout_eager_async(msg_idx).await?;
2090 let layout = {
2091 let state = self.lock_state()?;
2092 state.layouts[msg_idx].clone()
2093 };
2094 let msg_offset = layout.offset;
2095
2096 if let Some(ref index) = layout.index {
2097 Self::validate_index_access(index, obj_idx)?;
2098
2099 let range = Self::checked_frame_range(
2100 msg_offset,
2101 layout.length,
2102 index.offsets[obj_idx],
2103 index.lengths[obj_idx],
2104 )?;
2105 let frame_bytes = self.get_range_async(range).await?;
2106 let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
2107 let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
2108 Ok((desc, parts))
2109 } else {
2110 let msg_bytes = self.read_message_async(msg_idx).await?;
2111 crate::decode::decode_range(&msg_bytes, obj_idx, ranges, options)
2112 }
2113 }
2114}