wasmrs_rx/
lib.rs

1#![deny(
2  clippy::expect_used,
3  clippy::explicit_deref_methods,
4  clippy::option_if_let_else,
5  clippy::await_holding_lock,
6  clippy::cloned_instead_of_copied,
7  clippy::explicit_into_iter_loop,
8  clippy::flat_map_option,
9  clippy::fn_params_excessive_bools,
10  clippy::implicit_clone,
11  clippy::inefficient_to_string,
12  clippy::large_types_passed_by_value,
13  clippy::manual_ok_or,
14  clippy::map_flatten,
15  clippy::map_unwrap_or,
16  clippy::must_use_candidate,
17  clippy::needless_for_each,
18  clippy::needless_pass_by_value,
19  clippy::option_option,
20  clippy::redundant_else,
21  clippy::semicolon_if_nothing_returned,
22  clippy::too_many_lines,
23  clippy::trivially_copy_pass_by_ref,
24  clippy::unnested_or_patterns,
25  clippy::future_not_send,
26  clippy::useless_let_if_seq,
27  clippy::str_to_string,
28  clippy::inherent_to_string,
29  clippy::let_and_return,
30  clippy::string_to_string,
31  clippy::try_err,
32  clippy::unused_async,
33  clippy::missing_enforced_import_renames,
34  clippy::nonstandard_macro_braces,
35  clippy::rc_mutex,
36  clippy::unwrap_or_else_default,
37  clippy::manual_split_once,
38  clippy::derivable_impls,
39  clippy::needless_option_as_deref,
40  clippy::iter_not_returning_iterator,
41  clippy::same_name_method,
42  clippy::manual_assert,
43  clippy::non_send_fields_in_send_ty,
44  clippy::equatable_if_let,
45  bad_style,
46  clashing_extern_declarations,
47  dead_code,
48  deprecated,
49  explicit_outlives_requirements,
50  improper_ctypes,
51  invalid_value,
52  missing_copy_implementations,
53  missing_debug_implementations,
54  mutable_transmutes,
55  no_mangle_generic_items,
56  non_shorthand_field_patterns,
57  overflowing_literals,
58  path_statements,
59  patterns_in_fns_without_body,
60  private_in_public,
61  trivial_bounds,
62  trivial_casts,
63  trivial_numeric_casts,
64  type_alias_bounds,
65  unconditional_recursion,
66  unreachable_pub,
67  unsafe_code,
68  unstable_features,
69  unused,
70  unused_allocation,
71  unused_comparisons,
72  unused_import_braces,
73  unused_parens,
74  unused_qualifications,
75  while_true,
76  missing_docs
77)]
78#![doc = include_str!("../README.md")]
79
80mod error;
81mod flux;
82
83pub use flux::*;
84
85pub use error::Error;
86use futures::Stream;
87use wasmrs_runtime::ConditionallySendSync;
88
89/// A generic trait to wrap over Flux, Mono, and supporting types.
90pub trait Flux<I, E>: Stream<Item = Result<I, E>> + Unpin + ConditionallySendSync {}
91
92#[cfg(target_family = "wasm")]
93mod wasm {
94  /// A utility type for a boxed future.
95  pub type BoxMono<T, E> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + 'static>>;
96
97  /// A utility type for a boxed stream.
98  pub type BoxFlux<T, E> = std::pin::Pin<Box<dyn futures::Stream<Item = Result<T, E>> + 'static>>;
99}
100#[cfg(target_family = "wasm")]
101pub use wasm::*;
102
103#[cfg(not(target_family = "wasm"))]
104mod native {
105  /// A utility type for a boxed future.
106  pub type BoxMono<T, E> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send + 'static>>;
107
108  /// A utility type for a boxed stream.
109  pub type BoxFlux<T, E> = std::pin::Pin<Box<dyn futures::Stream<Item = Result<T, E>> + Send + 'static>>;
110}
111#[cfg(not(target_family = "wasm"))]
112pub use native::*;
113
114impl<I, E, T> Flux<I, E> for T
115where
116  T: Stream<Item = Result<I, E>> + Unpin,
117  T: ConditionallySendSync,
118{
119}
120
121#[cfg(test)]
122mod test {
123  use super::*;
124  use anyhow::Result;
125  use futures::StreamExt;
126
127  #[tokio::test]
128  async fn test_basic() -> Result<()> {
129    async fn takes_any(mut stream: impl Flux<u32, u32>) -> Vec<u32> {
130      let mut acc = vec![];
131      while let Some(Ok(v)) = stream.next().await {
132        acc.push(v);
133      }
134      acc
135    }
136    let flux = FluxChannel::<u32, u32>::new();
137    flux.send(1)?;
138    flux.send(2)?;
139    flux.send(3)?;
140    flux.send(4)?;
141    flux.complete();
142
143    println!("waiting for flux results");
144    let results = takes_any(flux).await;
145    assert_eq!(results, vec![1, 2, 3, 4]);
146
147    let mono = Mono::<u32, u32>::from_future(async move { Ok(42) });
148    println!("waiting for mono results");
149    let results = takes_any(mono).await;
150    assert_eq!(results, vec![42]);
151    Ok(())
152  }
153}