msgpack_tracing/
rotate.rs1use crate::{
2 storage::Store,
3 string_cache::{CacheInstruction, CacheInstructionSet},
4 tape::{Instruction, InstructionSet, TapeMachine},
5};
6use std::{
7 fs::File,
8 io::{self, Read, Seek},
9 path::{Path, PathBuf},
10};
11
12pub struct Rotate {
13 file: Option<File>,
14 path: PathBuf,
15 path1: Option<PathBuf>,
16 max_len: u64,
17}
18impl Rotate {
19 pub fn new<P: AsRef<Path>>(path: P, max_len: u64) -> io::Result<Self> {
20 let file = File::options().append(true).create(true).open(&path)?;
21 let path1 = Self::path1(path.as_ref());
22
23 Ok(Self {
24 file: Some(file),
25 path: path.as_ref().to_owned(),
26 path1,
27 max_len,
28 })
29 }
30
31 pub fn file_mut(&mut self) -> io::Result<&mut File> {
32 self.file
33 .as_mut()
34 .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "file closed"))
35 }
36
37 pub fn do_needs_restart(&mut self) -> io::Result<bool> {
38 let max_len = self.max_len;
39 let file = self.file_mut()?;
40
41 if file.stream_position()? <= max_len {
42 return Ok(false);
43 }
44
45 self.file = None;
46
47 if let Some(path1) = self.path1.as_ref() {
48 std::fs::rename(&self.path, path1)?;
49 }
50 self.file = Some(File::create(&self.path)?);
51
52 Ok(true)
53 }
54
55 pub fn extract_logs<P: AsRef<Path>, W>(path: P, mut to: W) -> io::Result<()>
56 where
57 W: io::Write,
58 {
59 let path1 = Self::path1(path.as_ref());
60
61 let mut buf = vec![0; 4096];
62
63 let files = [
64 path1.and_then(|path| File::open(path).ok()),
65 File::open(path).ok(),
66 ];
67 let files = files.into_iter().flatten();
68
69 for mut file in files {
70 loop {
71 let n = file.read(&mut buf)?;
72 let buf = &buf[..n];
73 if buf.is_empty() {
74 break;
75 }
76
77 to.write_all(buf)?;
78 to.flush()?;
79 }
80 }
81
82 Ok(())
83 }
84
85 fn path1(path: &Path) -> Option<PathBuf> {
86 path.to_str().map(|str| PathBuf::from(format!("{str}.1")))
87 }
88}
89impl TapeMachine<CacheInstructionSet> for Rotate {
90 fn needs_restart(&mut self) -> bool {
91 self.do_needs_restart().unwrap_or_default()
92 }
93
94 fn handle(&mut self, instruction: CacheInstruction) {
95 let Ok(file) = self.file_mut() else {
96 return;
97 };
98
99 let _ = Store::do_handle_cached(file, instruction);
100 }
101}
102impl TapeMachine<InstructionSet> for Rotate {
103 fn needs_restart(&mut self) -> bool {
104 self.do_needs_restart().unwrap_or_default()
105 }
106
107 fn handle(&mut self, instruction: Instruction) {
108 let Ok(file) = self.file_mut() else {
109 return;
110 };
111
112 let _ = Store::do_handle(file, instruction);
113 }
114}
115
116#[cfg(test)]
117pub mod tests {
118 use super::*;
119 use crate::{
120 storage::Load,
121 tape::{FieldValue, Value},
122 };
123
124 #[test]
125 fn extract_logs() {
126 let dir = tempfile::TempDir::new().unwrap();
127
128 let path = dir.path().join("log");
129 let mut rotate = Rotate::new(&path, 8).unwrap();
130 TapeMachine::<InstructionSet>::handle(&mut rotate, Instruction::Restart);
131
132 assert!(!TapeMachine::<InstructionSet>::needs_restart(&mut rotate));
133 for i in 0..8 {
134 TapeMachine::<InstructionSet>::handle(
135 &mut rotate,
136 Instruction::AddValue(FieldValue {
137 name: "name",
138 value: Value::Integer(i),
139 }),
140 );
141 }
142 assert!(TapeMachine::<InstructionSet>::needs_restart(&mut rotate));
143 for i in 8..16 {
144 TapeMachine::<InstructionSet>::handle(
145 &mut rotate,
146 Instruction::AddValue(FieldValue {
147 name: "name",
148 value: Value::Integer(i),
149 }),
150 );
151 }
152
153 let load = dir.path().join("extracted");
154 Rotate::extract_logs(&path, File::create(&load).unwrap()).unwrap();
155 let mut load = Load::new(File::open(load).unwrap());
156
157 let next = load.fetch_one().unwrap();
158 match next {
159 Some(Instruction::Restart) => {}
160 unexpected => panic!("restart: unexpected instruction {unexpected:?}"),
161 }
162
163 for i in 0..16 {
164 let next = load.fetch_one().unwrap();
165
166 match next {
167 Some(Instruction::AddValue(FieldValue {
168 name,
169 value: Value::Integer(value),
170 })) => {
171 assert_eq!(name, "name");
172 assert_eq!(value, i);
173 }
174 unexpected => panic!("{i}: unexpected instruction {unexpected:?}"),
175 }
176 }
177 }
178}