Skip to main content

stash_cli/store/
push.rs

1use signal_hook::SigId;
2use signal_hook::consts::signal::{SIGINT, SIGTERM};
3use signal_hook::low_level;
4use std::collections::BTreeMap;
5use std::fs::{self, File};
6use std::io::{self, Read, Write};
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
10
11use super::PartialSavedError;
12
13struct PartialSaveOptions {
14    save_on_error: bool,
15    save_empty: bool,
16    signal: Option<i32>,
17}
18
19pub fn push_from_reader<R: Read>(
20    reader: &mut R,
21    attrs: BTreeMap<String, String>,
22) -> io::Result<String> {
23    super::init()?;
24    let interrupted = Arc::new(AtomicBool::new(false));
25    let signal = Arc::new(AtomicI32::new(0));
26    let _signal_guard = SignalGuard::new(&interrupted, &signal)?;
27    let id = super::new_ulid()?;
28    let data_path = super::tmp_dir()?.join(format!("{id}.data"));
29    let data = File::create(&data_path)?;
30    run_read_loop(
31        reader,
32        None,
33        data,
34        data_path,
35        id,
36        attrs,
37        &interrupted,
38        &signal,
39        true,
40    )
41}
42
43pub fn tee_from_reader_partial<R: Read, W: Write>(
44    reader: &mut R,
45    stdout: &mut W,
46    attrs: BTreeMap<String, String>,
47    save_on_error: bool,
48) -> io::Result<String> {
49    super::init()?;
50    let interrupted = Arc::new(AtomicBool::new(false));
51    let signal = Arc::new(AtomicI32::new(0));
52    let _signal_guard = SignalGuard::new(&interrupted, &signal)?;
53    let id = super::new_ulid()?;
54    let data_path = super::tmp_dir()?.join(format!("{id}.data"));
55    let data = File::create(&data_path)?;
56    run_read_loop(
57        reader,
58        Some(stdout as &mut dyn Write),
59        data,
60        data_path,
61        id,
62        attrs,
63        &interrupted,
64        &signal,
65        save_on_error,
66    )
67}
68
69#[inline]
70fn check_interrupted(interrupted: &AtomicBool, signal: &AtomicI32) -> Option<(io::Error, i32)> {
71    if interrupted.load(Ordering::Relaxed) {
72        let signo = signal.load(Ordering::Relaxed);
73        let msg = match signo {
74            SIGTERM => "terminated by signal",
75            _ => "interrupted by signal",
76        };
77        Some((io::Error::new(io::ErrorKind::Interrupted, msg), signo))
78    } else {
79        None
80    }
81}
82
83fn run_read_loop<R: Read>(
84    reader: &mut R,
85    mut tee: Option<&mut dyn Write>,
86    mut data: File,
87    data_path: PathBuf,
88    id: String,
89    attrs: BTreeMap<String, String>,
90    interrupted: &Arc<AtomicBool>,
91    signal: &Arc<AtomicI32>,
92    save_on_error: bool,
93) -> io::Result<String> {
94    let mut sample = Vec::with_capacity(512);
95    let mut total = 0i64;
96    let mut buf = [0u8; 65536];
97    loop {
98        if let Some((err, signo)) = check_interrupted(interrupted, signal) {
99            return save_or_abort_partial(
100                id,
101                data_path,
102                &sample,
103                total,
104                attrs,
105                err,
106                PartialSaveOptions {
107                    save_on_error,
108                    save_empty: true,
109                    signal: Some(signo),
110                },
111            );
112        }
113        let n = match reader.read(&mut buf) {
114            Ok(n) => n,
115            Err(err) => {
116                return save_or_abort_partial(
117                    id,
118                    data_path,
119                    &sample,
120                    total,
121                    attrs,
122                    err,
123                    PartialSaveOptions {
124                        save_on_error,
125                        save_empty: false,
126                        signal: None,
127                    },
128                );
129            }
130        };
131        if n == 0 {
132            if let Some((err, signo)) = check_interrupted(interrupted, signal) {
133                return save_or_abort_partial(
134                    id,
135                    data_path,
136                    &sample,
137                    total,
138                    attrs,
139                    err,
140                    PartialSaveOptions {
141                        save_on_error,
142                        save_empty: true,
143                        signal: Some(signo),
144                    },
145                );
146            }
147            break;
148        }
149        let sample_len = sample.len();
150        if sample_len < 512 {
151            let need = (512 - sample_len).min(n);
152            sample.extend_from_slice(&buf[..need]);
153        }
154        if let Err(err) = data.write_all(&buf[..n]) {
155            let _ = fs::remove_file(&data_path);
156            return Err(err);
157        }
158        total += n as i64;
159        if let Some(ref mut out) = tee {
160            if let Err(err) = out.write_all(&buf[..n]) {
161                drop(data);
162                if err.kind() == io::ErrorKind::BrokenPipe {
163                    return super::finalize_saved_entry(id, data_path, &sample, total, attrs);
164                }
165                return save_or_abort_partial(
166                    id,
167                    data_path,
168                    &sample,
169                    total,
170                    attrs,
171                    err,
172                    PartialSaveOptions {
173                        save_on_error,
174                        save_empty: false,
175                        signal: None,
176                    },
177                );
178            }
179        }
180    }
181    drop(data);
182    super::finalize_saved_entry(id, data_path, &sample, total, attrs)
183}
184
185fn save_or_abort_partial(
186    id: String,
187    data_path: PathBuf,
188    sample: &[u8],
189    total: i64,
190    mut attrs: BTreeMap<String, String>,
191    err: io::Error,
192    options: PartialSaveOptions,
193) -> io::Result<String> {
194    if !options.save_on_error || (total == 0 && !options.save_empty) {
195        let _ = fs::remove_file(&data_path);
196        return Err(err);
197    }
198    attrs.insert("partial".into(), "true".into());
199    super::finalize_saved_entry(id.clone(), data_path, sample, total, attrs)?;
200    Err(io::Error::other(PartialSavedError {
201        id,
202        cause: err,
203        signal: options.signal,
204    }))
205}
206
207struct SignalGuard {
208    ids: [SigId; 2],
209}
210
211impl SignalGuard {
212    fn new(flag: &Arc<AtomicBool>, signal: &Arc<AtomicI32>) -> io::Result<Self> {
213        let id0 = register_signal(SIGINT, flag, signal)?;
214        let id1 = register_signal(SIGTERM, flag, signal)?;
215        Ok(Self { ids: [id0, id1] })
216    }
217}
218
219impl Drop for SignalGuard {
220    fn drop(&mut self) {
221        for id in &self.ids {
222            low_level::unregister(*id);
223        }
224    }
225}
226
227fn register_signal(
228    signo: i32,
229    flag: &Arc<AtomicBool>,
230    signal: &Arc<AtomicI32>,
231) -> io::Result<SigId> {
232    let flag = Arc::clone(flag);
233    let signal = Arc::clone(signal);
234    unsafe {
235        low_level::register(signo, move || {
236            signal.store(signo, Ordering::Relaxed);
237            flag.store(true, Ordering::Relaxed);
238        })
239    }
240    .map_err(io::Error::other)
241}