1use std::{
11 collections::{BTreeMap, BTreeSet, hash_map::RandomState},
12 fs,
13 hash::{BuildHasher, Hasher},
14 io,
15 path::{Path, PathBuf},
16 process,
17 string::ToString,
18 thread,
19 vec::Vec,
20};
21
22use log::trace;
23use thiserror::Error;
24
25use crate::{
26 coroutine::*,
27 entry::{
28 delete::*,
29 get::*,
30 list::*,
31 store::*,
32 types::{M2dirEntry, M2dirFullEntry, ParseFilenameError},
33 utils::validate_checksum,
34 },
35 flag::types::M2dirFlags,
36 flag::{add::*, remove::*, set::*},
37 m2dir::{
38 create::*,
39 delete::*,
40 list::*,
41 types::{DOT_M2DIR, LoadM2dirError, M2dir},
42 },
43 path::M2dirPath,
44 store::{DOT_M2STORE, M2dirStore, M2dirStoreError},
45};
46
47#[derive(Debug, Error)]
49pub enum M2dirClientError {
50 #[error(transparent)]
51 Store(#[from] M2dirStoreError),
52 #[error(transparent)]
53 LoadM2dir(#[from] LoadM2dirError),
54 #[error(transparent)]
55 CreateM2dir(#[from] M2dirCreateError),
56 #[error(transparent)]
57 DeleteM2dir(#[from] M2dirDeleteError),
58 #[error(transparent)]
59 ListM2dirs(#[from] M2dirListError),
60 #[error(transparent)]
61 ListEntries(#[from] M2dirEntryListError),
62 #[error(transparent)]
63 GetEntry(#[from] M2dirEntryGetError),
64 #[error(transparent)]
65 StoreEntry(#[from] M2dirEntryStoreError),
66 #[error(transparent)]
67 DeleteEntry(#[from] M2dirEntryDeleteError),
68 #[error(transparent)]
69 AddFlags(#[from] M2dirFlagAddError),
70 #[error(transparent)]
71 RemoveFlags(#[from] M2dirFlagRemoveError),
72 #[error(transparent)]
73 SetFlags(#[from] M2dirFlagSetError),
74 #[error(transparent)]
75 Parse(#[from] ParseFilenameError),
76 #[error(transparent)]
77 Io(#[from] io::Error),
78}
79
80#[derive(Debug)]
86pub struct M2dirClient {
87 root: M2dirPath,
88}
89
90impl M2dirClient {
91 pub fn new(root: impl Into<M2dirPath>) -> Self {
94 Self { root: root.into() }
95 }
96
97 pub fn root(&self) -> &M2dirPath {
99 &self.root
100 }
101
102 pub fn run<C, T, E>(&self, mut coroutine: C) -> Result<T, M2dirClientError>
106 where
107 C: M2dirCoroutine<Yield = M2dirYield, Return = Result<T, E>>,
108 M2dirClientError: From<E>,
109 {
110 let mut arg: Option<M2dirArg> = None;
111
112 loop {
113 match coroutine.resume(arg.take()) {
114 M2dirCoroutineState::Complete(Ok(out)) => return Ok(out),
115 M2dirCoroutineState::Complete(Err(err)) => return Err(err.into()),
116 M2dirCoroutineState::Yielded(M2dirYield::WantsPid) => {
117 arg = Some(M2dirArg::Pid(process::id()));
118 }
119 M2dirCoroutineState::Yielded(M2dirYield::WantsRandom { len }) => {
120 arg = Some(M2dirArg::Random(random_bytes(len)));
121 }
122 M2dirCoroutineState::Yielded(M2dirYield::WantsFileExists(paths)) => {
123 arg = Some(M2dirArg::FileExists(file_exists(paths)));
124 }
125 M2dirCoroutineState::Yielded(M2dirYield::WantsDirRead(paths)) => {
126 arg = Some(M2dirArg::DirRead(read_dirs(paths)?));
127 }
128 M2dirCoroutineState::Yielded(M2dirYield::WantsDirCreate(paths)) => {
129 create_dirs(paths)?;
130 arg = Some(M2dirArg::DirCreate);
131 }
132 M2dirCoroutineState::Yielded(M2dirYield::WantsDirRemove(paths)) => {
133 remove_dirs(paths)?;
134 arg = Some(M2dirArg::DirRemove);
135 }
136 M2dirCoroutineState::Yielded(M2dirYield::WantsFileRead(paths)) => {
137 arg = Some(M2dirArg::FileRead(read_files_tolerant(paths)?));
138 }
139 M2dirCoroutineState::Yielded(M2dirYield::WantsFileCreate(files)) => {
140 write_files(files)?;
141 arg = Some(M2dirArg::FileCreate);
142 }
143 M2dirCoroutineState::Yielded(M2dirYield::WantsFileRemove(paths)) => {
144 remove_files_tolerant(paths)?;
145 arg = Some(M2dirArg::FileRemove);
146 }
147 M2dirCoroutineState::Yielded(M2dirYield::WantsRename(pairs)) => {
148 rename_paths(pairs)?;
149 arg = Some(M2dirArg::Rename);
150 }
151 }
152 }
153 }
154
155 pub fn open_store(&self) -> Result<M2dirStore, M2dirClientError> {
158 load_store(self.root.clone()).map_err(Into::into)
159 }
160
161 pub fn init_store(&self) -> Result<M2dirStore, M2dirClientError> {
164 trace!("init m2store at {}", self.root);
165
166 fs::create_dir_all(self.root.as_str())?;
167 let marker = self.root.join(DOT_M2STORE);
168 if !Path::new(marker.as_str()).exists() {
169 fs::write(marker.as_str(), b"")?;
170 }
171
172 Ok(M2dirStore::from_path(self.root.clone()))
173 }
174
175 pub fn open_m2dir(&self, path: impl Into<M2dirPath>) -> Result<M2dir, M2dirClientError> {
178 load_m2dir(path.into()).map_err(Into::into)
179 }
180
181 pub fn create_m2dir(&self, name: &str) -> Result<M2dir, M2dirClientError> {
186 let store = self.open_store()?;
187 let coroutine = M2dirCreate::new(&store, name, M2dirCreateOptions::default())?;
188 self.run(coroutine)
189 }
190
191 pub fn delete_m2dir(&self, path: impl Into<M2dirPath>) -> Result<(), M2dirClientError> {
193 self.run(M2dirDelete::new(path, M2dirDeleteOptions::default()))
194 }
195
196 pub fn list_m2dirs(&self) -> Result<BTreeSet<M2dir>, M2dirClientError> {
198 let store = self.open_store()?;
199 self.run(M2dirList::new(&store, M2dirListOptions::default()))
200 }
201
202 pub fn list_entries(&self, m2dir: M2dir) -> Result<Vec<M2dirEntry>, M2dirClientError> {
206 self.run(M2dirEntryList::new(m2dir, M2dirEntryListOptions::default()))
207 }
208
209 pub fn read_entry(&self, entry: &M2dirEntry) -> Result<Vec<u8>, M2dirClientError> {
214 let path = entry.path();
215 trace!("read entry at {path}");
216
217 let bytes = fs::read(path.as_str())?;
218 let checksum = entry.checksum();
219
220 if !validate_checksum(checksum, &bytes) {
221 return Err(ParseFilenameError::InvalidChecksum {
222 path: path.clone(),
223 expected: checksum.to_string(),
224 got: entry.id().to_string(),
225 }
226 .into());
227 }
228
229 Ok(bytes)
230 }
231
232 pub fn read_entries(
238 &self,
239 m2dir: &M2dir,
240 entries: &[M2dirEntry],
241 ) -> Result<BTreeSet<M2dirFullEntry>, M2dirClientError> {
242 entries
243 .iter()
244 .map(|entry| self.read_full_entry(m2dir, entry))
245 .collect()
246 }
247
248 pub fn read_entries_par(
252 &self,
253 m2dir: &M2dir,
254 entries: &[M2dirEntry],
255 ) -> Result<BTreeSet<M2dirFullEntry>, M2dirClientError> {
256 if entries.len() <= 1 {
257 return entries
258 .iter()
259 .map(|entry| self.read_full_entry(m2dir, entry))
260 .collect();
261 }
262
263 let n_threads = thread::available_parallelism()
264 .map(|n| n.get())
265 .unwrap_or(8)
266 .min(entries.len());
267
268 let chunk_size = entries.len().div_ceil(n_threads);
269
270 thread::scope(|s| -> Result<BTreeSet<M2dirFullEntry>, M2dirClientError> {
271 let mut handles = Vec::with_capacity(n_threads);
272
273 for chunk in entries.chunks(chunk_size) {
274 let this = self;
275
276 handles.push(s.spawn(move || {
277 chunk
278 .iter()
279 .map(|entry| this.read_full_entry(m2dir, entry))
280 .collect::<Result<Vec<_>, _>>()
281 }));
282 }
283
284 let mut out = BTreeSet::new();
285
286 for handle in handles {
287 for full in handle.join().expect("m2dir worker thread panicked")? {
288 out.insert(full);
289 }
290 }
291
292 Ok(out)
293 })
294 }
295
296 fn read_full_entry(
297 &self,
298 m2dir: &M2dir,
299 entry: &M2dirEntry,
300 ) -> Result<M2dirFullEntry, M2dirClientError> {
301 let contents = self.read_entry(entry)?;
302 let flags = self.read_flags(m2dir, entry.id())?;
303
304 Ok(M2dirFullEntry::from_parts(entry.clone(), contents, flags))
305 }
306
307 pub fn get(
310 &self,
311 m2dir: M2dir,
312 id: impl ToString,
313 ) -> Result<(M2dirEntry, Vec<u8>), M2dirClientError> {
314 let M2dirEntryGetOutput { entry, contents } = self.run(M2dirEntryGet::new(
315 m2dir,
316 id,
317 M2dirEntryGetOptions::default(),
318 ))?;
319 Ok((entry, contents))
320 }
321
322 pub fn store(&self, m2dir: M2dir, bytes: Vec<u8>) -> Result<M2dirEntry, M2dirClientError> {
325 self.run(M2dirEntryStore::new(
326 m2dir,
327 bytes,
328 M2dirEntryStoreOptions::default(),
329 ))
330 }
331
332 pub fn delete_entry(&self, m2dir: M2dir, id: impl ToString) -> Result<(), M2dirClientError> {
334 self.run(M2dirEntryDelete::new(
335 m2dir,
336 id,
337 M2dirEntryDeleteOptions::default(),
338 ))
339 }
340
341 pub fn read_flags(
346 &self,
347 m2dir: &M2dir,
348 id: impl AsRef<str>,
349 ) -> Result<M2dirFlags, M2dirClientError> {
350 let path = m2dir.flags_path(id.as_ref());
351 trace!("read flags at {path}");
352
353 match fs::read_to_string(path.as_str()) {
354 Ok(text) => Ok(M2dirFlags::from_meta(&text)),
355 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(M2dirFlags::default()),
356 Err(err) => Err(err.into()),
357 }
358 }
359
360 pub fn add_flags(
362 &self,
363 m2dir: &M2dir,
364 id: impl AsRef<str>,
365 flags: M2dirFlags,
366 ) -> Result<(), M2dirClientError> {
367 self.run(M2dirFlagAdd::new(
368 m2dir,
369 id,
370 flags,
371 M2dirFlagAddOptions::default(),
372 ))
373 }
374
375 pub fn remove_flags(
378 &self,
379 m2dir: &M2dir,
380 id: impl AsRef<str>,
381 flags: M2dirFlags,
382 ) -> Result<(), M2dirClientError> {
383 self.run(M2dirFlagRemove::new(
384 m2dir,
385 id,
386 flags,
387 M2dirFlagRemoveOptions::default(),
388 ))
389 }
390
391 pub fn set_flags(
394 &self,
395 m2dir: &M2dir,
396 id: impl AsRef<str>,
397 flags: M2dirFlags,
398 ) -> Result<(), M2dirClientError> {
399 self.run(M2dirFlagSet::new(
400 m2dir,
401 id,
402 flags,
403 M2dirFlagSetOptions::default(),
404 ))
405 }
406}
407
408fn load_store(path: M2dirPath) -> Result<M2dirStore, M2dirStoreError> {
411 if !Path::new(path.as_str()).is_dir() {
412 return Err(M2dirStoreError::NotDir(path));
413 }
414
415 let marker = path.join(DOT_M2STORE);
416 if !Path::new(marker.as_str()).exists() {
417 return Err(M2dirStoreError::NoDotM2store(path));
418 }
419
420 Ok(M2dirStore::from_path(path))
421}
422
423fn load_m2dir(path: M2dirPath) -> Result<M2dir, LoadM2dirError> {
424 if !Path::new(path.as_str()).is_dir() {
425 return Err(LoadM2dirError::NotDir(path));
426 }
427
428 let marker = path.join(DOT_M2DIR);
429 if !Path::new(marker.as_str()).exists() {
430 return Err(LoadM2dirError::NoDotM2dir(path));
431 }
432
433 Ok(M2dir::from_path(path))
434}
435
436fn normalize_path(path: PathBuf) -> M2dirPath {
439 let s = path.to_string_lossy().into_owned();
440 #[cfg(windows)]
441 let s = s.replace('\\', "/");
442 M2dirPath::new(s)
443}
444
445fn create_dirs(paths: BTreeSet<M2dirPath>) -> Result<(), io::Error> {
448 for path in paths {
449 trace!("create_dir_all {path}");
450 fs::create_dir_all(path.as_str())?;
451 }
452 Ok(())
453}
454
455fn remove_dirs(paths: BTreeSet<M2dirPath>) -> Result<(), io::Error> {
456 for path in paths {
457 trace!("remove_dir_all {path}");
458 fs::remove_dir_all(path.as_str())?;
459 }
460 Ok(())
461}
462
463fn write_files(files: BTreeMap<M2dirPath, Vec<u8>>) -> Result<(), io::Error> {
464 for (path, contents) in files {
465 trace!("write {path} ({} bytes)", contents.len());
466
467 if let Some(parent) = Path::new(path.as_str()).parent() {
468 fs::create_dir_all(parent)?;
469 }
470 fs::write(path.as_str(), &contents)?;
471 }
472 Ok(())
473}
474
475fn remove_files_tolerant(paths: BTreeSet<M2dirPath>) -> Result<(), io::Error> {
476 for path in paths {
477 trace!("remove_file (tolerant) {path}");
478 match fs::remove_file(path.as_str()) {
479 Ok(()) => {}
480 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
481 Err(err) => return Err(err),
482 }
483 }
484 Ok(())
485}
486
487fn read_dirs(
488 paths: BTreeSet<M2dirPath>,
489) -> Result<BTreeMap<M2dirPath, BTreeSet<M2dirPath>>, io::Error> {
490 let mut entries = BTreeMap::new();
491
492 for path in paths {
493 trace!("read_dir {path}");
494
495 let mut names = BTreeSet::new();
496 match fs::read_dir(path.as_str()) {
497 Ok(iter) => {
498 for entry in iter {
499 let entry = entry?;
500 names.insert(normalize_path(entry.path()));
501 }
502 }
503 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
504 Err(err) if err.kind() == io::ErrorKind::NotADirectory => {}
505 Err(err) => return Err(err),
506 }
507
508 entries.insert(path, names);
509 }
510
511 Ok(entries)
512}
513
514fn read_files_tolerant(
515 paths: BTreeSet<M2dirPath>,
516) -> Result<BTreeMap<M2dirPath, Vec<u8>>, io::Error> {
517 let mut contents = BTreeMap::new();
518
519 for path in paths {
520 trace!("read_file (tolerant) {path}");
521 match fs::read(path.as_str()) {
522 Ok(bytes) => {
523 contents.insert(path, bytes);
524 }
525 Err(err) if err.kind() == io::ErrorKind::NotFound => {
526 contents.insert(path, Vec::new());
527 }
528 Err(err) => return Err(err),
529 }
530 }
531
532 Ok(contents)
533}
534
535fn rename_paths(pairs: Vec<(M2dirPath, M2dirPath)>) -> Result<(), io::Error> {
536 for (from, to) in pairs {
537 trace!("rename {from} -> {to}");
538 fs::rename(from.as_str(), to.as_str())?;
539 }
540 Ok(())
541}
542
543fn file_exists(paths: BTreeSet<M2dirPath>) -> BTreeMap<M2dirPath, bool> {
544 let mut out = BTreeMap::new();
545 for path in paths {
546 let exists = fs::metadata(path.as_str())
547 .map(|m| m.is_file())
548 .unwrap_or(false);
549 trace!("file_exists {path}: {exists}");
550 out.insert(path, exists);
551 }
552 out
553}
554
555fn random_bytes(len: usize) -> Vec<u8> {
560 let mut state = RandomState::new().build_hasher().finish();
561 if state == 0 {
562 state = 0xdeadbeef;
563 }
564
565 let mut out = Vec::with_capacity(len);
566 let mut buf = 0u64;
567 let mut i = 8;
568
569 while out.len() < len {
570 if i == 8 {
571 state ^= state << 13;
572 state ^= state >> 7;
573 state ^= state << 17;
574 buf = state;
575 i = 0;
576 }
577 out.push(buf as u8);
578 buf >>= 8;
579 i += 1;
580 }
581
582 out
583}
584
585#[cfg(test)]
586mod tests {
587 use std::path::Path;
588
589 use tempfile::tempdir;
590
591 use crate::{client::*, flag::types::M2dirFlags, store::DOT_M2STORE};
592
593 fn client() -> (tempfile::TempDir, M2dirClient) {
594 let dir = tempdir().unwrap();
595 let root = dir.path().to_string_lossy().into_owned();
596 let client = M2dirClient::new(root);
597 client.init_store().unwrap();
598 (dir, client)
599 }
600
601 #[test]
602 fn init_store_writes_marker() {
603 let (dir, _client) = client();
604 assert!(dir.path().join(DOT_M2STORE).exists());
605 }
606
607 #[test]
608 fn create_m2dir_writes_marker() {
609 let (_dir, client) = client();
610
611 let inbox = client.create_m2dir("inbox").unwrap();
612 assert!(Path::new(inbox.path().as_str()).is_dir());
613 assert!(Path::new(inbox.marker_path().as_str()).exists());
614 assert!(Path::new(inbox.meta_dir().as_str()).is_dir());
615 }
616
617 #[test]
618 fn list_m2dirs_finds_created_folder() {
619 let (_dir, client) = client();
620
621 client.create_m2dir("inbox").unwrap();
622 client.create_m2dir("sent").unwrap();
623
624 let m2dirs = client.list_m2dirs().unwrap();
625 assert_eq!(m2dirs.len(), 2);
626 }
627
628 #[test]
629 fn store_and_list_entries_round_trip() {
630 let (_dir, client) = client();
631
632 let inbox = client.create_m2dir("inbox").unwrap();
633 let msg = b"From: alice@example.org\r\nDate: Tue, 15 Apr 1994 08:12:31 GMT\r\nSubject: hi\r\n\r\nbody\r\n";
634
635 let entry = client.store(inbox.clone(), msg.to_vec()).unwrap();
636 assert!(Path::new(entry.path().as_str()).is_file());
637
638 let listed = client.list_entries(inbox.clone()).unwrap();
639 assert_eq!(listed.len(), 1);
640 assert_eq!(listed[0].id(), entry.id());
641
642 let (fetched, contents) = client.get(inbox, entry.id()).unwrap();
643 assert_eq!(fetched.id(), entry.id());
644 assert_eq!(contents, msg);
645 }
646
647 #[test]
648 fn flags_round_trip_via_meta() {
649 let (_dir, client) = client();
650
651 let inbox = client.create_m2dir("inbox").unwrap();
652 let msg = b"From: a\r\n\r\nbody\r\n";
653 let entry = client.store(inbox.clone(), msg.to_vec()).unwrap();
654
655 let initial = client.read_flags(&inbox, entry.id()).unwrap();
656 assert_eq!(initial.len(), 0);
657
658 let mut to_add = M2dirFlags::default();
659 to_add.insert("$seen");
660 to_add.insert("$forwarded");
661 client.add_flags(&inbox, entry.id(), to_add).unwrap();
662
663 let after_add = client.read_flags(&inbox, entry.id()).unwrap();
664 assert_eq!(after_add.len(), 2);
665 assert!(after_add.contains("$seen"));
666 assert!(after_add.contains("$forwarded"));
667
668 let mut to_remove = M2dirFlags::default();
669 to_remove.insert("$seen");
670 client.remove_flags(&inbox, entry.id(), to_remove).unwrap();
671
672 let after_remove = client.read_flags(&inbox, entry.id()).unwrap();
673 assert_eq!(after_remove.len(), 1);
674 assert!(after_remove.contains("$forwarded"));
675
676 let mut replacement = M2dirFlags::default();
677 replacement.insert("custom");
678 replacement.insert("$junk");
679 client.set_flags(&inbox, entry.id(), replacement).unwrap();
680
681 let after_set = client.read_flags(&inbox, entry.id()).unwrap();
682 assert_eq!(after_set.len(), 2);
683 assert!(after_set.contains("custom"));
684 assert!(after_set.contains("$junk"));
685
686 client
687 .set_flags(&inbox, entry.id(), M2dirFlags::default())
688 .unwrap();
689 let after_clear = client.read_flags(&inbox, entry.id()).unwrap();
690 assert!(after_clear.is_empty());
691 assert!(!Path::new(inbox.flags_path(entry.id()).as_str()).exists());
692 }
693
694 #[test]
695 fn delete_entry_removes_file_and_flags_meta() {
696 let (_dir, client) = client();
697
698 let inbox = client.create_m2dir("inbox").unwrap();
699 let entry = client.store(inbox.clone(), b"hello".to_vec()).unwrap();
700
701 let mut flags = M2dirFlags::default();
702 flags.insert("$seen");
703 client.add_flags(&inbox, entry.id(), flags).unwrap();
704 assert!(Path::new(inbox.flags_path(entry.id()).as_str()).exists());
705
706 client.delete_entry(inbox.clone(), entry.id()).unwrap();
707 assert!(!Path::new(entry.path().as_str()).exists());
708 assert!(!Path::new(inbox.flags_path(entry.id()).as_str()).exists());
709
710 let listed = client.list_entries(inbox).unwrap();
711 assert!(listed.is_empty());
712 }
713
714 #[test]
715 fn delete_m2dir_removes_tree() {
716 let (_dir, client) = client();
717
718 let inbox = client.create_m2dir("inbox").unwrap();
719 let path = inbox.path().clone();
720 assert!(Path::new(path.as_str()).is_dir());
721
722 client.delete_m2dir(path.clone()).unwrap();
723 assert!(!Path::new(path.as_str()).exists());
724 }
725}