ruw/
lib.rs

1//! Read-Update-Write
2
3#![no_std]
4
5use core::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures_core::{ready, FusedStream, Future};
11use pin_project::pin_project;
12
13use crate::stream_iter::StreamIter;
14
15mod stream_iter;
16
17/// Read-Update-Write system.
18///
19/// * Keeps two update tracks while [`Ruw::write`] is in progress
20///     * Based on old state, applied if write fails
21///     * Based on new state, applied if write succeeds
22/// * On failed [`Ruw::read`], rejects all incoming actions that have already arrived (insta-ready on [`poll_next`])
23/// * On failed [`Ruw::write`], rejects all actions that went into the new [`Ruw::State`]
24/// * On failed [`Ruw::update`] on either of two update tracks, rejects that action
25/// * All updates are supposed to be synchronous and in-memory
26///
27/// [`poll_next`]: futures_core::Stream::poll_next
28#[must_use]
29pub trait Ruw {
30    /// Central type for RUW. In [`std`] and [`alloc`] contexts, should rely on [`Arc`] to reduce
31    /// cloning overhead.
32    ///
33    /// [`std`]: https://doc.rust-lang.org/stable/std/
34    ///
35    /// [`alloc`]: https://doc.rust-lang.org/stable/alloc/
36    ///
37    /// [`Arc`]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
38    type State: Clone;
39
40    /// A single change applied to [`Ruw::State`] by [`Ruw::update`].
41    type Delta: Clone;
42
43    /// Represents either I/O error ([`Ruw::read`] and [`Ruw::write`]) or update error.
44    type Error;
45
46    /// Something to report completion of one action.
47    type TrackOne;
48
49    /// Something to report completion of zero or more actions.
50    type TrackMany: Default + Extend<Self::TrackOne>;
51
52    /// Try asynchronously reading the state.
53    ///
54    /// This should almost never fail. An error here is considered temporarily fatal draining and
55    /// rejecting the whole queue.
56    fn read(&self) -> impl Future<Output = Result<Self::State, Self::Error>>;
57
58    /// Try updating the state.
59    ///
60    /// This method is ran twice if [`Ruw::write`] is in process. Failure of either one is taken as
61    /// failure of both.
62    fn update(state: Self::State, delta: Self::Delta) -> Result<Self::State, Self::Error>;
63
64    /// Try asynchronously writing the state. Takes previous state for audit/logging/consistency.
65    ///
66    /// Restoration of failed state is out of scope for the current version.
67    fn write(
68        &self,
69        old: Self::State,
70        new: Self::State,
71    ) -> impl Future<Output = Result<(), Self::Error>>;
72
73    /// Report success.
74    ///
75    /// There is no `accept_one` function because the implementation is optimistic about being able
76    /// to fit many changes into one state transition.
77    fn accept(track: Self::TrackMany);
78
79    /// Report many failures.
80    fn reject(track: Self::TrackMany, error: Self::Error);
81
82    /// Convert [`Ruw::TrackOne`] to [`Ruw::TrackMany`].
83    #[must_use]
84    fn many(one: Self::TrackOne) -> Self::TrackMany {
85        let mut track: Self::TrackMany = Default::default();
86        track.extend(Some(one));
87        track
88    }
89
90    /// Report one failure.
91    ///
92    /// Used when [`Ruw::update`] fails.
93    fn reject_one(one: Self::TrackOne, error: Self::Error) {
94        Self::reject(Self::many(one), error);
95    }
96}
97
98/// Run [`Ruw`] daemon until the provided [`FusedStream`] is done.
99pub async fn ruw<R: Ruw>(ruw: &R, incoming: impl FusedStream<Item = (R::Delta, R::TrackOne)>) {
100    Ruwing::<R, _, _, _> {
101        incoming,
102        state: Default::default(),
103        read: || ruw.read(),
104        write: |old, new| ruw.write(old, new),
105    }
106    .await
107}
108
109fn update_or_reject<R: Ruw>(
110    state: R::State,
111    delta: R::Delta,
112    track: R::TrackOne,
113) -> Option<(R::State, R::TrackOne)> {
114    match R::update(state, delta) {
115        Ok(state) => Some((state, track)),
116        Err(error) => {
117            R::reject_one(track, error);
118            None
119        }
120    }
121}
122
123#[pin_project]
124#[must_use]
125struct Reading<R: Ruw, Rf> {
126    #[pin]
127    future: Rf,
128    item: Option<(R::Delta, R::TrackOne)>,
129}
130
131#[must_use]
132struct HeadState<R: Ruw> {
133    fallback: R::State,
134    success: R::State,
135}
136
137#[must_use]
138struct HsIter<'a, R: Ruw, I> {
139    state: &'a mut HeadState<R>,
140    iter: I,
141}
142
143impl<'a, R: Ruw, I: Iterator<Item = (R::Delta, R::TrackOne)>> Iterator for HsIter<'a, R, I> {
144    type Item = R::TrackOne;
145
146    fn next(&mut self) -> Option<Self::Item> {
147        loop {
148            let (delta, track) = self.iter.next()?;
149            let Some((fallback, track)) =
150                update_or_reject::<R>(self.state.fallback.clone(), delta.clone(), track)
151            else {
152                continue;
153            };
154            let Some((success, track)) =
155                update_or_reject::<R>(self.state.success.clone(), delta, track)
156            else {
157                continue;
158            };
159            self.state.fallback = fallback;
160            self.state.success = success;
161            break Some(track);
162        }
163    }
164}
165
166#[must_use]
167struct Head<R: Ruw> {
168    state: HeadState<R>,
169    track: R::TrackMany,
170}
171
172impl<R: Ruw> Head<R> {
173    fn fallback_tail(self, prev: R::State) -> Tail<R> {
174        Tail {
175            prev,
176            state: TailState {
177                next: self.state.fallback,
178            },
179            track: self.track,
180        }
181    }
182
183    fn success_tail(self, next: R::State) -> Tail<R> {
184        Tail {
185            prev: next,
186            state: TailState {
187                next: self.state.success,
188            },
189            track: self.track,
190        }
191    }
192}
193
194impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for Head<R> {
195    fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
196        self.track.extend(HsIter::<R, _> {
197            state: &mut self.state,
198            iter: iter.into_iter(),
199        })
200    }
201}
202
203#[must_use]
204struct TailState<R: Ruw> {
205    next: R::State,
206}
207
208#[must_use]
209struct TsIter<'a, R: Ruw, I> {
210    state: &'a mut TailState<R>,
211    iter: I,
212}
213
214impl<'a, R: Ruw, I: Iterator<Item = (R::Delta, R::TrackOne)>> Iterator for TsIter<'a, R, I> {
215    type Item = R::TrackOne;
216
217    fn next(&mut self) -> Option<Self::Item> {
218        loop {
219            let (delta, track) = self.iter.next()?;
220            let Some((state, track)) =
221                update_or_reject::<R>(self.state.next.clone(), delta.clone(), track)
222            else {
223                continue;
224            };
225            self.state.next = state;
226            break Some(track);
227        }
228    }
229}
230
231#[must_use]
232struct Tail<R: Ruw> {
233    prev: R::State,
234    state: TailState<R>,
235    track: R::TrackMany,
236}
237
238impl<R: Ruw> Tail<R> {
239    fn new(prev: R::State, next: R::State, track: R::TrackOne) -> Self {
240        Self {
241            prev,
242            state: TailState { next },
243            track: R::many(track),
244        }
245    }
246
247    fn into_write_state(self) -> WriteState<R> {
248        WriteState {
249            tail: Some(self),
250            head: None,
251        }
252    }
253
254    fn write<Write: WriteFn<R>>(&self, write: &Write) -> Write::Wf {
255        write(self.prev.clone(), self.state.next.clone())
256    }
257
258    fn writing<Write: WriteFn<R>>(self, write: &Write) -> Writing<R, Write::Wf> {
259        Writing {
260            future: self.write(write),
261            state: self.into_write_state(),
262        }
263    }
264
265    fn into_state<Write: WriteFn<R>, Rf>(self, write: &Write) -> State<R, Rf, Write::Wf> {
266        State::Write(self.writing(write))
267    }
268}
269
270impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for Tail<R> {
271    fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
272        self.track.extend(TsIter::<R, _> {
273            state: &mut self.state,
274            iter: iter.into_iter(),
275        })
276    }
277}
278
279#[must_use]
280struct WriteState<R: Ruw> {
281    tail: Option<Tail<R>>,
282    head: Option<Head<R>>,
283}
284
285impl<R: Ruw> WriteState<R> {
286    fn next_tail(&mut self, r: Result<(), R::Error>) -> Option<Tail<R>> {
287        let tail = self.tail.take()?;
288        Some(match r {
289            Ok(()) => {
290                R::accept(tail.track);
291                self.head.take()?.success_tail(tail.prev)
292            }
293            Err(error) => {
294                R::reject(tail.track, error);
295                self.head.take()?.fallback_tail(tail.prev)
296            }
297        })
298    }
299}
300
301impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for WriteState<R> {
302    fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
303        let Some(tail) = &self.tail else {
304            return;
305        };
306        let mut iter = iter.into_iter();
307        loop {
308            match &mut self.head {
309                Some(head) => {
310                    head.extend(iter);
311                    break;
312                }
313                None => match iter.next() {
314                    Some((delta, track)) => {
315                        let Some((fallback, track)) =
316                            update_or_reject::<R>(tail.prev.clone(), delta.clone(), track)
317                        else {
318                            continue;
319                        };
320                        let Some((success, track)) =
321                            update_or_reject::<R>(tail.state.next.clone(), delta, track)
322                        else {
323                            continue;
324                        };
325                        self.head = Some(Head {
326                            state: HeadState { fallback, success },
327                            track: R::many(track),
328                        });
329                    }
330                    None => {
331                        break;
332                    }
333                },
334            }
335        }
336    }
337}
338
339#[pin_project]
340#[must_use]
341struct Writing<R: Ruw, Wf> {
342    #[pin]
343    future: Wf,
344    state: WriteState<R>,
345}
346
347#[derive(Default)]
348#[pin_project(project = StateProj)]
349#[must_use]
350enum State<R: Ruw, Rf, Wf> {
351    #[default]
352    Stale,
353    Read(#[pin] Reading<R, Rf>),
354    Write(#[pin] Writing<R, Wf>),
355}
356
357trait ReadFn<R: Ruw>: Fn() -> Self::Rf {
358    type Rf: Future<Output = Result<R::State, R::Error>>;
359}
360
361impl<R: Ruw, Rf: Future<Output = Result<R::State, R::Error>>, Read: Fn() -> Rf> ReadFn<R> for Read {
362    type Rf = Rf;
363}
364
365trait WriteFn<R: Ruw>: Fn(R::State, R::State) -> Self::Wf {
366    type Wf: Future<Output = Result<(), R::Error>>;
367}
368
369impl<R: Ruw, Wf: Future<Output = Result<(), R::Error>>, Write: Fn(R::State, R::State) -> Wf>
370    WriteFn<R> for Write
371{
372    type Wf = Wf;
373}
374
375#[pin_project]
376#[must_use]
377struct Ruwing<R: Ruw, Read: ReadFn<R>, Write: WriteFn<R>, S> {
378    #[pin]
379    incoming: S,
380    #[pin]
381    state: State<R, Read::Rf, Write::Wf>,
382    read: Read,
383    write: Write,
384}
385
386#[must_use]
387struct RejectMany<'a, R: Ruw> {
388    track: &'a mut R::TrackMany,
389}
390
391impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for RejectMany<'_, R> {
392    fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
393        self.track.extend(iter.into_iter().map(|(_, track)| track))
394    }
395}
396
397impl<
398        R: Ruw,
399        Read: ReadFn<R>,
400        Write: WriteFn<R>,
401        S: FusedStream<Item = (R::Delta, R::TrackOne)>,
402    > Future for Ruwing<R, Read, Write, S>
403{
404    type Output = ();
405
406    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
407        let this = self.project();
408        let mut incoming = this.incoming;
409        let mut state = this.state;
410        loop {
411            match state.as_mut().project() {
412                StateProj::Stale if incoming.is_terminated() => break Poll::Ready(()),
413                StateProj::Stale => {
414                    let Some(item) = ready!(incoming.as_mut().poll_next(cx)) else {
415                        break Poll::Ready(());
416                    };
417                    let reading = Reading {
418                        future: (this.read)(),
419                        item: Some(item),
420                    };
421                    state.as_mut().set(State::Read(reading));
422                }
423                StateProj::Read(reading) => {
424                    let reading = reading.project();
425                    match ready!(reading.future.poll(cx)) {
426                        Ok(prev) => {
427                            let mut item = reading.item.take();
428                            loop {
429                                match item.take() {
430                                    Some((delta, track)) => {
431                                        if let Some((next, track)) =
432                                            update_or_reject::<R>(prev.clone(), delta, track)
433                                        {
434                                            let mut tail =
435                                                Tail::<R>::new(prev.clone(), next.clone(), track);
436                                            StreamIter::new(incoming.as_mut(), cx)
437                                                .extend_into(&mut tail);
438                                            state.as_mut().set(tail.into_state(this.write));
439                                            break;
440                                        }
441                                    }
442                                    None if incoming.is_terminated() => {
443                                        state.as_mut().set(State::Stale);
444                                        return Poll::Ready(());
445                                    }
446                                    None => match incoming.as_mut().poll_next(cx) {
447                                        Poll::Ready(Some(next)) => {
448                                            item = Some(next);
449                                        }
450                                        Poll::Ready(None) => {
451                                            state.as_mut().set(State::Stale);
452                                            return Poll::Ready(());
453                                        }
454                                        Poll::Pending => {
455                                            state.as_mut().set(State::Stale);
456                                            return Poll::Pending;
457                                        }
458                                    },
459                                }
460                            }
461                        }
462                        Err(error) => {
463                            let mut track = reading
464                                .item
465                                .take()
466                                .map(|(_, track)| R::many(track))
467                                .unwrap_or_default();
468                            StreamIter::new(incoming.as_mut(), cx).extend_into(&mut RejectMany::<
469                                R,
470                            > {
471                                track: &mut track,
472                            });
473                            R::reject(track, error);
474                            state.as_mut().set(State::Stale);
475                        }
476                    }
477                }
478                StateProj::Write(writing) => {
479                    let writing = writing.project();
480                    let wstate = writing.state;
481                    StreamIter::new(incoming.as_mut(), cx).extend_into(wstate);
482                    let new = match wstate.next_tail(ready!(writing.future.poll(cx))) {
483                        Some(tail) => tail.into_state(this.write),
484                        None => State::Stale,
485                    };
486                    state.as_mut().set(new);
487                }
488            }
489        }
490    }
491}