crux_core/command/
builder.rs

1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{Command, context::CommandContext};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23    Effect: Send + 'static,
24    Event: Send + 'static,
25    Task: Future<Output = ()> + Send + 'static,
26{
27    pub fn new<F>(make_task: F) -> Self
28    where
29        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30    {
31        let make_task = Box::new(make_task);
32
33        NotificationBuilder { make_task }
34    }
35
36    /// Convert the [`NotificationBuilder`] into a future to use in an async context
37    #[must_use]
38    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
39        let make_task = self.make_task;
40        make_task(ctx)
41    }
42
43    /// Convert the [`NotificationBuilder`] into a [`Command`] to use in an sync context
44    pub fn build(self) -> Command<Effect, Event> {
45        Command::new(|ctx| async move {
46            self.into_future(ctx.clone()).await;
47        })
48    }
49}
50
51impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
52where
53    Effect: Send + 'static,
54    Event: Send + 'static,
55    Task: Future<Output = ()> + Send + 'static,
56{
57    fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
58        Command::new(|ctx| value.into_future(ctx))
59    }
60}
61
62/// A builder of one-off request command
63// Task is a future which does the shell talking and returns an output
64pub struct RequestBuilder<Effect, Event, Task> {
65    make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
66}
67
68impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
69where
70    Effect: Send + 'static,
71    Event: Send + 'static,
72    Task: Future<Output = T> + Send + 'static,
73{
74    pub fn new<F>(make_task: F) -> Self
75    where
76        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
77    {
78        let make_task = Box::new(make_task);
79
80        RequestBuilder { make_task }
81    }
82
83    pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
84    where
85        F: FnOnce(T) -> U + Send + 'static,
86    {
87        RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
88    }
89
90    /// Chain a [`NotificationBuilder`] to run after completion of this one,
91    /// passing the result to the provided closure `make_next_builder`.
92    ///
93    /// The returned value of the closure must be a `NotificationBuilder`, which
94    /// can represent the notification to be sent before the composed future
95    /// is finished.
96    ///
97    /// If you want to chain a request, use [`Self::then_request`] instead.
98    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
99    ///
100    /// The closure `make_next_builder` is only run *after* successful completion
101    /// of the `self` future.
102    ///
103    /// Note that this function consumes the receiving `RequestBuilder`
104    /// and returns a [`NotificationBuilder`] that represents the composition.
105    ///
106    /// # Example
107    ///
108    /// ```
109    /// # use crux_core::{Command, Request};
110    /// # use crux_core::capability::Operation;
111    /// # use serde::{Deserialize, Serialize};
112    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
113    /// # enum AnOperation {
114    /// #     Request(u8),
115    /// #     Notify,
116    /// # }
117    /// #
118    /// # #[derive(Debug, PartialEq, Deserialize)]
119    /// # enum AnOperationOutput {
120    /// #     Response(String),
121    /// # }
122    /// #
123    /// # impl Operation for AnOperation {
124    /// #     type Output = AnOperationOutput;
125    /// # }
126    /// #
127    /// # #[derive(Debug)]
128    /// # enum Effect {
129    /// #     AnEffect(Request<AnOperation>),
130    /// # }
131    /// #
132    /// # impl From<Request<AnOperation>> for Effect {
133    /// #     fn from(request: Request<AnOperation>) -> Self {
134    /// #         Self::AnEffect(request)
135    /// #     }
136    /// # }
137    /// #
138    /// # #[derive(Debug, PartialEq)]
139    /// # enum Event {
140    /// #     Response(AnOperationOutput),
141    /// # }
142    /// let mut cmd: Command<Effect, Event> =
143    ///     Command::request_from_shell(AnOperation::Request(10))
144    ///     .then_notify(|response| {
145    ///         let AnOperationOutput::Response(_response) = response else {
146    ///             panic!("Invalid output!")
147    ///         };
148    ///
149    ///         // possibly do something with the response
150    ///
151    ///         Command::notify_shell(AnOperation::Notify)
152    ///     })
153    ///     .build();
154    ///
155    /// let effect = cmd.effects().next().unwrap();
156    /// let Effect::AnEffect(mut request) = effect;
157    ///
158    /// assert_eq!(request.operation, AnOperation::Request(10));
159    ///
160    /// request
161    ///     .resolve(AnOperationOutput::Response("ten".to_string()))
162    ///     .expect("should work");
163    ///
164    /// assert!(cmd.events().next().is_none());
165    /// let effect = cmd.effects().next().unwrap();
166    /// let Effect::AnEffect(request) = effect;
167    ///
168    /// assert_eq!(request.operation, AnOperation::Notify);
169    /// assert!(cmd.is_done());
170    /// ```
171    pub fn then_notify<F, NextTask>(
172        self,
173        make_next_builder: F,
174    ) -> NotificationBuilder<Effect, Event, impl Future<Output = ()>>
175    where
176        F: FnOnce(T) -> NotificationBuilder<Effect, Event, NextTask> + Send + 'static,
177        NextTask: Future<Output = ()> + Send + 'static,
178    {
179        NotificationBuilder::new(|ctx| {
180            self.into_future(ctx.clone())
181                .then(|out| make_next_builder(out).into_future(ctx))
182        })
183    }
184
185    /// Chain another [`RequestBuilder`] to run after completion of this one,
186    /// passing the result to the provided closure `make_next_builder`.
187    ///
188    /// The returned value of the closure must be a `RequestBuilder`, which
189    /// can represent some more work to be done before the composed future
190    /// is finished.
191    ///
192    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
193    ///
194    /// The closure `make_next_builder` is only run *after* successful completion
195    /// of the `self` future.
196    ///
197    /// Note that this function consumes the receiving `RequestBuilder` and returns a
198    /// new one that represents the composition.
199    ///
200    /// # Example
201    ///
202    /// ```
203    /// # use crux_core::{Command, Request};
204    /// # use crux_core::capability::Operation;
205    /// # use serde::{Deserialize, Serialize};
206    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
207    /// # enum AnOperation {
208    /// #     One,
209    /// #     Two,
210    /// #     More(u8),
211    /// # }
212    /// #
213    /// # #[derive(Debug, PartialEq, Deserialize)]
214    /// # enum AnOperationOutput {
215    /// #     One,
216    /// #     Two,
217    /// #     Other(u8),
218    /// # }
219    /// #
220    /// # impl Operation for AnOperation {
221    /// #     type Output = AnOperationOutput;
222    /// # }
223    /// #
224    /// # #[derive(Debug)]
225    /// # enum Effect {
226    /// #     AnEffect(Request<AnOperation>),
227    /// # }
228    /// #
229    /// # impl From<Request<AnOperation>> for Effect {
230    /// #     fn from(request: Request<AnOperation>) -> Self {
231    /// #         Self::AnEffect(request)
232    /// #     }
233    /// # }
234    /// #
235    /// # #[derive(Debug, PartialEq)]
236    /// # enum Event {
237    /// #     Completed(AnOperationOutput),
238    /// # }
239    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
240    ///     .then_request(|first| {
241    ///         let AnOperationOutput::Other(first) = first else {
242    ///             panic!("Invalid output!")
243    ///         };
244    ///
245    ///         let second = first + 1;
246    ///         Command::request_from_shell(AnOperation::More(second))
247    ///     })
248    ///     .then_send(Event::Completed);
249    ///
250    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
251    /// assert_eq!(request.operation, AnOperation::More(1));
252    ///
253    /// request
254    ///    .resolve(AnOperationOutput::Other(1))
255    ///    .expect("to resolve");
256    ///
257    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
258    /// assert_eq!(request.operation, AnOperation::More(2));
259    /// ```
260    pub fn then_request<F, U, NextTask>(
261        self,
262        make_next_builder: F,
263    ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
264    where
265        F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
266        NextTask: Future<Output = U> + Send + 'static,
267    {
268        RequestBuilder::new(|ctx| {
269            self.into_future(ctx.clone())
270                .then(|out| make_next_builder(out).into_future(ctx))
271        })
272    }
273
274    /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
275    /// passing the result to the provided closure `make_next_builder`.
276    ///
277    /// The returned value of the closure must be a `StreamBuilder`, which
278    /// can represent some more work to be done before the composed future
279    /// is finished.
280    ///
281    /// If you want to chain a request, use [`Self::then_request`] instead.
282    ///
283    /// The closure `make_next_builder` is only run *after* successful completion
284    /// of the `self` future.
285    ///
286    /// Note that this function consumes the receiving `RequestBuilder` and returns a
287    /// [`StreamBuilder`] that represents the composition.
288    ///
289    /// # Example
290    ///
291    /// ```
292    /// # use crux_core::{Command, Request};
293    /// # use crux_core::capability::Operation;
294    /// # use serde::{Deserialize, Serialize};
295    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
296    /// # enum AnOperation {
297    /// #     One,
298    /// #     Two,
299    /// #     More(u8),
300    /// # }
301    /// #
302    /// # #[derive(Debug, PartialEq, Deserialize)]
303    /// # enum AnOperationOutput {
304    /// #     One,
305    /// #     Two,
306    /// #     Other(u8),
307    /// # }
308    /// #
309    /// # impl Operation for AnOperation {
310    /// #     type Output = AnOperationOutput;
311    /// # }
312    /// #
313    /// # #[derive(Debug)]
314    /// # enum Effect {
315    /// #     AnEffect(Request<AnOperation>),
316    /// # }
317    /// #
318    /// # impl From<Request<AnOperation>> for Effect {
319    /// #     fn from(request: Request<AnOperation>) -> Self {
320    /// #         Self::AnEffect(request)
321    /// #     }
322    /// # }
323    /// #
324    /// # #[derive(Debug, PartialEq)]
325    /// # enum Event {
326    /// #     Completed(AnOperationOutput),
327    /// # }
328    /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
329    ///    .then_stream(|first| {
330    ///       let AnOperationOutput::Other(first) = first else {
331    ///          panic!("Invalid output!")
332    ///      };
333    ///
334    ///      let second = first + 1;
335    ///      Command::stream_from_shell(AnOperation::More(second))
336    ///    })
337    ///    .then_send(Event::Completed);
338    ///
339    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
340    /// assert_eq!(request.operation, AnOperation::More(1));
341    ///
342    /// request
343    ///   .resolve(AnOperationOutput::Other(1))
344    ///   .expect("to resolve");
345    ///
346    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
347    /// assert_eq!(request.operation, AnOperation::More(2));
348    pub fn then_stream<F, U, NextTask>(
349        self,
350        make_next_builder: F,
351    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
352    where
353        F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
354        NextTask: Stream<Item = U> + Send + 'static,
355    {
356        StreamBuilder::new(|ctx| {
357            self.into_future(ctx.clone())
358                .map(make_next_builder)
359                .into_stream()
360                .flat_map(move |builder| builder.into_stream(ctx.clone()))
361        })
362    }
363
364    /// Convert the [`RequestBuilder`] into a future to use in an async context
365    #[must_use]
366    pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
367        let make_task = self.make_task;
368        make_task(ctx)
369    }
370
371    /// Create the command in an evented context
372    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
373    where
374        E: FnOnce(T) -> Event + Send + 'static,
375        Task: Future<Output = T> + Send + 'static,
376    {
377        Command::new(|ctx| async move {
378            let out = self.into_future(ctx.clone()).await;
379            ctx.send_event(event(out));
380        })
381    }
382
383    /// Convert the [`RequestBuilder`] into a [`Command`] to use in an sync context
384    ///
385    /// Note: You might be looking for [`then_send`](Self::then_send)
386    /// instead, which will send the output back into the app with an event.
387    ///
388    /// The command created in this function will *ignore* the output
389    /// of the request so may not be very useful.
390    /// It might be useful when using a 3rd party capability and you don't
391    /// care about the request's response.
392    pub fn build(self) -> Command<Effect, Event> {
393        Command::new(|ctx| async move {
394            self.into_future(ctx.clone()).await;
395        })
396    }
397}
398
399/// A builder of stream command
400pub struct StreamBuilder<Effect, Event, Task> {
401    make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
402}
403
404impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
405where
406    Effect: Send + 'static,
407    Event: Send + 'static,
408    Task: Stream<Item = T> + Send + 'static,
409{
410    pub fn new<F>(make_task: F) -> Self
411    where
412        F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
413    {
414        let make_task = Box::new(make_task);
415
416        StreamBuilder {
417            make_stream: make_task,
418        }
419    }
420
421    pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
422    where
423        F: FnMut(T) -> U + Send + 'static,
424    {
425        StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
426    }
427
428    /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
429    /// passing the result to the provided closure `make_next_builder`.
430    ///
431    /// The returned value of the closure must be a [`StreamBuilder`], which
432    /// can represent some more work to be done before the composed future
433    /// is finished.
434    ///
435    /// If you want to chain a subscription, use [`Self::then_stream`] instead.
436    ///
437    /// The closure `make_next_builder` is only run *after* successful completion
438    /// of the `self` future.
439    ///
440    /// Note that this function consumes the receiving `StreamBuilder` and returns a
441    /// new one that represents the composition.
442    ///
443    /// # Example
444    ///
445    /// ```
446    /// # use crux_core::{Command, Request};
447    /// # use crux_core::capability::Operation;
448    /// # use serde::{Deserialize, Serialize};
449    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
450    /// # enum AnOperation {
451    /// #     One,
452    /// #     Two,
453    /// #     More(u8),
454    /// # }
455    /// #
456    /// # #[derive(Debug, PartialEq, Deserialize)]
457    /// # enum AnOperationOutput {
458    /// #     One,
459    /// #     Two,
460    /// #     Other(u8),
461    /// # }
462    /// #
463    /// # impl Operation for AnOperation {
464    /// #     type Output = AnOperationOutput;
465    /// # }
466    /// #
467    /// # #[derive(Debug)]
468    /// # enum Effect {
469    /// #     AnEffect(Request<AnOperation>),
470    /// # }
471    /// #
472    /// # impl From<Request<AnOperation>> for Effect {
473    /// #     fn from(request: Request<AnOperation>) -> Self {
474    /// #         Self::AnEffect(request)
475    /// #     }
476    /// # }
477    /// #
478    /// # #[derive(Debug, PartialEq)]
479    /// # enum Event {
480    /// #     Completed(AnOperationOutput),
481    /// # }
482    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
483    ///     .then_request(|first| {
484    ///         let AnOperationOutput::Other(first) = first else {
485    ///             panic!("Invalid output!")
486    ///         };
487    ///
488    ///         let second = first + 1;
489    ///         Command::request_from_shell(AnOperation::More(second))
490    ///     })
491    ///     .then_send(Event::Completed);
492    ///
493    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
494    /// assert_eq!(request.operation, AnOperation::More(1));
495    ///
496    /// request
497    ///    .resolve(AnOperationOutput::Other(1))
498    ///    .expect("to resolve");
499    ///
500    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
501    /// assert_eq!(request.operation, AnOperation::More(2));
502    /// ```
503    pub fn then_request<F, U, NextTask>(
504        self,
505        make_next_builder: F,
506    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
507    where
508        F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
509        NextTask: Future<Output = U> + Send + 'static,
510    {
511        StreamBuilder::new(|ctx| {
512            self.into_stream(ctx.clone())
513                .then(move |item| make_next_builder(item).into_future(ctx.clone()))
514        })
515    }
516
517    /// Chain another [`StreamBuilder`] to run after completion of this one,
518    /// passing the result to the provided closure `make_next_builder`.
519    ///
520    /// The returned value of the closure must be a `StreamBuilder`, which
521    /// can represent some more work to be done before the composed future
522    /// is finished.
523    ///
524    /// If you want to chain a request, use [`Self::then_request`] instead.
525    ///
526    /// The closure `make_next_builder` is only run *after* successful completion
527    /// of the `self` future.
528    ///
529    /// Note that this function consumes the receiving `StreamBuilder` and returns a
530    /// new one that represents the composition.
531    ///
532    /// # Example
533    ///
534    /// ```
535    /// # use crux_core::{Command, Request};
536    /// # use crux_core::capability::Operation;
537    /// # use serde::{Deserialize, Serialize};
538    /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
539    /// # enum AnOperation {
540    /// #     One,
541    /// #     Two,
542    /// #     More(u8),
543    /// # }
544    /// #
545    /// # #[derive(Debug, PartialEq, Deserialize)]
546    /// # enum AnOperationOutput {
547    /// #     One,
548    /// #     Two,
549    /// #     Other(u8),
550    /// # }
551    /// #
552    /// # impl Operation for AnOperation {
553    /// #     type Output = AnOperationOutput;
554    /// # }
555    /// #
556    /// # #[derive(Debug)]
557    /// # enum Effect {
558    /// #     AnEffect(Request<AnOperation>),
559    /// # }
560    /// #
561    /// # impl From<Request<AnOperation>> for Effect {
562    /// #     fn from(request: Request<AnOperation>) -> Self {
563    /// #         Self::AnEffect(request)
564    /// #     }
565    /// # }
566    /// #
567    /// # #[derive(Debug, PartialEq)]
568    /// # enum Event {
569    /// #     Completed(AnOperationOutput),
570    /// # }
571    /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
572    ///    .then_stream(|first| {
573    ///       let AnOperationOutput::Other(first) = first else {
574    ///          panic!("Invalid output!")
575    ///      };
576    ///
577    ///      let second = first + 1;
578    ///      Command::stream_from_shell(AnOperation::More(second))
579    ///    })
580    ///    .then_send(Event::Completed);
581    ///
582    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
583    /// assert_eq!(request.operation, AnOperation::More(1));
584    ///
585    /// request
586    ///   .resolve(AnOperationOutput::Other(1))
587    ///   .expect("to resolve");
588    ///
589    /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
590    /// assert_eq!(request.operation, AnOperation::More(2));
591    pub fn then_stream<F, U, NextTask>(
592        self,
593        make_next_builder: F,
594    ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
595    where
596        F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
597        NextTask: Stream<Item = U> + Send + 'static,
598    {
599        StreamBuilder::new(move |ctx| {
600            self.into_stream(ctx.clone())
601                .map(move |item| {
602                    let next_builder = make_next_builder(item);
603                    Box::pin(next_builder.into_stream(ctx.clone()))
604                })
605                .flatten_unordered(None)
606        })
607    }
608
609    /// Create the command in an evented context
610    pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
611    where
612        E: Fn(T) -> Event + Send + 'static,
613    {
614        Command::new(|ctx| async move {
615            let mut stream = pin!(self.into_stream(ctx.clone()));
616
617            while let Some(out) = stream.next().await {
618                ctx.send_event(event(out));
619            }
620        })
621    }
622
623    /// Convert the [`StreamBuilder`] into a stream to use in an async context
624    #[must_use]
625    pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
626        let make_stream = self.make_stream;
627
628        make_stream(ctx)
629    }
630
631    /// Convert the [`StreamBuilder`] into a [`Command`] to use in an sync context
632    ///
633    /// Note: You might be looking for [`then_send`](Self::then_send)
634    /// instead, which will send each item in the stream back into the
635    /// app with an event.
636    ///
637    /// The command created in this function will *ignore* the output
638    /// of the stream so may not be very useful.
639    /// It may be useful when using a 3rd party capability and you don't
640    /// care about the stream output.
641    pub fn build(self) -> Command<Effect, Event> {
642        Command::new(|ctx| async move {
643            let mut stream = pin!(self.into_stream(ctx.clone()));
644
645            while (stream.next().await).is_some() {}
646        })
647    }
648}