1use std::{
6 fs::File,
7 io::{self, BufRead, Seek},
8 task::Poll,
9 time::Duration,
10};
11
12use tokio::{
13 io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
14 sync::mpsc,
15 time::sleep,
16};
17
18use super::ring_buffer::RingBuffer;
19
20pub trait ReportCopyProgress {
21 fn report_progress(&mut self, bytes_so_far: u64, total_bytes: u64);
22}
23
24pub struct SilentCopyProgress();
26
27impl ReportCopyProgress for SilentCopyProgress {
28 fn report_progress(&mut self, _bytes_so_far: u64, _total_bytes: u64) {}
29}
30
31pub async fn copy_async_progress<T, R, W>(
34 mut reporter: T,
35 reader: &mut R,
36 writer: &mut W,
37 total_bytes: u64,
38) -> io::Result<u64>
39where
40 R: AsyncRead + Unpin,
41 W: AsyncWrite + Unpin,
42 T: ReportCopyProgress,
43{
44 let mut buf = vec![0; 8 * 1024];
45 let mut bytes_so_far = 0;
46 let mut bytes_last_reported = 0;
47 let report_granularity = std::cmp::min(total_bytes / 10, 2 * 1024 * 1024);
48
49 reporter.report_progress(0, total_bytes);
50
51 loop {
52 let read_buf = match reader.read(&mut buf).await {
53 Ok(0) => break,
54 Ok(n) => &buf[..n],
55 Err(e) => return Err(e),
56 };
57
58 writer.write_all(read_buf).await?;
59
60 bytes_so_far += read_buf.len() as u64;
61 if bytes_so_far - bytes_last_reported > report_granularity {
62 bytes_last_reported = bytes_so_far;
63 reporter.report_progress(bytes_so_far, total_bytes);
64 }
65 }
66
67 reporter.report_progress(bytes_so_far, total_bytes);
68
69 Ok(bytes_so_far)
70}
71
72#[derive(Default)]
75pub(crate) struct ReadBuffer(Option<(Vec<u8>, usize)>);
76
77impl ReadBuffer {
78 pub fn take_data(&mut self) -> Option<(Vec<u8>, usize)> {
80 self.0.take()
81 }
82
83 pub fn put_data(
85 &mut self,
86 target: &mut tokio::io::ReadBuf<'_>,
87 bytes: Vec<u8>,
88 start: usize,
89 ) -> Poll<std::io::Result<()>> {
90 if bytes.is_empty() {
91 self.0 = None;
92 return Poll::Pending;
95 }
96
97 if target.remaining() >= bytes.len() - start {
98 target.put_slice(&bytes[start..]);
99 self.0 = None;
100 } else {
101 let end = start + target.remaining();
102 target.put_slice(&bytes[start..end]);
103 self.0 = Some((bytes, end));
104 }
105
106 Poll::Ready(Ok(()))
107 }
108}
109
110#[derive(Debug)]
111pub enum TailEvent {
112 Line(String),
114 Reset,
116 Err(io::Error),
118}
119
120pub fn tailf(file: File, n: usize) -> mpsc::UnboundedReceiver<TailEvent> {
123 let (tx, rx) = mpsc::unbounded_channel();
124 let mut last_len = match file.metadata() {
125 Ok(m) => m.len(),
126 Err(e) => {
127 tx.send(TailEvent::Err(e)).ok();
128 return rx;
129 }
130 };
131
132 let mut reader = io::BufReader::new(file);
133 let mut pos = 0;
134
135 let mut initial_lines = RingBuffer::new(n);
138 loop {
139 let mut line = String::new();
140 let bytes_read = match reader.read_line(&mut line) {
141 Ok(0) => break,
142 Ok(n) => n,
143 Err(e) => {
144 tx.send(TailEvent::Err(e)).ok();
145 return rx;
146 }
147 };
148
149 if !line.ends_with('\n') {
150 break;
152 }
153
154 pos += bytes_read as u64;
155 initial_lines.push(line);
156 }
157
158 for line in initial_lines.into_iter() {
159 tx.send(TailEvent::Line(line)).ok();
160 }
161
162 tokio::spawn(async move {
164 let poll_interval = Duration::from_millis(500);
165
166 loop {
167 tokio::select! {
168 _ = sleep(poll_interval) => {},
169 _ = tx.closed() => return
170 }
171
172 match reader.get_ref().metadata() {
173 Err(e) => {
174 tx.send(TailEvent::Err(e)).ok();
175 return;
176 }
177 Ok(m) => {
178 if m.len() == last_len {
179 continue;
180 }
181
182 if m.len() < last_len {
183 tx.send(TailEvent::Reset).ok();
184 pos = 0;
185 }
186
187 last_len = m.len();
188 }
189 }
190
191 if let Err(e) = reader.seek(io::SeekFrom::Start(pos)) {
192 tx.send(TailEvent::Err(e)).ok();
193 return;
194 }
195
196 loop {
197 let mut line = String::new();
198 let n = match reader.read_line(&mut line) {
199 Ok(0) => break,
200 Ok(n) => n,
201 Err(e) => {
202 tx.send(TailEvent::Err(e)).ok();
203 return;
204 }
205 };
206
207 if n == 0 || !line.ends_with('\n') {
208 break;
209 }
210
211 pos += n as u64;
212 if tx.send(TailEvent::Line(line)).is_err() {
213 return;
214 }
215 }
216 }
217 });
218
219 rx
220}
221
222#[cfg(test)]
223mod tests {
224 use rand::Rng;
225 use std::{fs::OpenOptions, io::Write};
226
227 use super::*;
228
229 #[tokio::test]
230 async fn test_tailf_empty() {
231 let dir = tempfile::tempdir().unwrap();
232 let file_path = dir.path().join("tmp");
233
234 let read_file = OpenOptions::new()
235 .write(true)
236 .read(true)
237 .create(true)
238 .open(&file_path)
239 .unwrap();
240
241 let mut rx = tailf(read_file, 32);
242 assert!(rx.try_recv().is_err());
243
244 let mut append_file = OpenOptions::new()
245 .write(true)
246 .append(true)
247 .open(&file_path)
248 .unwrap();
249 writeln!(&mut append_file, "some line").unwrap();
250
251 let recv = rx.recv().await;
252 if let Some(TailEvent::Line(l)) = recv {
253 assert_eq!("some line\n".to_string(), l);
254 } else {
255 unreachable!("expect a line event, got {:?}", recv)
256 }
257
258 write!(&mut append_file, "partial ").unwrap();
259 writeln!(&mut append_file, "line").unwrap();
260
261 let recv = rx.recv().await;
262 if let Some(TailEvent::Line(l)) = recv {
263 assert_eq!("partial line\n".to_string(), l);
264 } else {
265 unreachable!("expect a line event, got {:?}", recv)
266 }
267 }
268
269 #[tokio::test]
270 async fn test_tailf_resets() {
271 let dir = tempfile::tempdir().unwrap();
272 let file_path = dir.path().join("tmp");
273
274 let mut read_file = OpenOptions::new()
275 .write(true)
276 .read(true)
277 .create(true)
278 .open(&file_path)
279 .unwrap();
280
281 writeln!(&mut read_file, "some existing content").unwrap();
282 let mut rx = tailf(read_file, 0);
283 assert!(rx.try_recv().is_err());
284
285 let mut append_file = File::create(&file_path).unwrap(); writeln!(&mut append_file, "some line").unwrap();
287
288 let recv = rx.recv().await;
289 if let Some(TailEvent::Reset) = recv {
290 } else {
292 unreachable!("expect a reset event, got {:?}", recv)
293 }
294
295 let recv = rx.recv().await;
296 if let Some(TailEvent::Line(l)) = recv {
297 assert_eq!("some line\n".to_string(), l);
298 } else {
299 unreachable!("expect a line event, got {:?}", recv)
300 }
301 }
302
303 #[tokio::test]
304 async fn test_tailf_with_data() {
305 let dir = tempfile::tempdir().unwrap();
306 let file_path = dir.path().join("tmp");
307
308 let mut read_file = OpenOptions::new()
309 .write(true)
310 .read(true)
311 .create(true)
312 .open(&file_path)
313 .unwrap();
314 let mut rng = rand::thread_rng();
315
316 let mut written = vec![];
317 let base_line = "Elit ipsum cillum ex cillum. Adipisicing consequat cupidatat do proident ut in sunt Lorem ipsum tempor. Eiusmod ipsum Lorem labore exercitation sunt pariatur excepteur fugiat cillum velit cillum enim. Nisi Lorem cupidatat ad enim velit officia eiusmod esse tempor aliquip. Deserunt pariatur tempor in duis culpa esse sit nulla irure ullamco ipsum voluptate non laboris. Occaecat officia nulla officia mollit do aliquip reprehenderit ad incididunt.";
318 for i in 0..100 {
319 let line = format!("{}: {}", i, &base_line[..rng.gen_range(0..base_line.len())]);
320 writeln!(&mut read_file, "{}", line).unwrap();
321 written.push(line);
322 }
323 write!(&mut read_file, "partial line").unwrap();
324 read_file.seek(io::SeekFrom::Start(0)).unwrap();
325
326 let last_n = 32;
327 let mut rx = tailf(read_file, last_n);
328 for i in 0..last_n {
329 let recv = rx.try_recv().unwrap();
330 if let TailEvent::Line(l) = recv {
331 let mut expected = written[written.len() - last_n + i].to_string();
332 expected.push('\n');
333 assert_eq!(expected, l);
334 } else {
335 unreachable!("expect a line event, got {:?}", recv)
336 }
337 }
338
339 assert!(rx.try_recv().is_err());
340
341 let mut append_file = OpenOptions::new()
342 .write(true)
343 .append(true)
344 .open(&file_path)
345 .unwrap();
346 writeln!(append_file, " is now complete").unwrap();
347
348 let recv = rx.recv().await;
349 if let Some(TailEvent::Line(l)) = recv {
350 assert_eq!("partial line is now complete\n".to_string(), l);
351 } else {
352 unreachable!("expect a line event, got {:?}", recv)
353 }
354 }
355}