1use std::{
2 path::{Path, PathBuf},
3 sync::{
4 Arc,
5 atomic::{AtomicUsize, Ordering},
6 },
7};
8
9use fs2::FileExt;
10use serde::{Serialize, de::Deserialize};
11
12pub use dirtyqueue_derive::DirtyQueue;
13
14const HINT_FILE: &str = "hint";
15
16type SafeUsize = Arc<AtomicUsize>;
17type StdResult<T> = std::result::Result<T, Error>;
18
19pub static mut DIRECTORY_HASH_LEVELS: usize = 1;
20
21#[derive(Debug, Clone)]
22#[allow(dead_code)]
23pub struct Error(Arc<String>);
24
25impl From<Box<dyn std::error::Error>> for Error {
26 #[inline]
27 fn from(value: Box<dyn std::error::Error>) -> Self {
28 Self(Arc::new(value.to_string()))
29 }
30}
31
32impl From<ciborium::de::Error<std::io::Error>> for Error {
33 #[inline]
34 fn from(value: ciborium::de::Error<std::io::Error>) -> Self {
35 Self(Arc::new(value.to_string()))
36 }
37}
38
39impl From<ciborium::ser::Error<std::io::Error>> for Error {
40 #[inline]
41 fn from(value: ciborium::ser::Error<std::io::Error>) -> Self {
42 Self(Arc::new(value.to_string()))
43 }
44}
45
46impl From<std::io::Error> for Error {
47 #[inline]
48 fn from(value: std::io::Error) -> Self {
49 Self(Arc::new(value.to_string()))
50 }
51}
52
53#[inline]
54fn next_with_overflow(u: usize) -> usize {
55 if u == usize::MAX { 0 } else { u + 1 }
56}
57
58#[inline]
59fn advance(u: &SafeUsize) -> StdResult<usize> {
60 let mut cur = u.load(Ordering::SeqCst);
61 while let Err(ret) = u.compare_exchange(
62 cur,
63 next_with_overflow(cur),
64 Ordering::SeqCst,
65 Ordering::SeqCst,
66 ) {
67 cur = ret;
68 }
69
70 Ok(next_with_overflow(cur))
71}
72
73fn hash_filename(path: &Path, count: usize) -> PathBuf {
74 let mut path = path.to_path_buf();
75
76 let s = count.to_string();
77 let mut x = 0;
78 let s_len = s.len();
79
80 let mut added = 0;
81
82 while x < s_len && added < unsafe { DIRECTORY_HASH_LEVELS } {
83 let slice = if x + 2 > s_len {
84 &format!("0{}", &s[x..s_len])
85 } else {
86 &s[x..x + 2]
87 };
88
89 path = path.join(slice);
90 x += 2;
91 added += 1;
92 }
93
94 for _ in added..unsafe { DIRECTORY_HASH_LEVELS } {
95 path = path.join("00");
96 }
97
98 path.join(s).to_path_buf()
99}
100
101pub trait Keyed: Sized {
102 fn key(&self) -> usize;
103 fn set_key(&mut self, key: usize) -> usize;
104 fn initialized(&self) -> bool;
105}
106
107pub trait IO: Serialize + for<'de> Deserialize<'de> {
108 fn read_from(filename: &PathBuf) -> StdResult<Self> {
109 let mut f = std::fs::OpenOptions::new().read(true).open(filename)?;
110 Ok(ciborium::from_reader(&mut f)?)
111 }
112
113 #[inline]
114 fn finished(&self, filename: &PathBuf) -> StdResult<()> {
115 std::fs::remove_file(filename)?;
116 Ok(())
117 }
118
119 fn write_to(&self, path: &PathBuf) -> StdResult<()> {
120 let parent = path.parent().map(|x| x.to_str().unwrap()).unwrap_or("/");
121 if !std::fs::exists(parent)? {
122 std::fs::create_dir_all(parent)?;
123 }
124
125 let f = std::fs::OpenOptions::new()
126 .create(true)
127 .write(true)
128 .open(path)?;
129
130 ciborium::into_writer(self, f)?;
131 Ok(())
132 }
133}
134
135#[derive(Debug, Clone)]
136pub struct DirtyQueue<T>
137where
138 T: IO + Keyed + Clone + Sync,
139{
140 root: PathBuf,
141 head: SafeUsize,
142 tail: SafeUsize,
143 _t: std::marker::PhantomData<T>,
144}
145
146impl<T> DirtyQueue<T>
147where
148 T: IO + Keyed + Clone + Sync,
149{
150 pub fn new(path: impl AsRef<Path>) -> StdResult<Self> {
151 if !std::fs::exists(path.as_ref())? {
152 std::fs::create_dir_all(path.as_ref())?;
153 }
154
155 let (head, tail) = Self::take_hint(path.as_ref())?;
156
157 Ok(Self {
158 root: path.as_ref().to_path_buf(),
159 head: Arc::new(AtomicUsize::from(head)),
160 tail: Arc::new(AtomicUsize::from(tail)),
161 _t: Default::default(),
162 })
163 }
164
165 #[inline]
166 pub fn head(&self) -> StdResult<usize> {
167 Ok(self.head.load(Ordering::SeqCst))
168 }
169
170 #[inline]
171 pub fn advance_head(&self) -> StdResult<usize> {
172 advance(&self.head)
173 }
174
175 #[inline]
176 pub fn advance_tail(&self) -> StdResult<usize> {
177 advance(&self.tail)
178 }
179
180 #[inline]
181 pub fn tail(&self) -> StdResult<usize> {
182 Ok(self.tail.load(Ordering::SeqCst))
183 }
184
185 #[inline]
186 pub fn queue_size(&self) -> StdResult<usize> {
187 Ok(self.tail()?.abs_diff(self.head()?))
188 }
189
190 pub fn push(&self, mut obj: T) -> StdResult<usize> {
191 let idx = self.advance_tail()?;
192 obj.set_key(idx);
193 obj.write_to(&hash_filename(&self.root, idx))?;
194 self.write_hint(self.head()?, idx)?;
195
196 Ok(idx)
197 }
198
199 pub fn shift(&self) -> StdResult<T> {
200 let idx = self.advance_head()?;
201 let mut obj = T::read_from(&hash_filename(&self.root, idx))?;
202 obj.set_key(idx);
203 self.write_hint(obj.key(), self.tail()?)?;
204
205 Ok(obj)
206 }
207
208 pub fn finished(&self, obj: T) -> StdResult<T> {
209 if !obj.initialized() {
210 return Ok(obj);
211 }
212
213 obj.finished(&hash_filename(&self.root, obj.key()))?;
214 Ok(obj)
215 }
216
217 #[inline]
218 pub fn shift_finished(&self) -> StdResult<T> {
219 self.finished(self.shift()?)
220 }
221
222 fn write_hint(&self, next: usize, last: usize) -> StdResult<()> {
223 if !std::fs::exists(&self.root)? {
224 std::fs::create_dir_all(&self.root)?;
225 }
226
227 let tmp = format!("{}.tmp", HINT_FILE);
228
229 let mut f = std::fs::OpenOptions::new()
230 .create(true)
231 .write(true)
232 .open(self.root.join(&tmp))?;
233
234 f.lock_exclusive()?;
235
236 ciborium::into_writer(&vec![next, last], &mut f)?;
239 let _ = std::fs::remove_file(self.root.join(HINT_FILE));
240 std::fs::hard_link(self.root.join(&tmp), self.root.join(HINT_FILE))?;
241
242 Ok(())
243 }
244
245 fn take_hint(path: impl AsRef<Path>) -> StdResult<(usize, usize)> {
246 match std::fs::OpenOptions::new()
247 .read(true)
248 .open(path.as_ref().to_path_buf().join(HINT_FILE))
249 {
250 Ok(mut f) => {
251 let v: Vec<usize> = ciborium::from_reader(&mut f)?;
252
253 Ok((v[0], v[1]))
254 }
255 Err(_) => Ok((0, 0)),
256 }
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use std::{
263 path::PathBuf,
264 str::FromStr,
265 sync::{Arc, atomic::AtomicBool},
266 usize,
267 };
268
269 use fancy_duration::AsFancyDuration;
270 use serde::{Deserialize, Serialize};
271
272 use crate::*;
273
274 #[derive(Debug, Clone, Serialize, Deserialize, Default, DirtyQueue)]
276 struct Thing {
277 x: usize,
278 #[serde(skip)]
279 _key: Option<usize>,
280 }
281
282 impl Keyed for Thing {
283 fn initialized(&self) -> bool {
284 self._key.is_some()
285 }
286
287 fn key(&self) -> usize {
288 self._key.unwrap_or_default()
289 }
290
291 fn set_key(&mut self, key: usize) -> usize {
292 let old = self.key();
293 self._key = Some(key);
294 old
295 }
296 }
297
298 #[test]
299 fn test_hash_filename() {
300 const LONG_PATH: &str =
301 "/complicated/deep/path/like/seriously/bro/its/looooooooooooooooooong/";
302
303 let roots = vec!["/", LONG_PATH];
304 let usizes = vec![0, 8675309, usize::MAX];
305
306 let biggest = &usize::MAX.to_string()[0..2];
307
308 let results = vec![
309 vec![
310 "/00/0".to_string(),
311 "/86/8675309".to_string(),
312 format!("/{}/", biggest) + &usize::MAX.to_string(),
313 ],
314 vec![
315 LONG_PATH.to_string() + "00/0",
316 LONG_PATH.to_string() + "86/8675309",
317 LONG_PATH.to_string() + &format!("{}/", biggest) + &usize::MAX.to_string(),
318 ],
319 ];
320
321 for (u, size) in usizes.iter().enumerate() {
322 for (r, root) in roots.iter().enumerate() {
323 assert_eq!(
324 PathBuf::from_str(&results[r][u]).unwrap(),
325 hash_filename(&PathBuf::from_str(root).unwrap(), *size)
326 )
327 }
328 }
329 }
330
331 #[test]
332 fn test_construction_big() {
333 const SIZE: usize = 100000;
334
335 let dir = tempfile::tempdir().unwrap();
336 eprintln!("\nbig dir: {}", dir.path().display());
337
338 let queue = DirtyQueue::new(dir.path()).unwrap();
339
340 let start = std::time::Instant::now();
341 for x in 0..SIZE {
342 let res = queue.push(Thing {
343 x,
344 ..Default::default()
345 });
346
347 assert!(res.is_ok(), "{} | {:?}", x, res);
348 }
349
350 eprintln!(
351 "\nPush duration: (size: {}): {}",
352 SIZE,
353 (std::time::Instant::now() - start).fancy_duration()
354 );
355
356 let start = std::time::Instant::now();
357 for x in 0..SIZE {
358 let res = queue.shift();
359 assert!(res.is_ok(), "{} | {:?}", x, res);
360 assert_eq!(res.unwrap().x, x, "{}", x);
361 }
362
363 eprintln!(
364 "\nShift duration (size: {}): {}",
365 SIZE,
366 (std::time::Instant::now() - start).fancy_duration()
367 );
368
369 let queue: DirtyQueue<Thing> = DirtyQueue::new(dir.path()).unwrap();
370 assert!(queue.shift().is_err())
371 }
372
373 #[test]
374 fn test_construction_overflow() {
375 let dir = tempfile::tempdir().unwrap();
376
377 let queue = DirtyQueue::new(dir.path()).unwrap();
378 queue.head.store(usize::MAX, Ordering::SeqCst);
379 queue.tail.store(usize::MAX, Ordering::SeqCst);
380
381 for x in 0..100 {
382 let res = queue.push(Thing {
383 x,
384 ..Default::default()
385 });
386
387 assert!(res.is_ok(), "{} | {:?}", x, res);
388 }
389
390 for x in 0..100 {
391 let res = queue.shift();
392 assert!(res.is_ok(), "{} | {:?}", x, res);
393 assert_eq!(res.unwrap().x, x, "{}", x);
394 }
395 }
396
397 #[test]
398 fn test_construction_use() {
399 let dir = tempfile::tempdir().unwrap();
400
401 let queue = DirtyQueue::new(dir.path()).unwrap();
402
403 for x in 0..100 {
404 let res = queue.push(Thing {
405 x,
406 ..Default::default()
407 });
408
409 assert!(res.is_ok(), "{} | {:?}", x, res);
410 }
411
412 for x in 0..100 {
413 let res = queue.shift();
414 assert!(res.is_ok(), "{} | {:?}", x, res);
415 assert_eq!(res.unwrap().x, x, "{}", x);
416 }
417 }
418
419 #[tokio::test]
420 async fn test_tokio() {
421 const SIZE: usize = 100000;
422 let dir = tempfile::tempdir().unwrap();
423 eprintln!("\ntokio dir: {}", dir.path().display());
424
425 let queue = DirtyQueue::new(dir.path()).unwrap();
426
427 let bool: Arc<AtomicBool> = Default::default();
428
429 let q = queue.clone();
430 let b = bool.clone();
431 tokio::spawn(async move {
432 let start = std::time::Instant::now();
433 for x in 0..SIZE {
434 let res = q.push(Thing {
435 x,
436 ..Default::default()
437 });
438 assert!(res.is_ok(), "{} | {:?}", x, res);
439 }
440 eprintln!(
441 "\ntokio push round trip: {}",
442 (std::time::Instant::now() - start).fancy_duration()
443 );
444 b.store(true, Ordering::SeqCst);
445 });
446
447 while !bool.load(Ordering::SeqCst) {
448 tokio::time::sleep(std::time::Duration::from_nanos(100)).await;
449 }
450
451 let mut count = 1;
452 let start = std::time::Instant::now();
453 while let Ok(res) = queue.shift() {
454 assert_eq!(res.key(), count);
455 count += 1;
456 }
457
458 eprintln!(
459 "\ntokio shift round trip: {}",
460 (std::time::Instant::now() - start).fancy_duration()
461 );
462
463 assert_eq!(count - 1, SIZE);
464
465 let queue: DirtyQueue<Thing> = DirtyQueue::new(dir.path()).unwrap();
466 assert!(queue.shift().is_err())
467 }
468
469 #[test]
470 fn test_thread() {
471 const SIZE: usize = 100000;
472 let dir = tempfile::tempdir().unwrap();
473 eprintln!("\nthread dir: {}", dir.path().display());
474
475 let queue = DirtyQueue::new(dir.path()).unwrap();
476 let bool: Arc<AtomicBool> = Default::default();
477
478 let q = queue.clone();
479 let b = bool.clone();
480 std::thread::spawn(move || {
481 let start = std::time::Instant::now();
482 for x in 0..SIZE {
483 let res = q.push(Thing {
484 x,
485 ..Default::default()
486 });
487 assert!(res.is_ok(), "{} | {:?}", x, res);
488 }
489 eprintln!(
490 "\nthread push round trip: {}",
491 (std::time::Instant::now() - start).fancy_duration()
492 );
493 b.store(true, Ordering::SeqCst);
494 });
495
496 while !bool.load(Ordering::SeqCst) {
497 std::thread::sleep(std::time::Duration::from_nanos(100));
498 }
499
500 let mut count = 1;
502 let start = std::time::Instant::now();
503 while let Ok(res) = queue.shift() {
504 assert_eq!(res.key(), count);
505 count += 1;
506 }
507
508 eprintln!(
509 "\nthread shift round trip: {}",
510 (std::time::Instant::now() - start).fancy_duration()
511 );
512
513 assert_eq!(count - 1, SIZE);
514
515 let queue: DirtyQueue<Thing> = DirtyQueue::new(dir.path()).unwrap();
516 assert!(queue.shift().is_err())
517 }
518}