1use crate::error::ServerError;
7use std::fs::File;
8use std::io::{BufReader, Read};
9use std::path::Path;
10
11#[derive(Debug)]
25pub struct ChunkStream {
26 reader: BufReader<File>,
27 chunk_size: usize,
28 exhausted: bool,
29}
30
31impl ChunkStream {
32 pub fn from_file(
51 path: &Path,
52 chunk_size: usize,
53 ) -> Result<Self, ServerError> {
54 let file = File::open(path)?;
55 Ok(Self {
56 reader: BufReader::new(file),
57 chunk_size: chunk_size.max(1),
58 exhausted: false,
59 })
60 }
61}
62
63impl Iterator for ChunkStream {
64 type Item = Result<Vec<u8>, ServerError>;
65
66 fn next(&mut self) -> Option<Self::Item> {
67 read_next_chunk(
68 &mut self.reader,
69 self.chunk_size,
70 &mut self.exhausted,
71 )
72 }
73}
74
75fn read_next_chunk<R: Read>(
76 reader: &mut R,
77 chunk_size: usize,
78 exhausted: &mut bool,
79) -> Option<Result<Vec<u8>, ServerError>> {
80 if *exhausted {
81 return None;
82 }
83
84 let mut buf = vec![0_u8; chunk_size];
85 match reader.read(&mut buf) {
86 Ok(0) => {
87 *exhausted = true;
88 None
89 }
90 Ok(n) => {
91 buf.truncate(n);
92 Some(Ok(buf))
93 }
94 Err(err) => {
95 *exhausted = true;
96 Some(Err(ServerError::Io(err)))
97 }
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use std::io;
105 use tempfile::TempDir;
106
107 struct ErrReader;
108
109 impl Read for ErrReader {
110 fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
111 Err(io::Error::other("boom"))
112 }
113 }
114
115 #[test]
116 fn helper_maps_read_errors_and_marks_exhausted() {
117 let mut exhausted = false;
118 let mut reader = ErrReader;
119 let item = read_next_chunk(&mut reader, 8, &mut exhausted);
120 let is_io_err = matches!(item, Some(Err(ServerError::Io(_))));
124 assert!(is_io_err);
125 assert!(exhausted);
126 }
127
128 #[test]
129 fn helper_returns_none_when_already_exhausted() {
130 let mut exhausted = true;
131 let mut reader = io::empty();
132 assert!(
133 read_next_chunk(&mut reader, 4, &mut exhausted).is_none()
134 );
135 }
136
137 #[test]
138 fn streams_file_in_chunks() {
139 let tmp = TempDir::new().expect("tmp");
140 let file = tmp.path().join("data.txt");
141 std::fs::write(&file, b"abcdefgh").expect("write");
142
143 let chunks: Result<Vec<Vec<u8>>, _> =
144 ChunkStream::from_file(&file, 3).expect("open").collect();
145
146 assert_eq!(
147 chunks.expect("chunks"),
148 vec![b"abc".to_vec(), b"def".to_vec(), b"gh".to_vec()]
149 );
150 }
151
152 #[test]
153 fn missing_file_returns_io_error() {
154 let tmp = TempDir::new().expect("tmp");
155 let missing = tmp.path().join("does-not-exist.txt");
156 let result = ChunkStream::from_file(&missing, 4);
157 let is_io_err = matches!(result, Err(ServerError::Io(_)));
158 assert!(is_io_err);
159 }
160
161 #[test]
162 fn returns_none_after_stream_is_exhausted() {
163 let tmp = TempDir::new().expect("tmp");
164 let file = tmp.path().join("single-byte.txt");
165 std::fs::write(&file, b"x").expect("write");
166 let mut stream =
167 ChunkStream::from_file(&file, 1).expect("stream open");
168
169 let chunk = stream.next();
170 let is_x = matches!(&chunk, Some(Ok(b)) if b == &b"x".to_vec());
171 assert!(is_x, "unexpected first chunk: {chunk:?}");
172 assert!(stream.next().is_none());
173 assert!(stream.next().is_none());
174 }
175}