use std::{
cell::RefCell,
fmt::Display,
future::Future,
pin::Pin,
rc::Rc,
task::{Context as TaskContext, Poll, Waker},
};
use crate::{
context::Context,
observable::{CoreObservable, Observable, ObservableType},
observer::Observer,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IntoFutureError {
Empty,
MultipleValues,
}
impl Display for IntoFutureError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IntoFutureError::Empty => write!(f, "the observable has no values"),
IntoFutureError::MultipleValues => {
write!(f, "the observable emitted more than one value")
}
}
}
}
impl std::error::Error for IntoFutureError {}
pub type IntoFutureResult<T, E> = Result<Result<T, E>, IntoFutureError>;
pub(crate) enum State<Item, Err> {
Empty,
HasValue(Item),
MultipleValues,
Error(Err),
}
pub(crate) struct SharedState<Item, Err> {
pub(crate) state: State<Item, Err>,
pub(crate) waker: Option<Waker>,
pub(crate) completed: bool,
}
pub struct ObservableFuture<Item, Err> {
shared: Rc<RefCell<SharedState<Item, Err>>>,
}
impl<Item, Err> Future for ObservableFuture<Item, Err> {
type Output = IntoFutureResult<Item, Err>;
fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let mut shared = self.shared.borrow_mut();
if shared.completed {
let result = match std::mem::replace(&mut shared.state, State::Empty) {
State::Empty => Err(IntoFutureError::Empty),
State::HasValue(v) => Ok(Ok(v)),
State::MultipleValues => Err(IntoFutureError::MultipleValues),
State::Error(e) => Ok(Err(e)),
};
Poll::Ready(result)
} else {
shared.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
pub struct IntoFutureObserver<Item, Err> {
shared: Rc<RefCell<SharedState<Item, Err>>>,
}
impl<Item, Err> IntoFutureObserver<Item, Err> {
pub(crate) fn new(shared: Rc<RefCell<SharedState<Item, Err>>>) -> Self { Self { shared } }
fn wake(&self) {
if let Some(waker) = self.shared.borrow_mut().waker.take() {
waker.wake();
}
}
}
impl<Item, Err> Observer<Item, Err> for IntoFutureObserver<Item, Err> {
fn next(&mut self, value: Item) {
let mut shared = self.shared.borrow_mut();
match &shared.state {
State::Empty => shared.state = State::HasValue(value),
State::HasValue(_) => {
shared.state = State::MultipleValues;
shared.completed = true;
drop(shared);
self.wake();
}
State::MultipleValues | State::Error(_) => {}
}
}
fn error(self, err: Err) {
{
let mut shared = self.shared.borrow_mut();
shared.state = State::Error(err);
shared.completed = true;
}
self.wake();
}
fn complete(self) {
self.shared.borrow_mut().completed = true;
self.wake();
}
fn is_closed(&self) -> bool {
let shared = self.shared.borrow();
shared.completed || matches!(shared.state, State::MultipleValues | State::Error(_))
}
}
pub fn observable_into_future<T, E, F>(subscribe_fn: F) -> ObservableFuture<T, E>
where
F: FnOnce(IntoFutureObserver<T, E>),
{
let shared =
Rc::new(RefCell::new(SharedState { state: State::Empty, waker: None, completed: false }));
let observer = IntoFutureObserver::new(shared.clone());
subscribe_fn(observer);
ObservableFuture { shared }
}
#[doc(hidden)]
pub trait SupportsIntoFuture<'a, C>: ObservableType + Sized
where
C: Context<Inner = Self> + Observable + 'a,
Self: 'a,
{
fn into_future(ctx: C) -> ObservableFuture<C::Item<'a>, C::Err>;
}
impl<'a, C, T> SupportsIntoFuture<'a, C> for T
where
C: Context<Inner = T> + Observable + 'a,
T: ObservableType + 'a,
T: CoreObservable<C::With<IntoFutureObserver<C::Item<'a>, C::Err>>>,
{
fn into_future(ctx: C) -> ObservableFuture<C::Item<'a>, C::Err> {
observable_into_future(|observer| {
let (core, wrapped) = ctx.swap(observer);
core.subscribe(wrapped);
})
}
}
#[cfg(test)]
mod tests {
use futures::task::noop_waker;
use super::*;
use crate::prelude::*;
#[rxrust_macro::test(local)]
async fn test_into_future_single_value() {
let fut = Local::of(42).into_future();
let value = fut.await;
assert_eq!(value, Ok(Ok(42)));
}
#[rxrust_macro::test(local)]
async fn test_into_future_empty_observable() {
let fut = Local::from_iter(std::iter::empty::<i32>()).into_future();
let value = fut.await;
assert_eq!(value, Err(IntoFutureError::Empty));
}
#[rxrust_macro::test(local)]
async fn test_into_future_multiple_values() {
let fut = Local::from_iter([1, 2, 3]).into_future();
let value = fut.await;
assert_eq!(value, Err(IntoFutureError::MultipleValues));
}
#[rxrust_macro::test(local)]
async fn test_into_future_with_map() {
let fut = Local::of(4)
.map(|x| format!("Number {x}"))
.into_future();
let value = fut.await.unwrap().ok().unwrap();
assert_eq!(value, "Number 4");
}
#[rxrust_macro::test(local)]
async fn test_into_future_error() {
let fut = Local::throw_err("test error".to_string()).into_future();
let value = fut.await;
assert_eq!(value, Ok(Err("test error".to_string())));
}
#[rxrust_macro::test]
fn test_into_future_with_delay() {
TestScheduler::init();
let mut fut = TestCtx::of(42)
.delay(Duration::from_millis(100))
.into_future();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
let poll_result = Pin::new(&mut fut).poll(&mut cx);
assert!(poll_result.is_pending());
TestScheduler::advance_by(Duration::from_millis(100));
let poll_result = Pin::new(&mut fut).poll(&mut cx);
assert_eq!(poll_result, Poll::Ready(Ok(Ok(42))));
}
#[rxrust_macro::test]
fn test_into_future_sync_observable_completes_immediately() {
let shared = Rc::new(RefCell::new(SharedState::<i32, ()> {
state: State::Empty,
waker: None,
completed: false,
}));
let observer = IntoFutureObserver::new(shared.clone());
let mut observer_mut = observer;
observer_mut.next(42);
{
let mut s = shared.borrow_mut();
s.completed = true;
}
let mut fut = ObservableFuture { shared };
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
let poll_result = Pin::new(&mut fut).poll(&mut cx);
assert_eq!(
poll_result,
Poll::Ready(Ok(Ok(42))),
"Synchronous observable should complete immediately"
);
}
}