dirtyqueue/
lib.rs

1use std::{
2	path::{Path, PathBuf},
3	sync::{
4		Arc,
5		atomic::{AtomicUsize, Ordering},
6	},
7};
8
9use fs2::FileExt;
10use serde::{Serialize, de::Deserialize};
11
12pub use dirtyqueue_derive::DirtyQueue;
13
14const HINT_FILE: &str = "hint";
15
16type SafeUsize = Arc<AtomicUsize>;
17type StdResult<T> = std::result::Result<T, Error>;
18
19pub static mut DIRECTORY_HASH_LEVELS: usize = 1;
20
21#[derive(Debug, Clone)]
22#[allow(dead_code)]
23pub struct Error(Arc<String>);
24
25impl From<Box<dyn std::error::Error>> for Error {
26	#[inline]
27	fn from(value: Box<dyn std::error::Error>) -> Self {
28		Self(Arc::new(value.to_string()))
29	}
30}
31
32impl From<ciborium::de::Error<std::io::Error>> for Error {
33	#[inline]
34	fn from(value: ciborium::de::Error<std::io::Error>) -> Self {
35		Self(Arc::new(value.to_string()))
36	}
37}
38
39impl From<ciborium::ser::Error<std::io::Error>> for Error {
40	#[inline]
41	fn from(value: ciborium::ser::Error<std::io::Error>) -> Self {
42		Self(Arc::new(value.to_string()))
43	}
44}
45
46impl From<std::io::Error> for Error {
47	#[inline]
48	fn from(value: std::io::Error) -> Self {
49		Self(Arc::new(value.to_string()))
50	}
51}
52
53#[inline]
54fn next_with_overflow(u: usize) -> usize {
55	if u == usize::MAX { 0 } else { u + 1 }
56}
57
58#[inline]
59fn advance(u: &SafeUsize) -> StdResult<usize> {
60	let mut cur = u.load(Ordering::SeqCst);
61	while let Err(ret) = u.compare_exchange(
62		cur,
63		next_with_overflow(cur),
64		Ordering::SeqCst,
65		Ordering::SeqCst,
66	) {
67		cur = ret;
68	}
69
70	Ok(next_with_overflow(cur))
71}
72
73fn hash_filename(path: &Path, count: usize) -> PathBuf {
74	let mut path = path.to_path_buf();
75
76	let s = count.to_string();
77	let mut x = 0;
78	let s_len = s.len();
79
80	let mut added = 0;
81
82	while x < s_len && added < unsafe { DIRECTORY_HASH_LEVELS } {
83		let slice = if x + 2 > s_len {
84			&format!("0{}", &s[x..s_len])
85		} else {
86			&s[x..x + 2]
87		};
88
89		path = path.join(slice);
90		x += 2;
91		added += 1;
92	}
93
94	for _ in added..unsafe { DIRECTORY_HASH_LEVELS } {
95		path = path.join("00");
96	}
97
98	path.join(s).to_path_buf()
99}
100
101pub trait Keyed: Sized {
102	fn key(&self) -> usize;
103	fn set_key(&mut self, key: usize) -> usize;
104	fn initialized(&self) -> bool;
105}
106
107pub trait IO: Serialize + for<'de> Deserialize<'de> {
108	fn read_from(filename: &PathBuf) -> StdResult<Self> {
109		let mut f = std::fs::OpenOptions::new().read(true).open(filename)?;
110		Ok(ciborium::from_reader(&mut f)?)
111	}
112
113	#[inline]
114	fn finished(&self, filename: &PathBuf) -> StdResult<()> {
115		std::fs::remove_file(filename)?;
116		Ok(())
117	}
118
119	fn write_to(&self, path: &PathBuf) -> StdResult<()> {
120		let parent = path.parent().map(|x| x.to_str().unwrap()).unwrap_or("/");
121		if !std::fs::exists(parent)? {
122			std::fs::create_dir_all(parent)?;
123		}
124
125		let f = std::fs::OpenOptions::new()
126			.create(true)
127			.write(true)
128			.open(path)?;
129
130		ciborium::into_writer(self, f)?;
131		Ok(())
132	}
133}
134
135#[derive(Debug, Clone)]
136pub struct DirtyQueue<T>
137where
138	T: IO + Keyed + Clone + Sync,
139{
140	root: PathBuf,
141	head: SafeUsize,
142	tail: SafeUsize,
143	_t: std::marker::PhantomData<T>,
144}
145
146impl<T> DirtyQueue<T>
147where
148	T: IO + Keyed + Clone + Sync,
149{
150	pub fn new(path: impl AsRef<Path>) -> StdResult<Self> {
151		if !std::fs::exists(path.as_ref())? {
152			std::fs::create_dir_all(path.as_ref())?;
153		}
154
155		let (head, tail) = Self::take_hint(path.as_ref())?;
156
157		Ok(Self {
158			root: path.as_ref().to_path_buf(),
159			head: Arc::new(AtomicUsize::from(head)),
160			tail: Arc::new(AtomicUsize::from(tail)),
161			_t: Default::default(),
162		})
163	}
164
165	#[inline]
166	pub fn head(&self) -> StdResult<usize> {
167		Ok(self.head.load(Ordering::SeqCst))
168	}
169
170	#[inline]
171	pub fn advance_head(&self) -> StdResult<usize> {
172		advance(&self.head)
173	}
174
175	#[inline]
176	pub fn advance_tail(&self) -> StdResult<usize> {
177		advance(&self.tail)
178	}
179
180	#[inline]
181	pub fn tail(&self) -> StdResult<usize> {
182		Ok(self.tail.load(Ordering::SeqCst))
183	}
184
185	#[inline]
186	pub fn queue_size(&self) -> StdResult<usize> {
187		Ok(self.tail()?.abs_diff(self.head()?))
188	}
189
190	pub fn push(&self, mut obj: T) -> StdResult<usize> {
191		let idx = self.advance_tail()?;
192		obj.set_key(idx);
193		obj.write_to(&hash_filename(&self.root, idx))?;
194		self.write_hint(self.head()?, idx)?;
195
196		Ok(idx)
197	}
198
199	pub fn shift(&self) -> StdResult<T> {
200		let idx = self.advance_head()?;
201		let mut obj = T::read_from(&hash_filename(&self.root, idx))?;
202		obj.set_key(idx);
203		self.write_hint(obj.key(), self.tail()?)?;
204
205		Ok(obj)
206	}
207
208	pub fn finished(&self, obj: T) -> StdResult<T> {
209		if !obj.initialized() {
210			return Ok(obj);
211		}
212
213		obj.finished(&hash_filename(&self.root, obj.key()))?;
214		Ok(obj)
215	}
216
217	#[inline]
218	pub fn shift_finished(&self) -> StdResult<T> {
219		self.finished(self.shift()?)
220	}
221
222	fn write_hint(&self, next: usize, last: usize) -> StdResult<()> {
223		if !std::fs::exists(&self.root)? {
224			std::fs::create_dir_all(&self.root)?;
225		}
226
227		let tmp = format!("{}.tmp", HINT_FILE);
228
229		let mut f = std::fs::OpenOptions::new()
230			.create(true)
231			.write(true)
232			.open(self.root.join(&tmp))?;
233
234		f.lock_exclusive()?;
235
236		// if this check fails, we've raced waiting for a lock and someone else won; exit cleanly so
237		// we don't cause more trouble.
238		ciborium::into_writer(&vec![next, last], &mut f)?;
239		let _ = std::fs::remove_file(self.root.join(HINT_FILE));
240		std::fs::hard_link(self.root.join(&tmp), self.root.join(HINT_FILE))?;
241
242		Ok(())
243	}
244
245	fn take_hint(path: impl AsRef<Path>) -> StdResult<(usize, usize)> {
246		match std::fs::OpenOptions::new()
247			.read(true)
248			.open(path.as_ref().to_path_buf().join(HINT_FILE))
249		{
250			Ok(mut f) => {
251				let v: Vec<usize> = ciborium::from_reader(&mut f)?;
252
253				Ok((v[0], v[1]))
254			}
255			Err(_) => Ok((0, 0)),
256		}
257	}
258}
259
260#[cfg(test)]
261mod tests {
262	use std::{
263		path::PathBuf,
264		str::FromStr,
265		sync::{Arc, atomic::AtomicBool},
266		usize,
267	};
268
269	use fancy_duration::AsFancyDuration;
270	use serde::{Deserialize, Serialize};
271
272	use crate::*;
273
274	// struct used for payload in tests
275	#[derive(Debug, Clone, Serialize, Deserialize, Default, DirtyQueue)]
276	struct Thing {
277		x: usize,
278		#[serde(skip)]
279		_key: Option<usize>,
280	}
281
282	impl Keyed for Thing {
283		fn initialized(&self) -> bool {
284			self._key.is_some()
285		}
286
287		fn key(&self) -> usize {
288			self._key.unwrap_or_default()
289		}
290
291		fn set_key(&mut self, key: usize) -> usize {
292			let old = self.key();
293			self._key = Some(key);
294			old
295		}
296	}
297
298	#[test]
299	fn test_hash_filename() {
300		const LONG_PATH: &str =
301			"/complicated/deep/path/like/seriously/bro/its/looooooooooooooooooong/";
302
303		let roots = vec!["/", LONG_PATH];
304		let usizes = vec![0, 8675309, usize::MAX];
305
306		let biggest = &usize::MAX.to_string()[0..2];
307
308		let results = vec![
309			vec![
310				"/00/0".to_string(),
311				"/86/8675309".to_string(),
312				format!("/{}/", biggest) + &usize::MAX.to_string(),
313			],
314			vec![
315				LONG_PATH.to_string() + "00/0",
316				LONG_PATH.to_string() + "86/8675309",
317				LONG_PATH.to_string() + &format!("{}/", biggest) + &usize::MAX.to_string(),
318			],
319		];
320
321		for (u, size) in usizes.iter().enumerate() {
322			for (r, root) in roots.iter().enumerate() {
323				assert_eq!(
324					PathBuf::from_str(&results[r][u]).unwrap(),
325					hash_filename(&PathBuf::from_str(root).unwrap(), *size)
326				)
327			}
328		}
329	}
330
331	#[test]
332	fn test_construction_big() {
333		const SIZE: usize = 100000;
334
335		let dir = tempfile::tempdir().unwrap();
336		eprintln!("\nbig dir: {}", dir.path().display());
337
338		let queue = DirtyQueue::new(dir.path()).unwrap();
339
340		let start = std::time::Instant::now();
341		for x in 0..SIZE {
342			let res = queue.push(Thing {
343				x,
344				..Default::default()
345			});
346
347			assert!(res.is_ok(), "{} | {:?}", x, res);
348		}
349
350		eprintln!(
351			"\nPush duration: (size: {}): {}",
352			SIZE,
353			(std::time::Instant::now() - start).fancy_duration()
354		);
355
356		let start = std::time::Instant::now();
357		for x in 0..SIZE {
358			let res = queue.shift();
359			assert!(res.is_ok(), "{} | {:?}", x, res);
360			assert_eq!(res.unwrap().x, x, "{}", x);
361		}
362
363		eprintln!(
364			"\nShift duration (size: {}): {}",
365			SIZE,
366			(std::time::Instant::now() - start).fancy_duration()
367		);
368
369		let queue: DirtyQueue<Thing> = DirtyQueue::new(dir.path()).unwrap();
370		assert!(queue.shift().is_err())
371	}
372
373	#[test]
374	fn test_construction_overflow() {
375		let dir = tempfile::tempdir().unwrap();
376
377		let queue = DirtyQueue::new(dir.path()).unwrap();
378		queue.head.store(usize::MAX, Ordering::SeqCst);
379		queue.tail.store(usize::MAX, Ordering::SeqCst);
380
381		for x in 0..100 {
382			let res = queue.push(Thing {
383				x,
384				..Default::default()
385			});
386
387			assert!(res.is_ok(), "{} | {:?}", x, res);
388		}
389
390		for x in 0..100 {
391			let res = queue.shift();
392			assert!(res.is_ok(), "{} | {:?}", x, res);
393			assert_eq!(res.unwrap().x, x, "{}", x);
394		}
395	}
396
397	#[test]
398	fn test_construction_use() {
399		let dir = tempfile::tempdir().unwrap();
400
401		let queue = DirtyQueue::new(dir.path()).unwrap();
402
403		for x in 0..100 {
404			let res = queue.push(Thing {
405				x,
406				..Default::default()
407			});
408
409			assert!(res.is_ok(), "{} | {:?}", x, res);
410		}
411
412		for x in 0..100 {
413			let res = queue.shift();
414			assert!(res.is_ok(), "{} | {:?}", x, res);
415			assert_eq!(res.unwrap().x, x, "{}", x);
416		}
417	}
418
419	#[tokio::test]
420	async fn test_tokio() {
421		const SIZE: usize = 100000;
422		let dir = tempfile::tempdir().unwrap();
423		eprintln!("\ntokio dir: {}", dir.path().display());
424
425		let queue = DirtyQueue::new(dir.path()).unwrap();
426
427		let bool: Arc<AtomicBool> = Default::default();
428
429		let q = queue.clone();
430		let b = bool.clone();
431		tokio::spawn(async move {
432			let start = std::time::Instant::now();
433			for x in 0..SIZE {
434				let res = q.push(Thing {
435					x,
436					..Default::default()
437				});
438				assert!(res.is_ok(), "{} | {:?}", x, res);
439			}
440			eprintln!(
441				"\ntokio push round trip: {}",
442				(std::time::Instant::now() - start).fancy_duration()
443			);
444			b.store(true, Ordering::SeqCst);
445		});
446
447		while !bool.load(Ordering::SeqCst) {
448			tokio::time::sleep(std::time::Duration::from_nanos(100)).await;
449		}
450
451		let mut count = 1;
452		let start = std::time::Instant::now();
453		while let Ok(res) = queue.shift() {
454			assert_eq!(res.key(), count);
455			count += 1;
456		}
457
458		eprintln!(
459			"\ntokio shift round trip: {}",
460			(std::time::Instant::now() - start).fancy_duration()
461		);
462
463		assert_eq!(count - 1, SIZE);
464
465		let queue: DirtyQueue<Thing> = DirtyQueue::new(dir.path()).unwrap();
466		assert!(queue.shift().is_err())
467	}
468
469	#[test]
470	fn test_thread() {
471		const SIZE: usize = 100000;
472		let dir = tempfile::tempdir().unwrap();
473		eprintln!("\nthread dir: {}", dir.path().display());
474
475		let queue = DirtyQueue::new(dir.path()).unwrap();
476		let bool: Arc<AtomicBool> = Default::default();
477
478		let q = queue.clone();
479		let b = bool.clone();
480		std::thread::spawn(move || {
481			let start = std::time::Instant::now();
482			for x in 0..SIZE {
483				let res = q.push(Thing {
484					x,
485					..Default::default()
486				});
487				assert!(res.is_ok(), "{} | {:?}", x, res);
488			}
489			eprintln!(
490				"\nthread push round trip: {}",
491				(std::time::Instant::now() - start).fancy_duration()
492			);
493			b.store(true, Ordering::SeqCst);
494		});
495
496		while !bool.load(Ordering::SeqCst) {
497			std::thread::sleep(std::time::Duration::from_nanos(100));
498		}
499
500		// internally, the queue's keys start counting at 1, not 0.
501		let mut count = 1;
502		let start = std::time::Instant::now();
503		while let Ok(res) = queue.shift() {
504			assert_eq!(res.key(), count);
505			count += 1;
506		}
507
508		eprintln!(
509			"\nthread shift round trip: {}",
510			(std::time::Instant::now() - start).fancy_duration()
511		);
512
513		assert_eq!(count - 1, SIZE);
514
515		let queue: DirtyQueue<Thing> = DirtyQueue::new(dir.path()).unwrap();
516		assert!(queue.shift().is_err())
517	}
518}