Skip to main content

git_internal/internal/pack/
channel_reader.rs

1//! Simple channel-backed `BufRead` adapter so pack decoding can consume data streamed from async
2//! tasks or background threads.
3
4use std::{
5    io::{self, BufRead, Read},
6    sync::mpsc::Receiver,
7};
8
9/// Custom BufRead implementation that reads from the channel
10pub(crate) struct StreamBufReader {
11    receiver: Receiver<Vec<u8>>,
12    buffer: io::Cursor<Vec<u8>>,
13}
14
15impl StreamBufReader {
16    pub(crate) fn new(receiver: Receiver<Vec<u8>>) -> Self {
17        StreamBufReader {
18            receiver,
19            buffer: io::Cursor::new(Vec::new()),
20        }
21    }
22}
23
24impl Read for StreamBufReader {
25    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
26        if self.buffer.position() as usize == self.buffer.get_ref().len() {
27            // buffer has been read completely
28            match self.receiver.recv() {
29                Ok(data) => {
30                    self.buffer = io::Cursor::new(data);
31                }
32                Err(_) => return Ok(0), // Channel is closed
33            }
34        }
35        self.buffer.read(buf)
36    }
37}
38
39impl BufRead for StreamBufReader {
40    fn fill_buf(&mut self) -> io::Result<&[u8]> {
41        if self.buffer.position() as usize == self.buffer.get_ref().len() {
42            match self.receiver.recv() {
43                Ok(data) => {
44                    self.buffer = io::Cursor::new(data);
45                }
46                Err(_) => return Ok(&[]), // Channel is closed
47            }
48        }
49        self.buffer.fill_buf()
50    }
51
52    fn consume(&mut self, amt: usize) {
53        self.buffer.consume(amt);
54    }
55}