1#![no_std]
4
5use core::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures_core::{ready, FusedStream, Future};
11use pin_project::pin_project;
12
13use crate::stream_iter::StreamIter;
14
15mod stream_iter;
16
17#[must_use]
29pub trait Ruw {
30 type State: Clone;
39
40 type Delta: Clone;
42
43 type Error;
45
46 type TrackOne;
48
49 type TrackMany: Default + Extend<Self::TrackOne>;
51
52 fn read(&self) -> impl Future<Output = Result<Self::State, Self::Error>>;
57
58 fn update(state: Self::State, delta: Self::Delta) -> Result<Self::State, Self::Error>;
63
64 fn write(
68 &self,
69 old: Self::State,
70 new: Self::State,
71 ) -> impl Future<Output = Result<(), Self::Error>>;
72
73 fn accept(track: Self::TrackMany);
78
79 fn reject(track: Self::TrackMany, error: Self::Error);
81
82 #[must_use]
84 fn many(one: Self::TrackOne) -> Self::TrackMany {
85 let mut track: Self::TrackMany = Default::default();
86 track.extend(Some(one));
87 track
88 }
89
90 fn reject_one(one: Self::TrackOne, error: Self::Error) {
94 Self::reject(Self::many(one), error);
95 }
96}
97
98pub async fn ruw<R: Ruw>(ruw: &R, incoming: impl FusedStream<Item = (R::Delta, R::TrackOne)>) {
100 Ruwing::<R, _, _, _> {
101 incoming,
102 state: Default::default(),
103 read: || ruw.read(),
104 write: |old, new| ruw.write(old, new),
105 }
106 .await
107}
108
109fn update_or_reject<R: Ruw>(
110 state: R::State,
111 delta: R::Delta,
112 track: R::TrackOne,
113) -> Option<(R::State, R::TrackOne)> {
114 match R::update(state, delta) {
115 Ok(state) => Some((state, track)),
116 Err(error) => {
117 R::reject_one(track, error);
118 None
119 }
120 }
121}
122
123#[pin_project]
124#[must_use]
125struct Reading<R: Ruw, Rf> {
126 #[pin]
127 future: Rf,
128 item: Option<(R::Delta, R::TrackOne)>,
129}
130
131#[must_use]
132struct HeadState<R: Ruw> {
133 fallback: R::State,
134 success: R::State,
135}
136
137#[must_use]
138struct HsIter<'a, R: Ruw, I> {
139 state: &'a mut HeadState<R>,
140 iter: I,
141}
142
143impl<'a, R: Ruw, I: Iterator<Item = (R::Delta, R::TrackOne)>> Iterator for HsIter<'a, R, I> {
144 type Item = R::TrackOne;
145
146 fn next(&mut self) -> Option<Self::Item> {
147 loop {
148 let (delta, track) = self.iter.next()?;
149 let Some((fallback, track)) =
150 update_or_reject::<R>(self.state.fallback.clone(), delta.clone(), track)
151 else {
152 continue;
153 };
154 let Some((success, track)) =
155 update_or_reject::<R>(self.state.success.clone(), delta, track)
156 else {
157 continue;
158 };
159 self.state.fallback = fallback;
160 self.state.success = success;
161 break Some(track);
162 }
163 }
164}
165
166#[must_use]
167struct Head<R: Ruw> {
168 state: HeadState<R>,
169 track: R::TrackMany,
170}
171
172impl<R: Ruw> Head<R> {
173 fn fallback_tail(self, prev: R::State) -> Tail<R> {
174 Tail {
175 prev,
176 state: TailState {
177 next: self.state.fallback,
178 },
179 track: self.track,
180 }
181 }
182
183 fn success_tail(self, next: R::State) -> Tail<R> {
184 Tail {
185 prev: next,
186 state: TailState {
187 next: self.state.success,
188 },
189 track: self.track,
190 }
191 }
192}
193
194impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for Head<R> {
195 fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
196 self.track.extend(HsIter::<R, _> {
197 state: &mut self.state,
198 iter: iter.into_iter(),
199 })
200 }
201}
202
203#[must_use]
204struct TailState<R: Ruw> {
205 next: R::State,
206}
207
208#[must_use]
209struct TsIter<'a, R: Ruw, I> {
210 state: &'a mut TailState<R>,
211 iter: I,
212}
213
214impl<'a, R: Ruw, I: Iterator<Item = (R::Delta, R::TrackOne)>> Iterator for TsIter<'a, R, I> {
215 type Item = R::TrackOne;
216
217 fn next(&mut self) -> Option<Self::Item> {
218 loop {
219 let (delta, track) = self.iter.next()?;
220 let Some((state, track)) =
221 update_or_reject::<R>(self.state.next.clone(), delta.clone(), track)
222 else {
223 continue;
224 };
225 self.state.next = state;
226 break Some(track);
227 }
228 }
229}
230
231#[must_use]
232struct Tail<R: Ruw> {
233 prev: R::State,
234 state: TailState<R>,
235 track: R::TrackMany,
236}
237
238impl<R: Ruw> Tail<R> {
239 fn new(prev: R::State, next: R::State, track: R::TrackOne) -> Self {
240 Self {
241 prev,
242 state: TailState { next },
243 track: R::many(track),
244 }
245 }
246
247 fn into_write_state(self) -> WriteState<R> {
248 WriteState {
249 tail: Some(self),
250 head: None,
251 }
252 }
253
254 fn write<Write: WriteFn<R>>(&self, write: &Write) -> Write::Wf {
255 write(self.prev.clone(), self.state.next.clone())
256 }
257
258 fn writing<Write: WriteFn<R>>(self, write: &Write) -> Writing<R, Write::Wf> {
259 Writing {
260 future: self.write(write),
261 state: self.into_write_state(),
262 }
263 }
264
265 fn into_state<Write: WriteFn<R>, Rf>(self, write: &Write) -> State<R, Rf, Write::Wf> {
266 State::Write(self.writing(write))
267 }
268}
269
270impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for Tail<R> {
271 fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
272 self.track.extend(TsIter::<R, _> {
273 state: &mut self.state,
274 iter: iter.into_iter(),
275 })
276 }
277}
278
279#[must_use]
280struct WriteState<R: Ruw> {
281 tail: Option<Tail<R>>,
282 head: Option<Head<R>>,
283}
284
285impl<R: Ruw> WriteState<R> {
286 fn next_tail(&mut self, r: Result<(), R::Error>) -> Option<Tail<R>> {
287 let tail = self.tail.take()?;
288 Some(match r {
289 Ok(()) => {
290 R::accept(tail.track);
291 self.head.take()?.success_tail(tail.prev)
292 }
293 Err(error) => {
294 R::reject(tail.track, error);
295 self.head.take()?.fallback_tail(tail.prev)
296 }
297 })
298 }
299}
300
301impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for WriteState<R> {
302 fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
303 let Some(tail) = &self.tail else {
304 return;
305 };
306 let mut iter = iter.into_iter();
307 loop {
308 match &mut self.head {
309 Some(head) => {
310 head.extend(iter);
311 break;
312 }
313 None => match iter.next() {
314 Some((delta, track)) => {
315 let Some((fallback, track)) =
316 update_or_reject::<R>(tail.prev.clone(), delta.clone(), track)
317 else {
318 continue;
319 };
320 let Some((success, track)) =
321 update_or_reject::<R>(tail.state.next.clone(), delta, track)
322 else {
323 continue;
324 };
325 self.head = Some(Head {
326 state: HeadState { fallback, success },
327 track: R::many(track),
328 });
329 }
330 None => {
331 break;
332 }
333 },
334 }
335 }
336 }
337}
338
339#[pin_project]
340#[must_use]
341struct Writing<R: Ruw, Wf> {
342 #[pin]
343 future: Wf,
344 state: WriteState<R>,
345}
346
347#[derive(Default)]
348#[pin_project(project = StateProj)]
349#[must_use]
350enum State<R: Ruw, Rf, Wf> {
351 #[default]
352 Stale,
353 Read(#[pin] Reading<R, Rf>),
354 Write(#[pin] Writing<R, Wf>),
355}
356
357trait ReadFn<R: Ruw>: Fn() -> Self::Rf {
358 type Rf: Future<Output = Result<R::State, R::Error>>;
359}
360
361impl<R: Ruw, Rf: Future<Output = Result<R::State, R::Error>>, Read: Fn() -> Rf> ReadFn<R> for Read {
362 type Rf = Rf;
363}
364
365trait WriteFn<R: Ruw>: Fn(R::State, R::State) -> Self::Wf {
366 type Wf: Future<Output = Result<(), R::Error>>;
367}
368
369impl<R: Ruw, Wf: Future<Output = Result<(), R::Error>>, Write: Fn(R::State, R::State) -> Wf>
370 WriteFn<R> for Write
371{
372 type Wf = Wf;
373}
374
375#[pin_project]
376#[must_use]
377struct Ruwing<R: Ruw, Read: ReadFn<R>, Write: WriteFn<R>, S> {
378 #[pin]
379 incoming: S,
380 #[pin]
381 state: State<R, Read::Rf, Write::Wf>,
382 read: Read,
383 write: Write,
384}
385
386#[must_use]
387struct RejectMany<'a, R: Ruw> {
388 track: &'a mut R::TrackMany,
389}
390
391impl<R: Ruw> Extend<(R::Delta, R::TrackOne)> for RejectMany<'_, R> {
392 fn extend<T: IntoIterator<Item = (R::Delta, R::TrackOne)>>(&mut self, iter: T) {
393 self.track.extend(iter.into_iter().map(|(_, track)| track))
394 }
395}
396
397impl<
398 R: Ruw,
399 Read: ReadFn<R>,
400 Write: WriteFn<R>,
401 S: FusedStream<Item = (R::Delta, R::TrackOne)>,
402 > Future for Ruwing<R, Read, Write, S>
403{
404 type Output = ();
405
406 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
407 let this = self.project();
408 let mut incoming = this.incoming;
409 let mut state = this.state;
410 loop {
411 match state.as_mut().project() {
412 StateProj::Stale if incoming.is_terminated() => break Poll::Ready(()),
413 StateProj::Stale => {
414 let Some(item) = ready!(incoming.as_mut().poll_next(cx)) else {
415 break Poll::Ready(());
416 };
417 let reading = Reading {
418 future: (this.read)(),
419 item: Some(item),
420 };
421 state.as_mut().set(State::Read(reading));
422 }
423 StateProj::Read(reading) => {
424 let reading = reading.project();
425 match ready!(reading.future.poll(cx)) {
426 Ok(prev) => {
427 let mut item = reading.item.take();
428 loop {
429 match item.take() {
430 Some((delta, track)) => {
431 if let Some((next, track)) =
432 update_or_reject::<R>(prev.clone(), delta, track)
433 {
434 let mut tail =
435 Tail::<R>::new(prev.clone(), next.clone(), track);
436 StreamIter::new(incoming.as_mut(), cx)
437 .extend_into(&mut tail);
438 state.as_mut().set(tail.into_state(this.write));
439 break;
440 }
441 }
442 None if incoming.is_terminated() => {
443 state.as_mut().set(State::Stale);
444 return Poll::Ready(());
445 }
446 None => match incoming.as_mut().poll_next(cx) {
447 Poll::Ready(Some(next)) => {
448 item = Some(next);
449 }
450 Poll::Ready(None) => {
451 state.as_mut().set(State::Stale);
452 return Poll::Ready(());
453 }
454 Poll::Pending => {
455 state.as_mut().set(State::Stale);
456 return Poll::Pending;
457 }
458 },
459 }
460 }
461 }
462 Err(error) => {
463 let mut track = reading
464 .item
465 .take()
466 .map(|(_, track)| R::many(track))
467 .unwrap_or_default();
468 StreamIter::new(incoming.as_mut(), cx).extend_into(&mut RejectMany::<
469 R,
470 > {
471 track: &mut track,
472 });
473 R::reject(track, error);
474 state.as_mut().set(State::Stale);
475 }
476 }
477 }
478 StateProj::Write(writing) => {
479 let writing = writing.project();
480 let wstate = writing.state;
481 StreamIter::new(incoming.as_mut(), cx).extend_into(wstate);
482 let new = match wstate.next_tail(ready!(writing.future.poll(cx))) {
483 Some(tail) => tail.into_state(this.write),
484 None => State::Stale,
485 };
486 state.as_mut().set(new);
487 }
488 }
489 }
490 }
491}