use futures::{
FutureExt, Stream, StreamExt,
stream::{self, BoxStream, select_all},
};
pub enum Action<Msg> {
Message(Msg),
Quit,
}
#[must_use = "Commands represent side effects in the Elm Architecture and must be handled by the runtime. Ignoring a command means the intended side effect will not occur."]
pub struct Command<Msg: Send + 'static> {
pub(super) stream: Option<BoxStream<'static, Action<Msg>>>,
}
impl<Msg: Send + 'static> Command<Msg> {
pub const fn none() -> Self {
Self { stream: None }
}
#[must_use]
pub fn is_none(&self) -> bool {
self.stream.is_none()
}
#[must_use]
pub fn is_some(&self) -> bool {
self.stream.is_some()
}
pub fn perform<A>(
future: impl Future<Output = A> + Send + 'static,
f: impl FnOnce(A) -> Msg + Send + 'static,
) -> Self {
Self::future(future.map(f))
}
pub fn future(future: impl Future<Output = Msg> + Send + 'static) -> Self {
Self {
stream: Some(future.into_stream().map(Action::Message).boxed()),
}
}
pub fn message(msg: Msg) -> Self {
Self::effect(Action::Message(msg))
}
pub fn effect(action: Action<Msg>) -> Self {
Self {
stream: Some(stream::once(async move { action }).boxed()),
}
}
pub fn batch(commands: impl IntoIterator<Item = Self>) -> Self {
let streams: Vec<_> = commands.into_iter().filter_map(|cmd| cmd.stream).collect();
if streams.is_empty() {
Self::none()
} else {
Self {
stream: Some(select_all(streams).boxed()),
}
}
}
pub fn stream(stream: impl Stream<Item = Msg> + Send + 'static) -> Self {
Self {
stream: Some(stream.map(Action::Message).boxed()),
}
}
pub fn run<A>(
stream: impl Stream<Item = A> + Send + 'static,
f: impl Fn(A) -> Msg + Send + 'static,
) -> Self
where
Msg: 'static,
{
Self::stream(stream.map(f))
}
pub fn map<T>(self, f: impl Fn(Msg) -> T + Send + 'static) -> Command<T>
where
T: Send + 'static,
{
self.stream.map_or_else(Command::none, |stream| {
let mapped = stream.map(move |action| match action {
Action::Message(msg) => Action::Message(f(msg)),
Action::Quit => Action::Quit,
});
Command {
stream: Some(mapped.boxed()),
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use futures::stream;
use tokio::time::{Duration, sleep};
#[tokio::test]
async fn test_batch_empty() {
let cmd: Command<i32> = Command::batch(vec![]);
assert!(cmd.is_none());
}
#[tokio::test]
async fn test_batch_single_command() {
let cmd1 = Command::future(async { 1 });
let cmd = Command::batch(vec![cmd1]);
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == 1));
}
#[tokio::test]
async fn test_batch_multiple_commands() {
let cmd1 = Command::future(async { 1 });
let cmd2 = Command::future(async { 2 });
let cmd3 = Command::future(async { 3 });
let cmd = Command::batch(vec![cmd1, cmd2, cmd3]);
let mut stream = cmd.stream.expect("stream should exist");
let mut results = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => results.push(msg),
Action::Quit => break,
}
}
results.sort_unstable();
assert_eq!(results, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_batch_with_none_commands() {
let cmd1 = Command::future(async { 1 });
let cmd2 = Command::<i32>::none();
let cmd3 = Command::future(async { 3 });
let cmd = Command::batch(vec![cmd1, cmd2, cmd3]);
let mut stream = cmd.stream.expect("stream should exist");
let mut results = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => results.push(msg),
Action::Quit => break,
}
}
results.sort_unstable();
assert_eq!(results, vec![1, 3]);
}
#[tokio::test]
async fn test_batch_all_none() {
let cmd1 = Command::<i32>::none();
let cmd2 = Command::<i32>::none();
let cmd = Command::batch(vec![cmd1, cmd2]);
assert!(cmd.is_none());
}
#[tokio::test]
async fn test_batch_with_quit_action() {
let cmd1 = Command::future(async { 1 });
let cmd2 = Command::effect(Action::Quit);
let cmd3 = Command::future(async { 3 });
let cmd = Command::batch(vec![cmd1, cmd2, cmd3]);
let mut stream = cmd.stream.expect("stream should exist");
let mut has_quit = false;
let mut messages = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => messages.push(msg),
Action::Quit => {
has_quit = true;
break;
}
}
}
assert!(has_quit, "should receive quit action");
assert!(!messages.is_empty());
}
#[tokio::test]
async fn test_stream() {
let input_stream = stream::iter(vec![1, 2, 3]);
let cmd = Command::stream(input_stream);
let mut stream = cmd.stream.expect("stream should exist");
let mut results = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => results.push(msg),
Action::Quit => break,
}
}
assert_eq!(results, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_run() {
let input_stream = stream::iter(vec![1, 2, 3]);
let cmd = Command::run(input_stream, |x| x * 2);
let mut stream = cmd.stream.expect("stream should exist");
let mut results = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => results.push(msg),
Action::Quit => break,
}
}
assert_eq!(results, vec![2, 4, 6]);
}
#[tokio::test]
async fn test_run_with_conversion() {
#[derive(Debug, PartialEq)]
enum Message {
Number(i32),
}
let input_stream = stream::iter(vec![1, 2, 3]);
let cmd = Command::run(input_stream, |x| Message::Number(x * 10));
let mut stream = cmd.stream.expect("stream should exist");
let mut results = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => results.push(msg),
Action::Quit => break,
}
}
assert_eq!(
results,
vec![
Message::Number(10),
Message::Number(20),
Message::Number(30)
]
);
}
#[tokio::test]
async fn test_run_with_empty_stream() {
let input_stream = stream::iter(Vec::<i32>::new());
let cmd = Command::run(input_stream, |x| x * 2);
let mut stream = cmd.stream.expect("stream should exist");
let result = stream.next().await;
assert!(result.is_none(), "empty stream should produce no messages");
}
#[tokio::test]
async fn test_none() {
let cmd: Command<i32> = Command::none();
assert!(cmd.is_none());
}
#[tokio::test]
async fn test_future() {
let cmd = Command::future(async { 42 });
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == 42));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_perform() {
#[allow(clippy::unused_async)]
async fn fetch_value() -> i32 {
42
}
let cmd = Command::perform(fetch_value(), |x| x * 2);
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == 84));
}
#[tokio::test]
async fn test_perform_with_result() {
#[allow(clippy::unused_async)]
async fn fallible_operation() -> Result<String, String> {
Ok("success".to_string())
}
let cmd = Command::perform(fallible_operation(), |result| match result {
Ok(s) => format!("Got: {s}"),
Err(e) => format!("Error: {e}"),
});
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == "Got: success"));
}
#[tokio::test]
async fn test_message() {
let cmd = Command::message(42);
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == 42));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_message_with_string() {
let cmd = Command::message("hello".to_string());
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == "hello"));
}
#[tokio::test]
async fn test_effect_with_message() {
let cmd = Command::effect(Action::Message(100));
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == 100));
}
#[tokio::test]
async fn test_effect_with_quit() {
let cmd: Command<i32> = Command::effect(Action::Quit);
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Quit));
}
#[tokio::test]
async fn test_stream_empty() {
let input_stream = stream::iter(Vec::<i32>::new());
let cmd = Command::stream(input_stream);
let mut stream = cmd.stream.expect("stream should exist");
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_batch_nested() {
let cmd1 = Command::future(async { 1 });
let cmd2 = Command::future(async { 2 });
let batch1 = Command::batch(vec![cmd1, cmd2]);
let cmd3 = Command::future(async { 3 });
let cmd4 = Command::future(async { 4 });
let batch2 = Command::batch(vec![cmd3, cmd4]);
let final_batch = Command::batch(vec![batch1, batch2]);
let mut stream = final_batch.stream.expect("stream should exist");
let mut results = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => results.push(msg),
Action::Quit => break,
}
}
results.sort_unstable();
assert_eq!(results, vec![1, 2, 3, 4]);
}
#[tokio::test]
async fn test_future_with_delay() {
let cmd = Command::future(async {
sleep(Duration::from_millis(10)).await;
"delayed".to_string()
});
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == "delayed"));
}
#[tokio::test]
async fn test_perform_with_error_handling() {
#[allow(clippy::unused_async)]
async fn may_fail(should_fail: bool) -> Result<i32, &'static str> {
if should_fail {
Err("operation failed")
} else {
Ok(42)
}
}
let cmd = Command::perform(may_fail(false), |result| result.unwrap_or(-1));
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == 42));
let cmd = Command::perform(may_fail(true), |result| result.unwrap_or(-1));
let mut stream = cmd.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == -1));
}
#[tokio::test]
async fn test_batch_execution_order_independence() {
let cmd1 = Command::future(async {
sleep(Duration::from_millis(30)).await;
1
});
let cmd2 = Command::future(async {
sleep(Duration::from_millis(10)).await;
2
});
let cmd3 = Command::future(async {
sleep(Duration::from_millis(20)).await;
3
});
let cmd = Command::batch(vec![cmd1, cmd2, cmd3]);
let mut stream = cmd.stream.expect("stream should exist");
let mut results = vec![];
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => results.push(msg),
Action::Quit => break,
}
}
results.sort_unstable();
assert_eq!(results, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_map() {
let cmd = Command::future(async { 42 });
let mapped = cmd.map(|x| x * 2);
let mut stream = mapped.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(msg) if msg == 84));
}
#[tokio::test]
async fn test_map_with_type_conversion() {
#[derive(Debug, PartialEq)]
enum Message {
Number(i32),
}
let cmd: Command<i32> = Command::future(async { 42 });
let mapped = cmd.map(Message::Number);
let mut stream = mapped.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(Message::Number(42))));
}
#[tokio::test]
async fn test_map_with_result() {
#[derive(Debug, PartialEq)]
enum Message {
Success(String),
Error(String),
}
let cmd: Command<Result<String, String>> =
Command::future(async { Ok("data".to_string()) });
let mapped = cmd.map(|result| match result {
Ok(s) => Message::Success(s),
Err(e) => Message::Error(e),
});
let mut stream = mapped.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Message(Message::Success(ref s)) if s == "data"));
}
#[tokio::test]
async fn test_map_none() {
let cmd: Command<i32> = Command::none();
let mapped = cmd.map(|x| x * 2);
assert!(mapped.is_none());
}
#[tokio::test]
async fn test_map_preserves_quit() {
let cmd: Command<i32> = Command::effect(Action::Quit);
let mapped = cmd.map(|x| x * 2);
let mut stream = mapped.stream.expect("stream should exist");
let action = stream.next().await.expect("should have action");
assert!(matches!(action, Action::Quit));
}
#[test]
fn test_is_none() {
let cmd: Command<i32> = Command::none();
assert!(cmd.is_none());
assert!(!cmd.is_some());
}
#[test]
fn test_is_some() {
let cmd = Command::perform(async { 42 }, |x| x);
assert!(cmd.is_some());
assert!(!cmd.is_none());
}
#[test]
fn test_is_some_with_future() {
let cmd = Command::future(async { 100 });
assert!(cmd.is_some());
assert!(!cmd.is_none());
}
#[test]
fn test_is_some_with_message() {
let cmd = Command::message("test");
assert!(cmd.is_some());
assert!(!cmd.is_none());
}
#[test]
fn test_is_some_with_effect() {
let cmd: Command<i32> = Command::effect(Action::Quit);
assert!(cmd.is_some());
assert!(!cmd.is_none());
}
#[test]
fn test_is_none_after_batch_empty() {
let cmd: Command<i32> = Command::batch(vec![]);
assert!(cmd.is_none());
assert!(!cmd.is_some());
}
#[test]
fn test_is_none_after_batch_all_none() {
let cmd = Command::batch(vec![
Command::<i32>::none(),
Command::<i32>::none(),
Command::<i32>::none(),
]);
assert!(cmd.is_none());
assert!(!cmd.is_some());
}
#[test]
fn test_is_some_after_batch_with_some() {
let cmd = Command::batch(vec![
Command::<i32>::none(),
Command::future(async { 42 }),
Command::<i32>::none(),
]);
assert!(cmd.is_some());
assert!(!cmd.is_none());
}
#[test]
fn test_is_none_after_map_none() {
let cmd: Command<i32> = Command::none();
let mapped = cmd.map(|x| x * 2);
assert!(mapped.is_none());
assert!(!mapped.is_some());
}
#[test]
fn test_is_some_after_map_some() {
let cmd = Command::future(async { 42 });
let mapped = cmd.map(|x| x * 2);
assert!(mapped.is_some());
assert!(!mapped.is_none());
}
}