fluxion_exec/lib.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5#![cfg_attr(
6 not(any(
7 feature = "runtime-tokio",
8 feature = "runtime-smol",
9 feature = "runtime-async-std",
10 target_arch = "wasm32"
11 )),
12 no_std
13)]
14#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
15
16//! Async execution utilities for stream processing.
17//!
18//! This crate provides subscription-based execution patterns for consuming streams
19//! with async handlers. It focuses on the **execution** of stream processing, while
20//! `fluxion-stream` focuses on **composition** of streams.
21//!
22//! # Overview
23//!
24//! The execution utilities solve a common problem: how to process stream items with
25//! async functions while controlling concurrency and cancellation behavior.
26//!
27//! ## Key Concepts
28//!
29//! - **Subscription**: Attach an async handler to a stream and run it to completion
30//! - **Sequential execution**: Process items one at a time (no concurrent handlers)
31//! - **Cancellation**: Automatically cancel outdated work when new items arrive
32//! - **Error handling**: Propagate errors from handlers while continuing stream processing
33//!
34//! # Execution Patterns
35//!
36//! This crate provides two execution patterns:
37//!
38//! ## [`subscribe`] - Sequential Processing
39//!
40//! Process each item sequentially with an async handler. Every item is processed
41//! to completion before the next item is handled.
42//!
43//! **Use when:**
44//! - Every item must be processed
45//! - Processing order matters
46//! - Side effects must occur for each item
47//! - Work cannot be skipped
48//!
49//! **Examples:**
50//! - Writing each event to a database
51//! - Sending each notification
52//! - Processing every transaction
53//! - Logging all events
54//!
55//! ## [`subscribe_latest`] - Latest-Value Processing
56//!
57//! Process only the latest item, automatically canceling work for outdated items.
58//! When a new item arrives while processing, the current work is canceled and the
59//! new item is processed instead.
60//!
61//! **Use when:**
62//! - Only the latest value matters
63//! - Old values become irrelevant
64//! - Expensive operations should skip intermediate values
65//! - UI updates or state synchronization
66//!
67//! **Examples:**
68//! - Rendering UI based on latest state
69//! - Auto-saving the current document
70//! - Updating a preview
71//! - Recalculating derived values
72//!
73//! # Architecture
74//!
75//! ## Extension Trait Pattern
76//!
77//! Both utilities are provided as extension traits on `Stream`:
78//!
79//! ```text
80//! use fluxion_exec::SubscribeExt;
81//! use futures::StreamExt;
82//!
83//! # async fn example() {
84//! let (tx, rx) = futures::channel::mpsc::unbounded::<i32>();
85//! let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
86//!
87//! // Any Stream can use subscribe
88//! stream.subscribe(|value| async move {
89//! println!("Processing: {}", value);
90//! Ok::<_, Box<dyn std::error::Error>>(())
91//! }).await;
92//! # }
93//! ```
94//!
95//! ## Task Spawning
96//!
97//! Both patterns spawn tokio tasks internally:
98//!
99//! - **[`subscribe`]**: Spawns one task per stream item (sequential)
100//! - **[`subscribe_latest`]**: Spawns tasks and cancels obsolete ones
101//!
102//! This means:
103//! - Handlers must be `Send + 'static`
104//! - Processing happens on the tokio runtime
105//! - Multiple streams can be processed concurrently
106//!
107//! # Performance Characteristics
108//!
109//! ## Sequential Processing (`subscribe`)
110//!
111//! - **Latency**: Items wait for previous items to complete
112//! - **Throughput**: Limited by handler execution time
113//! - **Memory**: $O(1)$ - processes one item at a time
114//! - **Ordering**: Maintains strict order
115//!
116//! **Best for**: Correctness over throughput
117//!
118//! ## Latest-Value Processing (`subscribe_latest`)
119//!
120//! - **Latency**: Immediate start on new items (cancels old work)
121//! - **Throughput**: Skips intermediate values for efficiency
122//! - **Memory**: $O(1)$ - one active task at a time
123//! - **Ordering**: Processes latest available
124//!
125//! **Best for**: Responsiveness over completeness
126//!
127//! # Comparison with Other Patterns
128//!
129//! ## vs `for_each` (futures)
130//!
131//! ```text
132//! // futures::StreamExt::for_each - blocks until stream ends
133//! stream.for_each(|item| async {
134//! process(item).await;
135//! }).await;
136//!
137//! // subscribe - returns immediately, spawns background task
138//! stream.subscribe(process).await;
139//! ```
140//!
141//! ## vs `buffer_unordered` (futures)
142//!
143//! ```text
144//! // futures - processes N items concurrently
145//! stream.map(process).buffer_unordered(10).collect().await;
146//!
147//! // subscribe - strictly sequential
148//! stream.subscribe(process).await;
149//! ```
150//!
151//! ## vs Manual Task Spawning
152//!
153//! ```text
154//! // Manual - no cancellation on new items
155//! while let Some(item) = stream.next().await {
156//! tokio::spawn(async move { process(item).await });
157//! }
158//!
159//! // subscribe_latest - automatic cancellation
160//! stream.subscribe_latest(process).await;
161//! ```
162//!
163//! # Common Patterns
164//!
165//! ## Pattern: Database Writes
166//!
167//! Every item must be persisted:
168//!
169//! ```text
170//! use fluxion_exec::SubscribeExt;
171//! use futures::StreamExt;
172//!
173//! # async fn example() {
174//! # let (tx, rx) = futures::channel::mpsc::unbounded::<i32>();
175//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
176//! stream.subscribe(|event| async move {
177//! // Save to database
178//! // database.insert(event).await?;
179//! Ok::<_, Box<dyn std::error::Error>>(())
180//! }).await;
181//! # }
182//! ```
183//!
184//! ## Pattern: UI Updates
185//!
186//! Only latest state matters:
187//!
188//! ```text
189//! use fluxion_exec::SubscribeLatestExt;
190//! use futures::StreamExt;
191//!
192//! # async fn example() {
193//! # let (tx, rx) = futures::channel::mpsc::unbounded::<i32>();
194//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
195//! stream.subscribe_latest(|state| async move {
196//! // Render UI with latest state
197//! // update_ui(state).await?;
198//! Ok::<_, Box<dyn std::error::Error>>(())
199//! }).await;
200//! # }
201//! ```
202//!
203//! ## Pattern: Batch Processing
204//!
205//! Combine with `chunks` for batch operations:
206//!
207//! ```text
208//! use fluxion_exec::SubscribeExt;
209//! use futures::StreamExt;
210//!
211//! # async fn example() {
212//! # let (tx, rx) = futures::channel::mpsc::unbounded::<i32>();
213//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
214//! stream
215//! .chunks(100) // Batch 100 items
216//! .subscribe(|batch| async move {
217//! // Process batch
218//! // database.insert_batch(batch).await?;
219//! Ok::<_, Box<dyn std::error::Error>>(())
220//! })
221//! .await;
222//! # }
223//! ```
224//!
225//! ## Pattern: Error Recovery
226//!
227//! Handle errors without stopping the stream:
228//!
229//! ```text
230//! use fluxion_exec::SubscribeExt;
231//! use futures::StreamExt;
232//!
233//! # async fn example() {
234//! # let (tx, rx) = futures::channel::mpsc::unbounded::<i32>();
235//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
236//! stream.subscribe(|item| async move {
237//! match process_item(item).await {
238//! Ok(result) => Ok(()),
239//! Err(e) => {
240//! eprintln!("Error processing item: {}", e);
241//! Ok(()) // Continue processing despite error
242//! }
243//! }
244//! }).await;
245//!
246//! # async fn process_item(_item: i32) -> Result<(), Box<dyn std::error::Error>> { Ok(()) }
247//! # }
248//! ```
249//!
250//! # Anti-Patterns
251//!
252//! ## ❌ Don't: Use `subscribe_latest` for Critical Work
253//!
254//! ```text
255//! // BAD: Payment processing might be skipped!
256//! payment_stream.subscribe_latest(|payment| async move {
257//! process_payment(payment).await // Could be canceled!
258//! }).await;
259//! ```
260//!
261//! Use `subscribe` for work that must complete:
262//!
263//! ```text
264//! // GOOD: Every payment is processed
265//! payment_stream.subscribe(|payment| async move {
266//! process_payment(payment).await
267//! }).await;
268//! ```
269//!
270//! ## ❌ Don't: Block in Handlers
271//!
272//! ```text
273//! // BAD: Blocking operations stall the executor
274//! stream.subscribe(|item| async move {
275//! std::thread::sleep(Duration::from_secs(1)); // Blocks!
276//! Ok(())
277//! }).await;
278//! ```
279//!
280//! Use async operations or `spawn_blocking`:
281//!
282//! ```text
283//! // GOOD: Async sleep doesn't block
284//! stream.subscribe(|item| async move {
285//! tokio::time::sleep(Duration::from_secs(1)).await;
286//! Ok(())
287//! }).await;
288//! ```
289//!
290//! ## ❌ Don't: Use for CPU-Intensive Work
291//!
292//! ```text
293//! // BAD: CPU-intensive work on async runtime
294//! stream.subscribe(|data| async move {
295//! expensive_computation(data); // Blocks executor!
296//! Ok(())
297//! }).await;
298//! ```
299//!
300//! Offload to blocking threadpool:
301//!
302//! ```text
303//! // GOOD: CPU work on dedicated threads
304//! stream.subscribe(|data| async move {
305//! tokio::task::spawn_blocking(move || {
306//! expensive_computation(data)
307//! }).await?;
308//! Ok(())
309//! }).await;
310//! ```
311//!
312//! # Error Handling
313//!
314//! Both subscription methods return `Result`:
315//!
316//! - **`Ok(())`**: Stream completed successfully
317//! - **`Err(e)`**: Handler returned an error
318//!
319//! Errors from handlers are propagated but don't stop stream processing automatically.
320//! Design your handlers to return `Ok(())` to continue processing despite errors, or
321//! return `Err(e)` to stop on first error.
322//!
323//! # Getting Started
324//!
325//! Add to your `Cargo.toml`:
326//!
327//! ```toml
328//! [dependencies]
329//! fluxion-exec = { path = "../fluxion-exec" }
330//! tokio = { version = "1.48", features = ["rt", "sync"] }
331//! futures = "0.3"
332//! ```
333//!
334//! See individual trait documentation for detailed examples:
335//! - [`SubscribeExt`] for sequential processing
336//! - [`SubscribeLatestExt`] for latest-value processing
337//!
338//! [`subscribe`]: SubscribeExt::subscribe
339//! [`subscribe_latest`]: SubscribeLatestExt::subscribe_latest
340
341extern crate alloc;
342
343#[macro_use]
344mod logging;
345pub mod subscribe;
346#[cfg(any(
347 feature = "runtime-tokio",
348 feature = "runtime-smol",
349 feature = "runtime-async-std",
350 target_arch = "wasm32"
351))]
352pub mod subscribe_latest;
353
354// Re-export commonly used types
355pub use subscribe::SubscribeExt;
356#[cfg(any(
357 feature = "runtime-tokio",
358 feature = "runtime-smol",
359 feature = "runtime-async-std",
360 target_arch = "wasm32"
361))]
362pub use subscribe_latest::SubscribeLatestExt;