msgpack_tracing/
rotate.rs

1use crate::{
2    storage::Store,
3    string_cache::{CacheInstruction, CacheInstructionSet},
4    tape::{Instruction, InstructionSet, TapeMachine},
5};
6use std::{
7    fs::File,
8    io::{self, Read, Seek},
9    path::{Path, PathBuf},
10};
11
12pub struct Rotate {
13    file: Option<File>,
14    path: PathBuf,
15    path1: Option<PathBuf>,
16    max_len: u64,
17}
18impl Rotate {
19    pub fn new<P: AsRef<Path>>(path: P, max_len: u64) -> io::Result<Self> {
20        let file = File::options().append(true).create(true).open(&path)?;
21        let path1 = Self::path1(path.as_ref());
22
23        Ok(Self {
24            file: Some(file),
25            path: path.as_ref().to_owned(),
26            path1,
27            max_len,
28        })
29    }
30
31    pub fn file_mut(&mut self) -> io::Result<&mut File> {
32        self.file
33            .as_mut()
34            .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "file closed"))
35    }
36
37    pub fn do_needs_restart(&mut self) -> io::Result<bool> {
38        let max_len = self.max_len;
39        let file = self.file_mut()?;
40
41        if file.stream_position()? <= max_len {
42            return Ok(false);
43        }
44
45        self.file = None;
46
47        if let Some(path1) = self.path1.as_ref() {
48            std::fs::rename(&self.path, path1)?;
49        }
50        self.file = Some(File::create(&self.path)?);
51
52        Ok(true)
53    }
54
55    pub fn extract_logs<P: AsRef<Path>, W>(path: P, mut to: W) -> io::Result<()>
56    where
57        W: io::Write,
58    {
59        let path1 = Self::path1(path.as_ref());
60
61        let mut buf = vec![0; 4096];
62
63        let files = [
64            path1.and_then(|path| File::open(path).ok()),
65            File::open(path).ok(),
66        ];
67        let files = files.into_iter().flatten();
68
69        for mut file in files {
70            loop {
71                let n = file.read(&mut buf)?;
72                let buf = &buf[..n];
73                if buf.is_empty() {
74                    break;
75                }
76
77                to.write_all(buf)?;
78                to.flush()?;
79            }
80        }
81
82        Ok(())
83    }
84
85    fn path1(path: &Path) -> Option<PathBuf> {
86        path.to_str().map(|str| PathBuf::from(format!("{str}.1")))
87    }
88}
89impl TapeMachine<CacheInstructionSet> for Rotate {
90    fn needs_restart(&mut self) -> bool {
91        self.do_needs_restart().unwrap_or_default()
92    }
93
94    fn handle(&mut self, instruction: CacheInstruction) {
95        let Ok(file) = self.file_mut() else {
96            return;
97        };
98
99        let _ = Store::do_handle_cached(file, instruction);
100    }
101}
102impl TapeMachine<InstructionSet> for Rotate {
103    fn needs_restart(&mut self) -> bool {
104        self.do_needs_restart().unwrap_or_default()
105    }
106
107    fn handle(&mut self, instruction: Instruction) {
108        let Ok(file) = self.file_mut() else {
109            return;
110        };
111
112        let _ = Store::do_handle(file, instruction);
113    }
114}
115
116#[cfg(test)]
117pub mod tests {
118    use super::*;
119    use crate::{
120        storage::Load,
121        tape::{FieldValue, Value},
122    };
123
124    #[test]
125    fn extract_logs() {
126        let dir = tempfile::TempDir::new().unwrap();
127
128        let path = dir.path().join("log");
129        let mut rotate = Rotate::new(&path, 8).unwrap();
130        TapeMachine::<InstructionSet>::handle(&mut rotate, Instruction::Restart);
131
132        assert!(!TapeMachine::<InstructionSet>::needs_restart(&mut rotate));
133        for i in 0..8 {
134            TapeMachine::<InstructionSet>::handle(
135                &mut rotate,
136                Instruction::AddValue(FieldValue {
137                    name: "name",
138                    value: Value::Integer(i),
139                }),
140            );
141        }
142        assert!(TapeMachine::<InstructionSet>::needs_restart(&mut rotate));
143        for i in 8..16 {
144            TapeMachine::<InstructionSet>::handle(
145                &mut rotate,
146                Instruction::AddValue(FieldValue {
147                    name: "name",
148                    value: Value::Integer(i),
149                }),
150            );
151        }
152
153        let load = dir.path().join("extracted");
154        Rotate::extract_logs(&path, File::create(&load).unwrap()).unwrap();
155        let mut load = Load::new(File::open(load).unwrap());
156
157        let next = load.fetch_one().unwrap();
158        match next {
159            Some(Instruction::Restart) => {}
160            unexpected => panic!("restart: unexpected instruction {unexpected:?}"),
161        }
162
163        for i in 0..16 {
164            let next = load.fetch_one().unwrap();
165
166            match next {
167                Some(Instruction::AddValue(FieldValue {
168                    name,
169                    value: Value::Integer(value),
170                })) => {
171                    assert_eq!(name, "name");
172                    assert_eq!(value, i);
173                }
174                unexpected => panic!("{i}: unexpected instruction {unexpected:?}"),
175            }
176        }
177    }
178}