cli/util/
io.rs

1/*---------------------------------------------------------------------------------------------
2 *  Copyright (c) Microsoft Corporation. All rights reserved.
3 *  Licensed under the MIT License. See License.txt in the project root for license information.
4 *--------------------------------------------------------------------------------------------*/
5use std::{
6	fs::File,
7	io::{self, BufRead, Seek},
8	task::Poll,
9	time::Duration,
10};
11
12use tokio::{
13	io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
14	sync::mpsc,
15	time::sleep,
16};
17
18use super::ring_buffer::RingBuffer;
19
20pub trait ReportCopyProgress {
21	fn report_progress(&mut self, bytes_so_far: u64, total_bytes: u64);
22}
23
24/// Type that doesn't emit anything for download progress.
25pub struct SilentCopyProgress();
26
27impl ReportCopyProgress for SilentCopyProgress {
28	fn report_progress(&mut self, _bytes_so_far: u64, _total_bytes: u64) {}
29}
30
31/// Copies from the reader to the writer, reporting progress to the provided
32/// reporter every so often.
33pub async fn copy_async_progress<T, R, W>(
34	mut reporter: T,
35	reader: &mut R,
36	writer: &mut W,
37	total_bytes: u64,
38) -> io::Result<u64>
39where
40	R: AsyncRead + Unpin,
41	W: AsyncWrite + Unpin,
42	T: ReportCopyProgress,
43{
44	let mut buf = vec![0; 8 * 1024];
45	let mut bytes_so_far = 0;
46	let mut bytes_last_reported = 0;
47	let report_granularity = std::cmp::min(total_bytes / 10, 2 * 1024 * 1024);
48
49	reporter.report_progress(0, total_bytes);
50
51	loop {
52		let read_buf = match reader.read(&mut buf).await {
53			Ok(0) => break,
54			Ok(n) => &buf[..n],
55			Err(e) => return Err(e),
56		};
57
58		writer.write_all(read_buf).await?;
59
60		bytes_so_far += read_buf.len() as u64;
61		if bytes_so_far - bytes_last_reported > report_granularity {
62			bytes_last_reported = bytes_so_far;
63			reporter.report_progress(bytes_so_far, total_bytes);
64		}
65	}
66
67	reporter.report_progress(bytes_so_far, total_bytes);
68
69	Ok(bytes_so_far)
70}
71
72/// Helper used when converting Future interfaces to poll-based interfaces.
73/// Stores excess data that can be reused on future polls.
74#[derive(Default)]
75pub(crate) struct ReadBuffer(Option<(Vec<u8>, usize)>);
76
77impl ReadBuffer {
78	/// Removes any data stored in the read buffer
79	pub fn take_data(&mut self) -> Option<(Vec<u8>, usize)> {
80		self.0.take()
81	}
82
83	/// Writes as many bytes as possible to the readbuf, stashing any extra.
84	pub fn put_data(
85		&mut self,
86		target: &mut tokio::io::ReadBuf<'_>,
87		bytes: Vec<u8>,
88		start: usize,
89	) -> Poll<std::io::Result<()>> {
90		if bytes.is_empty() {
91			self.0 = None;
92			// should not return Ok(), since if nothing is written to the target
93			// it signals EOF. Instead wait for more data from the source.
94			return Poll::Pending;
95		}
96
97		if target.remaining() >= bytes.len() - start {
98			target.put_slice(&bytes[start..]);
99			self.0 = None;
100		} else {
101			let end = start + target.remaining();
102			target.put_slice(&bytes[start..end]);
103			self.0 = Some((bytes, end));
104		}
105
106		Poll::Ready(Ok(()))
107	}
108}
109
110#[derive(Debug)]
111pub enum TailEvent {
112	/// A new line was read from the file. The line includes its trailing newline character.
113	Line(String),
114	/// The file appears to have been rewritten (size shrunk)
115	Reset,
116	/// An error was encountered with the file.
117	Err(io::Error),
118}
119
120/// Simple, naive implementation of `tail -f -n <n> <path>`. Uses polling, so
121/// it's not the fastest, but simple and working for easy cases.
122pub fn tailf(file: File, n: usize) -> mpsc::UnboundedReceiver<TailEvent> {
123	let (tx, rx) = mpsc::unbounded_channel();
124	let mut last_len = match file.metadata() {
125		Ok(m) => m.len(),
126		Err(e) => {
127			tx.send(TailEvent::Err(e)).ok();
128			return rx;
129		}
130	};
131
132	let mut reader = io::BufReader::new(file);
133	let mut pos = 0;
134
135	// Read the initial "n" lines back from the request. initial_lines
136	// is a small ring buffer.
137	let mut initial_lines = RingBuffer::new(n);
138	loop {
139		let mut line = String::new();
140		let bytes_read = match reader.read_line(&mut line) {
141			Ok(0) => break,
142			Ok(n) => n,
143			Err(e) => {
144				tx.send(TailEvent::Err(e)).ok();
145				return rx;
146			}
147		};
148
149		if !line.ends_with('\n') {
150			// EOF
151			break;
152		}
153
154		pos += bytes_read as u64;
155		initial_lines.push(line);
156	}
157
158	for line in initial_lines.into_iter() {
159		tx.send(TailEvent::Line(line)).ok();
160	}
161
162	// now spawn the poll process to keep reading new lines
163	tokio::spawn(async move {
164		let poll_interval = Duration::from_millis(500);
165
166		loop {
167			tokio::select! {
168				_ = sleep(poll_interval) => {},
169				_ = tx.closed() => return
170			}
171
172			match reader.get_ref().metadata() {
173				Err(e) => {
174					tx.send(TailEvent::Err(e)).ok();
175					return;
176				}
177				Ok(m) => {
178					if m.len() == last_len {
179						continue;
180					}
181
182					if m.len() < last_len {
183						tx.send(TailEvent::Reset).ok();
184						pos = 0;
185					}
186
187					last_len = m.len();
188				}
189			}
190
191			if let Err(e) = reader.seek(io::SeekFrom::Start(pos)) {
192				tx.send(TailEvent::Err(e)).ok();
193				return;
194			}
195
196			loop {
197				let mut line = String::new();
198				let n = match reader.read_line(&mut line) {
199					Ok(0) => break,
200					Ok(n) => n,
201					Err(e) => {
202						tx.send(TailEvent::Err(e)).ok();
203						return;
204					}
205				};
206
207				if n == 0 || !line.ends_with('\n') {
208					break;
209				}
210
211				pos += n as u64;
212				if tx.send(TailEvent::Line(line)).is_err() {
213					return;
214				}
215			}
216		}
217	});
218
219	rx
220}
221
222#[cfg(test)]
223mod tests {
224	use rand::Rng;
225	use std::{fs::OpenOptions, io::Write};
226
227	use super::*;
228
229	#[tokio::test]
230	async fn test_tailf_empty() {
231		let dir = tempfile::tempdir().unwrap();
232		let file_path = dir.path().join("tmp");
233
234		let read_file = OpenOptions::new()
235			.write(true)
236			.read(true)
237			.create(true)
238			.open(&file_path)
239			.unwrap();
240
241		let mut rx = tailf(read_file, 32);
242		assert!(rx.try_recv().is_err());
243
244		let mut append_file = OpenOptions::new()
245			.write(true)
246			.append(true)
247			.open(&file_path)
248			.unwrap();
249		writeln!(&mut append_file, "some line").unwrap();
250
251		let recv = rx.recv().await;
252		if let Some(TailEvent::Line(l)) = recv {
253			assert_eq!("some line\n".to_string(), l);
254		} else {
255			unreachable!("expect a line event, got {:?}", recv)
256		}
257
258		write!(&mut append_file, "partial ").unwrap();
259		writeln!(&mut append_file, "line").unwrap();
260
261		let recv = rx.recv().await;
262		if let Some(TailEvent::Line(l)) = recv {
263			assert_eq!("partial line\n".to_string(), l);
264		} else {
265			unreachable!("expect a line event, got {:?}", recv)
266		}
267	}
268
269	#[tokio::test]
270	async fn test_tailf_resets() {
271		let dir = tempfile::tempdir().unwrap();
272		let file_path = dir.path().join("tmp");
273
274		let mut read_file = OpenOptions::new()
275			.write(true)
276			.read(true)
277			.create(true)
278			.open(&file_path)
279			.unwrap();
280
281		writeln!(&mut read_file, "some existing content").unwrap();
282		let mut rx = tailf(read_file, 0);
283		assert!(rx.try_recv().is_err());
284
285		let mut append_file = File::create(&file_path).unwrap(); // truncates
286		writeln!(&mut append_file, "some line").unwrap();
287
288		let recv = rx.recv().await;
289		if let Some(TailEvent::Reset) = recv {
290			// ok
291		} else {
292			unreachable!("expect a reset event, got {:?}", recv)
293		}
294
295		let recv = rx.recv().await;
296		if let Some(TailEvent::Line(l)) = recv {
297			assert_eq!("some line\n".to_string(), l);
298		} else {
299			unreachable!("expect a line event, got {:?}", recv)
300		}
301	}
302
303	#[tokio::test]
304	async fn test_tailf_with_data() {
305		let dir = tempfile::tempdir().unwrap();
306		let file_path = dir.path().join("tmp");
307
308		let mut read_file = OpenOptions::new()
309			.write(true)
310			.read(true)
311			.create(true)
312			.open(&file_path)
313			.unwrap();
314		let mut rng = rand::thread_rng();
315
316		let mut written = vec![];
317		let base_line = "Elit ipsum cillum ex cillum. Adipisicing consequat cupidatat do proident ut in sunt Lorem ipsum tempor. Eiusmod ipsum Lorem labore exercitation sunt pariatur excepteur fugiat cillum velit cillum enim. Nisi Lorem cupidatat ad enim velit officia eiusmod esse tempor aliquip. Deserunt pariatur tempor in duis culpa esse sit nulla irure ullamco ipsum voluptate non laboris. Occaecat officia nulla officia mollit do aliquip reprehenderit ad incididunt.";
318		for i in 0..100 {
319			let line = format!("{}: {}", i, &base_line[..rng.gen_range(0..base_line.len())]);
320			writeln!(&mut read_file, "{}", line).unwrap();
321			written.push(line);
322		}
323		write!(&mut read_file, "partial line").unwrap();
324		read_file.seek(io::SeekFrom::Start(0)).unwrap();
325
326		let last_n = 32;
327		let mut rx = tailf(read_file, last_n);
328		for i in 0..last_n {
329			let recv = rx.try_recv().unwrap();
330			if let TailEvent::Line(l) = recv {
331				let mut expected = written[written.len() - last_n + i].to_string();
332				expected.push('\n');
333				assert_eq!(expected, l);
334			} else {
335				unreachable!("expect a line event, got {:?}", recv)
336			}
337		}
338
339		assert!(rx.try_recv().is_err());
340
341		let mut append_file = OpenOptions::new()
342			.write(true)
343			.append(true)
344			.open(&file_path)
345			.unwrap();
346		writeln!(append_file, " is now complete").unwrap();
347
348		let recv = rx.recv().await;
349		if let Some(TailEvent::Line(l)) = recv {
350			assert_eq!("partial line is now complete\n".to_string(), l);
351		} else {
352			unreachable!("expect a line event, got {:?}", recv)
353		}
354	}
355}