1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
use std::{
pin::Pin,
task::{Context, Poll},
};
use anyhow::Result;
use futures::{Stream, TryStreamExt};
use pin_project::pin_project;
/// A [`Stream`] optimized for parallel processing of ordered data.
///
/// The optimization is achieved by associating each item in the stream with
/// its order. This can be leveraged by consumers of [`IndexedStream`]s to
/// reason about the original ordering of the items as they become available.
/// This can obviate the need to drive a stream to completion before operating
/// on the results.
///
/// # From an [`IntoIterator`]
///
/// A [`From`] implementation is provided for any `IntoIterator`, allowing for
/// easy conversion from an arbitrary collection into an [`IndexedStream`].
///
/// ## Example
/// ```
/// # use paladin::directive::IndexedStream;
/// let stream = IndexedStream::from(vec![1, 2, 3]);
/// ```
///
/// # As a [`Stream`]
///
/// A [`Stream`] implementation is also provided for [`IndexedStream`], such
/// that can be consumed as a normal stream.
///
/// ## Example
/// ```
/// # use futures::StreamExt;
/// # use paladin::directive::IndexedStream;
/// #
/// # async fn example() {
/// let mut stream = IndexedStream::from(vec![1, 2, 3]);
/// while let Some(Ok((idx, item))) = stream.next().await {
/// println!("{}: {}", idx, item);
/// }
/// # }
/// ```
///
/// # [`Directive`](crate::directive::Directive), [`Functor`](crate::directive::Functor), and [`Foldable`](crate::directive::Foldable)
///
/// [`IndexedStream`] implements [`Directive`](crate::directive::Directive),
/// [`Functor`](crate::directive::Functor), and
/// [`Foldable`](crate::directive::Foldable), allowing it to be used directly to
/// form a [`Directive`](crate::directive::Directive) chain. Note that because
/// the stream is indexed, the [`Foldable`](crate::directive::Foldable)
/// implementation satisfies associativity of combination while folding in
/// parallel.
///
/// ## Example
/// ```
/// # use paladin::{
/// # operation::{Operation, Monoid, Result},
/// # directive::{Directive, IndexedStream},
/// # opkind_derive::OpKind,
/// # runtime::Runtime,
/// # };
/// # use serde::{Deserialize, Serialize};
/// #
/// # #[derive(Clone, Copy, Debug, Deserialize, Serialize)]
/// struct Multiply;
/// impl Monoid for Multiply {
/// type Elem = i32;
/// type Kind = MyOps;
///
/// fn combine(&self, a: i32, b: i32) -> Result<i32> {
/// Ok(a * b)
/// }
///
/// fn empty(&self) -> i32 {
/// 1
/// }
/// }
///
/// # #[derive(Clone, Copy, Debug, Deserialize, Serialize)]
/// struct MultiplyBy(i32);
/// impl Operation for MultiplyBy {
/// type Input = i32;
/// type Output = i32;
/// type Kind = MyOps;
///
/// fn execute(&self, input: i32) -> Result<i32> {
/// Ok(self.0 * input)
/// }
/// }
/// #
/// # #[derive(OpKind, Copy, Clone, Debug, Deserialize, Serialize)]
/// # enum MyOps {
/// # Multiply(Multiply),
/// # MultiplyBy(MultiplyBy),
/// # }
///
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// # let runtime = Runtime::in_memory().await?;
/// let computation = IndexedStream::from([1, 2, 3, 4, 5])
/// .map(MultiplyBy(2))
/// .fold(Multiply);
/// let result = computation.run(&runtime).await?;
/// assert_eq!(result, 3840);
/// # Ok(())
/// # }
/// ```
#[pin_project]
pub struct IndexedStream<Item> {
#[pin]
inner: Box<dyn Stream<Item = Result<(usize, Item)>> + Send + Unpin>,
}
impl_lit!(IndexedStream<Item>);
impl_hkt!(IndexedStream);
impl<Item> IndexedStream<Item> {
/// Create a new [`IndexedStream`].
pub fn new(inner: impl Stream<Item = Result<(usize, Item)>> + Send + Unpin + 'static) -> Self {
Self {
inner: Box::new(inner),
}
}
/// Convert this [`IndexedStream`] into an [`IntoIterator`] of values,
/// sorted by their index.
///
/// Note that this will drive the stream to completion.
///
/// # Example
/// ```
/// # use paladin::directive::IndexedStream;
/// # async fn example() -> anyhow::Result<()> {
/// let stream = IndexedStream::from(vec![1, 2, 3]);
/// let values = stream.into_values_sorted().await?
/// .into_iter()
/// .collect::<Vec<_>>();
/// assert_eq!(vec![1, 2, 3], values);
/// # Ok(())
/// # }
/// ```
pub async fn into_values_sorted(self) -> Result<impl IntoIterator<Item = Item>> {
let mut vec = self.try_collect::<Vec<_>>().await?;
vec.sort_unstable_by(|a, b| a.0.cmp(&b.0));
Ok(vec.into_iter().map(|(_, v)| v))
}
}
/// Create an [`IndexedStream`] from an [`IntoIterator`].
///
/// # Example
/// ```
/// # use paladin::directive::IndexedStream;
/// let stream = IndexedStream::from(vec![1, 2, 3]);
/// ```
pub fn from_into_iterator<
Item: 'static,
IntoIter: IntoIterator<Item = Item> + Send + Sync + 'static,
>(
iter: IntoIter,
) -> IndexedStream<Item>
where
<IntoIter as IntoIterator>::IntoIter: Send + Sync,
{
IndexedStream::new(futures::stream::iter(iter.into_iter().enumerate().map(Ok)))
}
/// Create an [`IndexedStream`] from an [`IntoIterator`] containing `Result`s.
///
/// # Example
/// ```
/// # use paladin::directive::indexed_stream::try_from_into_iterator;
/// let stream = try_from_into_iterator(vec![Ok(1), Ok(2), Ok(3)]);
/// ```
pub fn try_from_into_iterator<
Item: 'static,
IntoIter: IntoIterator<Item = Result<Item>> + Send + Sync + 'static,
>(
iter: IntoIter,
) -> IndexedStream<Item>
where
<IntoIter as IntoIterator>::IntoIter: Send + Sync,
{
IndexedStream::new(futures::stream::iter(
iter.into_iter()
.enumerate()
.map(|(idx, item)| item.map(|item| (idx, item))),
))
}
/// Create an [`IndexedStream`] from an [`IntoIterator`].
impl<Item: 'static, IntoIter: IntoIterator<Item = Item> + Send + Sync + 'static> From<IntoIter>
for IndexedStream<Item>
where
<IntoIter as IntoIterator>::IntoIter: Send + Sync,
{
fn from(iter: IntoIter) -> Self {
from_into_iterator(iter)
}
}
/// [`Stream`] implementation for [`IndexedStream`].
///
/// This is a passthrough implementation, which simply delegates to the inner
/// stream.
impl<Item> Stream for IndexedStream<Item> {
type Item = Result<(usize, Item)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.inner.poll_next(cx)
}
}
mod foldable;
mod functor;