1use signal_hook::SigId;
2use signal_hook::consts::signal::{SIGINT, SIGTERM};
3use signal_hook::low_level;
4use std::collections::BTreeMap;
5use std::fs::{self, File};
6use std::io::{self, Read, Write};
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
10
11use super::PartialSavedError;
12
13struct PartialSaveOptions {
14 save_on_error: bool,
15 save_empty: bool,
16 signal: Option<i32>,
17}
18
19pub fn push_from_reader<R: Read>(
20 reader: &mut R,
21 attrs: BTreeMap<String, String>,
22) -> io::Result<String> {
23 super::init()?;
24 let interrupted = Arc::new(AtomicBool::new(false));
25 let signal = Arc::new(AtomicI32::new(0));
26 let _signal_guard = SignalGuard::new(&interrupted, &signal)?;
27 let id = super::new_ulid()?;
28 let data_path = super::tmp_dir()?.join(format!("{id}.data"));
29 let data = File::create(&data_path)?;
30 run_read_loop(
31 reader,
32 None,
33 data,
34 data_path,
35 id,
36 attrs,
37 &interrupted,
38 &signal,
39 true,
40 )
41}
42
43pub fn tee_from_reader_partial<R: Read, W: Write>(
44 reader: &mut R,
45 stdout: &mut W,
46 attrs: BTreeMap<String, String>,
47 save_on_error: bool,
48) -> io::Result<String> {
49 super::init()?;
50 let interrupted = Arc::new(AtomicBool::new(false));
51 let signal = Arc::new(AtomicI32::new(0));
52 let _signal_guard = SignalGuard::new(&interrupted, &signal)?;
53 let id = super::new_ulid()?;
54 let data_path = super::tmp_dir()?.join(format!("{id}.data"));
55 let data = File::create(&data_path)?;
56 run_read_loop(
57 reader,
58 Some(stdout as &mut dyn Write),
59 data,
60 data_path,
61 id,
62 attrs,
63 &interrupted,
64 &signal,
65 save_on_error,
66 )
67}
68
69#[inline]
70fn check_interrupted(interrupted: &AtomicBool, signal: &AtomicI32) -> Option<(io::Error, i32)> {
71 if interrupted.load(Ordering::Relaxed) {
72 let signo = signal.load(Ordering::Relaxed);
73 let msg = match signo {
74 SIGTERM => "terminated by signal",
75 _ => "interrupted by signal",
76 };
77 Some((io::Error::new(io::ErrorKind::Interrupted, msg), signo))
78 } else {
79 None
80 }
81}
82
83fn run_read_loop<R: Read>(
84 reader: &mut R,
85 mut tee: Option<&mut dyn Write>,
86 mut data: File,
87 data_path: PathBuf,
88 id: String,
89 attrs: BTreeMap<String, String>,
90 interrupted: &Arc<AtomicBool>,
91 signal: &Arc<AtomicI32>,
92 save_on_error: bool,
93) -> io::Result<String> {
94 let mut sample = Vec::with_capacity(512);
95 let mut total = 0i64;
96 let mut buf = [0u8; 65536];
97 loop {
98 if let Some((err, signo)) = check_interrupted(interrupted, signal) {
99 return save_or_abort_partial(
100 id,
101 data_path,
102 &sample,
103 total,
104 attrs,
105 err,
106 PartialSaveOptions {
107 save_on_error,
108 save_empty: true,
109 signal: Some(signo),
110 },
111 );
112 }
113 let n = match reader.read(&mut buf) {
114 Ok(n) => n,
115 Err(err) => {
116 return save_or_abort_partial(
117 id,
118 data_path,
119 &sample,
120 total,
121 attrs,
122 err,
123 PartialSaveOptions {
124 save_on_error,
125 save_empty: false,
126 signal: None,
127 },
128 );
129 }
130 };
131 if n == 0 {
132 if let Some((err, signo)) = check_interrupted(interrupted, signal) {
133 return save_or_abort_partial(
134 id,
135 data_path,
136 &sample,
137 total,
138 attrs,
139 err,
140 PartialSaveOptions {
141 save_on_error,
142 save_empty: true,
143 signal: Some(signo),
144 },
145 );
146 }
147 break;
148 }
149 let sample_len = sample.len();
150 if sample_len < 512 {
151 let need = (512 - sample_len).min(n);
152 sample.extend_from_slice(&buf[..need]);
153 }
154 if let Err(err) = data.write_all(&buf[..n]) {
155 let _ = fs::remove_file(&data_path);
156 return Err(err);
157 }
158 total += n as i64;
159 if let Some(ref mut out) = tee {
160 if let Err(err) = out.write_all(&buf[..n]) {
161 drop(data);
162 if err.kind() == io::ErrorKind::BrokenPipe {
163 return super::finalize_saved_entry(id, data_path, &sample, total, attrs);
164 }
165 return save_or_abort_partial(
166 id,
167 data_path,
168 &sample,
169 total,
170 attrs,
171 err,
172 PartialSaveOptions {
173 save_on_error,
174 save_empty: false,
175 signal: None,
176 },
177 );
178 }
179 }
180 }
181 drop(data);
182 super::finalize_saved_entry(id, data_path, &sample, total, attrs)
183}
184
185fn save_or_abort_partial(
186 id: String,
187 data_path: PathBuf,
188 sample: &[u8],
189 total: i64,
190 mut attrs: BTreeMap<String, String>,
191 err: io::Error,
192 options: PartialSaveOptions,
193) -> io::Result<String> {
194 if !options.save_on_error || (total == 0 && !options.save_empty) {
195 let _ = fs::remove_file(&data_path);
196 return Err(err);
197 }
198 attrs.insert("partial".into(), "true".into());
199 super::finalize_saved_entry(id.clone(), data_path, sample, total, attrs)?;
200 Err(io::Error::other(PartialSavedError {
201 id,
202 cause: err,
203 signal: options.signal,
204 }))
205}
206
207struct SignalGuard {
208 ids: [SigId; 2],
209}
210
211impl SignalGuard {
212 fn new(flag: &Arc<AtomicBool>, signal: &Arc<AtomicI32>) -> io::Result<Self> {
213 let id0 = register_signal(SIGINT, flag, signal)?;
214 let id1 = register_signal(SIGTERM, flag, signal)?;
215 Ok(Self { ids: [id0, id1] })
216 }
217}
218
219impl Drop for SignalGuard {
220 fn drop(&mut self) {
221 for id in &self.ids {
222 low_level::unregister(*id);
223 }
224 }
225}
226
227fn register_signal(
228 signo: i32,
229 flag: &Arc<AtomicBool>,
230 signal: &Arc<AtomicI32>,
231) -> io::Result<SigId> {
232 let flag = Arc::clone(flag);
233 let signal = Arc::clone(signal);
234 unsafe {
235 low_level::register(signo, move || {
236 signal.store(signo, Ordering::Relaxed);
237 flag.store(true, Ordering::Relaxed);
238 })
239 }
240 .map_err(io::Error::other)
241}