1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//! A persistent mutex implementation using the atomicity of [`OpenOptions::create_new`]
//!
//! Please note that this `Mutex` just really works if other processes in your system
//! are willing to "play nice" with you. In most systems (Unix-like), locks are mostly
//! advisory.

use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Seek, Write};
use std::path::{Path, PathBuf};

pub use crate::sync::FileGuard;

/// A persistent mutex implementation using the atomicity of [`OpenOptions::create_new`].
/// This structure, as opposed to [`FileGuard`], holds some content in a separate file.
pub struct Mutex {
    path: PathBuf,
}

impl Mutex {
    /// Opens a new mutex, given the path for a folder in which the mutex will be mounted.
    /// This will create a new folder if one does not exist yet.
    ///
    /// # Errors
    ///
    /// This function fails if it cannot create the folder which is going to contain the
    /// mutex.
    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Mutex> {
        fs::create_dir_all(&path)?;
        Ok(Mutex {
            path: path.as_ref().to_owned(),
        })
    }

    /// Locks this mutex, awaiting for it to unlock if it is locked.
    pub async fn lock(&self) -> io::Result<MutexGuard> {
        let file_guard = FileGuard::lock(self.path.join("lock")).await?;
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(self.path.join("contents"))?;

        Ok(MutexGuard {
            _file_guard: file_guard,
            file,
        })
    }

    /// Tries to lock this mutex, returning `None` if it is locked.
    pub fn try_lock(&self) -> io::Result<Option<MutexGuard>> {
        let file_guard = FileGuard::try_lock(self.path.join("lock"))?;

        if let Some(file_guard) = file_guard {
            let file = OpenOptions::new()
                .read(true)
                .write(true)
                .create(true)
                .open(self.path.join("contents"))?;

            Ok(Some(MutexGuard {
                _file_guard: file_guard,
                file,
            }))
        } else {
            Ok(None)
        }
    }
}

/// A guard to the the [`Mutex`] when it is locked. This structure give access to
/// the contents of the mutex.
pub struct MutexGuard {
    _file_guard: FileGuard,
    file: File,
}

impl MutexGuard {
    /// Reads all the contents of the content file into a vector.
    pub fn read(&self) -> io::Result<Vec<u8>> {
        (&self.file).seek(io::SeekFrom::Start(0))?;
        (&self.file).bytes().collect::<io::Result<Vec<_>>>()
    }

    /// Writes some data to the content file, overwriting all the previous content.
    pub fn write<D: AsRef<[u8]>>(&self, data: D) -> io::Result<()> {
        (&self.file).seek(io::SeekFrom::Start(0))?;
        self.file.set_len(0)?;
        (&self.file).write_all(data.as_ref())?;
        (&self.file).flush()
    }

    /// Gives direct access to the underlying content file.
    pub fn file(&self) -> &File {
        &self.file
    }
}

// The drop order doesn't matter. Therefore, no `Drop` implementation.

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn test_mutex() {
        futures::executor::block_on(async move {
            let mutex = Mutex::open("data/mutex").unwrap();
            let guard = mutex.lock().await.unwrap();

            guard.write(b"some data").unwrap();

            drop(guard);

            let guard = mutex.lock().await.unwrap();

            assert_eq!(
                String::from_utf8_lossy(&*guard.read().unwrap()),
                String::from_utf8_lossy(b"some data")
            );
        });
    }

    #[test]
    fn test_mutex_longer_data_first() {
        futures::executor::block_on(async move {
            let mutex = Mutex::open("data/mutex-longer-data-first").unwrap();
            let guard = mutex.lock().await.unwrap();

            guard.write(b"some long data").unwrap();

            drop(guard);

            let guard = mutex.lock().await.unwrap();

            guard.write(b"some data").unwrap();

            drop(guard);

            let guard = mutex.lock().await.unwrap();

            assert_eq!(guard.read().unwrap(), b"some data");
        });
    }
}