crux_core/command/builder.rs
1//! Command builders are an abstraction allowing chaining effects,
2//! where outputs of one effect can serve as inputs to further effects,
3//! without requiring an async context.
4//!
5//! Chaining streams with streams is currently not supported, as the semantics
6//! of the composition are unclear. If you need to compose streams, use the async
7//! API and tools from the `futures` crate.
8
9use std::{future::Future, pin::pin};
10
11use futures::{FutureExt, Stream, StreamExt};
12
13use super::{Command, context::CommandContext};
14
15/// A builder of one-off notify command
16// Task is a future which does the shell talking and returns an output
17pub struct NotificationBuilder<Effect, Event, Task> {
18 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
19}
20
21impl<Effect, Event, Task> NotificationBuilder<Effect, Event, Task>
22where
23 Effect: Send + 'static,
24 Event: Send + 'static,
25 Task: Future<Output = ()> + Send + 'static,
26{
27 pub fn new<F>(make_task: F) -> Self
28 where
29 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
30 {
31 let make_task = Box::new(make_task);
32
33 NotificationBuilder { make_task }
34 }
35
36 /// Convert the [`NotificationBuilder`] into a future to use in an async context
37 #[must_use]
38 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
39 let make_task = self.make_task;
40 make_task(ctx)
41 }
42
43 /// Convert the [`NotificationBuilder`] into a [`Command`] to use in an sync context
44 pub fn build(self) -> Command<Effect, Event> {
45 Command::new(|ctx| async move {
46 self.into_future(ctx.clone()).await;
47 })
48 }
49}
50
51impl<Effect, Event, Task> From<NotificationBuilder<Effect, Event, Task>> for Command<Effect, Event>
52where
53 Effect: Send + 'static,
54 Event: Send + 'static,
55 Task: Future<Output = ()> + Send + 'static,
56{
57 fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
58 Command::new(|ctx| value.into_future(ctx))
59 }
60}
61
62/// A builder of one-off request command
63// Task is a future which does the shell talking and returns an output
64pub struct RequestBuilder<Effect, Event, Task> {
65 make_task: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
66}
67
68impl<Effect, Event, Task, T> RequestBuilder<Effect, Event, Task>
69where
70 Effect: Send + 'static,
71 Event: Send + 'static,
72 Task: Future<Output = T> + Send + 'static,
73{
74 pub fn new<F>(make_task: F) -> Self
75 where
76 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
77 {
78 let make_task = Box::new(make_task);
79
80 RequestBuilder { make_task }
81 }
82
83 pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
84 where
85 F: FnOnce(T) -> U + Send + 'static,
86 {
87 RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
88 }
89
90 /// Chain a [`NotificationBuilder`] to run after completion of this one,
91 /// passing the result to the provided closure `make_next_builder`.
92 ///
93 /// The returned value of the closure must be a `NotificationBuilder`, which
94 /// can represent the notification to be sent before the composed future
95 /// is finished.
96 ///
97 /// If you want to chain a request, use [`Self::then_request`] instead.
98 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
99 ///
100 /// The closure `make_next_builder` is only run *after* successful completion
101 /// of the `self` future.
102 ///
103 /// Note that this function consumes the receiving `RequestBuilder`
104 /// and returns a [`NotificationBuilder`] that represents the composition.
105 ///
106 /// # Example
107 ///
108 /// ```
109 /// # use crux_core::{Command, Request};
110 /// # use crux_core::capability::Operation;
111 /// # use serde::{Deserialize, Serialize};
112 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
113 /// # enum AnOperation {
114 /// # Request(u8),
115 /// # Notify,
116 /// # }
117 /// #
118 /// # #[derive(Debug, PartialEq, Deserialize)]
119 /// # enum AnOperationOutput {
120 /// # Response(String),
121 /// # }
122 /// #
123 /// # impl Operation for AnOperation {
124 /// # type Output = AnOperationOutput;
125 /// # }
126 /// #
127 /// # #[derive(Debug)]
128 /// # enum Effect {
129 /// # AnEffect(Request<AnOperation>),
130 /// # }
131 /// #
132 /// # impl From<Request<AnOperation>> for Effect {
133 /// # fn from(request: Request<AnOperation>) -> Self {
134 /// # Self::AnEffect(request)
135 /// # }
136 /// # }
137 /// #
138 /// # #[derive(Debug, PartialEq)]
139 /// # enum Event {
140 /// # Response(AnOperationOutput),
141 /// # }
142 /// let mut cmd: Command<Effect, Event> =
143 /// Command::request_from_shell(AnOperation::Request(10))
144 /// .then_notify(|response| {
145 /// let AnOperationOutput::Response(_response) = response else {
146 /// panic!("Invalid output!")
147 /// };
148 ///
149 /// // possibly do something with the response
150 ///
151 /// Command::notify_shell(AnOperation::Notify)
152 /// })
153 /// .build();
154 ///
155 /// let effect = cmd.effects().next().unwrap();
156 /// let Effect::AnEffect(mut request) = effect;
157 ///
158 /// assert_eq!(request.operation, AnOperation::Request(10));
159 ///
160 /// request
161 /// .resolve(AnOperationOutput::Response("ten".to_string()))
162 /// .expect("should work");
163 ///
164 /// assert!(cmd.events().next().is_none());
165 /// let effect = cmd.effects().next().unwrap();
166 /// let Effect::AnEffect(request) = effect;
167 ///
168 /// assert_eq!(request.operation, AnOperation::Notify);
169 /// assert!(cmd.is_done());
170 /// ```
171 pub fn then_notify<F, NextTask>(
172 self,
173 make_next_builder: F,
174 ) -> NotificationBuilder<Effect, Event, impl Future<Output = ()>>
175 where
176 F: FnOnce(T) -> NotificationBuilder<Effect, Event, NextTask> + Send + 'static,
177 NextTask: Future<Output = ()> + Send + 'static,
178 {
179 NotificationBuilder::new(|ctx| {
180 self.into_future(ctx.clone())
181 .then(|out| make_next_builder(out).into_future(ctx))
182 })
183 }
184
185 /// Chain another [`RequestBuilder`] to run after completion of this one,
186 /// passing the result to the provided closure `make_next_builder`.
187 ///
188 /// The returned value of the closure must be a `RequestBuilder`, which
189 /// can represent some more work to be done before the composed future
190 /// is finished.
191 ///
192 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
193 ///
194 /// The closure `make_next_builder` is only run *after* successful completion
195 /// of the `self` future.
196 ///
197 /// Note that this function consumes the receiving `RequestBuilder` and returns a
198 /// new one that represents the composition.
199 ///
200 /// # Example
201 ///
202 /// ```
203 /// # use crux_core::{Command, Request};
204 /// # use crux_core::capability::Operation;
205 /// # use serde::{Deserialize, Serialize};
206 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
207 /// # enum AnOperation {
208 /// # One,
209 /// # Two,
210 /// # More(u8),
211 /// # }
212 /// #
213 /// # #[derive(Debug, PartialEq, Deserialize)]
214 /// # enum AnOperationOutput {
215 /// # One,
216 /// # Two,
217 /// # Other(u8),
218 /// # }
219 /// #
220 /// # impl Operation for AnOperation {
221 /// # type Output = AnOperationOutput;
222 /// # }
223 /// #
224 /// # #[derive(Debug)]
225 /// # enum Effect {
226 /// # AnEffect(Request<AnOperation>),
227 /// # }
228 /// #
229 /// # impl From<Request<AnOperation>> for Effect {
230 /// # fn from(request: Request<AnOperation>) -> Self {
231 /// # Self::AnEffect(request)
232 /// # }
233 /// # }
234 /// #
235 /// # #[derive(Debug, PartialEq)]
236 /// # enum Event {
237 /// # Completed(AnOperationOutput),
238 /// # }
239 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
240 /// .then_request(|first| {
241 /// let AnOperationOutput::Other(first) = first else {
242 /// panic!("Invalid output!")
243 /// };
244 ///
245 /// let second = first + 1;
246 /// Command::request_from_shell(AnOperation::More(second))
247 /// })
248 /// .then_send(Event::Completed);
249 ///
250 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
251 /// assert_eq!(request.operation, AnOperation::More(1));
252 ///
253 /// request
254 /// .resolve(AnOperationOutput::Other(1))
255 /// .expect("to resolve");
256 ///
257 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
258 /// assert_eq!(request.operation, AnOperation::More(2));
259 /// ```
260 pub fn then_request<F, U, NextTask>(
261 self,
262 make_next_builder: F,
263 ) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
264 where
265 F: FnOnce(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
266 NextTask: Future<Output = U> + Send + 'static,
267 {
268 RequestBuilder::new(|ctx| {
269 self.into_future(ctx.clone())
270 .then(|out| make_next_builder(out).into_future(ctx))
271 })
272 }
273
274 /// Chain a [`StreamBuilder`] to run after completion of this [`RequestBuilder`],
275 /// passing the result to the provided closure `make_next_builder`.
276 ///
277 /// The returned value of the closure must be a `StreamBuilder`, which
278 /// can represent some more work to be done before the composed future
279 /// is finished.
280 ///
281 /// If you want to chain a request, use [`Self::then_request`] instead.
282 ///
283 /// The closure `make_next_builder` is only run *after* successful completion
284 /// of the `self` future.
285 ///
286 /// Note that this function consumes the receiving `RequestBuilder` and returns a
287 /// [`StreamBuilder`] that represents the composition.
288 ///
289 /// # Example
290 ///
291 /// ```
292 /// # use crux_core::{Command, Request};
293 /// # use crux_core::capability::Operation;
294 /// # use serde::{Deserialize, Serialize};
295 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
296 /// # enum AnOperation {
297 /// # One,
298 /// # Two,
299 /// # More(u8),
300 /// # }
301 /// #
302 /// # #[derive(Debug, PartialEq, Deserialize)]
303 /// # enum AnOperationOutput {
304 /// # One,
305 /// # Two,
306 /// # Other(u8),
307 /// # }
308 /// #
309 /// # impl Operation for AnOperation {
310 /// # type Output = AnOperationOutput;
311 /// # }
312 /// #
313 /// # #[derive(Debug)]
314 /// # enum Effect {
315 /// # AnEffect(Request<AnOperation>),
316 /// # }
317 /// #
318 /// # impl From<Request<AnOperation>> for Effect {
319 /// # fn from(request: Request<AnOperation>) -> Self {
320 /// # Self::AnEffect(request)
321 /// # }
322 /// # }
323 /// #
324 /// # #[derive(Debug, PartialEq)]
325 /// # enum Event {
326 /// # Completed(AnOperationOutput),
327 /// # }
328 /// let mut cmd: Command<Effect, Event> = Command::request_from_shell(AnOperation::More(1))
329 /// .then_stream(|first| {
330 /// let AnOperationOutput::Other(first) = first else {
331 /// panic!("Invalid output!")
332 /// };
333 ///
334 /// let second = first + 1;
335 /// Command::stream_from_shell(AnOperation::More(second))
336 /// })
337 /// .then_send(Event::Completed);
338 ///
339 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
340 /// assert_eq!(request.operation, AnOperation::More(1));
341 ///
342 /// request
343 /// .resolve(AnOperationOutput::Other(1))
344 /// .expect("to resolve");
345 ///
346 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
347 /// assert_eq!(request.operation, AnOperation::More(2));
348 pub fn then_stream<F, U, NextTask>(
349 self,
350 make_next_builder: F,
351 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
352 where
353 F: FnOnce(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
354 NextTask: Stream<Item = U> + Send + 'static,
355 {
356 StreamBuilder::new(|ctx| {
357 self.into_future(ctx.clone())
358 .map(make_next_builder)
359 .into_stream()
360 .flat_map(move |builder| builder.into_stream(ctx.clone()))
361 })
362 }
363
364 /// Convert the [`RequestBuilder`] into a future to use in an async context
365 #[must_use]
366 pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> Task {
367 let make_task = self.make_task;
368 make_task(ctx)
369 }
370
371 /// Create the command in an evented context
372 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
373 where
374 E: FnOnce(T) -> Event + Send + 'static,
375 Task: Future<Output = T> + Send + 'static,
376 {
377 Command::new(|ctx| async move {
378 let out = self.into_future(ctx.clone()).await;
379 ctx.send_event(event(out));
380 })
381 }
382
383 /// Convert the [`RequestBuilder`] into a [`Command`] to use in an sync context
384 ///
385 /// Note: You might be looking for [`then_send`](Self::then_send)
386 /// instead, which will send the output back into the app with an event.
387 ///
388 /// The command created in this function will *ignore* the output
389 /// of the request so may not be very useful.
390 /// It might be useful when using a 3rd party capability and you don't
391 /// care about the request's response.
392 pub fn build(self) -> Command<Effect, Event> {
393 Command::new(|ctx| async move {
394 self.into_future(ctx.clone()).await;
395 })
396 }
397}
398
399/// A builder of stream command
400pub struct StreamBuilder<Effect, Event, Task> {
401 make_stream: Box<dyn FnOnce(CommandContext<Effect, Event>) -> Task + Send>,
402}
403
404impl<Effect, Event, Task, T> StreamBuilder<Effect, Event, Task>
405where
406 Effect: Send + 'static,
407 Event: Send + 'static,
408 Task: Stream<Item = T> + Send + 'static,
409{
410 pub fn new<F>(make_task: F) -> Self
411 where
412 F: FnOnce(CommandContext<Effect, Event>) -> Task + Send + 'static,
413 {
414 let make_task = Box::new(make_task);
415
416 StreamBuilder {
417 make_stream: make_task,
418 }
419 }
420
421 pub fn map<F, U>(self, map: F) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
422 where
423 F: FnMut(T) -> U + Send + 'static,
424 {
425 StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
426 }
427
428 /// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
429 /// passing the result to the provided closure `make_next_builder`.
430 ///
431 /// The returned value of the closure must be a [`StreamBuilder`], which
432 /// can represent some more work to be done before the composed future
433 /// is finished.
434 ///
435 /// If you want to chain a subscription, use [`Self::then_stream`] instead.
436 ///
437 /// The closure `make_next_builder` is only run *after* successful completion
438 /// of the `self` future.
439 ///
440 /// Note that this function consumes the receiving `StreamBuilder` and returns a
441 /// new one that represents the composition.
442 ///
443 /// # Example
444 ///
445 /// ```
446 /// # use crux_core::{Command, Request};
447 /// # use crux_core::capability::Operation;
448 /// # use serde::{Deserialize, Serialize};
449 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
450 /// # enum AnOperation {
451 /// # One,
452 /// # Two,
453 /// # More(u8),
454 /// # }
455 /// #
456 /// # #[derive(Debug, PartialEq, Deserialize)]
457 /// # enum AnOperationOutput {
458 /// # One,
459 /// # Two,
460 /// # Other(u8),
461 /// # }
462 /// #
463 /// # impl Operation for AnOperation {
464 /// # type Output = AnOperationOutput;
465 /// # }
466 /// #
467 /// # #[derive(Debug)]
468 /// # enum Effect {
469 /// # AnEffect(Request<AnOperation>),
470 /// # }
471 /// #
472 /// # impl From<Request<AnOperation>> for Effect {
473 /// # fn from(request: Request<AnOperation>) -> Self {
474 /// # Self::AnEffect(request)
475 /// # }
476 /// # }
477 /// #
478 /// # #[derive(Debug, PartialEq)]
479 /// # enum Event {
480 /// # Completed(AnOperationOutput),
481 /// # }
482 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
483 /// .then_request(|first| {
484 /// let AnOperationOutput::Other(first) = first else {
485 /// panic!("Invalid output!")
486 /// };
487 ///
488 /// let second = first + 1;
489 /// Command::request_from_shell(AnOperation::More(second))
490 /// })
491 /// .then_send(Event::Completed);
492 ///
493 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
494 /// assert_eq!(request.operation, AnOperation::More(1));
495 ///
496 /// request
497 /// .resolve(AnOperationOutput::Other(1))
498 /// .expect("to resolve");
499 ///
500 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
501 /// assert_eq!(request.operation, AnOperation::More(2));
502 /// ```
503 pub fn then_request<F, U, NextTask>(
504 self,
505 make_next_builder: F,
506 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
507 where
508 F: Fn(T) -> RequestBuilder<Effect, Event, NextTask> + Send + 'static,
509 NextTask: Future<Output = U> + Send + 'static,
510 {
511 StreamBuilder::new(|ctx| {
512 self.into_stream(ctx.clone())
513 .then(move |item| make_next_builder(item).into_future(ctx.clone()))
514 })
515 }
516
517 /// Chain another [`StreamBuilder`] to run after completion of this one,
518 /// passing the result to the provided closure `make_next_builder`.
519 ///
520 /// The returned value of the closure must be a `StreamBuilder`, which
521 /// can represent some more work to be done before the composed future
522 /// is finished.
523 ///
524 /// If you want to chain a request, use [`Self::then_request`] instead.
525 ///
526 /// The closure `make_next_builder` is only run *after* successful completion
527 /// of the `self` future.
528 ///
529 /// Note that this function consumes the receiving `StreamBuilder` and returns a
530 /// new one that represents the composition.
531 ///
532 /// # Example
533 ///
534 /// ```
535 /// # use crux_core::{Command, Request};
536 /// # use crux_core::capability::Operation;
537 /// # use serde::{Deserialize, Serialize};
538 /// # #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
539 /// # enum AnOperation {
540 /// # One,
541 /// # Two,
542 /// # More(u8),
543 /// # }
544 /// #
545 /// # #[derive(Debug, PartialEq, Deserialize)]
546 /// # enum AnOperationOutput {
547 /// # One,
548 /// # Two,
549 /// # Other(u8),
550 /// # }
551 /// #
552 /// # impl Operation for AnOperation {
553 /// # type Output = AnOperationOutput;
554 /// # }
555 /// #
556 /// # #[derive(Debug)]
557 /// # enum Effect {
558 /// # AnEffect(Request<AnOperation>),
559 /// # }
560 /// #
561 /// # impl From<Request<AnOperation>> for Effect {
562 /// # fn from(request: Request<AnOperation>) -> Self {
563 /// # Self::AnEffect(request)
564 /// # }
565 /// # }
566 /// #
567 /// # #[derive(Debug, PartialEq)]
568 /// # enum Event {
569 /// # Completed(AnOperationOutput),
570 /// # }
571 /// let mut cmd: Command<Effect, Event> = Command::stream_from_shell(AnOperation::More(1))
572 /// .then_stream(|first| {
573 /// let AnOperationOutput::Other(first) = first else {
574 /// panic!("Invalid output!")
575 /// };
576 ///
577 /// let second = first + 1;
578 /// Command::stream_from_shell(AnOperation::More(second))
579 /// })
580 /// .then_send(Event::Completed);
581 ///
582 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
583 /// assert_eq!(request.operation, AnOperation::More(1));
584 ///
585 /// request
586 /// .resolve(AnOperationOutput::Other(1))
587 /// .expect("to resolve");
588 ///
589 /// let Effect::AnEffect(mut request) = cmd.effects().next().unwrap();
590 /// assert_eq!(request.operation, AnOperation::More(2));
591 pub fn then_stream<F, U, NextTask>(
592 self,
593 make_next_builder: F,
594 ) -> StreamBuilder<Effect, Event, impl Stream<Item = U>>
595 where
596 F: Fn(T) -> StreamBuilder<Effect, Event, NextTask> + Send + 'static,
597 NextTask: Stream<Item = U> + Send + 'static,
598 {
599 StreamBuilder::new(move |ctx| {
600 self.into_stream(ctx.clone())
601 .map(move |item| {
602 let next_builder = make_next_builder(item);
603 Box::pin(next_builder.into_stream(ctx.clone()))
604 })
605 .flatten_unordered(None)
606 })
607 }
608
609 /// Create the command in an evented context
610 pub fn then_send<E>(self, event: E) -> Command<Effect, Event>
611 where
612 E: Fn(T) -> Event + Send + 'static,
613 {
614 Command::new(|ctx| async move {
615 let mut stream = pin!(self.into_stream(ctx.clone()));
616
617 while let Some(out) = stream.next().await {
618 ctx.send_event(event(out));
619 }
620 })
621 }
622
623 /// Convert the [`StreamBuilder`] into a stream to use in an async context
624 #[must_use]
625 pub fn into_stream(self, ctx: CommandContext<Effect, Event>) -> Task {
626 let make_stream = self.make_stream;
627
628 make_stream(ctx)
629 }
630
631 /// Convert the [`StreamBuilder`] into a [`Command`] to use in an sync context
632 ///
633 /// Note: You might be looking for [`then_send`](Self::then_send)
634 /// instead, which will send each item in the stream back into the
635 /// app with an event.
636 ///
637 /// The command created in this function will *ignore* the output
638 /// of the stream so may not be very useful.
639 /// It may be useful when using a 3rd party capability and you don't
640 /// care about the stream output.
641 pub fn build(self) -> Command<Effect, Event> {
642 Command::new(|ctx| async move {
643 let mut stream = pin!(self.into_stream(ctx.clone()));
644
645 while (stream.next().await).is_some() {}
646 })
647 }
648}