1use futures::future::BoxFuture;
12use std::{
13 io::{self, Write},
14 path::{Path, PathBuf},
15 pin::Pin,
16 sync::{
17 atomic::{AtomicU64, Ordering},
18 Arc,
19 },
20 task::{Context, Poll},
21};
22use tokio::{
23 fs::{remove_file, rename, File, OpenOptions},
24 io::{AsyncWrite, AsyncWriteExt},
25 sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
26 task::JoinHandle,
27};
28
29use crate::DEFAULT_LOG_MAX_SIZE;
30
31pub struct RotatingLog {
52 file: File,
54
55 path: PathBuf,
57
58 max_size: u64,
60
61 current_size: Arc<AtomicU64>,
63
64 state: State,
66
67 tx: UnboundedSender<Vec<u8>>,
69
70 _background_task: JoinHandle<()>,
72}
73
74enum State {
76 Idle,
78
79 Rotating(RotationFuture),
81
82 Writing,
84}
85
86pub struct SyncChannelWriter {
88 tx: UnboundedSender<Vec<u8>>,
89}
90
91type RotationFuture = BoxFuture<'static, io::Result<(File, PathBuf)>>;
92
93impl RotatingLog {
98 pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
113 Self::with_max_size(path, DEFAULT_LOG_MAX_SIZE).await
114 }
115
116 pub async fn with_max_size(path: impl AsRef<Path>, max_size: u64) -> io::Result<Self> {
124 let path = path.as_ref().to_path_buf();
125 let file = OpenOptions::new()
126 .create(true)
127 .append(true)
128 .open(&path)
129 .await?;
130 let metadata = file.metadata().await?;
131 let (tx, rx) = mpsc::unbounded_channel();
132
133 let current_size = Arc::new(AtomicU64::new(metadata.len()));
135
136 let bg_file = file.try_clone().await?;
138 let bg_path = path.clone();
139 let bg_max_size = max_size;
140 let bg_size = Arc::clone(¤t_size);
141
142 let background_task = tokio::spawn(async move {
144 handle_channel_data(rx, bg_file, bg_path, bg_max_size, bg_size).await
145 });
146
147 Ok(Self {
148 file,
149 path,
150 max_size,
151 current_size,
152 state: State::Idle,
153 tx,
154 _background_task: background_task,
155 })
156 }
157
158 pub fn get_sync_writer(&self) -> SyncChannelWriter {
160 SyncChannelWriter::new(self.tx.clone())
161 }
162}
163
164impl SyncChannelWriter {
165 pub fn new(tx: UnboundedSender<Vec<u8>>) -> Self {
167 Self { tx }
168 }
169}
170
171async fn do_rotation(file: File, path: PathBuf) -> io::Result<(File, PathBuf)> {
196 file.sync_all().await?;
197 let backup_path = path.with_extension("old");
198 if backup_path.exists() {
199 remove_file(&backup_path).await?;
200 }
201
202 rename(&path, &backup_path).await?;
203
204 let new_file = OpenOptions::new()
205 .create(true)
206 .append(true)
207 .open(&path)
208 .await?;
209
210 Ok((new_file, path))
211}
212
213async fn handle_channel_data(
215 mut rx: UnboundedReceiver<Vec<u8>>,
216 mut file: File,
217 path: PathBuf,
218 max_size: u64,
219 current_size: Arc<AtomicU64>,
220) {
221 while let Some(data) = rx.recv().await {
222 let data_len = data.len() as u64;
223 let size = current_size.fetch_add(data_len, Ordering::Relaxed);
224
225 if size + data_len > max_size {
226 if let Ok(file_clone) = file.try_clone().await {
228 match do_rotation(file_clone, path.clone()).await {
229 Ok((new_file, _)) => {
230 file = new_file;
231 current_size.store(0, Ordering::Relaxed);
232 }
233 Err(e) => {
234 tracing::error!("failed to rotate log file: {}", e);
235 continue;
236 }
237 }
238 } else {
239 tracing::error!("failed to clone file handle for rotation");
240 continue;
241 }
242 }
243
244 if let Err(e) = file.write_all(&data).await {
245 tracing::error!("failed to write to log file: {}", e);
246 current_size.fetch_sub(data_len, Ordering::Relaxed);
248 }
249 }
250}
251
252impl AsyncWrite for RotatingLog {
257 fn poll_write(
258 mut self: Pin<&mut Self>,
259 cx: &mut Context<'_>,
260 buf: &[u8],
261 ) -> Poll<io::Result<usize>> {
262 let this = &mut *self;
263 let buf_len = buf.len() as u64;
264
265 loop {
266 match &mut this.state {
267 State::Idle => {
268 let size = this.current_size.fetch_add(buf_len, Ordering::Relaxed);
269 if size + buf_len > this.max_size {
270 let old_file = std::mem::replace(
271 &mut this.file,
272 File::from_std(std::fs::File::open("/dev/null").unwrap()),
273 );
274 let old_path = this.path.clone();
275 let fut = Box::pin(do_rotation(old_file, old_path));
276 this.state = State::Rotating(fut);
277 } else {
278 this.state = State::Writing;
279 }
280 }
281 State::Rotating(fut) => {
282 match fut.as_mut().poll(cx) {
283 Poll::Pending => return Poll::Pending,
284 Poll::Ready(Err(e)) => {
285 this.state = State::Idle;
286 this.current_size.fetch_sub(buf_len, Ordering::Relaxed);
288 return Poll::Ready(Err(e));
289 }
290 Poll::Ready(Ok((new_file, new_path))) => {
291 this.file = new_file;
292 this.path = new_path;
293 this.current_size.store(0, Ordering::Relaxed);
294 this.state = State::Writing;
295 }
296 }
297 }
298 State::Writing => {
299 let pinned_file = Pin::new(&mut this.file);
300 match pinned_file.poll_write(cx, buf) {
301 Poll::Ready(Ok(written)) => {
302 this.state = State::Idle;
303 return Poll::Ready(Ok(written));
304 }
305 Poll::Ready(Err(e)) => {
306 this.state = State::Idle;
307 this.current_size.fetch_sub(buf_len, Ordering::Relaxed);
309 return Poll::Ready(Err(e));
310 }
311 Poll::Pending => return Poll::Pending,
312 }
313 }
314 }
315 }
316 }
317
318 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
319 Pin::new(&mut self.file).poll_flush(cx)
320 }
321
322 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
323 Pin::new(&mut self.file).poll_shutdown(cx)
324 }
325}
326
327impl Write for SyncChannelWriter {
328 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
329 let data = buf.to_vec();
330 self.tx.send(data).map_err(|_| {
331 io::Error::new(io::ErrorKind::Other, "failed to send log data to channel")
332 })?;
333 Ok(buf.len())
334 }
335
336 fn flush(&mut self) -> io::Result<()> {
337 Ok(())
338 }
339}
340
341#[cfg(test)]
346mod tests {
347 use super::*;
348 use std::fs;
349 use tempfile::tempdir;
350 use tokio::io::AsyncWriteExt;
351
352 #[tokio::test]
353 async fn test_create_new_log() -> io::Result<()> {
354 let dir = tempdir()?;
355 let log_path = dir.path().join("test.log");
356
357 let log = RotatingLog::with_max_size(&log_path, 1024).await?;
358 assert!(log_path.exists());
359 assert_eq!(log.max_size, 1024);
360 assert_eq!(log.current_size.load(Ordering::Relaxed), 0);
361
362 Ok(())
363 }
364
365 #[tokio::test]
366 async fn test_write_to_log() -> io::Result<()> {
367 let dir = tempdir()?;
368 let log_path = dir.path().join("test.log");
369
370 let mut log = RotatingLog::with_max_size(&log_path, 1024).await?;
371 let test_data = b"test log entry\n";
372 log.write_all(test_data).await?;
373 log.flush().await?;
374
375 let content = fs::read_to_string(&log_path)?;
376 assert_eq!(content, String::from_utf8_lossy(test_data));
377 assert_eq!(
378 log.current_size.load(Ordering::Relaxed),
379 test_data.len() as u64
380 );
381
382 Ok(())
383 }
384
385 #[tokio::test]
386 async fn test_log_rotation() -> io::Result<()> {
387 let dir = tempdir()?;
388 let log_path = dir.path().join("test.log");
389 let max_size = 20; let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;
392
393 let first_entry = b"first entry\n";
395 log.write_all(first_entry).await?;
396 log.flush().await?;
397
398 let second_entry = b"second entry\n";
399 log.write_all(second_entry).await?;
400 log.flush().await?;
401
402 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
404
405 assert!(log_path.exists());
407 assert!(log_path.with_extension("old").exists());
408
409 let old_content = fs::read_to_string(log_path.with_extension("old"))?;
411 assert_eq!(old_content, String::from_utf8_lossy(first_entry));
412
413 let new_content = fs::read_to_string(&log_path)?;
415 assert_eq!(new_content, String::from_utf8_lossy(second_entry));
416
417 Ok(())
418 }
419
420 #[tokio::test]
421 async fn test_oversized_write() -> io::Result<()> {
422 let dir = tempdir()?;
423 let log_path = dir.path().join("test.log");
424 let max_size = 10; let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;
427
428 let large_entry = b"this is a very large log entry that exceeds the maximum size\n";
430 log.write_all(large_entry).await?;
431 log.flush().await?;
432
433 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
435
436 assert!(log_path.exists());
438 let content = fs::read_to_string(&log_path)?;
439 assert_eq!(content, String::from_utf8_lossy(large_entry));
440
441 Ok(())
442 }
443
444 #[tokio::test]
445 async fn test_sync_writer() -> io::Result<()> {
446 let dir = tempdir()?;
447 let log_path = dir.path().join("test.log");
448
449 let log = RotatingLog::with_max_size(&log_path, 1024).await?;
450 let mut sync_writer = log.get_sync_writer();
451
452 let test_data = b"sync writer test\n";
453 sync_writer.write_all(test_data)?;
454 sync_writer.flush()?;
455
456 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
458
459 let content = fs::read_to_string(&log_path)?;
460 assert_eq!(content, String::from_utf8_lossy(test_data));
461
462 Ok(())
463 }
464
465 #[tokio::test]
466 async fn test_multiple_rotations() -> io::Result<()> {
467 let dir = tempdir()?;
468 let log_path = dir.path().join("test.log");
469 let max_size = 20;
470
471 let mut log = RotatingLog::with_max_size(&log_path, max_size).await?;
472
473 for i in 0..3 {
475 let test_data = format!("rotation test {}\n", i).into_bytes();
476 log.write_all(&test_data).await?;
477 log.flush().await?;
478
479 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
481 }
482
483 assert!(log_path.exists());
485 assert!(log_path.with_extension("old").exists());
486
487 Ok(())
488 }
489}