fluxion_exec/lib.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Async execution utilities for stream processing.
6//!
7//! This crate provides subscription-based execution patterns for consuming streams
8//! with async handlers. It focuses on the **execution** of stream processing, while
9//! `fluxion-stream` focuses on **composition** of streams.
10//!
11//! # Overview
12//!
13//! The execution utilities solve a common problem: how to process stream items with
14//! async functions while controlling concurrency and cancellation behavior.
15//!
16//! ## Key Concepts
17//!
18//! - **Subscription**: Attach an async handler to a stream and run it to completion
19//! - **Sequential execution**: Process items one at a time (no concurrent handlers)
20//! - **Cancellation**: Automatically cancel outdated work when new items arrive
21//! - **Error handling**: Propagate errors from handlers while continuing stream processing
22//!
23//! # Execution Patterns
24//!
25//! This crate provides two execution patterns:
26//!
27//! ## [`subscribe`] - Sequential Processing
28//!
29//! Process each item sequentially with an async handler. Every item is processed
30//! to completion before the next item is handled.
31//!
32//! **Use when:**
33//! - Every item must be processed
34//! - Processing order matters
35//! - Side effects must occur for each item
36//! - Work cannot be skipped
37//!
38//! **Examples:**
39//! - Writing each event to a database
40//! - Sending each notification
41//! - Processing every transaction
42//! - Logging all events
43//!
44//! ## [`subscribe_latest`] - Latest-Value Processing
45//!
46//! Process only the latest item, automatically canceling work for outdated items.
47//! When a new item arrives while processing, the current work is canceled and the
48//! new item is processed instead.
49//!
50//! **Use when:**
51//! - Only the latest value matters
52//! - Old values become irrelevant
53//! - Expensive operations should skip intermediate values
54//! - UI updates or state synchronization
55//!
56//! **Examples:**
57//! - Rendering UI based on latest state
58//! - Auto-saving the current document
59//! - Updating a preview
60//! - Recalculating derived values
61//!
62//! # Architecture
63//!
64//! ## Extension Trait Pattern
65//!
66//! Both utilities are provided as extension traits on `Stream`:
67//!
68//! ```text
69//! use fluxion_exec::SubscribeExt;
70//! use futures::StreamExt;
71//!
72//! # async fn example() {
73//! let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
74//! let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
75//!
76//! // Any Stream can use subscribe
77//! stream.subscribe(|value| async move {
78//! println!("Processing: {}", value);
79//! Ok::<_, Box<dyn std::error::Error>>(())
80//! }).await;
81//! # }
82//! ```
83//!
84//! ## Task Spawning
85//!
86//! Both patterns spawn tokio tasks internally:
87//!
88//! - **[`subscribe`]**: Spawns one task per stream item (sequential)
89//! - **[`subscribe_latest`]**: Spawns tasks and cancels obsolete ones
90//!
91//! This means:
92//! - Handlers must be `Send + 'static`
93//! - Processing happens on the tokio runtime
94//! - Multiple streams can be processed concurrently
95//!
96//! # Performance Characteristics
97//!
98//! ## Sequential Processing (`subscribe`)
99//!
100//! - **Latency**: Items wait for previous items to complete
101//! - **Throughput**: Limited by handler execution time
102//! - **Memory**: $O(1)$ - processes one item at a time
103//! - **Ordering**: Maintains strict order
104//!
105//! **Best for**: Correctness over throughput
106//!
107//! ## Latest-Value Processing (`subscribe_latest`)
108//!
109//! - **Latency**: Immediate start on new items (cancels old work)
110//! - **Throughput**: Skips intermediate values for efficiency
111//! - **Memory**: $O(1)$ - one active task at a time
112//! - **Ordering**: Processes latest available
113//!
114//! **Best for**: Responsiveness over completeness
115//!
116//! # Comparison with Other Patterns
117//!
118//! ## vs `for_each` (futures)
119//!
120//! ```text
121//! // futures::StreamExt::for_each - blocks until stream ends
122//! stream.for_each(|item| async {
123//! process(item).await;
124//! }).await;
125//!
126//! // subscribe - returns immediately, spawns background task
127//! stream.subscribe(process).await;
128//! ```
129//!
130//! ## vs `buffer_unordered` (futures)
131//!
132//! ```text
133//! // futures - processes N items concurrently
134//! stream.map(process).buffer_unordered(10).collect().await;
135//!
136//! // subscribe - strictly sequential
137//! stream.subscribe(process).await;
138//! ```
139//!
140//! ## vs Manual Task Spawning
141//!
142//! ```text
143//! // Manual - no cancellation on new items
144//! while let Some(item) = stream.next().await {
145//! tokio::spawn(async move { process(item).await });
146//! }
147//!
148//! // subscribe_latest - automatic cancellation
149//! stream.subscribe_latest(process).await;
150//! ```
151//!
152//! # Common Patterns
153//!
154//! ## Pattern: Database Writes
155//!
156//! Every item must be persisted:
157//!
158//! ```text
159//! use fluxion_exec::SubscribeExt;
160//! use futures::StreamExt;
161//!
162//! # async fn example() {
163//! # let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
164//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
165//! stream.subscribe(|event| async move {
166//! // Save to database
167//! // database.insert(event).await?;
168//! Ok::<_, Box<dyn std::error::Error>>(())
169//! }).await;
170//! # }
171//! ```
172//!
173//! ## Pattern: UI Updates
174//!
175//! Only latest state matters:
176//!
177//! ```text
178//! use fluxion_exec::SubscribeLatestExt;
179//! use futures::StreamExt;
180//!
181//! # async fn example() {
182//! # let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
183//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
184//! stream.subscribe_latest(|state| async move {
185//! // Render UI with latest state
186//! // update_ui(state).await?;
187//! Ok::<_, Box<dyn std::error::Error>>(())
188//! }).await;
189//! # }
190//! ```
191//!
192//! ## Pattern: Batch Processing
193//!
194//! Combine with `chunks` for batch operations:
195//!
196//! ```text
197//! use fluxion_exec::SubscribeExt;
198//! use futures::StreamExt;
199//!
200//! # async fn example() {
201//! # let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
202//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
203//! stream
204//! .chunks(100) // Batch 100 items
205//! .subscribe(|batch| async move {
206//! // Process batch
207//! // database.insert_batch(batch).await?;
208//! Ok::<_, Box<dyn std::error::Error>>(())
209//! })
210//! .await;
211//! # }
212//! ```
213//!
214//! ## Pattern: Error Recovery
215//!
216//! Handle errors without stopping the stream:
217//!
218//! ```text
219//! use fluxion_exec::SubscribeExt;
220//! use futures::StreamExt;
221//!
222//! # async fn example() {
223//! # let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
224//! # let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
225//! stream.subscribe(|item| async move {
226//! match process_item(item).await {
227//! Ok(result) => Ok(()),
228//! Err(e) => {
229//! eprintln!("Error processing item: {}", e);
230//! Ok(()) // Continue processing despite error
231//! }
232//! }
233//! }).await;
234//!
235//! # async fn process_item(_item: i32) -> Result<(), Box<dyn std::error::Error>> { Ok(()) }
236//! # }
237//! ```
238//!
239//! # Anti-Patterns
240//!
241//! ## ❌ Don't: Use `subscribe_latest` for Critical Work
242//!
243//! ```text
244//! // BAD: Payment processing might be skipped!
245//! payment_stream.subscribe_latest(|payment| async move {
246//! process_payment(payment).await // Could be canceled!
247//! }).await;
248//! ```
249//!
250//! Use `subscribe` for work that must complete:
251//!
252//! ```text
253//! // GOOD: Every payment is processed
254//! payment_stream.subscribe(|payment| async move {
255//! process_payment(payment).await
256//! }).await;
257//! ```
258//!
259//! ## ❌ Don't: Block in Handlers
260//!
261//! ```text
262//! // BAD: Blocking operations stall the executor
263//! stream.subscribe(|item| async move {
264//! std::thread::sleep(Duration::from_secs(1)); // Blocks!
265//! Ok(())
266//! }).await;
267//! ```
268//!
269//! Use async operations or `spawn_blocking`:
270//!
271//! ```text
272//! // GOOD: Async sleep doesn't block
273//! stream.subscribe(|item| async move {
274//! tokio::time::sleep(Duration::from_secs(1)).await;
275//! Ok(())
276//! }).await;
277//! ```
278//!
279//! ## ❌ Don't: Use for CPU-Intensive Work
280//!
281//! ```text
282//! // BAD: CPU-intensive work on async runtime
283//! stream.subscribe(|data| async move {
284//! expensive_computation(data); // Blocks executor!
285//! Ok(())
286//! }).await;
287//! ```
288//!
289//! Offload to blocking threadpool:
290//!
291//! ```text
292//! // GOOD: CPU work on dedicated threads
293//! stream.subscribe(|data| async move {
294//! tokio::task::spawn_blocking(move || {
295//! expensive_computation(data)
296//! }).await?;
297//! Ok(())
298//! }).await;
299//! ```
300//!
301//! # Error Handling
302//!
303//! Both subscription methods return `Result`:
304//!
305//! - **`Ok(())`**: Stream completed successfully
306//! - **`Err(e)`**: Handler returned an error
307//!
308//! Errors from handlers are propagated but don't stop stream processing automatically.
309//! Design your handlers to return `Ok(())` to continue processing despite errors, or
310//! return `Err(e)` to stop on first error.
311//!
312//! # Getting Started
313//!
314//! Add to your `Cargo.toml`:
315//!
316//! ```toml
317//! [dependencies]
318//! fluxion-exec = { path = "../fluxion-exec" }
319//! tokio = { version = "1.48", features = ["rt", "sync"] }
320//! futures = "0.3"
321//! ```
322//!
323//! See individual trait documentation for detailed examples:
324//! - [`SubscribeExt`] for sequential processing
325//! - [`SubscribeLatestExt`] for latest-value processing
326//!
327//! [`subscribe`]: SubscribeExt::subscribe
328//! [`subscribe_latest`]: SubscribeLatestExt::subscribe_latest
329
330#![allow(clippy::multiple_crate_versions, clippy::doc_markdown)]
331#[macro_use]
332mod logging;
333pub mod subscribe;
334pub mod subscribe_latest;
335
336// Re-export commonly used types
337pub use subscribe::SubscribeExt;
338pub use subscribe_latest::SubscribeLatestExt;