file_chunker/
lib.rs

1//! This crate provides the `FileChunker` type, which is useful for efficiently reading a file
2//! in (approximately) equally-sized parts.
3//!
4//! The original use case was to process a log file in chunks, one thread per chunk, and to
5//! guarantee that each chunk ended with a full line of text.
6//!
7//! ## Example
8//!
9//! ```rust,no_run
10//! use file_chunker::FileChunker;
11//!
12//! let file = std::fs::File::open("/path/to/file").unwrap();
13//! let chunker = FileChunker::new(&file).unwrap();
14//! chunker.chunks(1024, Some('\n'))
15//!     .unwrap()
16//!     .iter()
17//!     .for_each(|chunk| {
18//!         println!("{:?}", chunk);
19//!     });
20//! ```
21//!
22
23use anyhow::Result;
24use memmap2::Mmap;
25use std::fs::File;
26
27pub struct FileChunker {
28    mmap: Mmap,
29}
30
31impl FileChunker {
32    /// Create a new FileChunker
33    pub fn new(file: &File) -> Result<Self> {
34        let mmap = unsafe { Mmap::map(file)? };
35        Ok(Self { mmap })
36    }
37
38    /// Divide the file into chunks approximately equal size. Returns a vector of memory-mapped
39    /// slices that each correspond to a chunk.
40    ///
41    /// If a delimeter is provided, then each chunk will end with an instance of the delimeter,
42    /// assuming the delimiter exists in the file. This is useful when working with text files
43    /// that have newline characters, for example. If no delimeter is provided, then each chunk
44    /// will be the same size, except for the last chunk which may be smaller.
45    ///
46    /// It is assumed that the underlying `File` will not change while this function is running.
47    pub fn chunks(&self, count: usize, delimiter: Option<char>) -> Result<Vec<&[u8]>> {
48        let chunk_size = chunk_size(self.mmap.len(), count);
49        let mut chunks = Vec::new();
50        let mut offset = 0;
51        while offset < self.mmap.len() {
52            let mut chunk_end = offset + chunk_size;
53            if let Some(delimiter) = delimiter {
54                while (chunk_end < self.mmap.len() - 1) && (self.mmap[chunk_end] != delimiter as u8)
55                {
56                    chunk_end += 1;
57                }
58                chunk_end += 1;
59            }
60            if chunk_end > self.mmap.len() {
61                chunks.push(&self.mmap[offset..]);
62                break;
63            }
64            chunks.push(&self.mmap[offset..chunk_end]);
65            offset = chunk_end;
66        }
67
68        Ok(chunks)
69    }
70}
71
72fn chunk_size(file_size: usize, count: usize) -> usize {
73    f64::ceil(file_size as f64 / count as f64) as usize
74}
75
76#[cfg(test)]
77mod test {
78    use super::*;
79    use std::io::Write;
80
81    #[test]
82    fn chunks_with_delimiter() {
83        let log = "01\n23\n45\n67\n89";
84
85        let mut file: File = tempfile::tempfile().unwrap();
86        file.write_all(log.as_bytes()).unwrap();
87        file.flush().unwrap();
88
89        let chunker = FileChunker::new(&file).unwrap();
90        let chunks = chunker.chunks(2, Some('\n')).unwrap();
91        assert_eq!(chunks.len(), 2);
92        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
93        assert_eq!(String::from_utf8_lossy(chunks[0]), "01\n23\n45\n");
94        assert_eq!(String::from_utf8_lossy(chunks[1]), "67\n89");
95    }
96
97    #[test]
98    fn chunks_without_delimiter() {
99        let log = "0123456789";
100
101        let mut file: File = tempfile::tempfile().unwrap();
102        file.write_all(log.as_bytes()).unwrap();
103        file.flush().unwrap();
104
105        let chunker = FileChunker::new(&file).unwrap();
106        let chunks = chunker.chunks(10, None).unwrap();
107        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
108        assert_eq!(chunks.len(), 10);
109        (0..9).into_iter().for_each(|i| {
110            assert_eq!(String::from_utf8_lossy(chunks[i]), format!("{}", i));
111        });
112    }
113
114    #[test]
115    fn chunks_with_delimiter_empty() {
116        let log = "";
117
118        let mut file: File = tempfile::tempfile().unwrap();
119        file.write_all(log.as_bytes()).unwrap();
120        file.flush().unwrap();
121
122        let chunker = FileChunker::new(&file).unwrap();
123        let chunks = chunker.chunks(2, Some('\n')).unwrap();
124        assert_eq!(chunks.len(), 0);
125        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
126    }
127
128    #[test]
129    fn chunks_without_delimiter_empty() {
130        let log = "";
131
132        let mut file: File = tempfile::tempfile().unwrap();
133        file.write_all(log.as_bytes()).unwrap();
134        file.flush().unwrap();
135
136        let chunker = FileChunker::new(&file).unwrap();
137        let chunks = chunker.chunks(2, None).unwrap();
138        assert_eq!(chunks.len(), 0);
139        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
140    }
141
142    #[test]
143    fn chunks_with_delimiter_start_and_end() {
144        let log = "\n01\n23\n45\n67\n89\n";
145
146        let mut file: File = tempfile::tempfile().unwrap();
147        file.write_all(log.as_bytes()).unwrap();
148        file.flush().unwrap();
149
150        let chunker = FileChunker::new(&file).unwrap();
151        let chunks = chunker.chunks(2, Some('\n')).unwrap();
152        assert_eq!(chunks.len(), 2);
153        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
154        assert_eq!(String::from_utf8_lossy(chunks[0]), "\n01\n23\n45\n");
155        assert_eq!(String::from_utf8_lossy(chunks[1]), "67\n89\n");
156    }
157
158    #[test]
159    fn chunks_with_delimiter_only() {
160        let log = "\n\n\n\n\n\n\n\n\n\n";
161
162        let mut file: File = tempfile::tempfile().unwrap();
163        file.write_all(log.as_bytes()).unwrap();
164        file.flush().unwrap();
165
166        let chunker = FileChunker::new(&file).unwrap();
167        let chunks = chunker.chunks(2, Some('\n')).unwrap();
168        assert_eq!(chunks.len(), 2);
169        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
170        assert_eq!(String::from_utf8_lossy(chunks[0]), "\n\n\n\n\n\n");
171        assert_eq!(String::from_utf8_lossy(chunks[1]), "\n\n\n\n");
172    }
173
174    #[test]
175    fn chunks_with_nonexistent_delimiter() {
176        let log = "0123456789";
177
178        let mut file: File = tempfile::tempfile().unwrap();
179        file.write_all(log.as_bytes()).unwrap();
180        file.flush().unwrap();
181
182        let chunker = FileChunker::new(&file).unwrap();
183        let chunks = chunker.chunks(10, Some('\n')).unwrap();
184        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
185        assert_eq!(chunks.len(), 1);
186        assert_eq!(chunks[0], b"0123456789");
187    }
188
189    #[test]
190    fn chunks_with_delimiter_long_text() {
191        let log = "Nov 23 06:26:40 ip-10-1-1-1 haproxy[20128]: 10.1.1.10:57305 [23/Nov/2019:06:26:40.781] public myapp/i-05fa49c0e7db8c328 0/0/0/78/78 206 913/458 - - ---- 9/9/6/0/0 0/0 {bytes=0-0} {||1|bytes 0-0/499704} \"GET /2518cb13a48bdf53b2f936f44e7042a3cc7baa06 HTTP/1.1\"
192Nov 23 06:26:41 ip-10-1-1-1 haproxy[20128]: 10.1.1.11:51819 [23/Nov/2019:06:27:41.780] public myapp/i-059c225b48702964a 0/0/0/80/80 200 802/142190 - - ---- 8/8/5/0/0 0/0 {} {||141752|} \"GET /2043f2eb9e2691edcc0c8084d1ffce8bd70bc6e7 HTTP/1.1\"
193Nov 23 06:26:42 ip-10-1-1-1 haproxy[20128]: 10.1.1.12:38870 [23/Nov/2019:06:28:42.773] public myapp/i-048088fd46abe7ed0 0/0/0/77/100 200 823/512174 - - ---- 8/8/5/0/0 0/0 {} {||511736|} \"GET /eb59c0b5dad36f080f3d261c6257ce0e21ef1a01 HTTP/1.1\"
194Nov 23 06:26:43 ip-10-1-1-1 haproxy[20128]: 10.1.1.13:35528 [23/Nov/2019:06:29:43.775] public myapp/i-05e9315b035d50f62 0/0/0/103/105 200 869/431481 - - ---- 8/8/1/0/0 0/0 {} {|||} \"GET /164672c9d75c76a8fa237c24f9cbfd2222554f6d HTTP/1.1\"
195Nov 23 06:26:44 ip-10-1-1-1 haproxy[20128]: 10.1.1.14:48553 [23/Nov/2019:06:30:44.808] public myapp/i-0008bfe6b1c98e964 0/0/0/72/73 200 840/265518 - - ---- 7/7/5/0/0 0/0 {} {||265080|} \"GET /e3b526928196d19ab3419d433f3de0ceb71e62b5 HTTP/1.1\"
196Nov 23 06:26:45 ip-10-1-1-1 haproxy[20128]: 10.1.1.15:60969 [23/Nov/2019:06:31:45.727] public myapp/i-005a2bfdba4c405a8 0/0/0/146/167 200 852/304622 - - ---- 7/7/5/0/0 0/0 {} {||304184|} \"GET /52f5edb4a46276defe54ead2fae3a19fb8cafdb6 HTTP/1.1\"
197Nov 23 06:26:46 ip-10-1-1-1 haproxy[20128]: 10.1.1.14:48539 [23/Nov/2019:06:32:46.730] public myapp/i-03b180605be4fa176 0/0/0/171/171 200 889/124142 - - ---- 6/6/4/0/0 0/0 {} {||123704|} \"GET /ef9e0c85cc1c76d7dc777f5b19d7cb85478496e4 HTTP/1.1\"
198Nov 23 06:26:47 ip-10-1-1-1 haproxy[20128]: 10.1.1.11:51847 [23/Nov/2019:06:33:47.886] public myapp/i-0aa566420409956d6 0/0/0/28/28 206 867/458 - - ---- 6/6/4/0/0 0/0 {bytes=0-0} {} \"GET /3c7ace8c683adcad375a4d14995734ac0db08bb3 HTTP/1.1\"
199Nov 23 06:26:48 ip-10-1-1-1 haproxy[20128]: 10.1.1.13:35554 [23/Nov/2019:06:34:48.866] public myapp/i-07f4205f35b4774b6 0/0/0/23/49 200 816/319662 - - ---- 5/5/3/0/0 0/0 {} {||319224|} \"GET /b95db0578977cd32658fa28b386c0db67ab23ee7 HTTP/1.1\"
200Nov 23 06:26:49 ip-10-1-1-1 haproxy[20128]: 10.1.1.12:38899 [23/Nov/2019:06:35:49.879] public myapp/i-08cb5309afd22e8c0 0/0/0/59/59 200 1000/112110 - - ---- 5/5/3/0/0 0/0 {} {||111672|} \"GET /5314ca870ed0f5e48a71adca185e4ff7f1d9d80f HTTP/1.1\"
201";
202        let log_lines: Vec<_> = log.lines().collect();
203
204        let mut file: File = tempfile::tempfile().unwrap();
205        file.write_all(log.as_bytes()).unwrap();
206        file.flush().unwrap();
207
208        let chunker = FileChunker::new(&file).unwrap();
209        let chunks = chunker.chunks(5, Some('\n')).unwrap();
210        assert_eq!(chunks.len(), 4); // N.B.: This is smaller than requested because we're chunking based on the delimeter
211        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
212        assert_eq!(
213            String::from_utf8_lossy(chunks[0]),
214            format!("{}\n{}\n", log_lines[0], &log_lines[1])
215        );
216        assert_eq!(
217            String::from_utf8_lossy(chunks[1]),
218            format!("{}\n{}\n{}\n", log_lines[2], &log_lines[3], &log_lines[4])
219        );
220        assert_eq!(
221            String::from_utf8_lossy(chunks[2]),
222            format!("{}\n{}\n{}\n", log_lines[5], &log_lines[6], &log_lines[7])
223        );
224        assert_eq!(
225            String::from_utf8_lossy(chunks[3]),
226            format!("{}\n{}\n", log_lines[8], &log_lines[9])
227        );
228    }
229
230    #[test]
231    fn chunks_without_delimiter_long_text() {
232        let log = "Nov 23 06:26:40 ip-10-1-1-1 haproxy[20128]: 10.1.1.10:57305 [23/Nov/2019:06:26:40.781] public myapp/i-05fa49c0e7db8c328 0/0/0/78/78 206 913/458 - - ---- 9/9/6/0/0 0/0 {bytes=0-0} {||1|bytes 0-0/499704} \"GET /2518cb13a48bdf53b2f936f44e7042a3cc7baa06 HTTP/1.1\"
233Nov 23 06:26:41 ip-10-1-1-1 haproxy[20128]: 10.1.1.11:51819 [23/Nov/2019:06:27:41.780] public myapp/i-059c225b48702964a 0/0/0/80/80 200 802/142190 - - ---- 8/8/5/0/0 0/0 {} {||141752|} \"GET /2043f2eb9e2691edcc0c8084d1ffce8bd70bc6e7 HTTP/1.1\"
234Nov 23 06:26:42 ip-10-1-1-1 haproxy[20128]: 10.1.1.12:38870 [23/Nov/2019:06:28:42.773] public myapp/i-048088fd46abe7ed0 0/0/0/77/100 200 823/512174 - - ---- 8/8/5/0/0 0/0 {} {||511736|} \"GET /eb59c0b5dad36f080f3d261c6257ce0e21ef1a01 HTTP/1.1\"
235Nov 23 06:26:43 ip-10-1-1-1 haproxy[20128]: 10.1.1.13:35528 [23/Nov/2019:06:29:43.775] public myapp/i-05e9315b035d50f62 0/0/0/103/105 200 869/431481 - - ---- 8/8/1/0/0 0/0 {} {|||} \"GET /164672c9d75c76a8fa237c24f9cbfd2222554f6d HTTP/1.1\"
236Nov 23 06:26:44 ip-10-1-1-1 haproxy[20128]: 10.1.1.14:48553 [23/Nov/2019:06:30:44.808] public myapp/i-0008bfe6b1c98e964 0/0/0/72/73 200 840/265518 - - ---- 7/7/5/0/0 0/0 {} {||265080|} \"GET /e3b526928196d19ab3419d433f3de0ceb71e62b5 HTTP/1.1\"
237Nov 23 06:26:45 ip-10-1-1-1 haproxy[20128]: 10.1.1.15:60969 [23/Nov/2019:06:31:45.727] public myapp/i-005a2bfdba4c405a8 0/0/0/146/167 200 852/304622 - - ---- 7/7/5/0/0 0/0 {} {||304184|} \"GET /52f5edb4a46276defe54ead2fae3a19fb8cafdb6 HTTP/1.1\"
238Nov 23 06:26:46 ip-10-1-1-1 haproxy[20128]: 10.1.1.14:48539 [23/Nov/2019:06:32:46.730] public myapp/i-03b180605be4fa176 0/0/0/171/171 200 889/124142 - - ---- 6/6/4/0/0 0/0 {} {||123704|} \"GET /ef9e0c85cc1c76d7dc777f5b19d7cb85478496e4 HTTP/1.1\"
239Nov 23 06:26:47 ip-10-1-1-1 haproxy[20128]: 10.1.1.11:51847 [23/Nov/2019:06:33:47.886] public myapp/i-0aa566420409956d6 0/0/0/28/28 206 867/458 - - ---- 6/6/4/0/0 0/0 {bytes=0-0} {} \"GET /3c7ace8c683adcad375a4d14995734ac0db08bb3 HTTP/1.1\"
240Nov 23 06:26:48 ip-10-1-1-1 haproxy[20128]: 10.1.1.13:35554 [23/Nov/2019:06:34:48.866] public myapp/i-07f4205f35b4774b6 0/0/0/23/49 200 816/319662 - - ---- 5/5/3/0/0 0/0 {} {||319224|} \"GET /b95db0578977cd32658fa28b386c0db67ab23ee7 HTTP/1.1\"
241Nov 23 06:26:49 ip-10-1-1-1 haproxy[20128]: 10.1.1.12:38899 [23/Nov/2019:06:35:49.879] public myapp/i-08cb5309afd22e8c0 0/0/0/59/59 200 1000/112110 - - ---- 5/5/3/0/0 0/0 {} {||111672|} \"GET /5314ca870ed0f5e48a71adca185e4ff7f1d9d80f HTTP/1.1\"
242";
243
244        let mut file: File = tempfile::tempfile().unwrap();
245        file.write_all(log.as_bytes()).unwrap();
246        file.flush().unwrap();
247
248        let chunks = 5;
249        let chunk_size = log.len() / chunks;
250        let chunker = FileChunker::new(&file).unwrap();
251        let chunks = chunker.chunks(chunks, None).unwrap();
252
253        assert_eq!(chunks.len(), 5);
254        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), log.len());
255        assert_eq!(
256            String::from_utf8_lossy(chunks[0]),
257            log[(0 * chunk_size)..=(1 * chunk_size)].to_string()
258        );
259        assert_eq!(
260            String::from_utf8_lossy(chunks[1]),
261            log[(1 * (chunk_size + 1))..=(2 * chunk_size + 1)].to_string()
262        );
263        assert_eq!(
264            String::from_utf8_lossy(chunks[2]),
265            log[(2 * (chunk_size + 1))..=(3 * chunk_size + 2)].to_string()
266        );
267        assert_eq!(
268            String::from_utf8_lossy(chunks[3]),
269            log[(3 * (chunk_size + 1))..=(4 * chunk_size + 3)].to_string()
270        );
271        assert_eq!(
272            String::from_utf8_lossy(chunks[4]),
273            log[(4 * (chunk_size + 1))..].to_string()
274        );
275    }
276
277    #[test]
278    fn chunks_with_delimiter_single_chunk() {
279        let log = "Why, sometimes I've believed as many as six impossible things before breakfast.";
280
281        let mut file: File = tempfile::tempfile().unwrap();
282        file.write_all(log.as_bytes()).unwrap();
283        file.flush().unwrap();
284
285        let chunk_count = 1;
286        let chunker = FileChunker::new(&file).unwrap();
287        let chunks = chunker.chunks(chunk_count, Some('\n')).unwrap();
288
289        assert_eq!(chunks[0].len(), log.len());
290        assert_eq!(chunks.len(), 1);
291    }
292}