Skip to main content

tensogram_core/
remote.rs

1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9//! Remote object store backend for `TensogramFile`.
10//!
11//! Enables reading `.tgm` files from S3, GCS, Azure, and HTTP(S) via
12//! the `object_store` crate. Feature-gated behind `remote`.
13
14use std::collections::BTreeMap;
15use std::ops::Range;
16#[cfg(feature = "async")]
17use std::sync::MutexGuard;
18use std::sync::{Arc, Mutex, OnceLock};
19
20use bytes::Bytes;
21use object_store::path::Path as ObjectPath;
22use object_store::{ObjectStore, ObjectStoreExt};
23use url::Url;
24
25use crate::decode::DecodeOptions;
26use crate::error::{Result, TensogramError};
27use crate::framing;
28use crate::metadata;
29use crate::types::{DataObjectDescriptor, GlobalMetadata, IndexFrame};
30use crate::wire::{
31    DATA_OBJECT_FOOTER_SIZE, DataObjectFlags, FRAME_END, FRAME_HEADER_SIZE, FrameHeader, FrameType,
32    MAGIC, MessageFlags, POSTAMBLE_SIZE, PREAMBLE_SIZE, Postamble, Preamble,
33};
34
35// ── URL scheme detection ─────────────────────────────────────────────────────
36
37const REMOTE_SCHEMES: &[&str] = &["s3", "s3a", "gs", "az", "azure", "http", "https"];
38
39// ── Shared tokio runtime ─────────────────────────────────────────────────────
40
41/// Process-wide shared tokio runtime for remote I/O.
42///
43/// Wrapped in `Result` so that `get_or_init` (which runs the closure
44/// exactly once, no races) can cache a build failure without panicking.
45static SHARED_RUNTIME: OnceLock<std::result::Result<tokio::runtime::Runtime, String>> =
46    OnceLock::new();
47
48fn shared_runtime() -> Result<&'static tokio::runtime::Runtime> {
49    SHARED_RUNTIME
50        .get_or_init(|| {
51            tokio::runtime::Builder::new_multi_thread()
52                .worker_threads(2)
53                .enable_all()
54                .thread_name("tensogram-remote-io")
55                .build()
56                .map_err(|e| format!("tokio runtime: {e}"))
57        })
58        .as_ref()
59        .map_err(|e| TensogramError::Remote(e.clone()))
60}
61
62/// Run an async operation synchronously using the shared runtime.
63///
64/// Three strategies based on the calling context:
65///
66/// - **Not in async context** (Python, CLI): direct `handle.block_on()`,
67///   no extra thread creation.
68/// - **Inside multi-thread tokio runtime** (`#[tokio::test]`, server
69///   handler): `block_in_place` + `handle.block_on()`, which tells
70///   tokio to spawn a replacement worker so the current one can block
71///   without causing runtime starvation.
72/// - **Inside current-thread tokio runtime**: scoped thread fallback,
73///   since `block_in_place` is not supported on single-threaded
74///   runtimes.
75///
76/// In all cases the shared runtime's event loop and connection pool
77/// are reused, eliminating the per-call runtime creation overhead of
78/// the old `block_on_thread` pattern.
79fn block_on_shared<T, Fut>(future: Fut) -> Result<T>
80where
81    T: Send,
82    Fut: std::future::Future<Output = std::result::Result<T, object_store::Error>> + Send,
83{
84    let rt = shared_runtime()?;
85    let handle = rt.handle().clone();
86
87    match tokio::runtime::Handle::try_current() {
88        Err(_) => {
89            // Not in an async context — drive the future directly.
90            handle
91                .block_on(future)
92                .map_err(|e| TensogramError::Remote(e.to_string()))
93        }
94        Ok(current) if current.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread => {
95            // Multi-thread runtime: block_in_place lets tokio spawn a
96            // replacement worker so this one can block safely.
97            tokio::task::block_in_place(|| {
98                handle
99                    .block_on(future)
100                    .map_err(|e| TensogramError::Remote(e.to_string()))
101            })
102        }
103        Ok(_) => {
104            // Current-thread runtime: block_in_place is unsupported,
105            // fall back to a scoped thread.
106            std::thread::scope(|s| {
107                match s
108                    .spawn(|| {
109                        handle
110                            .block_on(future)
111                            .map_err(|e| TensogramError::Remote(e.to_string()))
112                    })
113                    .join()
114                {
115                    Ok(result) => result,
116                    Err(_) => Err(TensogramError::Remote(
117                        "remote I/O thread panicked".to_string(),
118                    )),
119                }
120            })
121        }
122    }
123}
124
125pub fn is_remote_url(source: &str) -> bool {
126    match source.find("://") {
127        Some(pos) => {
128            let scheme = &source[..pos];
129            REMOTE_SCHEMES
130                .iter()
131                .any(|s| s.eq_ignore_ascii_case(scheme))
132        }
133        None => false,
134    }
135}
136
137// ── Cached per-message layout ────────────────────────────────────────────────
138
139#[derive(Debug, Clone)]
140struct MessageLayout {
141    offset: u64,
142    length: u64,
143    preamble: Preamble,
144    index: Option<IndexFrame>,
145    global_metadata: Option<GlobalMetadata>,
146}
147
148// ── Remote backend ───────────────────────────────────────────────────────────
149
150pub(crate) struct RemoteBackend {
151    source_url: String,
152    store: Arc<dyn ObjectStore>,
153    path: ObjectPath,
154    file_size: u64,
155    state: Mutex<RemoteState>,
156}
157
158#[derive(Debug, Default)]
159struct RemoteState {
160    layouts: Vec<MessageLayout>,
161    next_scan_offset: u64,
162    scan_complete: bool,
163}
164
165impl std::fmt::Debug for RemoteBackend {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        f.debug_struct("RemoteBackend")
168            .field("source", &self.source_url)
169            .field("file_size", &self.file_size)
170            .field(
171                "messages",
172                &self
173                    .state
174                    .lock()
175                    .map(|state| state.layouts.len())
176                    .unwrap_or(0),
177            )
178            .finish()
179    }
180}
181
182impl RemoteBackend {
183    pub(crate) fn source_url(&self) -> &str {
184        &self.source_url
185    }
186
187    #[cfg(feature = "async")]
188    fn lock_state(&self) -> Result<MutexGuard<'_, RemoteState>> {
189        self.state
190            .lock()
191            .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))
192    }
193
194    pub(crate) fn open(source: &str, storage_options: &BTreeMap<String, String>) -> Result<Self> {
195        let url = Url::parse(source)
196            .map_err(|e| TensogramError::Remote(format!("invalid URL '{source}': {e}")))?;
197
198        let mut opts: Vec<(String, String)> = storage_options
199            .iter()
200            .map(|(k, v)| (k.clone(), v.clone()))
201            .collect();
202        if url.scheme() == "http" && !opts.iter().any(|(k, _)| k == "allow_http") {
203            opts.push(("allow_http".to_string(), "true".to_string()));
204        }
205        let (store, path) = object_store::parse_url_opts(&url, opts)
206            .map_err(|e| TensogramError::Remote(format!("cannot open '{source}': {e}")))?;
207
208        let store: Arc<dyn ObjectStore> = Arc::from(store);
209
210        let head_store = store.clone();
211        let head_path = path.clone();
212        let meta = block_on_shared(async move { head_store.head(&head_path).await })?;
213
214        let file_size = meta.size;
215        if file_size < (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64 {
216            return Err(TensogramError::Remote(format!(
217                "remote file too small ({file_size} bytes)"
218            )));
219        }
220
221        let backend = RemoteBackend {
222            source_url: source.to_string(),
223            store,
224            path,
225            file_size,
226            state: Mutex::new(RemoteState::default()),
227        };
228        {
229            let mut state = backend
230                .state
231                .lock()
232                .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
233            backend.scan_next_locked(&mut state)?;
234            if state.layouts.is_empty() {
235                return Err(TensogramError::Remote(
236                    "no valid messages found in remote file".to_string(),
237                ));
238            }
239        }
240        Ok(backend)
241    }
242
243    // ── Range reads ──────────────────────────────────────────────────────
244
245    fn get_range(&self, range: Range<u64>) -> Result<Bytes> {
246        let store = self.store.clone();
247        let path = self.path.clone();
248        block_on_shared(async move { store.get_range(&path, range).await })
249    }
250
251    // ── Message scanning ─────────────────────────────────────────────────
252
253    fn scan_next_locked(&self, state: &mut RemoteState) -> Result<()> {
254        if state.scan_complete {
255            return Ok(());
256        }
257        let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
258        let pos = state.next_scan_offset;
259
260        if pos + min_message_size > self.file_size {
261            state.scan_complete = true;
262            return Ok(());
263        }
264
265        let preamble_bytes = self.get_range(pos..pos + PREAMBLE_SIZE as u64)?;
266        if &preamble_bytes[..MAGIC.len()] != MAGIC {
267            state.scan_complete = true;
268            return Ok(());
269        }
270
271        let preamble = match Preamble::read_from(&preamble_bytes) {
272            Ok(p) => p,
273            Err(_) => {
274                state.scan_complete = true;
275                return Ok(());
276            }
277        };
278
279        let msg_len = preamble.total_length;
280
281        if msg_len == 0 {
282            let remaining = self.file_size - pos;
283            if remaining < min_message_size {
284                state.scan_complete = true;
285                return Ok(());
286            }
287            let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
288            let end_bytes =
289                self.get_range(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)?;
290            if &end_bytes[..] != crate::wire::END_MAGIC {
291                state.scan_complete = true;
292                return Ok(());
293            }
294            state.layouts.push(MessageLayout {
295                offset: pos,
296                length: remaining,
297                preamble,
298                index: None,
299                global_metadata: None,
300            });
301            state.scan_complete = true;
302            return Ok(());
303        }
304
305        let msg_end = match pos.checked_add(msg_len) {
306            Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
307            _ => {
308                state.scan_complete = true;
309                return Ok(());
310            }
311        };
312
313        state.layouts.push(MessageLayout {
314            offset: pos,
315            length: msg_len,
316            preamble,
317            index: None,
318            global_metadata: None,
319        });
320        state.next_scan_offset = msg_end;
321        Ok(())
322    }
323
324    fn ensure_message_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
325        while msg_idx >= state.layouts.len() && !state.scan_complete {
326            self.scan_next_locked(state)?;
327        }
328        if msg_idx >= state.layouts.len() {
329            return Err(TensogramError::Framing(format!(
330                "message index {} out of range (count={})",
331                msg_idx,
332                state.layouts.len()
333            )));
334        }
335        Ok(())
336    }
337
338    fn scan_all_locked(&self, state: &mut RemoteState) -> Result<()> {
339        while !state.scan_complete {
340            self.scan_next_locked(state)?;
341        }
342        Ok(())
343    }
344
345    fn scan_and_discover_next_locked(&self, state: &mut RemoteState) -> Result<()> {
346        if state.scan_complete {
347            return Ok(());
348        }
349        let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
350        let pos = state.next_scan_offset;
351
352        if pos + min_message_size > self.file_size {
353            state.scan_complete = true;
354            return Ok(());
355        }
356
357        let chunk_size = (self.file_size - pos).min(256 * 1024);
358        let chunk = self.get_range(pos..pos + chunk_size)?;
359
360        if chunk.len() < PREAMBLE_SIZE || &chunk[..MAGIC.len()] != MAGIC {
361            state.scan_complete = true;
362            return Ok(());
363        }
364
365        let preamble = match Preamble::read_from(&chunk[..PREAMBLE_SIZE]) {
366            Ok(p) => p,
367            Err(_) => {
368                state.scan_complete = true;
369                return Ok(());
370            }
371        };
372
373        let msg_len = preamble.total_length;
374
375        if msg_len == 0 {
376            let remaining = self.file_size - pos;
377            if remaining < min_message_size {
378                state.scan_complete = true;
379                return Ok(());
380            }
381            let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
382            let end_bytes =
383                self.get_range(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)?;
384            if &end_bytes[..] != crate::wire::END_MAGIC {
385                state.scan_complete = true;
386                return Ok(());
387            }
388            state.layouts.push(MessageLayout {
389                offset: pos,
390                length: remaining,
391                preamble,
392                index: None,
393                global_metadata: None,
394            });
395            state.scan_complete = true;
396            return Ok(());
397        }
398
399        let msg_end = match pos.checked_add(msg_len) {
400            Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
401            _ => {
402                state.scan_complete = true;
403                return Ok(());
404            }
405        };
406
407        let flags = preamble.flags;
408        let msg_idx = state.layouts.len();
409
410        state.layouts.push(MessageLayout {
411            offset: pos,
412            length: msg_len,
413            preamble,
414            index: None,
415            global_metadata: None,
416        });
417        state.next_scan_offset = msg_end;
418
419        if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
420            let chunk_end = (msg_len as usize).min(chunk.len());
421            Self::parse_header_frames(state, msg_idx, &chunk[..chunk_end])?;
422        } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
423        {
424            self.discover_footer_layout_from_suffix_locked(state, msg_idx)?;
425        }
426
427        Ok(())
428    }
429
430    /// Discover footer layout by reading a single suffix chunk from the
431    /// message end.  The suffix covers both the postamble and the footer
432    /// region (metadata + index frames) in one GET, halving the round
433    /// trips compared to separate postamble + footer reads.
434    ///
435    /// The suffix size is capped at 256 KB — footer regions are typically
436    /// a few KB.  If `first_footer_offset` points outside the suffix,
437    /// falls back to a separate read for the footer region.
438    fn discover_footer_layout_from_suffix_locked(
439        &self,
440        state: &mut RemoteState,
441        msg_idx: usize,
442    ) -> Result<()> {
443        let msg_offset = state.layouts[msg_idx].offset;
444        let msg_len = state.layouts[msg_idx].length;
445
446        let suffix_size = msg_len.min(256 * 1024);
447        let msg_end = msg_offset
448            .checked_add(msg_len)
449            .ok_or_else(|| TensogramError::Remote("message end overflow".to_string()))?;
450        let suffix_start = msg_end - suffix_size;
451        let suffix = self.get_range(suffix_start..msg_end)?;
452
453        if suffix.len() < POSTAMBLE_SIZE {
454            return Err(TensogramError::Remote(
455                "suffix too short for postamble".to_string(),
456            ));
457        }
458
459        let pa_bytes = &suffix[suffix.len() - POSTAMBLE_SIZE..];
460        let postamble = Postamble::read_from(pa_bytes)?;
461
462        if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
463            return Err(TensogramError::Remote(format!(
464                "first_footer_offset ({}) is before preamble end ({PREAMBLE_SIZE})",
465                postamble.first_footer_offset
466            )));
467        }
468
469        let footer_abs_start = msg_offset
470            .checked_add(postamble.first_footer_offset)
471            .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
472        let footer_abs_end = msg_end - POSTAMBLE_SIZE as u64;
473
474        if footer_abs_start >= footer_abs_end {
475            return Err(TensogramError::Remote(
476                "first_footer_offset points at or past postamble".to_string(),
477            ));
478        }
479
480        if footer_abs_start >= suffix_start {
481            let local_start = (footer_abs_start - suffix_start) as usize;
482            let local_end = suffix.len() - POSTAMBLE_SIZE;
483            Self::parse_footer_frames(state, msg_idx, &suffix[local_start..local_end])
484        } else {
485            let footer_bytes = self.get_range(footer_abs_start..footer_abs_end)?;
486            Self::parse_footer_frames(state, msg_idx, &footer_bytes)
487        }
488    }
489
490    fn ensure_layout_eager_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
491        while msg_idx >= state.layouts.len() && !state.scan_complete {
492            self.scan_and_discover_next_locked(state)?;
493        }
494        if msg_idx >= state.layouts.len() {
495            return Err(TensogramError::Framing(format!(
496                "message index {} out of range (count={})",
497                msg_idx,
498                state.layouts.len()
499            )));
500        }
501        if state.layouts[msg_idx].global_metadata.is_some()
502            && state.layouts[msg_idx].index.is_some()
503        {
504            return Ok(());
505        }
506        self.ensure_layout_locked(state, msg_idx)
507    }
508
509    // ── Layout discovery (metadata + index for a single message) ─────────
510
511    fn ensure_layout_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
512        self.ensure_message_locked(state, msg_idx)?;
513        if state.layouts[msg_idx].global_metadata.is_some()
514            && state.layouts[msg_idx].index.is_some()
515        {
516            return Ok(());
517        }
518
519        let flags = state.layouts[msg_idx].preamble.flags;
520
521        if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
522            self.discover_header_layout_locked(state, msg_idx)?;
523        } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
524        {
525            self.discover_footer_layout_locked(state, msg_idx)?;
526        } else {
527            return Err(TensogramError::Remote(
528                "remote access requires header-indexed or footer-indexed messages".to_string(),
529            ));
530        }
531
532        Ok(())
533    }
534
535    fn discover_footer_layout_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
536        let msg_offset = state.layouts[msg_idx].offset;
537        let msg_len = state.layouts[msg_idx].length;
538
539        let pa_offset = msg_offset
540            .checked_add(msg_len)
541            .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
542            .ok_or_else(|| TensogramError::Remote("postamble offset overflow".to_string()))?;
543        let pa_bytes = self.get_range(pa_offset..pa_offset + POSTAMBLE_SIZE as u64)?;
544        let postamble = Postamble::read_from(&pa_bytes)?;
545
546        if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
547            return Err(TensogramError::Remote(format!(
548                "first_footer_offset ({}) is before preamble end ({})",
549                postamble.first_footer_offset, PREAMBLE_SIZE
550            )));
551        }
552        let footer_start = msg_offset
553            .checked_add(postamble.first_footer_offset)
554            .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
555        let footer_end = pa_offset;
556        if footer_start >= footer_end {
557            return Err(TensogramError::Remote(
558                "first_footer_offset points at or past postamble".to_string(),
559            ));
560        }
561        let footer_bytes = self.get_range(footer_start..footer_end)?;
562
563        Self::parse_footer_frames(state, msg_idx, &footer_bytes)
564    }
565
566    fn discover_header_layout_locked(&self, state: &mut RemoteState, msg_idx: usize) -> Result<()> {
567        let layout = &state.layouts[msg_idx];
568        let msg_offset = layout.offset;
569        let msg_len = layout.length;
570
571        // Read a generous initial chunk: up to 256KB or the message size.
572        // Header metadata + index are typically a few KB.
573        let chunk_size = msg_len.min(256 * 1024);
574        let header_bytes = self.get_range(msg_offset..msg_offset + chunk_size)?;
575
576        Self::parse_header_frames(state, msg_idx, &header_bytes)
577    }
578
579    fn parse_header_frames(state: &mut RemoteState, msg_idx: usize, buf: &[u8]) -> Result<()> {
580        let min_frame_size = FRAME_HEADER_SIZE + FRAME_END.len();
581        let mut pos = PREAMBLE_SIZE;
582
583        while pos + FRAME_HEADER_SIZE <= buf.len() {
584            if &buf[pos..pos + 2] != b"FR" {
585                pos += 1;
586                continue;
587            }
588            let fh = FrameHeader::read_from(&buf[pos..])?;
589            let frame_total = usize::try_from(fh.total_length).map_err(|_| {
590                TensogramError::Remote("frame total_length does not fit in usize".to_string())
591            })?;
592
593            if frame_total < min_frame_size {
594                return Err(TensogramError::Remote(format!(
595                    "frame total_length ({frame_total}) smaller than minimum ({min_frame_size})"
596                )));
597            }
598            let frame_end = match pos.checked_add(frame_total) {
599                Some(end) if end <= buf.len() => end,
600                _ => break,
601            };
602
603            if &buf[frame_end - FRAME_END.len()..frame_end] != FRAME_END {
604                return Err(TensogramError::Remote(
605                    "frame missing ENDF trailer".to_string(),
606                ));
607            }
608
609            let payload = &buf[pos + FRAME_HEADER_SIZE..frame_end - FRAME_END.len()];
610
611            match fh.frame_type {
612                FrameType::HeaderMetadata => {
613                    let meta = metadata::cbor_to_global_metadata(payload)?;
614                    state.layouts[msg_idx].global_metadata = Some(meta);
615                }
616                FrameType::HeaderIndex => {
617                    let idx = metadata::cbor_to_index(payload)?;
618                    state.layouts[msg_idx].index = Some(idx);
619                }
620                FrameType::DataObject | FrameType::PrecederMetadata => {
621                    break;
622                }
623                _ => {}
624            }
625
626            let aligned = (frame_end.saturating_add(7)) & !7;
627            pos = aligned.min(buf.len());
628        }
629
630        if state.layouts[msg_idx].global_metadata.is_none() {
631            return Err(TensogramError::Remote(
632                "header region did not contain a metadata frame".to_string(),
633            ));
634        }
635        if state.layouts[msg_idx].index.is_none() {
636            return Err(TensogramError::Remote(
637                "header region did not contain an index frame (header chunk may be too small)"
638                    .to_string(),
639            ));
640        }
641
642        Ok(())
643    }
644
645    fn parse_footer_frames(state: &mut RemoteState, msg_idx: usize, buf: &[u8]) -> Result<()> {
646        let min_frame_size = FRAME_HEADER_SIZE + FRAME_END.len();
647        let mut pos = 0;
648
649        while pos + FRAME_HEADER_SIZE <= buf.len() {
650            if &buf[pos..pos + 2] != b"FR" {
651                pos += 1;
652                continue;
653            }
654            let fh = FrameHeader::read_from(&buf[pos..])?;
655            let frame_total = usize::try_from(fh.total_length).map_err(|_| {
656                TensogramError::Remote(
657                    "footer frame total_length does not fit in usize".to_string(),
658                )
659            })?;
660
661            if frame_total < min_frame_size {
662                return Err(TensogramError::Remote(format!(
663                    "footer frame total_length ({frame_total}) smaller than minimum ({min_frame_size})"
664                )));
665            }
666            let frame_end = match pos.checked_add(frame_total) {
667                Some(end) if end <= buf.len() => end,
668                _ => break,
669            };
670
671            if &buf[frame_end - FRAME_END.len()..frame_end] != FRAME_END {
672                return Err(TensogramError::Remote(
673                    "footer frame missing ENDF trailer".to_string(),
674                ));
675            }
676
677            let payload = &buf[pos + FRAME_HEADER_SIZE..frame_end - FRAME_END.len()];
678
679            match fh.frame_type {
680                FrameType::FooterMetadata => {
681                    let meta = metadata::cbor_to_global_metadata(payload)?;
682                    state.layouts[msg_idx].global_metadata = Some(meta);
683                }
684                FrameType::FooterIndex => {
685                    let idx = metadata::cbor_to_index(payload)?;
686                    state.layouts[msg_idx].index = Some(idx);
687                }
688                _ => {}
689            }
690
691            let aligned = (frame_end.saturating_add(7)) & !7;
692            pos = aligned.min(buf.len());
693        }
694
695        if state.layouts[msg_idx].global_metadata.is_none() {
696            return Err(TensogramError::Remote(
697                "footer region did not contain a metadata frame".to_string(),
698            ));
699        }
700        if state.layouts[msg_idx].index.is_none() {
701            return Err(TensogramError::Remote(
702                "footer region did not contain an index frame".to_string(),
703            ));
704        }
705
706        Ok(())
707    }
708
709    // ── Public API used by TensogramFile ─────────────────────────────────
710
711    pub(crate) fn message_count(&self) -> Result<usize> {
712        let mut state = self
713            .state
714            .lock()
715            .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
716        self.scan_all_locked(&mut state)?;
717        Ok(state.layouts.len())
718    }
719
720    pub(crate) fn read_message(&self, msg_idx: usize) -> Result<Vec<u8>> {
721        let (offset, length) = {
722            let mut state = self
723                .state
724                .lock()
725                .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
726            self.ensure_message_locked(&mut state, msg_idx)?;
727            let layout = &state.layouts[msg_idx];
728            (layout.offset, layout.length)
729        };
730        let bytes = self.get_range(offset..offset + length)?;
731        Ok(bytes.to_vec())
732    }
733
734    pub(crate) fn read_metadata(&self, msg_idx: usize) -> Result<GlobalMetadata> {
735        let mut state = self
736            .state
737            .lock()
738            .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
739        self.ensure_layout_eager_locked(&mut state, msg_idx)?;
740        state.layouts[msg_idx]
741            .global_metadata
742            .clone()
743            .ok_or_else(|| TensogramError::Remote("metadata not found".to_string()))
744    }
745
746    pub(crate) fn read_descriptors(
747        &self,
748        msg_idx: usize,
749    ) -> Result<(GlobalMetadata, Vec<DataObjectDescriptor>)> {
750        let layout = {
751            let mut state = self
752                .state
753                .lock()
754                .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
755            self.ensure_layout_eager_locked(&mut state, msg_idx)?;
756            state.layouts[msg_idx].clone()
757        };
758        let msg_offset = layout.offset;
759
760        if let Some(ref index) = layout.index {
761            if index.offsets.len() != index.lengths.len() {
762                return Err(TensogramError::Remote(format!(
763                    "corrupt index: offsets.len()={} != lengths.len()={}",
764                    index.offsets.len(),
765                    index.lengths.len()
766                )));
767            }
768
769            let meta = layout
770                .global_metadata
771                .clone()
772                .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
773
774            let msg_length = layout.length;
775            let mut descriptors = Vec::with_capacity(index.offsets.len());
776            for i in 0..index.offsets.len() {
777                let desc = self.read_descriptor_only(
778                    msg_offset,
779                    msg_length,
780                    index.offsets[i],
781                    index.lengths[i],
782                )?;
783                descriptors.push(desc);
784            }
785            Ok((meta, descriptors))
786        } else {
787            let msg_bytes = self.read_message(msg_idx)?;
788            crate::decode::decode_descriptors(&msg_bytes)
789        }
790    }
791
792    /// Read only the CBOR descriptor from a data object frame, without
793    /// downloading the full payload.
794    ///
795    /// For frames below `DESCRIPTOR_PREFIX_THRESHOLD` bytes, falls back
796    /// to reading the entire frame (fewer round-trips).  For large frames,
797    /// reads just the header, footer, and CBOR region — typically < 10 KB
798    /// even when the payload is hundreds of megabytes.
799    fn read_descriptor_only(
800        &self,
801        msg_offset: u64,
802        msg_length: u64,
803        frame_offset_in_msg: u64,
804        frame_length: u64,
805    ) -> Result<DataObjectDescriptor> {
806        const DESCRIPTOR_PREFIX_THRESHOLD: u64 = 64 * 1024;
807
808        let range =
809            Self::checked_frame_range(msg_offset, msg_length, frame_offset_in_msg, frame_length)?;
810
811        if frame_length <= DESCRIPTOR_PREFIX_THRESHOLD {
812            let frame_bytes = self.get_range(range.clone())?;
813            let (desc, _payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
814            return Ok(desc);
815        }
816
817        let frame_start = range.start;
818        let frame_end = range.end;
819
820        let header_bytes = self.get_range(frame_start..frame_start + FRAME_HEADER_SIZE as u64)?;
821        let fh = FrameHeader::read_from(&header_bytes)?;
822
823        if fh.frame_type != FrameType::DataObject {
824            return Err(TensogramError::Remote(format!(
825                "expected DataObject frame, got {:?}",
826                fh.frame_type
827            )));
828        }
829
830        let footer_start = frame_end - DATA_OBJECT_FOOTER_SIZE as u64;
831        let footer_bytes = self.get_range(footer_start..frame_end)?;
832
833        if footer_bytes.len() < DATA_OBJECT_FOOTER_SIZE {
834            return Err(TensogramError::Remote("frame footer too short".to_string()));
835        }
836        if &footer_bytes[8..] != FRAME_END {
837            return Err(TensogramError::Remote(
838                "frame missing ENDF trailer".to_string(),
839            ));
840        }
841
842        let cbor_offset = u64::from_be_bytes(
843            footer_bytes[..8]
844                .try_into()
845                .map_err(|_| TensogramError::Remote("footer cbor_offset truncated".to_string()))?,
846        );
847
848        if cbor_offset < FRAME_HEADER_SIZE as u64 {
849            return Err(TensogramError::Remote(format!(
850                "cbor_offset ({cbor_offset}) below frame header size ({FRAME_HEADER_SIZE})"
851            )));
852        }
853
854        let cbor_after = fh.flags & DataObjectFlags::CBOR_AFTER_PAYLOAD != 0;
855        let cbor_start = frame_start
856            .checked_add(cbor_offset)
857            .ok_or_else(|| TensogramError::Remote("cbor_start overflow".to_string()))?;
858
859        if cbor_after {
860            if cbor_start >= footer_start {
861                return Err(TensogramError::Remote(
862                    "cbor_offset points at or past footer".to_string(),
863                ));
864            }
865            let cbor_bytes = self.get_range(cbor_start..footer_start)?;
866            metadata::cbor_to_object_descriptor(&cbor_bytes)
867        } else {
868            if cbor_start >= footer_start {
869                return Err(TensogramError::Remote(
870                    "cbor_offset beyond frame body".to_string(),
871                ));
872            }
873            let max_cbor_len = footer_start - cbor_start;
874            let mut prefix_size: u64 = 8192;
875            loop {
876                let read_end = (cbor_start + prefix_size).min(footer_start);
877                let prefix_bytes = self.get_range(cbor_start..read_end)?;
878                match metadata::cbor_to_object_descriptor(&prefix_bytes) {
879                    Ok(desc) => return Ok(desc),
880                    Err(_) if prefix_size < max_cbor_len => {
881                        prefix_size = (prefix_size * 2).min(max_cbor_len);
882                    }
883                    Err(e) => return Err(e),
884                }
885            }
886        }
887    }
888
889    pub(crate) fn read_object(
890        &self,
891        msg_idx: usize,
892        obj_idx: usize,
893        options: &DecodeOptions,
894    ) -> Result<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)> {
895        let layout = {
896            let mut state = self
897                .state
898                .lock()
899                .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
900            self.ensure_layout_eager_locked(&mut state, msg_idx)?;
901            state.layouts[msg_idx].clone()
902        };
903        let msg_offset = layout.offset;
904
905        if let Some(ref index) = layout.index {
906            Self::validate_index_access(index, obj_idx)?;
907
908            let meta = layout
909                .global_metadata
910                .clone()
911                .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
912
913            let range = Self::checked_frame_range(
914                msg_offset,
915                layout.length,
916                index.offsets[obj_idx],
917                index.lengths[obj_idx],
918            )?;
919            let frame_bytes = self.get_range(range)?;
920
921            let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
922
923            let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
924
925            Ok((meta, desc, decoded))
926        } else {
927            // No index — fall back to full message download
928            let msg_bytes = self.read_message(msg_idx)?;
929            crate::decode::decode_object(&msg_bytes, obj_idx, options)
930        }
931    }
932
933    pub(crate) fn read_range_batch(
934        &self,
935        msg_indices: &[usize],
936        obj_idx: usize,
937        ranges: &[(u64, u64)],
938        options: &DecodeOptions,
939    ) -> Result<Vec<(DataObjectDescriptor, Vec<Vec<u8>>)>> {
940        let mut byte_ranges = Vec::with_capacity(msg_indices.len());
941        {
942            let mut state = self
943                .state
944                .lock()
945                .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
946            for &msg_idx in msg_indices {
947                self.ensure_layout_eager_locked(&mut state, msg_idx)?;
948            }
949            for &msg_idx in msg_indices {
950                let layout = &state.layouts[msg_idx];
951                if let Some(ref index) = layout.index {
952                    Self::validate_index_access(index, obj_idx)?;
953                    byte_ranges.push(Self::checked_frame_range(
954                        layout.offset,
955                        layout.length,
956                        index.offsets[obj_idx],
957                        index.lengths[obj_idx],
958                    )?);
959                } else {
960                    return Err(TensogramError::Remote(format!(
961                        "message {} has no index frame; batch requires indexed messages",
962                        msg_idx
963                    )));
964                }
965            }
966        }
967
968        let store = self.store.clone();
969        let path = self.path.clone();
970        let all_bytes =
971            block_on_shared(async move { store.get_ranges(&path, &byte_ranges).await })?;
972
973        let mut results = Vec::with_capacity(msg_indices.len());
974        for frame_bytes in &all_bytes {
975            let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
976            let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
977            results.push((desc, parts));
978        }
979        Ok(results)
980    }
981
982    pub(crate) fn read_object_batch(
983        &self,
984        msg_indices: &[usize],
985        obj_idx: usize,
986        options: &DecodeOptions,
987    ) -> Result<Vec<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)>> {
988        let mut byte_ranges = Vec::with_capacity(msg_indices.len());
989        let mut metas = Vec::with_capacity(msg_indices.len());
990        {
991            let mut state = self
992                .state
993                .lock()
994                .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
995            for &msg_idx in msg_indices {
996                self.ensure_layout_eager_locked(&mut state, msg_idx)?;
997            }
998            for &msg_idx in msg_indices {
999                let layout = &state.layouts[msg_idx];
1000                if let Some(ref index) = layout.index {
1001                    Self::validate_index_access(index, obj_idx)?;
1002                    byte_ranges.push(Self::checked_frame_range(
1003                        layout.offset,
1004                        layout.length,
1005                        index.offsets[obj_idx],
1006                        index.lengths[obj_idx],
1007                    )?);
1008                    metas.push(layout.global_metadata.clone().ok_or_else(|| {
1009                        TensogramError::Remote("metadata not cached".to_string())
1010                    })?);
1011                } else {
1012                    return Err(TensogramError::Remote(format!(
1013                        "message {} has no index frame; batch decode requires indexed messages",
1014                        msg_idx
1015                    )));
1016                }
1017            }
1018        }
1019
1020        let store = self.store.clone();
1021        let path = self.path.clone();
1022        let all_bytes =
1023            block_on_shared(async move { store.get_ranges(&path, &byte_ranges).await })?;
1024
1025        let mut results = Vec::with_capacity(msg_indices.len());
1026        for (frame_bytes, meta) in all_bytes.iter().zip(metas) {
1027            let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
1028            let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
1029            results.push((meta, desc, decoded));
1030        }
1031        Ok(results)
1032    }
1033
1034    pub(crate) fn read_range(
1035        &self,
1036        msg_idx: usize,
1037        obj_idx: usize,
1038        ranges: &[(u64, u64)],
1039        options: &DecodeOptions,
1040    ) -> Result<(DataObjectDescriptor, Vec<Vec<u8>>)> {
1041        let layout = {
1042            let mut state = self
1043                .state
1044                .lock()
1045                .map_err(|_| TensogramError::Remote("remote state lock poisoned".to_string()))?;
1046            self.ensure_layout_eager_locked(&mut state, msg_idx)?;
1047            state.layouts[msg_idx].clone()
1048        };
1049        let msg_offset = layout.offset;
1050
1051        if let Some(ref index) = layout.index {
1052            Self::validate_index_access(index, obj_idx)?;
1053
1054            let range = Self::checked_frame_range(
1055                msg_offset,
1056                layout.length,
1057                index.offsets[obj_idx],
1058                index.lengths[obj_idx],
1059            )?;
1060            let frame_bytes = self.get_range(range)?;
1061            let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
1062            let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
1063            Ok((desc, parts))
1064        } else {
1065            let msg_bytes = self.read_message(msg_idx)?;
1066            crate::decode::decode_range(&msg_bytes, obj_idx, ranges, options)
1067        }
1068    }
1069
1070    fn validate_index_access(index: &IndexFrame, obj_idx: usize) -> Result<()> {
1071        if index.offsets.len() != index.lengths.len() {
1072            return Err(TensogramError::Remote(format!(
1073                "corrupt index: offsets.len()={} != lengths.len()={}",
1074                index.offsets.len(),
1075                index.lengths.len()
1076            )));
1077        }
1078        if obj_idx >= index.offsets.len() {
1079            return Err(TensogramError::Object(format!(
1080                "object index {} out of range (count={})",
1081                obj_idx,
1082                index.offsets.len()
1083            )));
1084        }
1085        Ok(())
1086    }
1087
1088    fn checked_frame_range(
1089        msg_offset: u64,
1090        msg_length: u64,
1091        frame_offset_in_msg: u64,
1092        frame_length: u64,
1093    ) -> Result<Range<u64>> {
1094        let start = msg_offset
1095            .checked_add(frame_offset_in_msg)
1096            .ok_or_else(|| TensogramError::Remote("frame offset overflow".to_string()))?;
1097        let end = start
1098            .checked_add(frame_length)
1099            .ok_or_else(|| TensogramError::Remote("frame end overflow".to_string()))?;
1100        let msg_end = msg_offset
1101            .checked_add(msg_length)
1102            .ok_or_else(|| TensogramError::Remote("message end overflow".to_string()))?;
1103        if end > msg_end {
1104            return Err(TensogramError::Remote(format!(
1105                "indexed frame {start}..{end} exceeds message boundary {msg_end}"
1106            )));
1107        }
1108        Ok(start..end)
1109    }
1110}
1111
1112// ── Native async path (remote + async) ───────────────────────────────────────
1113
1114#[cfg(feature = "async")]
1115impl RemoteBackend {
1116    async fn get_range_async(&self, range: Range<u64>) -> Result<Bytes> {
1117        self.store
1118            .get_range(&self.path, range)
1119            .await
1120            .map_err(|e| TensogramError::Remote(e.to_string()))
1121    }
1122
1123    pub(crate) async fn open_async(
1124        source: &str,
1125        storage_options: &BTreeMap<String, String>,
1126    ) -> Result<Self> {
1127        let url = Url::parse(source)
1128            .map_err(|e| TensogramError::Remote(format!("invalid URL '{source}': {e}")))?;
1129
1130        let mut opts: Vec<(String, String)> = storage_options
1131            .iter()
1132            .map(|(k, v)| (k.clone(), v.clone()))
1133            .collect();
1134        if url.scheme() == "http" && !opts.iter().any(|(k, _)| k == "allow_http") {
1135            opts.push(("allow_http".to_string(), "true".to_string()));
1136        }
1137        let (store, path) = object_store::parse_url_opts(&url, opts)
1138            .map_err(|e| TensogramError::Remote(format!("cannot open '{source}': {e}")))?;
1139
1140        let store: Arc<dyn ObjectStore> = Arc::from(store);
1141        let meta = store
1142            .head(&path)
1143            .await
1144            .map_err(|e| TensogramError::Remote(e.to_string()))?;
1145
1146        let file_size = meta.size;
1147        if file_size < (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64 {
1148            return Err(TensogramError::Remote(format!(
1149                "remote file too small ({file_size} bytes)"
1150            )));
1151        }
1152
1153        let backend = RemoteBackend {
1154            source_url: source.to_string(),
1155            store,
1156            path,
1157            file_size,
1158            state: Mutex::new(RemoteState::default()),
1159        };
1160        backend.scan_next_async().await?;
1161        {
1162            let state = backend.lock_state()?;
1163            if state.layouts.is_empty() {
1164                return Err(TensogramError::Remote(
1165                    "no valid messages found in remote file".to_string(),
1166                ));
1167            }
1168        }
1169        Ok(backend)
1170    }
1171
1172    async fn scan_next_async(&self) -> Result<()> {
1173        let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
1174        let pos = {
1175            let state = self.lock_state()?;
1176            if state.scan_complete {
1177                return Ok(());
1178            }
1179            state.next_scan_offset
1180        };
1181
1182        if pos + min_message_size > self.file_size {
1183            let mut state = self.lock_state()?;
1184            if state.next_scan_offset == pos {
1185                state.scan_complete = true;
1186            }
1187            return Ok(());
1188        }
1189
1190        let preamble_bytes = self
1191            .get_range_async(pos..pos + PREAMBLE_SIZE as u64)
1192            .await?;
1193        if &preamble_bytes[..MAGIC.len()] != MAGIC {
1194            let mut state = self.lock_state()?;
1195            if state.next_scan_offset == pos {
1196                state.scan_complete = true;
1197            }
1198            return Ok(());
1199        }
1200
1201        let preamble = match Preamble::read_from(&preamble_bytes) {
1202            Ok(preamble) => preamble,
1203            Err(_) => {
1204                let mut state = self.lock_state()?;
1205                if state.next_scan_offset == pos {
1206                    state.scan_complete = true;
1207                }
1208                return Ok(());
1209            }
1210        };
1211
1212        let msg_len = preamble.total_length;
1213
1214        if msg_len == 0 {
1215            let remaining = self.file_size - pos;
1216            if remaining < min_message_size {
1217                let mut state = self.lock_state()?;
1218                if state.next_scan_offset == pos {
1219                    state.scan_complete = true;
1220                }
1221                return Ok(());
1222            }
1223
1224            let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
1225            let end_bytes = self
1226                .get_range_async(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)
1227                .await?;
1228            if &end_bytes[..] != crate::wire::END_MAGIC {
1229                let mut state = self.lock_state()?;
1230                if state.next_scan_offset == pos {
1231                    state.scan_complete = true;
1232                }
1233                return Ok(());
1234            }
1235
1236            let mut state = self.lock_state()?;
1237            if state.scan_complete || state.next_scan_offset != pos {
1238                return Ok(());
1239            }
1240            state.layouts.push(MessageLayout {
1241                offset: pos,
1242                length: remaining,
1243                preamble,
1244                index: None,
1245                global_metadata: None,
1246            });
1247            state.scan_complete = true;
1248            return Ok(());
1249        }
1250
1251        let msg_end = match pos.checked_add(msg_len) {
1252            Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
1253            _ => {
1254                let mut state = self.lock_state()?;
1255                if state.next_scan_offset == pos {
1256                    state.scan_complete = true;
1257                }
1258                return Ok(());
1259            }
1260        };
1261
1262        let mut state = self.lock_state()?;
1263        if state.scan_complete || state.next_scan_offset != pos {
1264            return Ok(());
1265        }
1266        state.layouts.push(MessageLayout {
1267            offset: pos,
1268            length: msg_len,
1269            preamble,
1270            index: None,
1271            global_metadata: None,
1272        });
1273        state.next_scan_offset = msg_end;
1274        Ok(())
1275    }
1276
1277    async fn ensure_message_async(&self, msg_idx: usize) -> Result<()> {
1278        loop {
1279            let ready = {
1280                let state = self.lock_state()?;
1281                if msg_idx < state.layouts.len() {
1282                    return Ok(());
1283                }
1284                if state.scan_complete {
1285                    return Err(TensogramError::Framing(format!(
1286                        "message index {} out of range (count={})",
1287                        msg_idx,
1288                        state.layouts.len()
1289                    )));
1290                }
1291                false
1292            };
1293
1294            if !ready {
1295                self.scan_next_async().await?;
1296            }
1297        }
1298    }
1299
1300    async fn scan_and_discover_next_async(&self) -> Result<()> {
1301        let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
1302        let pos = {
1303            let state = self.lock_state()?;
1304            if state.scan_complete {
1305                return Ok(());
1306            }
1307            state.next_scan_offset
1308        };
1309
1310        if pos + min_message_size > self.file_size {
1311            let mut state = self.lock_state()?;
1312            if state.next_scan_offset == pos {
1313                state.scan_complete = true;
1314            }
1315            return Ok(());
1316        }
1317
1318        let chunk_size = (self.file_size - pos).min(256 * 1024);
1319        let chunk = self.get_range_async(pos..pos + chunk_size).await?;
1320
1321        if chunk.len() < PREAMBLE_SIZE || &chunk[..MAGIC.len()] != MAGIC {
1322            let mut state = self.lock_state()?;
1323            if state.next_scan_offset == pos {
1324                state.scan_complete = true;
1325            }
1326            return Ok(());
1327        }
1328
1329        let preamble = match Preamble::read_from(&chunk[..PREAMBLE_SIZE]) {
1330            Ok(preamble) => preamble,
1331            Err(_) => {
1332                let mut state = self.lock_state()?;
1333                if state.next_scan_offset == pos {
1334                    state.scan_complete = true;
1335                }
1336                return Ok(());
1337            }
1338        };
1339
1340        let msg_len = preamble.total_length;
1341
1342        if msg_len == 0 {
1343            let remaining = self.file_size - pos;
1344            if remaining < min_message_size {
1345                let mut state = self.lock_state()?;
1346                if state.next_scan_offset == pos {
1347                    state.scan_complete = true;
1348                }
1349                return Ok(());
1350            }
1351
1352            let end_magic_pos = self.file_size - crate::wire::END_MAGIC.len() as u64;
1353            let end_bytes = self
1354                .get_range_async(end_magic_pos..end_magic_pos + crate::wire::END_MAGIC.len() as u64)
1355                .await?;
1356            if &end_bytes[..] != crate::wire::END_MAGIC {
1357                let mut state = self.lock_state()?;
1358                if state.next_scan_offset == pos {
1359                    state.scan_complete = true;
1360                }
1361                return Ok(());
1362            }
1363
1364            let mut state = self.lock_state()?;
1365            if state.scan_complete || state.next_scan_offset != pos {
1366                return Ok(());
1367            }
1368            state.layouts.push(MessageLayout {
1369                offset: pos,
1370                length: remaining,
1371                preamble,
1372                index: None,
1373                global_metadata: None,
1374            });
1375            state.scan_complete = true;
1376            return Ok(());
1377        }
1378
1379        let msg_end = match pos.checked_add(msg_len) {
1380            Some(end) if msg_len >= min_message_size && end <= self.file_size => end,
1381            _ => {
1382                let mut state = self.lock_state()?;
1383                if state.next_scan_offset == pos {
1384                    state.scan_complete = true;
1385                }
1386                return Ok(());
1387            }
1388        };
1389
1390        let flags = preamble.flags;
1391        let msg_idx = {
1392            let mut state = self.lock_state()?;
1393            if state.scan_complete || state.next_scan_offset != pos {
1394                return Ok(());
1395            }
1396            let msg_idx = state.layouts.len();
1397            state.layouts.push(MessageLayout {
1398                offset: pos,
1399                length: msg_len,
1400                preamble,
1401                index: None,
1402                global_metadata: None,
1403            });
1404            state.next_scan_offset = msg_end;
1405            msg_idx
1406        };
1407
1408        if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
1409            let chunk_end = (msg_len as usize).min(chunk.len());
1410            let mut state = self.lock_state()?;
1411            if msg_idx < state.layouts.len()
1412                && state.layouts[msg_idx].offset == pos
1413                && state.layouts[msg_idx].global_metadata.is_none()
1414                && state.layouts[msg_idx].index.is_none()
1415            {
1416                Self::parse_header_frames(&mut state, msg_idx, &chunk[..chunk_end])?;
1417            }
1418        } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
1419        {
1420            self.discover_footer_layout_from_suffix_async(msg_idx)
1421                .await?;
1422        }
1423
1424        Ok(())
1425    }
1426
1427    async fn discover_footer_layout_from_suffix_async(&self, msg_idx: usize) -> Result<()> {
1428        let (msg_offset, msg_len) = {
1429            let state = self.lock_state()?;
1430            let layout = state.layouts.get(msg_idx).ok_or_else(|| {
1431                TensogramError::Framing(format!(
1432                    "message index {} out of range (count={})",
1433                    msg_idx,
1434                    state.layouts.len()
1435                ))
1436            })?;
1437            (layout.offset, layout.length)
1438        };
1439
1440        let suffix_size = msg_len.min(256 * 1024);
1441        let msg_end = msg_offset
1442            .checked_add(msg_len)
1443            .ok_or_else(|| TensogramError::Remote("message end overflow".to_string()))?;
1444        let suffix_start = msg_end - suffix_size;
1445        let suffix = self.get_range_async(suffix_start..msg_end).await?;
1446
1447        if suffix.len() < POSTAMBLE_SIZE {
1448            return Err(TensogramError::Remote(
1449                "suffix too short for postamble".to_string(),
1450            ));
1451        }
1452
1453        let pa_bytes = &suffix[suffix.len() - POSTAMBLE_SIZE..];
1454        let postamble = Postamble::read_from(pa_bytes)?;
1455
1456        if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
1457            return Err(TensogramError::Remote(format!(
1458                "first_footer_offset ({}) is before preamble end ({PREAMBLE_SIZE})",
1459                postamble.first_footer_offset
1460            )));
1461        }
1462
1463        let footer_abs_start = msg_offset
1464            .checked_add(postamble.first_footer_offset)
1465            .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
1466        let footer_abs_end = msg_end - POSTAMBLE_SIZE as u64;
1467
1468        if footer_abs_start >= footer_abs_end {
1469            return Err(TensogramError::Remote(
1470                "first_footer_offset points at or past postamble".to_string(),
1471            ));
1472        }
1473
1474        if footer_abs_start >= suffix_start {
1475            let local_start = (footer_abs_start - suffix_start) as usize;
1476            let local_end = suffix.len() - POSTAMBLE_SIZE;
1477            let mut state = self.lock_state()?;
1478            if state.layouts[msg_idx].global_metadata.is_some()
1479                && state.layouts[msg_idx].index.is_some()
1480            {
1481                return Ok(());
1482            }
1483            Self::parse_footer_frames(&mut state, msg_idx, &suffix[local_start..local_end])
1484        } else {
1485            let footer_bytes = self
1486                .get_range_async(footer_abs_start..footer_abs_end)
1487                .await?;
1488            let mut state = self.lock_state()?;
1489            if state.layouts[msg_idx].global_metadata.is_some()
1490                && state.layouts[msg_idx].index.is_some()
1491            {
1492                return Ok(());
1493            }
1494            Self::parse_footer_frames(&mut state, msg_idx, &footer_bytes)
1495        }
1496    }
1497
1498    async fn ensure_layout_eager_async(&self, msg_idx: usize) -> Result<()> {
1499        loop {
1500            let should_scan = {
1501                let state = self.lock_state()?;
1502                if let Some(layout) = state.layouts.get(msg_idx) {
1503                    if layout.global_metadata.is_some() && layout.index.is_some() {
1504                        return Ok(());
1505                    }
1506                    false
1507                } else if state.scan_complete {
1508                    return Err(TensogramError::Framing(format!(
1509                        "message index {} out of range (count={})",
1510                        msg_idx,
1511                        state.layouts.len()
1512                    )));
1513                } else {
1514                    true
1515                }
1516            };
1517
1518            if should_scan {
1519                self.scan_and_discover_next_async().await?;
1520                continue;
1521            }
1522
1523            return self.ensure_layout_async(msg_idx).await;
1524        }
1525    }
1526
1527    async fn ensure_layout_async(&self, msg_idx: usize) -> Result<()> {
1528        self.ensure_message_async(msg_idx).await?;
1529
1530        let flags = {
1531            let state = self.lock_state()?;
1532            let layout = &state.layouts[msg_idx];
1533            if layout.global_metadata.is_some() && layout.index.is_some() {
1534                return Ok(());
1535            }
1536            layout.preamble.flags
1537        };
1538
1539        if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX) {
1540            self.discover_header_layout_async(msg_idx).await?;
1541        } else if flags.has(MessageFlags::FOOTER_METADATA) && flags.has(MessageFlags::FOOTER_INDEX)
1542        {
1543            self.discover_footer_layout_async(msg_idx).await?;
1544        } else {
1545            return Err(TensogramError::Remote(
1546                "remote access requires header-indexed or footer-indexed messages".to_string(),
1547            ));
1548        }
1549
1550        Ok(())
1551    }
1552
1553    async fn discover_header_layout_async(&self, msg_idx: usize) -> Result<()> {
1554        let (msg_offset, msg_len) = {
1555            let state = self.lock_state()?;
1556            let layout = state.layouts.get(msg_idx).ok_or_else(|| {
1557                TensogramError::Framing(format!(
1558                    "message index {} out of range (count={})",
1559                    msg_idx,
1560                    state.layouts.len()
1561                ))
1562            })?;
1563            (layout.offset, layout.length)
1564        };
1565
1566        let chunk_size = msg_len.min(256 * 1024);
1567        let header_bytes = self
1568            .get_range_async(msg_offset..msg_offset + chunk_size)
1569            .await?;
1570
1571        let mut state = self.lock_state()?;
1572        if state.layouts[msg_idx].global_metadata.is_some()
1573            && state.layouts[msg_idx].index.is_some()
1574        {
1575            return Ok(());
1576        }
1577        Self::parse_header_frames(&mut state, msg_idx, &header_bytes)
1578    }
1579
1580    async fn discover_footer_layout_async(&self, msg_idx: usize) -> Result<()> {
1581        let (msg_offset, msg_len) = {
1582            let state = self.lock_state()?;
1583            let layout = state.layouts.get(msg_idx).ok_or_else(|| {
1584                TensogramError::Framing(format!(
1585                    "message index {} out of range (count={})",
1586                    msg_idx,
1587                    state.layouts.len()
1588                ))
1589            })?;
1590            (layout.offset, layout.length)
1591        };
1592
1593        let pa_offset = msg_offset
1594            .checked_add(msg_len)
1595            .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
1596            .ok_or_else(|| TensogramError::Remote("postamble offset overflow".to_string()))?;
1597        let pa_bytes = self
1598            .get_range_async(pa_offset..pa_offset + POSTAMBLE_SIZE as u64)
1599            .await?;
1600        let postamble = Postamble::read_from(&pa_bytes)?;
1601
1602        if postamble.first_footer_offset < PREAMBLE_SIZE as u64 {
1603            return Err(TensogramError::Remote(format!(
1604                "first_footer_offset ({}) is before preamble end ({})",
1605                postamble.first_footer_offset, PREAMBLE_SIZE
1606            )));
1607        }
1608        let footer_start = msg_offset
1609            .checked_add(postamble.first_footer_offset)
1610            .ok_or_else(|| TensogramError::Remote("footer offset overflow".to_string()))?;
1611        let footer_end = pa_offset;
1612        if footer_start >= footer_end {
1613            return Err(TensogramError::Remote(
1614                "first_footer_offset points at or past postamble".to_string(),
1615            ));
1616        }
1617
1618        let footer_bytes = self.get_range_async(footer_start..footer_end).await?;
1619        let mut state = self.lock_state()?;
1620        if state.layouts[msg_idx].global_metadata.is_some()
1621            && state.layouts[msg_idx].index.is_some()
1622        {
1623            return Ok(());
1624        }
1625        Self::parse_footer_frames(&mut state, msg_idx, &footer_bytes)
1626    }
1627
1628    pub(crate) async fn message_count_async(&self) -> Result<usize> {
1629        loop {
1630            let done = {
1631                let state = self.lock_state()?;
1632                state.scan_complete
1633            };
1634            if done {
1635                break;
1636            }
1637            self.scan_and_discover_next_async().await?;
1638        }
1639        let state = self.lock_state()?;
1640        Ok(state.layouts.len())
1641    }
1642
1643    pub(crate) async fn read_message_async(&self, msg_idx: usize) -> Result<Vec<u8>> {
1644        self.ensure_message_async(msg_idx).await?;
1645        let (offset, length) = {
1646            let state = self.lock_state()?;
1647            let layout = &state.layouts[msg_idx];
1648            (layout.offset, layout.length)
1649        };
1650        let bytes = self.get_range_async(offset..offset + length).await?;
1651        Ok(bytes.to_vec())
1652    }
1653
1654    pub(crate) async fn read_metadata_async(&self, msg_idx: usize) -> Result<GlobalMetadata> {
1655        self.ensure_layout_eager_async(msg_idx).await?;
1656        let state = self.lock_state()?;
1657        state.layouts[msg_idx]
1658            .global_metadata
1659            .clone()
1660            .ok_or_else(|| TensogramError::Remote("metadata not found".to_string()))
1661    }
1662
1663    pub(crate) async fn read_descriptors_async(
1664        &self,
1665        msg_idx: usize,
1666    ) -> Result<(GlobalMetadata, Vec<DataObjectDescriptor>)> {
1667        self.ensure_layout_eager_async(msg_idx).await?;
1668        let layout = {
1669            let state = self.lock_state()?;
1670            state.layouts[msg_idx].clone()
1671        };
1672        let msg_offset = layout.offset;
1673
1674        if let Some(ref index) = layout.index {
1675            if index.offsets.len() != index.lengths.len() {
1676                return Err(TensogramError::Remote(format!(
1677                    "corrupt index: offsets.len()={} != lengths.len()={}",
1678                    index.offsets.len(),
1679                    index.lengths.len()
1680                )));
1681            }
1682
1683            let meta = layout
1684                .global_metadata
1685                .clone()
1686                .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
1687
1688            let msg_length = layout.length;
1689            let mut descriptors = Vec::with_capacity(index.offsets.len());
1690            for i in 0..index.offsets.len() {
1691                let desc = self
1692                    .read_descriptor_only_async(
1693                        msg_offset,
1694                        msg_length,
1695                        index.offsets[i],
1696                        index.lengths[i],
1697                    )
1698                    .await?;
1699                descriptors.push(desc);
1700            }
1701            Ok((meta, descriptors))
1702        } else {
1703            let msg_bytes = self.read_message_async(msg_idx).await?;
1704            crate::decode::decode_descriptors(&msg_bytes)
1705        }
1706    }
1707
1708    async fn read_descriptor_only_async(
1709        &self,
1710        msg_offset: u64,
1711        msg_length: u64,
1712        frame_offset_in_msg: u64,
1713        frame_length: u64,
1714    ) -> Result<DataObjectDescriptor> {
1715        const DESCRIPTOR_PREFIX_THRESHOLD: u64 = 64 * 1024;
1716
1717        let range =
1718            Self::checked_frame_range(msg_offset, msg_length, frame_offset_in_msg, frame_length)?;
1719
1720        if frame_length <= DESCRIPTOR_PREFIX_THRESHOLD {
1721            let frame_bytes = self.get_range_async(range.clone()).await?;
1722            let (desc, _payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
1723            return Ok(desc);
1724        }
1725
1726        let frame_start = range.start;
1727        let frame_end = range.end;
1728
1729        let header_bytes = self
1730            .get_range_async(frame_start..frame_start + FRAME_HEADER_SIZE as u64)
1731            .await?;
1732        let fh = FrameHeader::read_from(&header_bytes)?;
1733
1734        if fh.frame_type != FrameType::DataObject {
1735            return Err(TensogramError::Remote(format!(
1736                "expected DataObject frame, got {:?}",
1737                fh.frame_type
1738            )));
1739        }
1740
1741        let footer_start = frame_end - DATA_OBJECT_FOOTER_SIZE as u64;
1742        let footer_bytes = self.get_range_async(footer_start..frame_end).await?;
1743
1744        if footer_bytes.len() < DATA_OBJECT_FOOTER_SIZE {
1745            return Err(TensogramError::Remote("frame footer too short".to_string()));
1746        }
1747        if &footer_bytes[8..] != FRAME_END {
1748            return Err(TensogramError::Remote(
1749                "frame missing ENDF trailer".to_string(),
1750            ));
1751        }
1752
1753        let cbor_offset = u64::from_be_bytes(
1754            footer_bytes[..8]
1755                .try_into()
1756                .map_err(|_| TensogramError::Remote("footer cbor_offset truncated".to_string()))?,
1757        );
1758
1759        if cbor_offset < FRAME_HEADER_SIZE as u64 {
1760            return Err(TensogramError::Remote(format!(
1761                "cbor_offset ({cbor_offset}) below frame header size ({FRAME_HEADER_SIZE})"
1762            )));
1763        }
1764
1765        let cbor_after = fh.flags & DataObjectFlags::CBOR_AFTER_PAYLOAD != 0;
1766        let cbor_start = frame_start
1767            .checked_add(cbor_offset)
1768            .ok_or_else(|| TensogramError::Remote("cbor_start overflow".to_string()))?;
1769
1770        if cbor_after {
1771            if cbor_start >= footer_start {
1772                return Err(TensogramError::Remote(
1773                    "cbor_offset points at or past footer".to_string(),
1774                ));
1775            }
1776            let cbor_bytes = self.get_range_async(cbor_start..footer_start).await?;
1777            metadata::cbor_to_object_descriptor(&cbor_bytes)
1778        } else {
1779            if cbor_start >= footer_start {
1780                return Err(TensogramError::Remote(
1781                    "cbor_offset beyond frame body".to_string(),
1782                ));
1783            }
1784            let max_cbor_len = footer_start - cbor_start;
1785            let mut prefix_size: u64 = 8192;
1786            loop {
1787                let read_end = (cbor_start + prefix_size).min(footer_start);
1788                let prefix_bytes = self.get_range_async(cbor_start..read_end).await?;
1789                match metadata::cbor_to_object_descriptor(&prefix_bytes) {
1790                    Ok(desc) => return Ok(desc),
1791                    Err(_) if prefix_size < max_cbor_len => {
1792                        prefix_size = (prefix_size * 2).min(max_cbor_len);
1793                    }
1794                    Err(e) => return Err(e),
1795                }
1796            }
1797        }
1798    }
1799
1800    pub(crate) async fn read_object_async(
1801        &self,
1802        msg_idx: usize,
1803        obj_idx: usize,
1804        options: &DecodeOptions,
1805    ) -> Result<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)> {
1806        self.ensure_layout_eager_async(msg_idx).await?;
1807        let layout = {
1808            let state = self.lock_state()?;
1809            state.layouts[msg_idx].clone()
1810        };
1811        let msg_offset = layout.offset;
1812
1813        if let Some(ref index) = layout.index {
1814            Self::validate_index_access(index, obj_idx)?;
1815
1816            let meta = layout
1817                .global_metadata
1818                .clone()
1819                .ok_or_else(|| TensogramError::Remote("metadata not cached".to_string()))?;
1820
1821            let range = Self::checked_frame_range(
1822                msg_offset,
1823                layout.length,
1824                index.offsets[obj_idx],
1825                index.lengths[obj_idx],
1826            )?;
1827            let frame_bytes = self.get_range_async(range).await?;
1828            let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
1829            let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
1830            Ok((meta, desc, decoded))
1831        } else {
1832            let msg_bytes = self.read_message_async(msg_idx).await?;
1833            crate::decode::decode_object(&msg_bytes, obj_idx, options)
1834        }
1835    }
1836
1837    pub(crate) async fn ensure_all_layouts_batch_async(&self, msg_indices: &[usize]) -> Result<()> {
1838        if msg_indices.is_empty() {
1839            return Ok(());
1840        }
1841        let max_idx = msg_indices.iter().copied().max().unwrap_or(0);
1842        loop {
1843            let (need_scan, scan_complete) = {
1844                let state = self.lock_state()?;
1845                (state.layouts.len() <= max_idx, state.scan_complete)
1846            };
1847            if !need_scan || scan_complete {
1848                break;
1849            }
1850            self.scan_and_discover_next_async().await?;
1851        }
1852
1853        {
1854            let state = self.lock_state()?;
1855            for &idx in msg_indices {
1856                if idx >= state.layouts.len() {
1857                    return Err(TensogramError::Framing(format!(
1858                        "message index {} out of range (count={})",
1859                        idx,
1860                        state.layouts.len()
1861                    )));
1862                }
1863            }
1864        }
1865
1866        // Phase 2: find which messages still need layout discovery.
1867        let needs_layout: Vec<usize> = {
1868            let state = self.lock_state()?;
1869            msg_indices
1870                .iter()
1871                .copied()
1872                .filter(|&idx| {
1873                    state
1874                        .layouts
1875                        .get(idx)
1876                        .is_none_or(|l| l.global_metadata.is_none() || l.index.is_none())
1877                })
1878                .collect()
1879        };
1880
1881        if needs_layout.is_empty() {
1882            return Ok(());
1883        }
1884
1885        // Phase 3: compute header/footer byte ranges for all cold messages.
1886        let mut fetch_ranges: Vec<Range<u64>> = Vec::new();
1887        let mut fetch_map: Vec<(usize, bool)> = Vec::new(); // (msg_idx, is_header)
1888        {
1889            let state = self.lock_state()?;
1890            for &msg_idx in &needs_layout {
1891                let layout = &state.layouts[msg_idx];
1892                let flags = layout.preamble.flags;
1893                if flags.has(MessageFlags::HEADER_METADATA) && flags.has(MessageFlags::HEADER_INDEX)
1894                {
1895                    let chunk_size = layout.length.min(256 * 1024);
1896                    fetch_ranges.push(layout.offset..layout.offset + chunk_size);
1897                    fetch_map.push((msg_idx, true));
1898                } else if flags.has(MessageFlags::FOOTER_METADATA)
1899                    && flags.has(MessageFlags::FOOTER_INDEX)
1900                {
1901                    let pa_offset = layout
1902                        .offset
1903                        .checked_add(layout.length)
1904                        .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
1905                        .ok_or_else(|| {
1906                            TensogramError::Remote("postamble offset overflow".to_string())
1907                        })?;
1908                    fetch_ranges.push(pa_offset..pa_offset + POSTAMBLE_SIZE as u64);
1909                    fetch_map.push((msg_idx, false));
1910                } else {
1911                    return Err(TensogramError::Remote(
1912                        "remote batch requires header-indexed or footer-indexed messages"
1913                            .to_string(),
1914                    ));
1915                }
1916            }
1917        }
1918
1919        // Phase 4: batched HTTP fetch for all layout headers.
1920        let all_bytes = self
1921            .store
1922            .get_ranges(&self.path, &fetch_ranges)
1923            .await
1924            .map_err(|e| TensogramError::Remote(e.to_string()))?;
1925
1926        // Phase 5: parse header layouts.
1927        let mut footer_fetches: Vec<(usize, Range<u64>)> = Vec::new();
1928        {
1929            let mut state = self.lock_state()?;
1930            for (bytes, &(msg_idx, is_header)) in all_bytes.iter().zip(fetch_map.iter()) {
1931                if state.layouts[msg_idx].global_metadata.is_some()
1932                    && state.layouts[msg_idx].index.is_some()
1933                {
1934                    continue;
1935                }
1936                if is_header {
1937                    Self::parse_header_frames(&mut state, msg_idx, bytes)?;
1938                } else {
1939                    let postamble = Postamble::read_from(bytes)?;
1940                    let layout = &state.layouts[msg_idx];
1941                    let footer_start = layout
1942                        .offset
1943                        .checked_add(postamble.first_footer_offset)
1944                        .ok_or_else(|| {
1945                            TensogramError::Remote("footer offset overflow".to_string())
1946                        })?;
1947                    let pa_offset = layout
1948                        .offset
1949                        .checked_add(layout.length)
1950                        .and_then(|end| end.checked_sub(POSTAMBLE_SIZE as u64))
1951                        .ok_or_else(|| {
1952                            TensogramError::Remote("postamble offset overflow".to_string())
1953                        })?;
1954                    footer_fetches.push((msg_idx, footer_start..pa_offset));
1955                }
1956            }
1957        }
1958
1959        // Phase 6: batched footer fetch if any footer-indexed messages.
1960        if !footer_fetches.is_empty() {
1961            let footer_ranges: Vec<Range<u64>> =
1962                footer_fetches.iter().map(|(_, r)| r.clone()).collect();
1963            let footer_bytes = self
1964                .store
1965                .get_ranges(&self.path, &footer_ranges)
1966                .await
1967                .map_err(|e| TensogramError::Remote(e.to_string()))?;
1968            let mut state = self.lock_state()?;
1969            for (bytes, &(msg_idx, _)) in footer_bytes.iter().zip(footer_fetches.iter()) {
1970                if state.layouts[msg_idx].global_metadata.is_some()
1971                    && state.layouts[msg_idx].index.is_some()
1972                {
1973                    continue;
1974                }
1975                Self::parse_footer_frames(&mut state, msg_idx, bytes)?;
1976            }
1977        }
1978
1979        Ok(())
1980    }
1981
1982    pub(crate) async fn read_object_batch_async(
1983        &self,
1984        msg_indices: &[usize],
1985        obj_idx: usize,
1986        options: &DecodeOptions,
1987    ) -> Result<Vec<(GlobalMetadata, DataObjectDescriptor, Vec<u8>)>> {
1988        self.ensure_all_layouts_batch_async(msg_indices).await?;
1989
1990        let mut byte_ranges = Vec::with_capacity(msg_indices.len());
1991        let mut metas = Vec::with_capacity(msg_indices.len());
1992        {
1993            let state = self.lock_state()?;
1994            for &msg_idx in msg_indices {
1995                let layout = &state.layouts[msg_idx];
1996                if let Some(ref index) = layout.index {
1997                    Self::validate_index_access(index, obj_idx)?;
1998                    byte_ranges.push(Self::checked_frame_range(
1999                        layout.offset,
2000                        layout.length,
2001                        index.offsets[obj_idx],
2002                        index.lengths[obj_idx],
2003                    )?);
2004                    metas.push(layout.global_metadata.clone().ok_or_else(|| {
2005                        TensogramError::Remote("metadata not cached".to_string())
2006                    })?);
2007                } else {
2008                    return Err(TensogramError::Remote(format!(
2009                        "message {} has no index frame; batch decode requires indexed messages",
2010                        msg_idx
2011                    )));
2012                }
2013            }
2014        }
2015
2016        let all_bytes = self
2017            .store
2018            .get_ranges(&self.path, &byte_ranges)
2019            .await
2020            .map_err(|e| TensogramError::Remote(e.to_string()))?;
2021
2022        let mut results = Vec::with_capacity(msg_indices.len());
2023        for (frame_bytes, meta) in all_bytes.iter().zip(metas) {
2024            let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
2025            let decoded = crate::decode::decode_single_object(&desc, payload, options)?;
2026            results.push((meta, desc, decoded));
2027        }
2028        Ok(results)
2029    }
2030
2031    pub(crate) async fn read_range_batch_async(
2032        &self,
2033        msg_indices: &[usize],
2034        obj_idx: usize,
2035        ranges: &[(u64, u64)],
2036        options: &DecodeOptions,
2037    ) -> Result<Vec<(DataObjectDescriptor, Vec<Vec<u8>>)>> {
2038        // 1. Batch-discover all layouts (scan + batched header/footer fetch).
2039        self.ensure_all_layouts_batch_async(msg_indices).await?;
2040
2041        // 2. Compute byte ranges for each message's data object frame.
2042        let mut byte_ranges = Vec::with_capacity(msg_indices.len());
2043        {
2044            let state = self.lock_state()?;
2045            for &msg_idx in msg_indices {
2046                let layout = &state.layouts[msg_idx];
2047                if let Some(ref index) = layout.index {
2048                    Self::validate_index_access(index, obj_idx)?;
2049                    let range = Self::checked_frame_range(
2050                        layout.offset,
2051                        layout.length,
2052                        index.offsets[obj_idx],
2053                        index.lengths[obj_idx],
2054                    )?;
2055                    byte_ranges.push(range);
2056                } else {
2057                    return Err(TensogramError::Remote(format!(
2058                        "message {} has no index frame; batch range decode requires indexed messages",
2059                        msg_idx
2060                    )));
2061                }
2062            }
2063        }
2064
2065        // 3. Single batched HTTP fetch for all frames.
2066        let all_bytes = self
2067            .store
2068            .get_ranges(&self.path, &byte_ranges)
2069            .await
2070            .map_err(|e| TensogramError::Remote(e.to_string()))?;
2071
2072        // 4. Decode each frame locally.
2073        let mut results = Vec::with_capacity(msg_indices.len());
2074        for frame_bytes in &all_bytes {
2075            let (desc, payload, _consumed) = framing::decode_data_object_frame(frame_bytes)?;
2076            let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
2077            results.push((desc, parts));
2078        }
2079        Ok(results)
2080    }
2081
2082    pub(crate) async fn read_range_async(
2083        &self,
2084        msg_idx: usize,
2085        obj_idx: usize,
2086        ranges: &[(u64, u64)],
2087        options: &DecodeOptions,
2088    ) -> Result<(DataObjectDescriptor, Vec<Vec<u8>>)> {
2089        self.ensure_layout_eager_async(msg_idx).await?;
2090        let layout = {
2091            let state = self.lock_state()?;
2092            state.layouts[msg_idx].clone()
2093        };
2094        let msg_offset = layout.offset;
2095
2096        if let Some(ref index) = layout.index {
2097            Self::validate_index_access(index, obj_idx)?;
2098
2099            let range = Self::checked_frame_range(
2100                msg_offset,
2101                layout.length,
2102                index.offsets[obj_idx],
2103                index.lengths[obj_idx],
2104            )?;
2105            let frame_bytes = self.get_range_async(range).await?;
2106            let (desc, payload, _consumed) = framing::decode_data_object_frame(&frame_bytes)?;
2107            let parts = crate::decode::decode_range_from_payload(&desc, payload, ranges, options)?;
2108            Ok((desc, parts))
2109        } else {
2110            let msg_bytes = self.read_message_async(msg_idx).await?;
2111            crate::decode::decode_range(&msg_bytes, obj_idx, ranges, options)
2112        }
2113    }
2114}