iroh_blobs/store/fs/
util.rs

1use std::{
2    fs::OpenOptions,
3    io::{self, Write},
4    path::Path,
5};
6
7/// overwrite a file with the given data.
8///
9/// This is almost like `std::fs::write`, but it does not truncate the file.
10///
11/// So if you overwrite a file with less data than it had before, the file will
12/// still have the same size as before.
13///
14/// Also, if you overwrite a file with the same data as it had before, the
15/// file will be unchanged even if the overwrite operation is interrupted.
16pub fn overwrite_and_sync(path: &Path, data: &[u8]) -> io::Result<std::fs::File> {
17    tracing::trace!(
18        "overwriting file {} with {} bytes",
19        path.display(),
20        data.len()
21    );
22    // std::fs::create_dir_all(path.parent().unwrap()).unwrap();
23    // tracing::error!("{}", path.parent().unwrap().display());
24    // tracing::error!("{}", path.parent().unwrap().metadata().unwrap().is_dir());
25    let mut file = OpenOptions::new()
26        .write(true)
27        .create(true)
28        .truncate(false)
29        .open(path)?;
30    file.write_all(data)?;
31    // todo: figure out if it is safe to not sync here
32    file.sync_all()?;
33    Ok(file)
34}
35
36/// Read a file into memory and then delete it.
37pub fn read_and_remove(path: &Path) -> io::Result<Vec<u8>> {
38    let data = std::fs::read(path)?;
39    // todo: should we fail here or just log a warning?
40    // remove could fail e.g. on windows if the file is still open
41    std::fs::remove_file(path)?;
42    Ok(data)
43}
44
45/// A wrapper for a flume receiver that allows peeking at the next message.
46#[derive(Debug)]
47pub(super) struct PeekableFlumeReceiver<T> {
48    msg: Option<T>,
49    recv: async_channel::Receiver<T>,
50}
51
52#[allow(dead_code)]
53impl<T> PeekableFlumeReceiver<T> {
54    pub fn new(recv: async_channel::Receiver<T>) -> Self {
55        Self { msg: None, recv }
56    }
57
58    /// Receive the next message.
59    ///
60    /// Will block if there are no messages.
61    /// Returns None only if there are no more messages (sender is dropped).
62    pub async fn recv(&mut self) -> Option<T> {
63        if let Some(msg) = self.msg.take() {
64            return Some(msg);
65        }
66        self.recv.recv().await.ok()
67    }
68
69    /// Push back a message. This will only work if there is room for it.
70    /// Otherwise, it will fail and return the message.
71    pub fn push_back(&mut self, msg: T) -> std::result::Result<(), T> {
72        if self.msg.is_none() {
73            self.msg = Some(msg);
74            Ok(())
75        } else {
76            Err(msg)
77        }
78    }
79}