Skip to main content

rivulet/
lazy.rs

1//! Lazy-initialized streams.
2
3use crate::{View, ViewMut};
4use core::{
5    pin::Pin,
6    sync::atomic::AtomicBool,
7    task::{Context, Poll},
8};
9use pin_project::pin_project;
10
11/// A lazy-initialized view.
12///
13/// The view is only initialized when polled for a grant.
14#[pin_project]
15#[derive(Copy, Clone, Debug, Hash)]
16pub struct Lazy<V, F> {
17    view: Option<V>,
18    init: Option<F>,
19}
20
21impl<V, F> Lazy<V, F> {
22    /// Create a new lazy view.
23    pub fn new(init: F) -> Self {
24        Self {
25            view: None,
26            init: Some(init),
27        }
28    }
29
30    /// Return the inner type, if it has been initialized.
31    pub fn into_inner(self) -> Option<V> {
32        self.view
33    }
34}
35
36#[cfg(feature = "std")]
37#[cfg_attr(docsrs, doc(cfg(all(feature = "std"))))]
38impl<V> Lazy<V, Box<dyn FnOnce() -> V>> {
39    /// Create a new lazy view with a boxed initialization function.
40    pub fn new_boxed(init: impl FnOnce() -> V + 'static) -> Self {
41        Self::new(Box::new(init))
42    }
43}
44
45impl<V, F> View for Lazy<V, F>
46where
47    V: View,
48    F: FnOnce() -> V,
49{
50    type Item = V::Item;
51    type Error = V::Error;
52
53    fn view(&self) -> &[Self::Item] {
54        if let Some(view) = self.view.as_ref() {
55            view.view()
56        } else {
57            &[]
58        }
59    }
60
61    fn poll_grant(
62        self: Pin<&mut Self>,
63        cx: &mut Context,
64        count: usize,
65    ) -> Poll<Result<(), Self::Error>> {
66        if count > 0 {
67            let this = self.project();
68            if this.view.is_none() {
69                this.view.get_or_insert(this.init.take().unwrap()());
70            }
71            Pin::new(this.view.as_mut().unwrap()).poll_grant(cx, count)
72        } else {
73            Poll::Ready(Ok(()))
74        }
75    }
76
77    fn release(&mut self, count: usize) {
78        if count > 0 {
79            self.view
80                .as_mut()
81                .expect("attempted to release greater than grant")
82                .release(count)
83        }
84    }
85}
86
87impl<V, F> ViewMut for Lazy<V, F>
88where
89    V: ViewMut,
90    F: FnOnce() -> V,
91{
92    fn view_mut(&mut self) -> &mut [Self::Item] {
93        if let Some(view) = self.view.as_mut() {
94            view.view_mut()
95        } else {
96            &mut []
97        }
98    }
99}
100
101#[cfg(feature = "std")]
102#[cfg_attr(docsrs, doc(cfg(all(feature = "std"))))]
103mod channel {
104    use super::*;
105    use core::marker::PhantomData;
106    use std::sync::{atomic::Ordering, Arc, Mutex};
107
108    struct LazyChannelImpl<Sink, Source, F> {
109        ready: AtomicBool,
110        source: Mutex<Option<Source>>,
111        init: Mutex<Option<F>>,
112        _sink: PhantomData<Sink>,
113    }
114
115    impl<Sink, Source, F> LazyChannelImpl<Sink, Source, F>
116    where
117        F: FnOnce() -> (Sink, Source),
118    {
119        fn new(f: F) -> Self {
120            Self {
121                ready: AtomicBool::new(false),
122                source: Mutex::new(None),
123                init: Mutex::new(Some(f)),
124                _sink: PhantomData,
125            }
126        }
127
128        fn take_sink(&self) -> Sink {
129            let init = self.init.lock().unwrap().take().unwrap();
130            let (sink, source) = init();
131            self.source.lock().unwrap().replace(source);
132            self.ready.store(true, Ordering::Release);
133            sink
134        }
135
136        fn try_take_source(&self) -> Option<Source> {
137            if self.ready.load(Ordering::Acquire) {
138                self.source.lock().unwrap().take()
139            } else {
140                None
141            }
142        }
143    }
144
145    /// A sink created by [`lazy_channel`].
146    #[pin_project]
147    #[cfg_attr(docsrs, doc(cfg(all(feature = "std"))))]
148    pub struct LazyChannelSink<Sink, Source, F> {
149        view: Option<Sink>,
150        shared: Arc<LazyChannelImpl<Sink, Source, F>>,
151    }
152
153    impl<Sink, Source, F> View for LazyChannelSink<Sink, Source, F>
154    where
155        Sink: crate::View,
156        F: FnOnce() -> (Sink, Source),
157    {
158        type Item = Sink::Item;
159        type Error = Sink::Error;
160
161        fn view(&self) -> &[Self::Item] {
162            if let Some(view) = self.view.as_ref() {
163                view.view()
164            } else {
165                &[]
166            }
167        }
168
169        fn poll_grant(
170            self: Pin<&mut Self>,
171            cx: &mut Context,
172            count: usize,
173        ) -> Poll<Result<(), Self::Error>> {
174            if count > 0 {
175                let this = self.project();
176                if this.view.is_none() {
177                    this.view.get_or_insert(this.shared.take_sink());
178                }
179                Pin::new(this.view.as_mut().unwrap()).poll_grant(cx, count)
180            } else {
181                Poll::Ready(Ok(()))
182            }
183        }
184
185        fn release(&mut self, count: usize) {
186            if count > 0 {
187                self.view
188                    .as_mut()
189                    .expect("attempted to release greater than grant")
190                    .release(count)
191            }
192        }
193    }
194
195    impl<Sink, Source, F> ViewMut for LazyChannelSink<Sink, Source, F>
196    where
197        Sink: ViewMut,
198        F: FnOnce() -> (Sink, Source),
199    {
200        fn view_mut(&mut self) -> &mut [Self::Item] {
201            if let Some(view) = self.view.as_mut() {
202                view.view_mut()
203            } else {
204                &mut []
205            }
206        }
207    }
208
209    /// A source created by [`lazy_channel`].
210    #[pin_project]
211    #[cfg_attr(docsrs, doc(cfg(all(feature = "std"))))]
212    pub struct LazyChannelSource<Sink, Source, F> {
213        view: Option<Source>,
214        shared: Arc<LazyChannelImpl<Sink, Source, F>>,
215    }
216
217    impl<Sink, Source, F> View for LazyChannelSource<Sink, Source, F>
218    where
219        Source: View,
220        F: FnOnce() -> (Sink, Source),
221    {
222        type Item = Source::Item;
223        type Error = Source::Error;
224
225        fn view(&self) -> &[Self::Item] {
226            if let Some(view) = self.view.as_ref() {
227                view.view()
228            } else {
229                &[]
230            }
231        }
232
233        fn poll_grant(
234            self: Pin<&mut Self>,
235            cx: &mut Context,
236            count: usize,
237        ) -> Poll<Result<(), Self::Error>> {
238            if count > 0 {
239                let this = self.project();
240                if this.view.is_none() {
241                    if let Some(source) = this.shared.try_take_source() {
242                        this.view.get_or_insert(source);
243                    }
244                }
245                Pin::new(this.view.as_mut().unwrap()).poll_grant(cx, count)
246            } else {
247                Poll::Ready(Ok(()))
248            }
249        }
250
251        fn release(&mut self, count: usize) {
252            if count > 0 {
253                self.view
254                    .as_mut()
255                    .expect("attempted to release greater than grant")
256                    .release(count)
257            }
258        }
259    }
260
261    impl<Sink, Source, F> ViewMut for LazyChannelSource<Sink, Source, F>
262    where
263        Source: ViewMut,
264        F: FnOnce() -> (Sink, Source),
265    {
266        fn view_mut(&mut self) -> &mut [Self::Item] {
267            if let Some(view) = self.view.as_mut() {
268                view.view_mut()
269            } else {
270                &mut []
271            }
272        }
273    }
274
275    /// Create a lazy-initialized channel.
276    ///
277    /// The channel is only initialized when first writing to the sink.
278    #[cfg_attr(docsrs, doc(cfg(all(feature = "std"))))]
279    pub fn lazy_channel<Sink, Source, F>(
280        f: F,
281    ) -> (
282        LazyChannelSink<Sink, Source, F>,
283        LazyChannelSource<Sink, Source, F>,
284    )
285    where
286        F: FnOnce() -> (Sink, Source) + 'static,
287        Sink: 'static,
288        Source: 'static,
289    {
290        let shared = Arc::new(LazyChannelImpl::new(f));
291
292        (
293            LazyChannelSink {
294                view: None,
295                shared: shared.clone(),
296            },
297            LazyChannelSource { view: None, shared },
298        )
299    }
300}
301
302#[cfg(feature = "std")]
303pub use channel::*;