1use std::io::{self, Read};
12
13use crate::crypto::{CommitReader, ContentKey};
14use crate::metadata::ShardReference;
15use crate::store::ObjectStoreExt;
16use crate::{cid, Result, VoidError};
17
18pub struct ShardStreamReader<'a, S> {
32 store: &'a S,
33 reader: &'a CommitReader,
34 ancestor_keys: &'a [ContentKey],
35 shard_refs: &'a [ShardReference],
36 current_shard: usize,
38 current_body: Vec<u8>,
40 current_pos: usize,
42 loaded: bool,
44}
45
46impl<'a, S: ObjectStoreExt> ShardStreamReader<'a, S> {
47 pub fn new(
53 store: &'a S,
54 reader: &'a CommitReader,
55 ancestor_keys: &'a [ContentKey],
56 shard_refs: &'a [ShardReference],
57 ) -> Self {
58 Self {
59 store,
60 reader,
61 ancestor_keys,
62 shard_refs,
63 current_shard: 0,
64 current_body: Vec::new(),
65 current_pos: 0,
66 loaded: false,
67 }
68 }
69
70 fn load_next_shard(&mut self) -> io::Result<bool> {
72 if self.current_shard >= self.shard_refs.len() {
73 return Ok(false); }
75
76 let shard_ref = &self.shard_refs[self.current_shard];
77 let shard_cid = cid::from_bytes(shard_ref.cid.as_bytes())
78 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
79
80 let encrypted: void_crypto::EncryptedShard = self.store.get_blob(&shard_cid)
81 .map_err(|e| io::Error::new(io::ErrorKind::NotFound, e.to_string()))?;
82
83 let decrypted = self.reader.decrypt_shard(
84 &encrypted,
85 shard_ref.wrapped_key.as_ref(),
86 self.ancestor_keys,
87 ).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
88
89 let body = decrypted.decompress()
90 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
91
92 self.current_body = body.as_bytes().to_vec();
93 self.current_pos = 0;
94 self.loaded = true;
95
96 Ok(true)
97 }
98}
99
100impl<S: ObjectStoreExt> Read for ShardStreamReader<'_, S> {
101 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
102 loop {
103 if !self.loaded {
105 if !self.load_next_shard()? {
106 return Ok(0); }
108 }
109
110 let remaining = &self.current_body[self.current_pos..];
112 if !remaining.is_empty() {
113 let n = remaining.len().min(buf.len());
114 buf[..n].copy_from_slice(&remaining[..n]);
115 self.current_pos += n;
116 return Ok(n);
117 }
118
119 self.current_shard += 1;
121 self.loaded = false;
122 self.current_body = Vec::new(); }
124 }
125}
126
127pub fn stream_file<'a, S: ObjectStoreExt>(
132 store: &'a S,
133 reader: &'a CommitReader,
134 ancestor_keys: &'a [ContentKey],
135 manifest: &'a crate::metadata::manifest_tree::TreeManifest,
136 path: &str,
137) -> Result<ShardStreamReader<'a, S>> {
138 let entry = manifest.lookup(path)?;
139 let shards = manifest.shards();
140
141 let start = entry.shard_index as usize;
142 let end = start + entry.shard_count.max(1) as usize;
143
144 if end > shards.len() {
145 return Err(VoidError::Shard(format!(
146 "shard range {}..{} out of bounds for '{}' (manifest has {} shards)",
147 start, end, path, shards.len()
148 )));
149 }
150
151 Ok(ShardStreamReader::new(store, reader, ancestor_keys, &shards[start..end]))
152}