tc-tensor 0.8.0

TinyChain's Tensor collection type
Documentation
use std::mem;
use std::pin::Pin;

use futures::ready;
use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
use futures::task::{Context, Poll};
use pin_project::pin_project;

use tc_error::*;
use tc_value::Number;

use crate::bounds::{Bounds, Coords};
use crate::Coord;

#[pin_project]
pub struct SparseValueStream<S> {
    #[pin]
    filled: Fuse<S>,

    affected: Coords,
    next_coord: Option<Coord>,
    next_filled: Option<(Coord, Number)>,
    zero: Number,
}

impl<'a, S: StreamExt + 'a> SparseValueStream<S> {
    pub async fn new(filled: S, bounds: Bounds, zero: Number) -> TCResult<Self> {
        let mut affected = bounds.affected();
        let next_coord = affected.next();

        Ok(Self {
            filled: filled.fuse(),
            affected,
            next_coord,
            next_filled: None,
            zero,
        })
    }
}

impl<S: Stream<Item = TCResult<(Coord, Number)>>> Stream for SparseValueStream<S> {
    type Item = TCResult<Number>;

    fn poll_next(self: Pin<&mut Self>, cxt: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        Poll::Ready(loop {
            let next_coord = if let Some(next_coord) = this.next_coord {
                next_coord
            } else {
                break None;
            };

            let mut next = None;
            mem::swap(this.next_filled, &mut next);
            if let Some((filled_coord, value)) = next {
                break if next_coord == &filled_coord {
                    *(this.next_coord) = this.affected.next();
                    Some(Ok(value))
                } else {
                    *(this.next_coord) = this.affected.next();
                    *(this.next_filled) = Some((filled_coord, value));
                    Some(Ok(*this.zero))
                };
            } else if this.filled.is_terminated() {
                *(this.next_coord) = this.affected.next();
                break Some(Ok(*this.zero));
            } else {
                match ready!(this.filled.as_mut().poll_next(cxt)) {
                    Some(Ok((filled_coord, value))) => {
                        break if next_coord == &filled_coord {
                            *(this.next_coord) = this.affected.next();
                            Some(Ok(value))
                        } else {
                            *(this.next_coord) = this.affected.next();
                            *(this.next_filled) = Some((filled_coord, value));
                            Some(Ok(*this.zero))
                        };
                    }
                    None => {
                        *(this.next_coord) = this.affected.next();
                        break Some(Ok(*this.zero));
                    }
                    Some(Err(cause)) => {
                        *(this.next_coord) = this.affected.next();
                        break Some(Err(cause));
                    }
                }
            }
        })
    }
}