Skip to main content

io_m2dir/entry/
store.rs

1//! I/O-free coroutine to store an entry in an m2dir.
2//!
3//! Follows the m2dir delivery protocol: write to a temporary file
4//! in the same directory first, then atomically rename to the final
5//! `<date>,<checksum>.<nonce>` filename.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use std::fs;
11//!
12//! use io_m2dir::{
13//!     coroutine::{M2dirArg, M2dirCoroutine, M2dirCoroutineState, M2dirYield},
14//!     m2dir::types::M2dir,
15//!     entry::store::{M2dirEntryStore, M2dirEntryStoreOptions},
16//! };
17//!
18//! let m2dir = M2dir::from_path("/tmp/inbox");
19//! let opts = M2dirEntryStoreOptions::default();
20//! let bytes = b"From: a\r\n\r\nhi\r\n".to_vec();
21//! let mut coroutine = M2dirEntryStore::new(m2dir, bytes, opts);
22//! let mut arg = None;
23//!
24//! let entry = loop {
25//!     match coroutine.resume(arg.take()) {
26//!         M2dirCoroutineState::Yielded(M2dirYield::WantsPid) => {
27//!             arg = Some(M2dirArg::Pid(std::process::id()));
28//!         }
29//!         M2dirCoroutineState::Yielded(M2dirYield::WantsRandom { len }) => {
30//!             let mut bytes = vec![0u8; len];
31//!             // ... fill via OS RNG ...
32//!             arg = Some(M2dirArg::Random(bytes));
33//!         }
34//!         M2dirCoroutineState::Yielded(M2dirYield::WantsFileCreate(files)) => {
35//!             for (path, bytes) in files {
36//!                 fs::write(path.as_str(), bytes).unwrap();
37//!             }
38//!             arg = Some(M2dirArg::FileCreate);
39//!         }
40//!         M2dirCoroutineState::Yielded(M2dirYield::WantsRename(pairs)) => {
41//!             for (from, to) in pairs {
42//!                 fs::rename(from.as_str(), to.as_str()).unwrap();
43//!             }
44//!             arg = Some(M2dirArg::Rename);
45//!         }
46//!         M2dirCoroutineState::Complete(Ok(entry)) => break entry,
47//!         M2dirCoroutineState::Complete(Err(err)) => panic!("{err}"),
48//!         state => panic!("unexpected state {state:?}"),
49//!     }
50//! };
51//!
52//! println!("delivered {}", entry.id());
53//! ```
54
55use core::{
56    fmt, mem,
57    sync::atomic::{AtomicU32, Ordering},
58};
59
60use alloc::{collections::BTreeMap, string::String, vec::Vec};
61
62use log::trace;
63use thiserror::Error;
64
65use crate::{coroutine::*, entry::types::M2dirEntry, m2dir::types::M2dir, path::M2dirPath};
66
67const NONCE_LEN: usize = 4;
68
69static TMP_COUNTER: AtomicU32 = AtomicU32::new(0);
70
71/// Failure causes during the m2dir STORE flow.
72#[derive(Clone, Debug, Error)]
73pub enum M2dirEntryStoreError {
74    #[error("M2DIR STORE failed: unexpected coroutine arg")]
75    UnexpectedArg,
76    #[error("M2DIR STORE failed: missing coroutine arg")]
77    MissingArg,
78}
79
80/// Options for [`M2dirEntryStore::new`].
81#[derive(Clone, Debug, Default, Eq, PartialEq)]
82pub struct M2dirEntryStoreOptions {}
83
84/// I/O-free m2dir entry STORE coroutine.
85pub struct M2dirEntryStore {
86    m2dir: M2dir,
87    bytes: Vec<u8>,
88    state: State,
89    #[allow(dead_code)]
90    opts: M2dirEntryStoreOptions,
91}
92
93impl M2dirEntryStore {
94    /// Creates a new coroutine that will store `bytes` as a new
95    /// entry in `m2dir`.
96    pub fn new(m2dir: M2dir, bytes: Vec<u8>, opts: M2dirEntryStoreOptions) -> Self {
97        Self {
98            m2dir,
99            bytes,
100            state: State::Start,
101            opts,
102        }
103    }
104}
105
106impl M2dirCoroutine for M2dirEntryStore {
107    type Yield = M2dirYield;
108    type Return = Result<M2dirEntry, M2dirEntryStoreError>;
109
110    fn resume(&mut self, arg: Option<M2dirArg>) -> M2dirCoroutineState<Self::Yield, Self::Return> {
111        trace!("store entry: {}", self.state);
112
113        match (&mut self.state, arg) {
114            (State::Start, None) => {
115                trace!("wants pid");
116                self.state = State::AwaitingPid;
117                M2dirCoroutineState::Yielded(M2dirYield::WantsPid)
118            }
119            (State::AwaitingPid, Some(M2dirArg::Pid(pid))) => {
120                trace!("wants {NONCE_LEN} random bytes");
121                self.state = State::AwaitingRandom { pid };
122                M2dirCoroutineState::Yielded(M2dirYield::WantsRandom { len: NONCE_LEN })
123            }
124            (State::AwaitingRandom { pid }, Some(M2dirArg::Random(nonce))) => {
125                let pid = *pid;
126                let bytes = mem::take(&mut self.bytes);
127
128                let (id, final_path) = self.m2dir.entry_path(&bytes, &nonce);
129                let counter = TMP_COUNTER.fetch_add(1, Ordering::AcqRel);
130                let tmp_path = self.m2dir.tmp_path(pid, counter);
131
132                trace!("wants tmp file create at {tmp_path}");
133
134                let files = BTreeMap::from_iter([(tmp_path.clone(), bytes)]);
135                self.state = State::Created {
136                    tmp_path,
137                    final_path,
138                    id,
139                };
140                M2dirCoroutineState::Yielded(M2dirYield::WantsFileCreate(files))
141            }
142            (
143                State::Created {
144                    tmp_path,
145                    final_path,
146                    id,
147                },
148                Some(M2dirArg::FileCreate),
149            ) => {
150                let tmp_path = mem::take(tmp_path);
151                let final_path = mem::take(final_path);
152                let id = mem::take(id);
153                trace!("created tmp file, wants rename to {final_path}");
154
155                let pairs = vec![(tmp_path, final_path.clone())];
156                self.state = State::Renamed { final_path, id };
157                M2dirCoroutineState::Yielded(M2dirYield::WantsRename(pairs))
158            }
159            (State::Renamed { final_path, id }, Some(M2dirArg::Rename)) => {
160                let final_path = mem::take(final_path);
161                let id = mem::take(id);
162                trace!("renamed tmp file to {final_path}");
163
164                let entry = M2dirEntry::from_parts(id, final_path);
165                M2dirCoroutineState::Complete(Ok(entry))
166            }
167            (_, Some(_)) => {
168                let err = M2dirEntryStoreError::UnexpectedArg;
169                M2dirCoroutineState::Complete(Err(err))
170            }
171            (_, None) => {
172                let err = M2dirEntryStoreError::MissingArg;
173                M2dirCoroutineState::Complete(Err(err))
174            }
175        }
176    }
177}
178
179enum State {
180    Start,
181    AwaitingPid,
182    AwaitingRandom {
183        pid: u32,
184    },
185    Created {
186        tmp_path: M2dirPath,
187        final_path: M2dirPath,
188        id: String,
189    },
190    Renamed {
191        final_path: M2dirPath,
192        id: String,
193    },
194}
195
196impl fmt::Display for State {
197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198        match self {
199            Self::Start => f.write_str("start"),
200            Self::AwaitingPid => f.write_str("awaiting pid"),
201            Self::AwaitingRandom { .. } => f.write_str("awaiting random nonce"),
202            Self::Created { .. } => f.write_str("tmp file created"),
203            Self::Renamed { .. } => f.write_str("renamed"),
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn happy_path_yields_full_delivery_sequence() {
214        let m2dir = M2dir::from_path("/tmp/inbox");
215        let mut store =
216            M2dirEntryStore::new(m2dir, b"hi".to_vec(), M2dirEntryStoreOptions::default());
217
218        match store.resume(None) {
219            M2dirCoroutineState::Yielded(M2dirYield::WantsPid) => {}
220            state => panic!("expected WantsPid, got {state:?}"),
221        }
222        match store.resume(Some(M2dirArg::Pid(42))) {
223            M2dirCoroutineState::Yielded(M2dirYield::WantsRandom { len }) => {
224                assert_eq!(len, NONCE_LEN);
225            }
226            state => panic!("expected WantsRandom, got {state:?}"),
227        }
228        let files = match store.resume(Some(M2dirArg::Random(b"abcd".to_vec()))) {
229            M2dirCoroutineState::Yielded(M2dirYield::WantsFileCreate(files)) => files,
230            state => panic!("expected WantsFileCreate, got {state:?}"),
231        };
232        assert_eq!(files.len(), 1);
233        let (tmp_path, _) = files.into_iter().next().unwrap();
234        assert!(tmp_path.as_str().contains(".m2dir.tmp."));
235
236        let pairs = match store.resume(Some(M2dirArg::FileCreate)) {
237            M2dirCoroutineState::Yielded(M2dirYield::WantsRename(pairs)) => pairs,
238            state => panic!("expected WantsRename, got {state:?}"),
239        };
240        assert_eq!(pairs.len(), 1);
241
242        let entry = match store.resume(Some(M2dirArg::Rename)) {
243            M2dirCoroutineState::Complete(Ok(entry)) => entry,
244            state => panic!("expected Complete(Ok), got {state:?}"),
245        };
246        assert!(entry.path().as_str().starts_with("/tmp/inbox/"));
247    }
248
249    #[test]
250    fn unexpected_arg_at_start_returns_unexpected_arg_error() {
251        let m2dir = M2dir::from_path("/tmp/inbox");
252        let mut store =
253            M2dirEntryStore::new(m2dir, b"hi".to_vec(), M2dirEntryStoreOptions::default());
254
255        let err = match store.resume(Some(M2dirArg::Pid(42))) {
256            M2dirCoroutineState::Complete(Err(err)) => err,
257            state => panic!("expected Complete(Err), got {state:?}"),
258        };
259        assert!(matches!(err, M2dirEntryStoreError::UnexpectedArg));
260    }
261
262    #[test]
263    fn missing_arg_at_awaiting_pid_returns_missing_arg_error() {
264        let m2dir = M2dir::from_path("/tmp/inbox");
265        let mut store =
266            M2dirEntryStore::new(m2dir, b"hi".to_vec(), M2dirEntryStoreOptions::default());
267        let _ = store.resume(None);
268
269        let err = match store.resume(None) {
270            M2dirCoroutineState::Complete(Err(err)) => err,
271            state => panic!("expected Complete(Err), got {state:?}"),
272        };
273        assert!(matches!(err, M2dirEntryStoreError::MissingArg));
274    }
275
276    #[test]
277    fn wrong_arg_kind_at_awaiting_random_returns_unexpected_arg_error() {
278        let m2dir = M2dir::from_path("/tmp/inbox");
279        let mut store =
280            M2dirEntryStore::new(m2dir, b"hi".to_vec(), M2dirEntryStoreOptions::default());
281        let _ = store.resume(None);
282        let _ = store.resume(Some(M2dirArg::Pid(42)));
283
284        let err = match store.resume(Some(M2dirArg::Pid(0))) {
285            M2dirCoroutineState::Complete(Err(err)) => err,
286            state => panic!("expected Complete(Err), got {state:?}"),
287        };
288        assert!(matches!(err, M2dirEntryStoreError::UnexpectedArg));
289    }
290
291    #[test]
292    fn wrong_arg_kind_at_renamed_returns_unexpected_arg_error() {
293        let m2dir = M2dir::from_path("/tmp/inbox");
294        let mut store =
295            M2dirEntryStore::new(m2dir, b"hi".to_vec(), M2dirEntryStoreOptions::default());
296
297        let _ = store.resume(None);
298        let _ = store.resume(Some(M2dirArg::Pid(42)));
299        let _ = store.resume(Some(M2dirArg::Random(b"abcd".to_vec())));
300        let _ = store.resume(Some(M2dirArg::FileCreate));
301
302        let err = match store.resume(Some(M2dirArg::FileCreate)) {
303            M2dirCoroutineState::Complete(Err(err)) => err,
304            state => panic!("expected Complete(Err), got {state:?}"),
305        };
306        assert!(matches!(err, M2dirEntryStoreError::UnexpectedArg));
307    }
308}