potential/
lib.rs

1#![doc = include_str!("../README.md")]
2#![deny(unsafe_code)]
3
4use crate::sync::{Pub as Sender, Sub as Receiver};
5use notify::Notify;
6use std::fmt::Debug;
7
8pub mod future;
9//mod lock;
10mod notify;
11pub mod stream;
12pub mod sync;
13
14/// The publishing part of the pipework
15#[derive(Debug)]
16pub struct Pub<T> {
17    inner: Sender<T>,
18    wakers: Receiver<Notify>,
19}
20impl<T> Clone for Pub<T> {
21    fn clone(&self) -> Self {
22        Self {
23            inner: self.inner.clone(),
24            wakers: self.wakers.clone(),
25        }
26    }
27}
28impl<T> Default for Pub<T> {
29    fn default() -> Self {
30        Self {
31            inner: Sender::default(),
32            wakers: Receiver::default(),
33        }
34    }
35}
36impl<T> Drop for Pub<T> {
37    fn drop(&mut self) {
38        self.notify();
39    }
40}
41impl<T: Clone> Pub<T> {
42    pub fn subscribe(&self) -> Sub<T> {
43        Sub {
44            inner: self.inner.subscribe(),
45            wakers: self.wakers.publish(),
46        }
47    }
48
49    pub fn push(&mut self, value: T) -> bool {
50        let pushed = self.inner.push(value);
51        self.notify();
52        pushed
53    }
54}
55
56/// The consuming part of the pipework
57#[derive(Debug)]
58pub struct Sub<T> {
59    inner: Receiver<T>,
60    wakers: Sender<Notify>,
61}
62impl<T> Clone for Sub<T> {
63    fn clone(&self) -> Self {
64        Self {
65            inner: self.inner.clone(),
66            wakers: self.wakers.clone(),
67        }
68    }
69}
70impl<T> Default for Sub<T> {
71    fn default() -> Self {
72        Self {
73            inner: Default::default(),
74            wakers: Default::default(),
75        }
76    }
77}
78impl<T: Clone> Sub<T> {
79    pub fn publish(&self) -> Pub<T> {
80        Pub {
81            inner: self.inner.publish(),
82            wakers: self.wakers.subscribe(),
83        }
84    }
85    pub fn try_pop(&mut self) -> Result<T, TryPopError> {
86        self.inner.try_pop()
87    }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum TryPopError {
92    Empty,
93    Finished,
94}
95impl std::error::Error for TryPopError {}
96impl std::fmt::Display for TryPopError {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            TryPopError::Empty => {
100                f.write_str("The subscription reached the end, but there still some publishers")
101            }
102            TryPopError::Finished => {
103                f.write_str("The subscription reached the end and there are no publishers left")
104            }
105        }
106    }
107}