Skip to main content

void_core/
stream.rs

1//! Streaming file reader over encrypted shards.
2//!
3//! Provides [`ShardStreamReader`], a `std::io::Read` implementation that
4//! fetches, decrypts, and decompresses shards on demand. Only one shard
5//! is held in memory at a time regardless of total file size.
6//!
7//! This is the core read primitive for the void SDK — consumers (soma nodes,
8//! web servers, CLI tools) call `read()` and get bytes without managing
9//! shards, encryption, or manifest lookups.
10
11use std::io::{self, Read};
12
13use crate::crypto::{CommitReader, ContentKey};
14use crate::metadata::ShardReference;
15use crate::store::ObjectStoreExt;
16use crate::{cid, Result, VoidError};
17
18/// A streaming reader over one or more encrypted shards.
19///
20/// Implements `std::io::Read`. Fetches shards lazily — the first `read()`
21/// triggers decryption of the first shard, subsequent reads advance through
22/// shards as needed. Peak memory: one decompressed shard body.
23///
24/// # Example
25///
26/// ```ignore
27/// let reader = ShardStreamReader::new(&store, &commit_reader, &ancestor_keys, &shard_refs);
28/// let mut buf = Vec::new();
29/// reader.read_to_end(&mut buf)?; // streams through all shards
30/// ```
31pub struct ShardStreamReader<'a, S> {
32    store: &'a S,
33    reader: &'a CommitReader,
34    ancestor_keys: &'a [ContentKey],
35    shard_refs: &'a [ShardReference],
36    /// Index of the current shard being read (into shard_refs)
37    current_shard: usize,
38    /// Decompressed body of the current shard
39    current_body: Vec<u8>,
40    /// Read position within current_body
41    current_pos: usize,
42    /// Whether the current shard has been loaded
43    loaded: bool,
44}
45
46impl<'a, S: ObjectStoreExt> ShardStreamReader<'a, S> {
47    /// Create a new streaming reader over the given shard references.
48    ///
49    /// For single-shard files, pass a slice of one ShardReference.
50    /// For chunked files, pass the consecutive slice from the manifest.
51    /// Shards are fetched lazily on first `read()`.
52    pub fn new(
53        store: &'a S,
54        reader: &'a CommitReader,
55        ancestor_keys: &'a [ContentKey],
56        shard_refs: &'a [ShardReference],
57    ) -> Self {
58        Self {
59            store,
60            reader,
61            ancestor_keys,
62            shard_refs,
63            current_shard: 0,
64            current_body: Vec::new(),
65            current_pos: 0,
66            loaded: false,
67        }
68    }
69
70    /// Load the next shard into memory, replacing the current one.
71    fn load_next_shard(&mut self) -> io::Result<bool> {
72        if self.current_shard >= self.shard_refs.len() {
73            return Ok(false); // no more shards
74        }
75
76        let shard_ref = &self.shard_refs[self.current_shard];
77        let shard_cid = cid::from_bytes(shard_ref.cid.as_bytes())
78            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
79
80        let encrypted: void_crypto::EncryptedShard = self.store.get_blob(&shard_cid)
81            .map_err(|e| io::Error::new(io::ErrorKind::NotFound, e.to_string()))?;
82
83        let decrypted = self.reader.decrypt_shard(
84            &encrypted,
85            shard_ref.wrapped_key.as_ref(),
86            self.ancestor_keys,
87        ).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
88
89        let body = decrypted.decompress()
90            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
91
92        self.current_body = body.as_bytes().to_vec();
93        self.current_pos = 0;
94        self.loaded = true;
95
96        Ok(true)
97    }
98}
99
100impl<S: ObjectStoreExt> Read for ShardStreamReader<'_, S> {
101    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
102        loop {
103            // Load the current shard if not yet loaded
104            if !self.loaded {
105                if !self.load_next_shard()? {
106                    return Ok(0); // EOF — all shards consumed
107                }
108            }
109
110            // Read from the current shard body
111            let remaining = &self.current_body[self.current_pos..];
112            if !remaining.is_empty() {
113                let n = remaining.len().min(buf.len());
114                buf[..n].copy_from_slice(&remaining[..n]);
115                self.current_pos += n;
116                return Ok(n);
117            }
118
119            // Current shard exhausted — advance to next
120            self.current_shard += 1;
121            self.loaded = false;
122            self.current_body = Vec::new(); // free memory before loading next
123        }
124    }
125}
126
127/// Open a streaming reader for a file in a commit.
128///
129/// Looks up the file in the manifest, determines which shards it spans,
130/// and returns a `Read` implementation that streams through them.
131pub fn stream_file<'a, S: ObjectStoreExt>(
132    store: &'a S,
133    reader: &'a CommitReader,
134    ancestor_keys: &'a [ContentKey],
135    manifest: &'a crate::metadata::manifest_tree::TreeManifest,
136    path: &str,
137) -> Result<ShardStreamReader<'a, S>> {
138    let entry = manifest.lookup(path)?;
139    let shards = manifest.shards();
140
141    let start = entry.shard_index as usize;
142    let end = start + entry.shard_count.max(1) as usize;
143
144    if end > shards.len() {
145        return Err(VoidError::Shard(format!(
146            "shard range {}..{} out of bounds for '{}' (manifest has {} shards)",
147            start, end, path, shards.len()
148        )));
149    }
150
151    Ok(ShardStreamReader::new(store, reader, ancestor_keys, &shards[start..end]))
152}