use std::{
convert::Infallible,
future::Future,
pin::Pin,
task::{Context as TaskContext, Poll},
};
use pin_project_lite::pin_project;
use crate::context::Context;
use crate::{
observable::{CoreObservable, ObservableType},
observer::Observer,
scheduler::{Scheduler, TaskHandle},
};
#[derive(Clone)]
pub struct FromFuture<F, S> {
pub future: F,
pub scheduler: S,
}
impl<F: Future, S> ObservableType for FromFuture<F, S> {
type Item<'a>
= F::Output
where
Self: 'a;
type Err = Infallible;
}
pin_project! {
pub struct FromFutureTask<F, O> {
#[pin]
future: F,
observer: Option<O>,
}
}
impl<F, O> Future for FromFutureTask<F, O>
where
F: Future,
O: Observer<F::Output, Infallible>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.poll(cx) {
Poll::Ready(value) => {
if let Some(mut observer) = this.observer.take() {
observer.next(value);
observer.complete();
}
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
}
impl<F, C, S> CoreObservable<C> for FromFuture<F, S>
where
C: Context,
C::Inner: Observer<F::Output, Infallible>,
F: Future,
S: Scheduler<FromFutureTask<F, C::Inner>> + Clone,
{
type Unsub = TaskHandle;
fn subscribe(self, context: C) -> Self::Unsub {
let observer = context.into_inner();
let task = FromFutureTask { future: self.future, observer: Some(observer) };
self.scheduler.schedule(task, None)
}
}
#[derive(Clone)]
pub struct FromFutureResult<F, S> {
pub future: F,
pub scheduler: S,
}
impl<F, Item, Err, S> ObservableType for FromFutureResult<F, S>
where
F: Future<Output = Result<Item, Err>>,
{
type Item<'a>
= Item
where
Self: 'a;
type Err = Err;
}
pin_project! {
pub struct FromFutureResultTask<F, O> {
#[pin]
future: F,
observer: Option<O>,
}
}
impl<F, O, Item, Err> Future for FromFutureResultTask<F, O>
where
F: Future<Output = Result<Item, Err>>,
O: Observer<Item, Err>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.poll(cx) {
Poll::Ready(result) => {
if let Some(mut observer) = this.observer.take() {
match result {
Ok(value) => {
observer.next(value);
observer.complete();
}
Err(err) => {
observer.error(err);
}
}
}
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
}
impl<F, C, S, Item, Err> CoreObservable<C> for FromFutureResult<F, S>
where
C: Context,
C::Inner: Observer<Item, Err>,
F: Future<Output = Result<Item, Err>>,
S: Scheduler<FromFutureResultTask<F, C::Inner>> + Clone,
{
type Unsub = TaskHandle;
fn subscribe(self, context: C) -> Self::Unsub {
let observer = context.into_inner();
let task = FromFutureResultTask { future: self.future, observer: Some(observer) };
self.scheduler.schedule(task, None)
}
}
#[cfg(test)]
mod tests {
use std::{
future,
sync::{Arc, Mutex},
};
use crate::{
prelude::*,
scheduler::{Duration, LocalScheduler, SleepProvider},
};
#[rxrust_macro::test(local)]
async fn test_from_future_ready() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
let completed = Arc::new(Mutex::new(false));
let completed_clone = completed.clone();
Local::from_future(future::ready(42))
.on_complete(move || *completed_clone.lock().unwrap() = true)
.subscribe(move |v| {
*result_clone.lock().unwrap() = Some(v);
});
LocalScheduler
.sleep(Duration::from_millis(0))
.await;
assert_eq!(*result.lock().unwrap(), Some(42));
assert!(*completed.lock().unwrap());
}
#[rxrust_macro::test(local)]
async fn test_from_future_async() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
let completed = Arc::new(Mutex::new(false));
let completed_clone = completed.clone();
Local::from_future(future::ready(42))
.on_complete(move || *completed_clone.lock().unwrap() = true)
.subscribe(move |v| {
*result_clone.lock().unwrap() = Some(v);
});
LocalScheduler
.sleep(Duration::from_millis(0))
.await;
assert_eq!(*result.lock().unwrap(), Some(42));
assert!(*completed.lock().unwrap());
}
#[cfg(not(target_arch = "wasm32"))]
#[rxrust_macro::test]
async fn test_from_future_shared() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
let completed = Arc::new(Mutex::new(false));
let completed_clone = completed.clone();
let handle = Shared::from_future(future::ready("hello"))
.on_complete(move || *completed_clone.lock().unwrap() = true)
.subscribe(move |v| {
*result_clone.lock().unwrap() = Some(v);
});
handle.await;
assert_eq!(*result.lock().unwrap(), Some("hello"));
assert!(*completed.lock().unwrap());
}
#[rxrust_macro::test(local)]
async fn test_from_future_with_map() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
Local::from_future(future::ready(10))
.map(|v| v * 2)
.subscribe(move |v| {
*result_clone.lock().unwrap() = Some(v);
});
LocalScheduler
.sleep(Duration::from_millis(0))
.await;
assert_eq!(*result.lock().unwrap(), Some(20));
}
#[rxrust_macro::test(local)]
async fn test_from_future_result_ok() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
let completed = Arc::new(Mutex::new(false));
let completed_clone = completed.clone();
let error_received = Arc::new(Mutex::new(false));
let error_clone = error_received.clone();
Local::from_future_result(future::ready(Ok::<_, String>(42)))
.on_complete(move || *completed_clone.lock().unwrap() = true)
.on_error(move |_| *error_clone.lock().unwrap() = true)
.subscribe(move |v| {
*result_clone.lock().unwrap() = Some(v);
});
LocalScheduler
.sleep(Duration::from_millis(0))
.await;
assert_eq!(*result.lock().unwrap(), Some(42));
assert!(*completed.lock().unwrap());
assert!(!*error_received.lock().unwrap());
}
#[rxrust_macro::test(local)]
async fn test_from_future_result_err() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
let completed = Arc::new(Mutex::new(false));
let completed_clone = completed.clone();
let error_received = Arc::new(Mutex::new(None));
let error_clone = error_received.clone();
Local::from_future_result(future::ready(Err::<i32, _>("test error".to_string())))
.on_complete(move || *completed_clone.lock().unwrap() = true)
.on_error(move |e| *error_clone.lock().unwrap() = Some(e))
.subscribe(move |v| *result_clone.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(0))
.await;
assert_eq!(*result.lock().unwrap(), None);
assert!(!*completed.lock().unwrap());
assert_eq!(*error_received.lock().unwrap(), Some("test error".to_string()));
}
#[cfg(not(target_arch = "wasm32"))]
#[rxrust_macro::test]
async fn test_from_future_result_shared_ok() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
let completed = Arc::new(Mutex::new(false));
let completed_clone = completed.clone();
let error_received = Arc::new(Mutex::new(false));
let error_clone = error_received.clone();
let handle = Shared::from_future_result(future::ready(Ok::<_, String>("success")))
.on_complete(move || *completed_clone.lock().unwrap() = true)
.on_error(move |_| *error_clone.lock().unwrap() = true)
.subscribe(move |v| *result_clone.lock().unwrap() = Some(v));
handle.await;
assert_eq!(*result.lock().unwrap(), Some("success"));
assert!(*completed.lock().unwrap());
assert!(!*error_received.lock().unwrap());
}
#[cfg(not(target_arch = "wasm32"))]
#[rxrust_macro::test]
async fn test_from_future_result_shared_err() {
let error_received = Arc::new(Mutex::new(None));
let error_clone = error_received.clone();
let completed = Arc::new(Mutex::new(false));
let completed_clone = completed.clone();
let handle = Shared::from_future_result(future::ready(Err::<i32, _>("shared error")))
.on_complete(move || *completed_clone.lock().unwrap() = true)
.on_error(move |e| *error_clone.lock().unwrap() = Some(e))
.subscribe(|_| {});
handle.await;
assert!(!*completed.lock().unwrap());
assert_eq!(*error_received.lock().unwrap(), Some("shared error"));
}
#[rxrust_macro::test(local)]
async fn test_from_future_result_with_map() {
let result = Arc::new(Mutex::new(None));
let result_clone = result.clone();
Local::from_future_result(future::ready(Ok::<_, String>(10)))
.map(|v| v * 2)
.on_error(|_| {})
.subscribe(move |v| *result_clone.lock().unwrap() = Some(v));
LocalScheduler
.sleep(Duration::from_millis(0))
.await;
assert_eq!(*result.lock().unwrap(), Some(20));
}
}