Skip to main content

cognee_core/
task.rs

1// expect() calls guard construction-time invariants (is::<T>() check, non-empty
2// task guarantee) and are safe by design.
3#![allow(clippy::expect_used, reason = "invariants are upheld by construction")]
4
5use std::any::Any;
6use std::sync::Arc;
7
8use crate::rate_limiter::RateLimiter;
9
10use futures::future::BoxFuture;
11use futures::stream::{BoxStream, Stream, StreamExt};
12
13use crate::task_context::TaskContext;
14/// Type-erased value passed between pipeline tasks.
15///
16/// Automatically implemented for every `T: Any + Send + Sync + 'static`.
17/// Use `value.as_any().downcast_ref::<T>()` for a borrowed `&T`, or
18/// [`downcast_value`] to recover an owned `Box<T>`.
19///
20/// Both `Send` and `Sync` are required because the executor shares values
21/// via `Arc<dyn Value>` across retry attempts and fan-out.
22pub trait Value: Any + Send + Sync + 'static {
23    fn as_any(&self) -> &dyn Any;
24    fn as_any_mut(&mut self) -> &mut dyn Any;
25    fn into_any(self: Box<Self>) -> Box<dyn Any>;
26}
27
28impl<T: Any + Send + Sync + 'static> Value for T {
29    fn as_any(&self) -> &dyn Any {
30        self
31    }
32
33    fn as_any_mut(&mut self) -> &mut dyn Any {
34        self
35    }
36
37    fn into_any(self: Box<Self>) -> Box<dyn Any> {
38        self
39    }
40}
41
42/// Attempt to downcast a `Box<dyn Value>` to a concrete type.
43///
44/// Returns `Ok(Box<T>)` on success and gives the original box back as
45/// `Err(Box<dyn Value>)` on type mismatch.
46pub fn downcast_value<T: Any>(value: Box<dyn Value>) -> Result<Box<T>, Box<dyn Value>> {
47    if value.as_any().is::<T>() {
48        // SAFETY: confirmed with is::<T>() above.
49        Ok(value
50            .into_any()
51            .downcast::<T>()
52            .expect("downcast can't fail after is::<T>() check"))
53    } else {
54        Err(value)
55    }
56}
57
58/// A value wrapper that carries arbitrary string metadata alongside the inner
59/// value.
60///
61/// Use this to attach provenance information (e.g. `node_set`) to pipeline
62/// outputs so the executor can forward it to
63/// [`ExecStatusManager::stamp_provenance`](crate::exec_status::ExecStatusManager::stamp_provenance).
64///
65/// ```rust,ignore
66/// let output = Tagged::new(my_chunk)
67///     .with_meta("node_set", "entity_nodes");
68/// Ok(Arc::new(output) as Arc<dyn Value>)
69/// ```
70pub struct Tagged<T: Value> {
71    inner: T,
72    metadata: std::collections::HashMap<String, String>,
73}
74
75impl<T: Value> Tagged<T> {
76    pub fn new(inner: T) -> Self {
77        Self {
78            inner,
79            metadata: std::collections::HashMap::new(),
80        }
81    }
82
83    pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
84        self.metadata.insert(key.into(), value.into());
85        self
86    }
87
88    pub fn inner(&self) -> &T {
89        &self.inner
90    }
91
92    pub fn into_inner(self) -> T {
93        self.inner
94    }
95
96    pub fn meta(&self, key: &str) -> Option<&str> {
97        self.metadata.get(key).map(|s| s.as_str())
98    }
99
100    pub fn metadata(&self) -> &std::collections::HashMap<String, String> {
101        &self.metadata
102    }
103}
104
105// Tagged<T>: Any + Send + Sync + 'static  →  blanket impl covers it
106// (provided T: Value, which implies T: Any + Send + Sync + 'static,
107// and HashMap<String, String> is Send + Sync + 'static).
108
109/// Try to extract a `node_set` metadata value from an `Arc<dyn Value>`.
110///
111/// Checks whether the value is a [`TaggedMeta`] and extracts its `node_set`.
112/// Returns `None` if the value is not a `TaggedMeta` or the field is absent.
113///
114/// The generic [`Tagged<T>`] is a user-facing wrapper; provenance extraction
115/// in the executor uses `TaggedMeta` which is type-erased.
116pub fn extract_node_set(value: &dyn Value) -> Option<&str> {
117    value
118        .as_any()
119        .downcast_ref::<TaggedMeta>()
120        .and_then(|m| m.node_set.as_deref())
121}
122
123/// Lightweight metadata carrier that tasks can attach to any `Arc<dyn Value>`
124/// when they need to propagate `node_set` without wrapping in `Tagged<T>`.
125///
126/// The executor checks for this type when stamping provenance.
127pub struct TaggedMeta {
128    /// The wrapped value (type-erased).
129    pub value: Arc<dyn Value>,
130    /// Node set label for provenance stamping.
131    pub node_set: Option<String>,
132}
133
134impl TaggedMeta {
135    pub fn new(value: Arc<dyn Value>) -> Self {
136        Self {
137            value,
138            node_set: None,
139        }
140    }
141
142    pub fn with_node_set(mut self, node_set: impl Into<String>) -> Self {
143        self.node_set = Some(node_set.into());
144        self
145    }
146}
147/// Boxed error returned by a failing task.
148pub type TaskError = Box<dyn std::error::Error + Send + Sync + 'static>;
149
150/// Boxed, type-erased iterator yielded by `SyncIter` tasks.
151pub type ValueIter = Box<dyn Iterator<Item = Box<dyn Value>> + Send + 'static>;
152
153/// Boxed, type-erased async stream yielded by `AsyncStream` tasks.
154pub type ValueStream = BoxStream<'static, Box<dyn Value>>;
155
156//
157// Single-value flavours — calling convention:
158//   fn(input: Arc<dyn Value>, ctx: Arc<TaskContext>) -> <output>
159//
160// `Arc<dyn Value>` as input enables two things:
161//   • Retry:   the executor holds one Arc, clones it O(1) for each attempt.
162//   • Fan-out: the same Arc can be given to multiple downstream calls cheaply.
163//
164// Batch flavours — calling convention:
165//   fn(items: &[Box<dyn Value>], ctx: Arc<TaskContext>) -> <output>
166//
167// `&[Box<dyn Value>]` delivers a whole accumulated batch at once; the executor
168// decides the slice boundary based on the next task's configured batch_size.
169//
170// `Arc<dyn Fn(...)>` (not Box<dyn FnOnce>) means:
171//   • The same task object is callable multiple times without being consumed.
172
173/// Sync task: one value in → one value out.
174pub type SyncFn = Arc<
175    dyn Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<Arc<dyn Value>, TaskError> + Send + Sync,
176>;
177
178/// Async task: one value in → one value out (via a future).
179pub type AsyncFn = Arc<
180    dyn Fn(
181            Arc<dyn Value>,
182            Arc<TaskContext>,
183        ) -> BoxFuture<'static, Result<Arc<dyn Value>, TaskError>>
184        + Send
185        + Sync,
186>;
187
188/// Sync task: one value in → lazy iterator of values out.
189pub type SyncIterFn =
190    Arc<dyn Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<ValueIter, TaskError> + Send + Sync>;
191
192/// Async task: one value in → async stream of values out.
193pub type AsyncStreamFn =
194    Arc<dyn Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<ValueStream, TaskError> + Send + Sync>;
195
196/// Sync batch task: slice of values in → one value out.
197pub type SyncBatchFn = Arc<
198    dyn for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<Arc<dyn Value>, TaskError>
199        + Send
200        + Sync,
201>;
202
203/// Async batch task: slice of values in → one value out (via a future).
204pub type AsyncBatchFn = Arc<
205    dyn for<'a> Fn(
206            &'a [Box<dyn Value>],
207            Arc<TaskContext>,
208        ) -> BoxFuture<'static, Result<Arc<dyn Value>, TaskError>>
209        + Send
210        + Sync,
211>;
212
213/// Sync batch task: slice of values in → lazy iterator of values out.
214pub type SyncIterBatchFn = Arc<
215    dyn for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<ValueIter, TaskError>
216        + Send
217        + Sync,
218>;
219
220/// Async batch task: slice of values in → async stream of values out.
221pub type AsyncStreamBatchFn = Arc<
222    dyn for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<ValueStream, TaskError>
223        + Send
224        + Sync,
225>;
226/// A single reusable unit of work in a cognee pipeline.
227///
228/// | Variant | Execution | Input | Output |
229/// |---------|-----------|-------|--------|
230/// | [`Task::Sync`] | blocking | single value | single value |
231/// | [`Task::Async`] | non-blocking | single value | single value |
232/// | [`Task::SyncIter`] | blocking | single value | lazy iterator |
233/// | [`Task::AsyncStream`] | non-blocking | single value | async stream |
234/// | [`Task::SyncBatch`] | blocking | slice of values | single value |
235/// | [`Task::AsyncBatch`] | non-blocking | slice of values | single value |
236/// | [`Task::SyncIterBatch`] | blocking | slice of values | lazy iterator |
237/// | [`Task::AsyncStreamBatch`] | non-blocking | slice of values | async stream |
238///
239/// Single-value variants are called once per item. Batch variants receive a
240/// `&[Box<dyn Value>]` slice of items accumulated up to the task's `batch_size`.
241/// The pipeline executor detects which kind the next task is and routes
242/// accordingly.
243pub enum Task {
244    Sync(SyncFn),
245    Async(AsyncFn),
246    SyncIter(SyncIterFn),
247    AsyncStream(AsyncStreamFn),
248    SyncBatch(SyncBatchFn),
249    AsyncBatch(AsyncBatchFn),
250    SyncIterBatch(SyncIterBatchFn),
251    AsyncStreamBatch(AsyncStreamBatchFn),
252}
253
254impl Task {
255    /// Returns `true` if this task accepts a batch slice rather than a single value.
256    pub fn is_batch(&self) -> bool {
257        matches!(
258            self,
259            Task::SyncBatch(_)
260                | Task::AsyncBatch(_)
261                | Task::SyncIterBatch(_)
262                | Task::AsyncStreamBatch(_)
263        )
264    }
265
266    /// Python-compat label used in the `${task_type} Task Started/
267    /// Completed/Errored` analytics event names.
268    ///
269    /// Mirrors Python's
270    /// [`tasks/task.py:194-207`](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/tasks/task.py#L194-L207)
271    /// `inspect.isasyncgenfunction` / `iscoroutinefunction` branch:
272    ///
273    /// | Rust variant | Python label |
274    /// |---|---|
275    /// | `Task::Sync`, `Task::SyncBatch` | `"Function"` |
276    /// | `Task::Async`, `Task::AsyncBatch` | `"Coroutine"` |
277    /// | `Task::SyncIter`, `Task::SyncIterBatch` | `"Generator"` |
278    /// | `Task::AsyncStream`, `Task::AsyncStreamBatch` | `"Async Generator"` |
279    ///
280    /// The match is intentionally exhaustive (no wildcard arm) so that
281    /// adding a new `Task::*` variant fails the build until the
282    /// analytics mapping is decided.
283    pub fn python_task_type(&self) -> &'static str {
284        match self {
285            Task::Sync(_) | Task::SyncBatch(_) => "Function",
286            Task::Async(_) | Task::AsyncBatch(_) => "Coroutine",
287            Task::SyncIter(_) | Task::SyncIterBatch(_) => "Generator",
288            Task::AsyncStream(_) | Task::AsyncStreamBatch(_) => "Async Generator",
289        }
290    }
291}
292
293impl Task {
294    // ── Raw constructors (type-erased Arc<dyn Value> in/out) ──────────────────
295
296    /// Create a [`Task::Sync`] from a raw closure.
297    pub fn sync<F>(f: F) -> Self
298    where
299        F: Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<Arc<dyn Value>, TaskError>
300            + Send
301            + Sync
302            + 'static,
303    {
304        Task::Sync(Arc::new(f))
305    }
306
307    /// Create a [`Task::Async`] from a raw closure returning a [`BoxFuture`].
308    pub fn async_fn<F>(f: F) -> Self
309    where
310        F: Fn(
311                Arc<dyn Value>,
312                Arc<TaskContext>,
313            ) -> BoxFuture<'static, Result<Arc<dyn Value>, TaskError>>
314            + Send
315            + Sync
316            + 'static,
317    {
318        Task::Async(Arc::new(f))
319    }
320
321    /// Create a [`Task::SyncIter`] from a raw closure returning a [`ValueIter`].
322    pub fn sync_iter<F>(f: F) -> Self
323    where
324        F: Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<ValueIter, TaskError>
325            + Send
326            + Sync
327            + 'static,
328    {
329        Task::SyncIter(Arc::new(f))
330    }
331
332    /// Create a [`Task::AsyncStream`] from a raw closure returning a
333    /// [`ValueStream`].
334    pub fn async_stream<F>(f: F) -> Self
335    where
336        F: Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<ValueStream, TaskError>
337            + Send
338            + Sync
339            + 'static,
340    {
341        Task::AsyncStream(Arc::new(f))
342    }
343
344    // ── Raw batch constructors (type-erased &[Box<dyn Value>] in) ─────────────
345
346    /// Create a [`Task::SyncBatch`] from a raw closure.
347    pub fn sync_batch<F>(f: F) -> Self
348    where
349        F: for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<Arc<dyn Value>, TaskError>
350            + Send
351            + Sync
352            + 'static,
353    {
354        Task::SyncBatch(Arc::new(f))
355    }
356
357    /// Create a [`Task::AsyncBatch`] from a raw closure returning a [`BoxFuture`].
358    pub fn async_batch<F>(f: F) -> Self
359    where
360        F: for<'a> Fn(
361                &'a [Box<dyn Value>],
362                Arc<TaskContext>,
363            ) -> BoxFuture<'static, Result<Arc<dyn Value>, TaskError>>
364            + Send
365            + Sync
366            + 'static,
367    {
368        Task::AsyncBatch(Arc::new(f))
369    }
370
371    /// Create a [`Task::SyncIterBatch`] from a raw closure returning a [`ValueIter`].
372    pub fn sync_iter_batch<F>(f: F) -> Self
373    where
374        F: for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<ValueIter, TaskError>
375            + Send
376            + Sync
377            + 'static,
378    {
379        Task::SyncIterBatch(Arc::new(f))
380    }
381
382    /// Create a [`Task::AsyncStreamBatch`] from a raw closure returning a [`ValueStream`].
383    pub fn async_stream_batch<F>(f: F) -> Self
384    where
385        F: for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<ValueStream, TaskError>
386            + Send
387            + Sync
388            + 'static,
389    {
390        Task::AsyncStreamBatch(Arc::new(f))
391    }
392
393    // ── Typed constructors ────────────────────────────────────────────────────
394    //
395    // These accept closures over concrete `&I` / `Box<O>` types and generate all
396    // downcast / coercion boilerplate automatically.
397    //
398    // The input is presented as `&I` — a borrowed view obtained via
399    // `downcast_ref` on the shared `Arc<dyn Value>`.  The `Arc` stays alive
400    // for the duration of the call, so the reference is valid.
401    //
402    // For async variants the closure must return `BoxFuture<'static, ...>`,
403    // which means the future may NOT borrow `&I`.  Any data needed inside the
404    // async block must be owned (copied/cloned) before `Box::pin(async move {
405    // ... })`.
406    //
407    // The concrete output type `O: Sized` allows the wrapper to convert
408    // `Box<O>` → `Arc<dyn Value>` via `Arc::new(*box_o)` at zero extra cost.
409
410    /// Create a [`Task::Sync`] from a typed closure.
411    ///
412    /// ```rust,ignore
413    /// Task::sync_typed(|input: &MyInput, ctx| {
414    ///     Ok(Box::new(process(input)))
415    /// })
416    /// ```
417    pub fn sync_typed<I, O, F>(f: F) -> Self
418    where
419        I: Value,
420        O: Value,
421        F: Fn(&I, Arc<TaskContext>) -> Result<Box<O>, TaskError> + Send + Sync + 'static,
422    {
423        Task::Sync(Arc::new(move |input: Arc<dyn Value>, ctx| {
424            let typed = Self::borrow_input::<I>(&input);
425            f(typed, ctx).map(|v| Arc::new(*v) as Arc<dyn Value>)
426        }))
427    }
428
429    /// Create a [`Task::Async`] from a typed closure returning a `'static`
430    /// future.
431    ///
432    /// Data needed inside the async block must be copied/cloned before it:
433    ///
434    /// ```rust,ignore
435    /// Task::async_fn_typed(|input: &MyInput, ctx| {
436    ///     let id = input.id;  // copy before async block
437    ///     Box::pin(async move {
438    ///         Ok(Box::new(fetch(id).await?))
439    ///     })
440    /// })
441    /// ```
442    pub fn async_fn_typed<I, O, F>(f: F) -> Self
443    where
444        I: Value,
445        O: Value,
446        F: Fn(&I, Arc<TaskContext>) -> BoxFuture<'static, Result<Box<O>, TaskError>>
447            + Send
448            + Sync
449            + 'static,
450    {
451        Task::Async(Arc::new(move |input: Arc<dyn Value>, ctx| {
452            let typed = Self::borrow_input::<I>(&input);
453            // `f(typed, ctx)` produces a 'static future and must not borrow
454            // from `typed` (ensured by the BoxFuture<'static> bound).
455            let fut = f(typed, ctx);
456            Box::pin(async move { fut.await.map(|v| Arc::new(*v) as Arc<dyn Value>) })
457        }))
458    }
459
460    /// Create a [`Task::SyncIter`] from a typed closure returning a concrete
461    /// iterator.  The iterator must be `'static` (may not borrow the input).
462    ///
463    /// ```rust,ignore
464    /// Task::sync_iter_typed(|input: &Document, ctx| {
465    ///     let chunks = split(input.text.clone());
466    ///     Ok(chunks.into_iter().map(Box::new))
467    /// })
468    /// ```
469    pub fn sync_iter_typed<I, O, F, Iter>(f: F) -> Self
470    where
471        I: Value,
472        O: Value,
473        F: Fn(&I, Arc<TaskContext>) -> Result<Iter, TaskError> + Send + Sync + 'static,
474        Iter: Iterator<Item = Box<O>> + Send + 'static,
475    {
476        Task::SyncIter(Arc::new(move |input: Arc<dyn Value>, ctx| {
477            let typed = Self::borrow_input::<I>(&input);
478            f(typed, ctx).map(|iter| Box::new(iter.map(|v| v as Box<dyn Value>)) as ValueIter)
479        }))
480    }
481
482    /// Create a [`Task::AsyncStream`] from a typed closure returning a concrete
483    /// stream.  The stream must be `'static`.
484    ///
485    /// ```rust,ignore
486    /// Task::async_stream_typed(|input: &DatasetId, ctx| {
487    ///     let id = *input;
488    ///     Ok(stream_chunks(id))
489    /// })
490    /// ```
491    pub fn async_stream_typed<I, O, F, S>(f: F) -> Self
492    where
493        I: Value,
494        O: Value,
495        F: Fn(&I, Arc<TaskContext>) -> Result<S, TaskError> + Send + Sync + 'static,
496        S: Stream<Item = Box<O>> + Send + 'static,
497    {
498        Task::AsyncStream(Arc::new(move |input: Arc<dyn Value>, ctx| {
499            let typed = Self::borrow_input::<I>(&input);
500            f(typed, ctx).map(|s| Box::pin(s.map(|v| v as Box<dyn Value>)) as ValueStream)
501        }))
502    }
503
504    // ── Typed batch constructors ──────────────────────────────────────────────
505    //
506    // Same ergonomics as the single-value typed constructors, but the closure
507    // receives `&[&I]` — a slice of borrow-downcast references.  Any data
508    // needed inside an async block must be owned (copied/cloned) before
509    // `Box::pin(async move { ... })`.
510
511    /// Create a [`Task::SyncBatch`] from a typed closure receiving `&[&I]`.
512    ///
513    /// ```rust,ignore
514    /// Task::sync_batch_typed(|chunks: &[&DocumentChunk], ctx| {
515    ///     Ok(Box::new(embed_all(chunks)))
516    /// })
517    /// ```
518    pub fn sync_batch_typed<I, O, F>(f: F) -> Self
519    where
520        I: Value,
521        O: Value,
522        F: for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<Box<O>, TaskError>
523            + Send
524            + Sync
525            + 'static,
526    {
527        Task::SyncBatch(Arc::new(move |items: &[Box<dyn Value>], ctx| {
528            let typed: Vec<&I> = items.iter().map(|v| Self::borrow_item::<I>(v)).collect();
529            f(&typed, ctx).map(|v| Arc::new(*v) as Arc<dyn Value>)
530        }))
531    }
532
533    /// Create a [`Task::AsyncBatch`] from a typed closure returning a `'static` future.
534    ///
535    /// Data needed inside the async block must be copied/cloned before it:
536    ///
537    /// ```rust,ignore
538    /// Task::async_batch_typed(|chunks: &[&DocumentChunk], ctx| {
539    ///     let texts: Vec<String> = chunks.iter().map(|c| c.text.clone()).collect();
540    ///     Box::pin(async move {
541    ///         Ok(Box::new(embed_batch(texts).await?))
542    ///     })
543    /// })
544    /// ```
545    pub fn async_batch_typed<I, O, F>(f: F) -> Self
546    where
547        I: Value,
548        O: Value,
549        F: for<'a> Fn(
550                &'a [&'a I],
551                Arc<TaskContext>,
552            ) -> BoxFuture<'static, Result<Box<O>, TaskError>>
553            + Send
554            + Sync
555            + 'static,
556    {
557        Task::AsyncBatch(Arc::new(move |items: &[Box<dyn Value>], ctx| {
558            let typed: Vec<&I> = items.iter().map(|v| Self::borrow_item::<I>(v)).collect();
559            let fut = f(&typed, ctx);
560            Box::pin(async move { fut.await.map(|v| Arc::new(*v) as Arc<dyn Value>) })
561        }))
562    }
563
564    /// Create a [`Task::SyncIterBatch`] from a typed closure returning a concrete iterator.
565    pub fn sync_iter_batch_typed<I, O, F, Iter>(f: F) -> Self
566    where
567        I: Value,
568        O: Value,
569        F: for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<Iter, TaskError>
570            + Send
571            + Sync
572            + 'static,
573        Iter: Iterator<Item = Box<O>> + Send + 'static,
574    {
575        Task::SyncIterBatch(Arc::new(move |items: &[Box<dyn Value>], ctx| {
576            let typed: Vec<&I> = items.iter().map(|v| Self::borrow_item::<I>(v)).collect();
577            f(&typed, ctx).map(|iter| Box::new(iter.map(|v| v as Box<dyn Value>)) as ValueIter)
578        }))
579    }
580
581    /// Create a [`Task::AsyncStreamBatch`] from a typed closure returning a concrete stream.
582    pub fn async_stream_batch_typed<I, O, F, S>(f: F) -> Self
583    where
584        I: Value,
585        O: Value,
586        F: for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<S, TaskError>
587            + Send
588            + Sync
589            + 'static,
590        S: Stream<Item = Box<O>> + Send + 'static,
591    {
592        Task::AsyncStreamBatch(Arc::new(move |items: &[Box<dyn Value>], ctx| {
593            let typed: Vec<&I> = items.iter().map(|v| Self::borrow_item::<I>(v)).collect();
594            f(&typed, ctx).map(|s| Box::pin(s.map(|v| v as Box<dyn Value>)) as ValueStream)
595        }))
596    }
597
598    // ── Helpers ───────────────────────────────────────────────────────────────
599
600    /// Borrow-downcast the input to `&I`.
601    ///
602    /// Panics on type mismatch — a mismatch means the pipeline was assembled
603    /// with incompatible task types (programming error).
604    fn borrow_input<I: Value>(input: &Arc<dyn Value>) -> &I {
605        let type_name = std::any::type_name::<I>();
606        // Explicit deref through Arc to reach the inner `dyn Value`, then call
607        // `as_any` via vtable dispatch. Without this, method resolution finds
608        // `<Arc<dyn Value> as Value>::as_any()` (via the blanket impl) instead
609        // of dispatching through the trait object.
610        (**input)
611            .as_any()
612            .downcast_ref::<I>()
613            .unwrap_or_else(|| panic!("Task input type mismatch: expected {type_name}"))
614    }
615
616    /// Borrow-downcast a `Box<dyn Value>` item to `&I`.
617    ///
618    /// Used inside typed batch constructors to downcast each slice element.
619    fn borrow_item<I: Value>(item: &dyn Value) -> &I {
620        let type_name = std::any::type_name::<I>();
621        item.as_any()
622            .downcast_ref::<I>()
623            .unwrap_or_else(|| panic!("Batch item type mismatch: expected {type_name}"))
624    }
625
626    /// Call this task with a single input value.
627    ///
628    /// Panics if called on a batch variant — use [`Task::call_batch`] for those.
629    /// The task is `Fn`, so `&self` suffices — the same task object handles
630    /// every input in a fan-out scenario and every retry attempt.
631    pub fn call(&self, input: Arc<dyn Value>, ctx: Arc<TaskContext>) -> TaskCall {
632        match self {
633            Task::Sync(f) => TaskCall::Sync(f(input, ctx)),
634            Task::Async(f) => TaskCall::Async(f(input, ctx)),
635            Task::SyncIter(f) => TaskCall::SyncIter(f(input, ctx)),
636            Task::AsyncStream(f) => TaskCall::AsyncStream(f(input, ctx)),
637            Task::SyncBatch(_)
638            | Task::AsyncBatch(_)
639            | Task::SyncIterBatch(_)
640            | Task::AsyncStreamBatch(_) => {
641                panic!("call() used on a batch task variant — use call_batch() instead")
642            }
643        }
644    }
645
646    /// Build a task that runs multiple sub-tasks concurrently on the same input.
647    ///
648    /// Semantics (matching Python `run_tasks_parallel`):
649    /// - Each sub-task receives `Arc::clone(&input)` and the shared context.
650    /// - All sub-tasks run concurrently via `futures::future::join_all`.
651    /// - If any sub-task fails, the whole parallel task fails with that error.
652    /// - On success, returns the result of the **last** sub-task (by position).
653    ///
654    /// Only single-value (`Sync` / `Async`) sub-tasks are supported. Iter/stream
655    /// sub-tasks inside a parallel group don't have well-defined "last result"
656    /// semantics and will panic at call time.
657    pub fn parallel(tasks: Vec<Task>) -> Self {
658        let tasks = Arc::new(tasks);
659        Task::Async(Arc::new(move |input, ctx| {
660            let tasks = Arc::clone(&tasks);
661            Box::pin(async move {
662                if tasks.is_empty() {
663                    return Ok(input);
664                }
665
666                let futs: Vec<_> = tasks
667                    .iter()
668                    .map(|t| {
669                        let call = t.call(Arc::clone(&input), Arc::clone(&ctx));
670                        async move {
671                            match call {
672                                TaskCall::Sync(result) => result,
673                                TaskCall::Async(fut) => fut.await,
674                                TaskCall::SyncIter(_) | TaskCall::AsyncStream(_) => {
675                                    Err("iter/stream tasks are not supported inside Task::parallel"
676                                        .into())
677                                }
678                            }
679                        }
680                    })
681                    .collect();
682
683                let results = futures::future::join_all(futs).await;
684
685                // Collect: if any failed, return the first error.
686                // Otherwise return the last successful result.
687                let mut last_ok: Option<Arc<dyn Value>> = None;
688                for r in results {
689                    match r {
690                        Err(e) => return Err(e),
691                        Ok(v) => last_ok = Some(v),
692                    }
693                }
694
695                Ok(last_ok.expect("non-empty tasks guaranteed above"))
696            })
697        }))
698    }
699
700    /// Call this batch task with a slice of accumulated values.
701    ///
702    /// Panics if called on a single-value variant — use [`Task::call`] for those.
703    pub fn call_batch(&self, items: &[Box<dyn Value>], ctx: Arc<TaskContext>) -> TaskCall {
704        match self {
705            Task::SyncBatch(f) => TaskCall::Sync(f(items, ctx)),
706            Task::AsyncBatch(f) => TaskCall::Async(f(items, ctx)),
707            Task::SyncIterBatch(f) => TaskCall::SyncIter(f(items, ctx)),
708            Task::AsyncStreamBatch(f) => TaskCall::AsyncStream(f(items, ctx)),
709            Task::Sync(_) | Task::Async(_) | Task::SyncIter(_) | Task::AsyncStream(_) => {
710                panic!("call_batch() used on a single-value task variant — use call() instead")
711            }
712        }
713    }
714}
715/// A [`Task`] bundled with optional per-task configuration.
716///
717/// Use [`TaskInfo::new`] to wrap a task and then chain `.with_name` /
718/// `.with_batch_size` to override the pipeline-level defaults:
719///
720/// ```rust,ignore
721/// TaskInfo::new(my_task)
722///     .with_name("embed-chunks")
723///     .with_batch_size(16)
724/// ```
725pub struct TaskInfo {
726    pub task: Task,
727    /// Human-readable label used in watcher events and status logs.
728    pub name: Option<String>,
729    /// Overrides the pipeline-level `batch_size` for this task.
730    /// `None` → inherit `pipeline.batch_size`.
731    pub batch_size: Option<usize>,
732    /// Template for a human-readable result summary recorded as a tracing span
733    /// attribute when the `telemetry` feature is enabled.
734    ///
735    /// Use `{n}` as a placeholder for the result count.
736    /// E.g. `"Classified {n} document(s)"`.
737    pub summary_template: Option<String>,
738    /// Relative weight for progress allocation. The executor normalizes weights
739    /// across all tasks to determine what fraction of overall progress each
740    /// task owns. Default: 1.
741    pub weight: u32,
742    /// If `true`, a returned `PassthroughSentinel` forwards the task's input
743    /// unchanged to the next task instead of being an error. Default `false`.
744    ///
745    /// Note: pass-through is defined for **single-value** outputs only. For
746    /// iterator/stream tasks there is no single "original input" to forward;
747    /// those paths treat `PassthroughSentinel` as a regular value. Use
748    /// `DroppedSentinel` to skip individual items in iterator/stream tasks.
749    pub enriches: bool,
750    /// Per-task rate limiter. Overrides the pipeline-level limiter when set.
751    /// `None` inherits the pipeline limiter (or no throttling if that is also
752    /// `None`). See [`RateLimiter`] for details.
753    pub rate_limiter: Option<Arc<dyn RateLimiter>>,
754}
755
756impl TaskInfo {
757    pub fn new(task: Task) -> Self {
758        Self {
759            task,
760            name: None,
761            batch_size: None,
762            summary_template: None,
763            weight: 1,
764            enriches: false,
765            rate_limiter: None,
766        }
767    }
768
769    pub fn with_name(mut self, name: impl Into<String>) -> Self {
770        self.name = Some(name.into());
771        self
772    }
773
774    pub fn with_batch_size(mut self, size: usize) -> Self {
775        assert!(size > 0, "batch_size must be > 0");
776        self.batch_size = Some(size);
777        self
778    }
779
780    /// Set a summary template for telemetry.
781    ///
782    /// `{n}` is replaced with the result count at runtime.
783    /// E.g. `"Classified {n} document(s)"`.
784    pub fn with_summary(mut self, template: impl Into<String>) -> Self {
785        self.summary_template = Some(template.into());
786        self
787    }
788
789    pub fn with_weight(mut self, weight: u32) -> Self {
790        self.weight = weight;
791        self
792    }
793
794    /// Mark this task as an enrichment step: returning [`PassthroughSentinel`]
795    /// forwards the input unchanged rather than failing.
796    ///
797    /// Pass-through is defined for **single-value** outputs only — it forwards
798    /// the one input of a 1-in/1-out task. Iterator/stream tasks that want to
799    /// skip individual items should yield [`DroppedSentinel`] instead.
800    ///
801    /// [`PassthroughSentinel`]: crate::sentinels::PassthroughSentinel
802    /// [`DroppedSentinel`]: crate::sentinels::DroppedSentinel
803    pub fn with_enriches(mut self) -> Self {
804        self.enriches = true;
805        self
806    }
807
808    /// Set a per-task rate limiter, overriding the pipeline-level one.
809    ///
810    /// When set, every attempt inside `call_with_retry` (and every batch call
811    /// in `dispatch_batch`) acquires a token from this limiter before calling
812    /// the task. `None` inherits the pipeline-level limiter.
813    pub fn with_rate_limiter(mut self, rl: Arc<dyn RateLimiter>) -> Self {
814        self.rate_limiter = Some(rl);
815        self
816    }
817
818    /// Build a parallel task from multiple `TaskInfo`s.
819    ///
820    /// Extracts the inner [`Task`]s, delegates to [`Task::parallel`], and
821    /// auto-generates a name like `"parallel([name1, name2, …])"`.
822    pub fn parallel(infos: Vec<TaskInfo>) -> Self {
823        let names: Vec<String> = infos
824            .iter()
825            .enumerate()
826            .map(|(i, ti)| ti.name.clone().unwrap_or_else(|| format!("task_{i}")))
827            .collect();
828
829        let tasks: Vec<Task> = infos.into_iter().map(|ti| ti.task).collect();
830
831        TaskInfo {
832            task: Task::parallel(tasks),
833            name: Some(format!("parallel([{}])", names.join(", "))),
834            batch_size: None,
835            summary_template: None,
836            weight: 1,
837            enriches: false,
838            rate_limiter: None,
839        }
840    }
841}
842
843impl From<Task> for TaskInfo {
844    fn from(task: Task) -> Self {
845        TaskInfo::new(task)
846    }
847}
848
849/// Typed sync single-value fn: `&I → Result<Box<O>, TaskError>`.
850type TypedSyncFn<I, O> = dyn Fn(&I, Arc<TaskContext>) -> Result<Box<O>, TaskError> + Send + Sync;
851/// Typed async single-value fn: `&I → BoxFuture<Result<Box<O>, TaskError>>`.
852type TypedAsyncFn<I, O> =
853    dyn Fn(&I, Arc<TaskContext>) -> BoxFuture<'static, Result<Box<O>, TaskError>> + Send + Sync;
854/// Typed sync iterator fn: `&I → Result<Box<dyn Iterator<Item=Box<O>>>, TaskError>`.
855type TypedSyncIterFn<I, O> = dyn Fn(&I, Arc<TaskContext>) -> Result<Box<dyn Iterator<Item = Box<O>> + Send + 'static>, TaskError>
856    + Send
857    + Sync;
858/// Typed async stream fn: `&I → Result<BoxStream<Box<O>>, TaskError>`.
859type TypedAsyncStreamFn<I, O> =
860    dyn Fn(&I, Arc<TaskContext>) -> Result<BoxStream<'static, Box<O>>, TaskError> + Send + Sync;
861/// Typed sync batch fn: `&[&I] → Result<Box<O>, TaskError>`.
862type TypedSyncBatchFn<I, O> =
863    dyn for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<Box<O>, TaskError> + Send + Sync;
864/// Typed async batch fn: `&[&I] → BoxFuture<Result<Box<O>, TaskError>>`.
865type TypedAsyncBatchFn<I, O> = dyn for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> BoxFuture<'static, Result<Box<O>, TaskError>>
866    + Send
867    + Sync;
868/// Typed sync batch iterator fn: `&[&I] → Result<Box<dyn Iterator<Item=Box<O>>>, TaskError>`.
869type TypedSyncIterBatchFn<I, O> = dyn for<'a> Fn(
870        &'a [&'a I],
871        Arc<TaskContext>,
872    ) -> Result<Box<dyn Iterator<Item = Box<O>> + Send + 'static>, TaskError>
873    + Send
874    + Sync;
875/// Typed async batch stream fn: `&[&I] → Result<BoxStream<Box<O>>, TaskError>`.
876type TypedAsyncStreamBatchFn<I, O> = dyn for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<BoxStream<'static, Box<O>>, TaskError>
877    + Send
878    + Sync;
879
880/// A typed pipeline task whose input and output types are tracked at the type level.
881///
882/// Unlike [`Task`], which erases all types to [`Value`] trait objects, `TypedTask<I, O>`
883/// carries the concrete input type `I` and output type `O` in its variant signatures.
884/// This allows [`PipelineBuilder`](crate::pipeline::PipelineBuilder) to enforce at
885/// compile time that the output type of each task matches the input type of the next.
886///
887/// Type erasure occurs only when the task is converted to [`Task`] or [`TaskInfo`] via
888/// the [`From`] impls, which delegate to the corresponding [`Task::sync_typed`] /
889/// [`Task::async_fn_typed`] / … constructors.
890///
891/// # Constructors
892///
893/// | Method | Task variant |
894/// |---|---|
895/// | [`sync`](TypedTask::sync) | `Sync` — blocking, `&I → Box<O>` |
896/// | [`async_fn`](TypedTask::async_fn) | `Async` — non-blocking, `&I → Box<O>` |
897/// | [`sync_iter`](TypedTask::sync_iter) | `SyncIter` — blocking, `&I → Iterator<Box<O>>` |
898/// | [`async_stream`](TypedTask::async_stream) | `AsyncStream` — non-blocking, `&I → Stream<Box<O>>` |
899/// | [`sync_batch`](TypedTask::sync_batch) | `SyncBatch` — blocking, `&[&I] → Box<O>` |
900/// | [`async_batch`](TypedTask::async_batch) | `AsyncBatch` — non-blocking, `&[&I] → Box<O>` |
901/// | [`sync_iter_batch`](TypedTask::sync_iter_batch) | `SyncIterBatch` — blocking, `&[&I] → Iterator<Box<O>>` |
902/// | [`async_stream_batch`](TypedTask::async_stream_batch) | `AsyncStreamBatch` — non-blocking, `&[&I] → Stream<Box<O>>` |
903pub enum TypedTask<I: Value, O: Value> {
904    /// Blocking single-value task: `&I → Result<Box<O>, TaskError>`.
905    Sync(Arc<TypedSyncFn<I, O>>),
906    /// Non-blocking single-value task: `&I → BoxFuture<Result<Box<O>, TaskError>>`.
907    Async(Arc<TypedAsyncFn<I, O>>),
908    /// Blocking iterator task: `&I → Result<Box<dyn Iterator<Item=Box<O>>>, TaskError>`.
909    SyncIter(Arc<TypedSyncIterFn<I, O>>),
910    /// Non-blocking stream task: `&I → Result<BoxStream<Box<O>>, TaskError>`.
911    AsyncStream(Arc<TypedAsyncStreamFn<I, O>>),
912    /// Blocking batch task: `&[&I] → Result<Box<O>, TaskError>`.
913    SyncBatch(Arc<TypedSyncBatchFn<I, O>>),
914    /// Non-blocking batch task: `&[&I] → BoxFuture<Result<Box<O>, TaskError>>`.
915    AsyncBatch(Arc<TypedAsyncBatchFn<I, O>>),
916    /// Blocking batch iterator task: `&[&I] → Result<Box<dyn Iterator<Item=Box<O>>>, TaskError>`.
917    SyncIterBatch(Arc<TypedSyncIterBatchFn<I, O>>),
918    /// Non-blocking batch stream task: `&[&I] → Result<BoxStream<Box<O>>, TaskError>`.
919    AsyncStreamBatch(Arc<TypedAsyncStreamBatchFn<I, O>>),
920}
921
922impl<I: Value, O: Value> TypedTask<I, O> {
923    /// Create a [`TypedTask::Sync`] from a typed closure `&I → Result<Box<O>, TaskError>`.
924    pub fn sync<F>(f: F) -> Self
925    where
926        F: Fn(&I, Arc<TaskContext>) -> Result<Box<O>, TaskError> + Send + Sync + 'static,
927    {
928        TypedTask::Sync(Arc::new(f))
929    }
930
931    /// Create a [`TypedTask::Async`] from a typed closure returning a `'static` future.
932    ///
933    /// Any data needed inside the async block must be owned (copied/cloned) before
934    /// `Box::pin(async move { ... })`.
935    pub fn async_fn<F>(f: F) -> Self
936    where
937        F: Fn(&I, Arc<TaskContext>) -> BoxFuture<'static, Result<Box<O>, TaskError>>
938            + Send
939            + Sync
940            + 'static,
941    {
942        TypedTask::Async(Arc::new(f))
943    }
944
945    /// Create a [`TypedTask::SyncIter`] from a typed closure returning a concrete iterator.
946    ///
947    /// The iterator is boxed into `Box<dyn Iterator<Item=Box<O>>>` at construction time.
948    pub fn sync_iter<F, Iter>(f: F) -> Self
949    where
950        F: Fn(&I, Arc<TaskContext>) -> Result<Iter, TaskError> + Send + Sync + 'static,
951        Iter: Iterator<Item = Box<O>> + Send + 'static,
952    {
953        TypedTask::SyncIter(Arc::new(move |i, ctx| {
954            f(i, ctx)
955                .map(|iter| Box::new(iter) as Box<dyn Iterator<Item = Box<O>> + Send + 'static>)
956        }))
957    }
958
959    /// Create a [`TypedTask::AsyncStream`] from a typed closure returning a concrete stream.
960    ///
961    /// The stream is pinned into a `BoxStream` at construction time.
962    pub fn async_stream<F, S>(f: F) -> Self
963    where
964        F: Fn(&I, Arc<TaskContext>) -> Result<S, TaskError> + Send + Sync + 'static,
965        S: Stream<Item = Box<O>> + Send + 'static,
966    {
967        TypedTask::AsyncStream(Arc::new(move |i, ctx| {
968            f(i, ctx).map(|s| Box::pin(s) as BoxStream<'static, Box<O>>)
969        }))
970    }
971
972    /// Create a [`TypedTask::SyncBatch`] from a typed closure `&[&I] → Result<Box<O>, TaskError>`.
973    pub fn sync_batch<F>(f: F) -> Self
974    where
975        F: for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<Box<O>, TaskError>
976            + Send
977            + Sync
978            + 'static,
979    {
980        TypedTask::SyncBatch(Arc::new(f))
981    }
982
983    /// Create a [`TypedTask::AsyncBatch`] from a typed closure returning a `'static` future.
984    pub fn async_batch<F>(f: F) -> Self
985    where
986        F: for<'a> Fn(
987                &'a [&'a I],
988                Arc<TaskContext>,
989            ) -> BoxFuture<'static, Result<Box<O>, TaskError>>
990            + Send
991            + Sync
992            + 'static,
993    {
994        TypedTask::AsyncBatch(Arc::new(f))
995    }
996
997    /// Create a [`TypedTask::SyncIterBatch`] from a typed closure returning a concrete iterator.
998    pub fn sync_iter_batch<F, Iter>(f: F) -> Self
999    where
1000        F: for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<Iter, TaskError>
1001            + Send
1002            + Sync
1003            + 'static,
1004        Iter: Iterator<Item = Box<O>> + Send + 'static,
1005    {
1006        TypedTask::SyncIterBatch(Arc::new(move |items, ctx| {
1007            f(items, ctx)
1008                .map(|iter| Box::new(iter) as Box<dyn Iterator<Item = Box<O>> + Send + 'static>)
1009        }))
1010    }
1011
1012    /// Create a [`TypedTask::AsyncStreamBatch`] from a typed closure returning a concrete stream.
1013    pub fn async_stream_batch<F, S>(f: F) -> Self
1014    where
1015        F: for<'a> Fn(&'a [&'a I], Arc<TaskContext>) -> Result<S, TaskError>
1016            + Send
1017            + Sync
1018            + 'static,
1019        S: Stream<Item = Box<O>> + Send + 'static,
1020    {
1021        TypedTask::AsyncStreamBatch(Arc::new(move |items, ctx| {
1022            f(items, ctx).map(|s| Box::pin(s) as BoxStream<'static, Box<O>>)
1023        }))
1024    }
1025}
1026
1027impl<I: Value, O: Value> From<TypedTask<I, O>> for Task {
1028    /// Erase `I` and `O`, producing the type-erased [`Task`].
1029    ///
1030    /// Delegates to the corresponding [`Task::sync_typed`] / [`Task::async_fn_typed`] / …
1031    /// constructor, reusing their downcast logic.
1032    fn from(typed: TypedTask<I, O>) -> Self {
1033        match typed {
1034            TypedTask::Sync(f) => Task::sync_typed(move |i: &I, ctx| f(i, ctx)),
1035            TypedTask::Async(f) => Task::async_fn_typed(move |i: &I, ctx| f(i, ctx)),
1036            TypedTask::SyncIter(f) => Task::sync_iter_typed(move |i: &I, ctx| f(i, ctx)),
1037            TypedTask::AsyncStream(f) => Task::async_stream_typed(move |i: &I, ctx| f(i, ctx)),
1038            TypedTask::SyncBatch(f) => {
1039                Task::sync_batch_typed(move |items: &[&I], ctx| f(items, ctx))
1040            }
1041            TypedTask::AsyncBatch(f) => {
1042                Task::async_batch_typed(move |items: &[&I], ctx| f(items, ctx))
1043            }
1044            TypedTask::SyncIterBatch(f) => {
1045                Task::sync_iter_batch_typed(move |items: &[&I], ctx| f(items, ctx))
1046            }
1047            TypedTask::AsyncStreamBatch(f) => {
1048                Task::async_stream_batch_typed(move |items: &[&I], ctx| f(items, ctx))
1049            }
1050        }
1051    }
1052}
1053
1054impl<I: Value, O: Value> From<TypedTask<I, O>> for TaskInfo {
1055    fn from(t: TypedTask<I, O>) -> TaskInfo {
1056        TaskInfo::new(Task::from(t))
1057    }
1058}
1059
1060/// The pending (or already-resolved) output of [`Task::call`].
1061pub enum TaskCall {
1062    /// Already-computed single value (or an error).
1063    Sync(Result<Arc<dyn Value>, TaskError>),
1064
1065    /// Future resolving to a single value (or an error).
1066    Async(BoxFuture<'static, Result<Arc<dyn Value>, TaskError>>),
1067
1068    /// Lazy iterator of values (or a setup error).
1069    SyncIter(Result<ValueIter, TaskError>),
1070
1071    /// Async stream of values (or a setup error).
1072    AsyncStream(Result<ValueStream, TaskError>),
1073}
1074
1075#[cfg(test)]
1076#[allow(
1077    clippy::unwrap_used,
1078    clippy::expect_used,
1079    reason = "test code — panics are acceptable failures"
1080)]
1081mod tests {
1082    use super::*;
1083    use std::future::Future;
1084    use std::pin::Pin;
1085
1086    use crate::cancellation::cancellation_pair;
1087    use crate::exec_status::NoopExecStatusManager;
1088    use crate::progress::ProgressToken;
1089    use crate::task_context::TaskContext;
1090    use crate::thread_pool::CpuPool;
1091
1092    // ── Minimal stub for CpuPool (no mock crate needed) ─────────────────────
1093
1094    struct StubPool;
1095    impl CpuPool for StubPool {
1096        fn spawn_raw(
1097            &self,
1098            _task: Box<dyn FnOnce() + Send + 'static>,
1099        ) -> Pin<Box<dyn Future<Output = Result<(), crate::error::CoreError>> + Send + 'static>>
1100        {
1101            Box::pin(async { Ok(()) })
1102        }
1103    }
1104
1105    async fn stub_ctx() -> Arc<TaskContext> {
1106        let db = cognee_database::connect("sqlite::memory:").await.unwrap();
1107        cognee_database::initialize(&db).await.unwrap();
1108        let (_handle, token) = cancellation_pair();
1109        Arc::new(TaskContext {
1110            thread_pool: Arc::new(StubPool),
1111            database: Arc::new(db),
1112            graph_db: Arc::new(cognee_graph::MockGraphDB::new()),
1113            vector_db: Arc::new(cognee_vector::MockVectorDB::new()),
1114            cancellation: token,
1115            progress: ProgressToken::new(),
1116            pipeline_ctx: None,
1117            exec_status: Arc::new(NoopExecStatusManager),
1118            pipeline_watcher: None,
1119        })
1120    }
1121
1122    // ── Tests ────────────────────────────────────────────────────────────────
1123
1124    #[tokio::test]
1125    async fn parallel_runs_sync_tasks_concurrently() {
1126        // Two sync tasks: one doubles, one triples. Last result (triple) wins.
1127        let double = Task::sync_typed(|x: &i32, _ctx| Ok(Box::new(*x * 2)));
1128        let triple = Task::sync_typed(|x: &i32, _ctx| Ok(Box::new(*x * 3)));
1129
1130        let par = Task::parallel(vec![double, triple]);
1131        let input: Arc<dyn Value> = Arc::new(5_i32);
1132        let ctx = stub_ctx().await;
1133
1134        let call = par.call(input, ctx);
1135        let result = match call {
1136            TaskCall::Async(fut) => fut.await.unwrap(),
1137            _ => panic!("parallel should produce Async variant"),
1138        };
1139
1140        // Last task (triple) result: 5 * 3 = 15
1141        assert_eq!(*(*result).as_any().downcast_ref::<i32>().unwrap(), 15);
1142    }
1143
1144    #[tokio::test]
1145    async fn parallel_runs_async_tasks() {
1146        let add_ten = Task::async_fn_typed(|x: &i32, _ctx| {
1147            let v = *x + 10;
1148            Box::pin(async move { Ok(Box::new(v)) })
1149        });
1150        let add_twenty = Task::async_fn_typed(|x: &i32, _ctx| {
1151            let v = *x + 20;
1152            Box::pin(async move { Ok(Box::new(v)) })
1153        });
1154
1155        let par = Task::parallel(vec![add_ten, add_twenty]);
1156        let input: Arc<dyn Value> = Arc::new(100_i32);
1157        let ctx = stub_ctx().await;
1158
1159        let result = match par.call(input, ctx) {
1160            TaskCall::Async(fut) => fut.await.unwrap(),
1161            _ => panic!("expected Async"),
1162        };
1163
1164        // Last task: 100 + 20 = 120
1165        assert_eq!(*(*result).as_any().downcast_ref::<i32>().unwrap(), 120);
1166    }
1167
1168    #[tokio::test]
1169    async fn parallel_propagates_first_error() {
1170        let ok_task = Task::sync_typed(|x: &i32, _ctx| Ok(Box::new(*x)));
1171        let err_task = Task::Sync(Arc::new(|_input, _ctx| Err("boom".into())));
1172
1173        let par = Task::parallel(vec![ok_task, err_task]);
1174        let input: Arc<dyn Value> = Arc::new(42_i32);
1175        let ctx = stub_ctx().await;
1176
1177        let result = match par.call(input, ctx) {
1178            TaskCall::Async(fut) => fut.await,
1179            _ => panic!("expected Async"),
1180        };
1181
1182        let err = result.err().expect("should be an error");
1183        assert!(err.to_string().contains("boom"));
1184    }
1185
1186    #[tokio::test]
1187    async fn parallel_empty_returns_input() {
1188        let par = Task::parallel(vec![]);
1189        let input: Arc<dyn Value> = Arc::new(99_i32);
1190        let ctx = stub_ctx().await;
1191
1192        let result = match par.call(Arc::clone(&input), ctx) {
1193            TaskCall::Async(fut) => fut.await.unwrap(),
1194            _ => panic!("expected Async"),
1195        };
1196
1197        assert_eq!(*(*result).as_any().downcast_ref::<i32>().unwrap(), 99);
1198    }
1199
1200    #[tokio::test]
1201    async fn test_typed_task_panics_on_type_mismatch() {
1202        use std::panic::{AssertUnwindSafe, catch_unwind};
1203
1204        let task = Task::sync_typed(|_x: &String, _ctx| Ok(Box::new("ok".to_string())));
1205        let input: Arc<dyn Value> = Arc::new(42_i32); // wrong type
1206        let ctx = stub_ctx().await;
1207
1208        let result = catch_unwind(AssertUnwindSafe(|| task.call(input, ctx)));
1209
1210        let err = match result {
1211            Err(e) => e,
1212            Ok(_) => panic!("should have panicked on type mismatch"),
1213        };
1214        let msg = err
1215            .downcast_ref::<String>()
1216            .map(|s| s.as_str())
1217            .or_else(|| err.downcast_ref::<&str>().copied())
1218            .expect("panic payload should be a string");
1219        assert!(
1220            msg.contains("type mismatch"),
1221            "expected 'type mismatch' in panic message, got: {msg}"
1222        );
1223    }
1224
1225    #[test]
1226    fn test_taskinfo_weight_default() {
1227        let info = TaskInfo::new(Task::sync_typed(|_: &i32, _| Ok(Box::new(0_i32))));
1228        assert_eq!(info.weight, 1);
1229    }
1230
1231    #[test]
1232    fn test_taskinfo_with_weight() {
1233        let info = TaskInfo::new(Task::sync_typed(|_: &i32, _| Ok(Box::new(0_i32)))).with_weight(5);
1234        assert_eq!(info.weight, 5);
1235    }
1236
1237    #[test]
1238    fn task_info_parallel_generates_name() {
1239        let t1 =
1240            TaskInfo::new(Task::sync_typed(|_: &i32, _| Ok(Box::new(0_i32)))).with_name("classify");
1241        let t2 =
1242            TaskInfo::new(Task::sync_typed(|_: &i32, _| Ok(Box::new(0_i32)))).with_name("embed");
1243        let t3 = TaskInfo::new(Task::sync_typed(|_: &i32, _| Ok(Box::new(0_i32))));
1244
1245        let par = TaskInfo::parallel(vec![t1, t2, t3]);
1246        assert_eq!(
1247            par.name.as_deref(),
1248            Some("parallel([classify, embed, task_2])")
1249        );
1250    }
1251
1252    /// Tests for [`Task::python_task_type`] — the 8-variant Rust → 4-string
1253    /// Python label mapping consumed by the analytics emitters in
1254    /// `crates/core/src/pipeline.rs::call_with_retry()`.
1255    mod python_task_type {
1256        use super::*;
1257        use futures::stream;
1258
1259        #[test]
1260        fn sync_variant_maps_to_function() {
1261            let t = Task::sync_typed(|_: &i32, _| Ok(Box::new(0_i32)));
1262            assert_eq!(t.python_task_type(), "Function");
1263        }
1264
1265        #[test]
1266        fn sync_batch_variant_maps_to_function() {
1267            let t = Task::sync_batch_typed(|_: &[&i32], _| Ok(Box::new(0_i32)));
1268            assert_eq!(t.python_task_type(), "Function");
1269        }
1270
1271        #[test]
1272        fn async_variant_maps_to_coroutine() {
1273            let t = Task::async_fn_typed(|_: &i32, _| Box::pin(async move { Ok(Box::new(0_i32)) }));
1274            assert_eq!(t.python_task_type(), "Coroutine");
1275        }
1276
1277        #[test]
1278        fn async_batch_variant_maps_to_coroutine() {
1279            let t = Task::async_batch_typed(|_: &[&i32], _| {
1280                Box::pin(async move { Ok(Box::new(0_i32)) })
1281            });
1282            assert_eq!(t.python_task_type(), "Coroutine");
1283        }
1284
1285        #[test]
1286        fn sync_iter_variant_maps_to_generator() {
1287            let t = Task::sync_iter_typed(|_: &i32, _| Ok(std::iter::empty::<Box<i32>>()));
1288            assert_eq!(t.python_task_type(), "Generator");
1289        }
1290
1291        #[test]
1292        fn sync_iter_batch_variant_maps_to_generator() {
1293            let t = Task::sync_iter_batch_typed(|_: &[&i32], _| Ok(std::iter::empty::<Box<i32>>()));
1294            assert_eq!(t.python_task_type(), "Generator");
1295        }
1296
1297        #[test]
1298        fn async_stream_variant_maps_to_async_generator() {
1299            let t = Task::async_stream_typed(|_: &i32, _| Ok(stream::empty::<Box<i32>>()));
1300            assert_eq!(t.python_task_type(), "Async Generator");
1301        }
1302
1303        #[test]
1304        fn async_stream_batch_variant_maps_to_async_generator() {
1305            let t = Task::async_stream_batch_typed(|_: &[&i32], _| Ok(stream::empty::<Box<i32>>()));
1306            assert_eq!(t.python_task_type(), "Async Generator");
1307        }
1308
1309        #[test]
1310        fn covers_all_eight_variants_with_four_distinct_labels() {
1311            let labels: std::collections::HashSet<&'static str> = [
1312                Task::sync_typed(|_: &i32, _| Ok(Box::new(0_i32))).python_task_type(),
1313                Task::sync_batch_typed(|_: &[&i32], _| Ok(Box::new(0_i32))).python_task_type(),
1314                Task::async_fn_typed(|_: &i32, _| Box::pin(async move { Ok(Box::new(0_i32)) }))
1315                    .python_task_type(),
1316                Task::async_batch_typed(|_: &[&i32], _| {
1317                    Box::pin(async move { Ok(Box::new(0_i32)) })
1318                })
1319                .python_task_type(),
1320                Task::sync_iter_typed(|_: &i32, _| Ok(std::iter::empty::<Box<i32>>()))
1321                    .python_task_type(),
1322                Task::sync_iter_batch_typed(|_: &[&i32], _| Ok(std::iter::empty::<Box<i32>>()))
1323                    .python_task_type(),
1324                Task::async_stream_typed(|_: &i32, _| Ok(stream::empty::<Box<i32>>()))
1325                    .python_task_type(),
1326                Task::async_stream_batch_typed(|_: &[&i32], _| Ok(stream::empty::<Box<i32>>()))
1327                    .python_task_type(),
1328            ]
1329            .into_iter()
1330            .collect();
1331
1332            assert_eq!(
1333                labels.len(),
1334                4,
1335                "expected exactly 4 distinct Python task-type labels, got {labels:?}"
1336            );
1337            assert!(labels.contains("Function"));
1338            assert!(labels.contains("Coroutine"));
1339            assert!(labels.contains("Generator"));
1340            assert!(labels.contains("Async Generator"));
1341        }
1342    }
1343}