pub enum Task {
Sync(SyncFn),
Async(AsyncFn),
SyncIter(SyncIterFn),
AsyncStream(AsyncStreamFn),
SyncBatch(SyncBatchFn),
AsyncBatch(AsyncBatchFn),
SyncIterBatch(SyncIterBatchFn),
AsyncStreamBatch(AsyncStreamBatchFn),
}Expand description
A single reusable unit of work in a cognee pipeline.
| Variant | Execution | Input | Output |
|---|---|---|---|
Task::Sync | blocking | single value | single value |
Task::Async | non-blocking | single value | single value |
Task::SyncIter | blocking | single value | lazy iterator |
Task::AsyncStream | non-blocking | single value | async stream |
Task::SyncBatch | blocking | slice of values | single value |
Task::AsyncBatch | non-blocking | slice of values | single value |
Task::SyncIterBatch | blocking | slice of values | lazy iterator |
Task::AsyncStreamBatch | non-blocking | slice of values | async stream |
Single-value variants are called once per item. Batch variants receive a
&[Box<dyn Value>] slice of items accumulated up to the task’s batch_size.
The pipeline executor detects which kind the next task is and routes
accordingly.
Variants§
Sync(SyncFn)
Async(AsyncFn)
SyncIter(SyncIterFn)
AsyncStream(AsyncStreamFn)
SyncBatch(SyncBatchFn)
AsyncBatch(AsyncBatchFn)
SyncIterBatch(SyncIterBatchFn)
AsyncStreamBatch(AsyncStreamBatchFn)
Implementations§
Source§impl Task
impl Task
Sourcepub fn is_batch(&self) -> bool
pub fn is_batch(&self) -> bool
Returns true if this task accepts a batch slice rather than a single value.
Sourcepub fn python_task_type(&self) -> &'static str
pub fn python_task_type(&self) -> &'static str
Python-compat label used in the ${task_type} Task Started/ Completed/Errored analytics event names.
Mirrors Python’s
tasks/task.py:194-207
inspect.isasyncgenfunction / iscoroutinefunction branch:
| Rust variant | Python label |
|---|---|
Task::Sync, Task::SyncBatch | "Function" |
Task::Async, Task::AsyncBatch | "Coroutine" |
Task::SyncIter, Task::SyncIterBatch | "Generator" |
Task::AsyncStream, Task::AsyncStreamBatch | "Async Generator" |
The match is intentionally exhaustive (no wildcard arm) so that
adding a new Task::* variant fails the build until the
analytics mapping is decided.
Source§impl Task
impl Task
Sourcepub fn sync<F>(f: F) -> Self
pub fn sync<F>(f: F) -> Self
Create a Task::Sync from a raw closure.
Sourcepub fn async_fn<F>(f: F) -> Self
pub fn async_fn<F>(f: F) -> Self
Create a Task::Async from a raw closure returning a BoxFuture.
Sourcepub fn sync_iter<F>(f: F) -> Self
pub fn sync_iter<F>(f: F) -> Self
Create a Task::SyncIter from a raw closure returning a ValueIter.
Sourcepub fn async_stream<F>(f: F) -> Selfwhere
F: Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<ValueStream, TaskError> + Send + Sync + 'static,
pub fn async_stream<F>(f: F) -> Selfwhere
F: Fn(Arc<dyn Value>, Arc<TaskContext>) -> Result<ValueStream, TaskError> + Send + Sync + 'static,
Create a Task::AsyncStream from a raw closure returning a
ValueStream.
Sourcepub fn sync_batch<F>(f: F) -> Self
pub fn sync_batch<F>(f: F) -> Self
Create a Task::SyncBatch from a raw closure.
Sourcepub fn async_batch<F>(f: F) -> Self
pub fn async_batch<F>(f: F) -> Self
Create a Task::AsyncBatch from a raw closure returning a BoxFuture.
Sourcepub fn sync_iter_batch<F>(f: F) -> Self
pub fn sync_iter_batch<F>(f: F) -> Self
Create a Task::SyncIterBatch from a raw closure returning a ValueIter.
Sourcepub fn async_stream_batch<F>(f: F) -> Selfwhere
F: for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<ValueStream, TaskError> + Send + Sync + 'static,
pub fn async_stream_batch<F>(f: F) -> Selfwhere
F: for<'a> Fn(&'a [Box<dyn Value>], Arc<TaskContext>) -> Result<ValueStream, TaskError> + Send + Sync + 'static,
Create a Task::AsyncStreamBatch from a raw closure returning a ValueStream.
Sourcepub fn sync_typed<I, O, F>(f: F) -> Self
pub fn sync_typed<I, O, F>(f: F) -> Self
Create a Task::Sync from a typed closure.
Task::sync_typed(|input: &MyInput, ctx| {
Ok(Box::new(process(input)))
})Sourcepub fn async_fn_typed<I, O, F>(f: F) -> Self
pub fn async_fn_typed<I, O, F>(f: F) -> Self
Create a Task::Async from a typed closure returning a 'static
future.
Data needed inside the async block must be copied/cloned before it:
Task::async_fn_typed(|input: &MyInput, ctx| {
let id = input.id; // copy before async block
Box::pin(async move {
Ok(Box::new(fetch(id).await?))
})
})Sourcepub fn sync_iter_typed<I, O, F, Iter>(f: F) -> Self
pub fn sync_iter_typed<I, O, F, Iter>(f: F) -> Self
Create a Task::SyncIter from a typed closure returning a concrete
iterator. The iterator must be 'static (may not borrow the input).
Task::sync_iter_typed(|input: &Document, ctx| {
let chunks = split(input.text.clone());
Ok(chunks.into_iter().map(Box::new))
})Sourcepub fn async_stream_typed<I, O, F, S>(f: F) -> Self
pub fn async_stream_typed<I, O, F, S>(f: F) -> Self
Create a Task::AsyncStream from a typed closure returning a concrete
stream. The stream must be 'static.
Task::async_stream_typed(|input: &DatasetId, ctx| {
let id = *input;
Ok(stream_chunks(id))
})Sourcepub fn sync_batch_typed<I, O, F>(f: F) -> Self
pub fn sync_batch_typed<I, O, F>(f: F) -> Self
Create a Task::SyncBatch from a typed closure receiving &[&I].
Task::sync_batch_typed(|chunks: &[&DocumentChunk], ctx| {
Ok(Box::new(embed_all(chunks)))
})Sourcepub fn async_batch_typed<I, O, F>(f: F) -> Self
pub fn async_batch_typed<I, O, F>(f: F) -> Self
Create a Task::AsyncBatch from a typed closure returning a 'static future.
Data needed inside the async block must be copied/cloned before it:
Task::async_batch_typed(|chunks: &[&DocumentChunk], ctx| {
let texts: Vec<String> = chunks.iter().map(|c| c.text.clone()).collect();
Box::pin(async move {
Ok(Box::new(embed_batch(texts).await?))
})
})Sourcepub fn sync_iter_batch_typed<I, O, F, Iter>(f: F) -> Self
pub fn sync_iter_batch_typed<I, O, F, Iter>(f: F) -> Self
Create a Task::SyncIterBatch from a typed closure returning a concrete iterator.
Sourcepub fn async_stream_batch_typed<I, O, F, S>(f: F) -> Self
pub fn async_stream_batch_typed<I, O, F, S>(f: F) -> Self
Create a Task::AsyncStreamBatch from a typed closure returning a concrete stream.
Sourcepub fn call(&self, input: Arc<dyn Value>, ctx: Arc<TaskContext>) -> TaskCall
pub fn call(&self, input: Arc<dyn Value>, ctx: Arc<TaskContext>) -> TaskCall
Call this task with a single input value.
Panics if called on a batch variant — use Task::call_batch for those.
The task is Fn, so &self suffices — the same task object handles
every input in a fan-out scenario and every retry attempt.
Sourcepub fn parallel(tasks: Vec<Task>) -> Self
pub fn parallel(tasks: Vec<Task>) -> Self
Build a task that runs multiple sub-tasks concurrently on the same input.
Semantics (matching Python run_tasks_parallel):
- Each sub-task receives
Arc::clone(&input)and the shared context. - All sub-tasks run concurrently via
futures::future::join_all. - If any sub-task fails, the whole parallel task fails with that error.
- On success, returns the result of the last sub-task (by position).
Only single-value (Sync / Async) sub-tasks are supported. Iter/stream
sub-tasks inside a parallel group don’t have well-defined “last result”
semantics and will panic at call time.
Sourcepub fn call_batch(
&self,
items: &[Box<dyn Value>],
ctx: Arc<TaskContext>,
) -> TaskCall
pub fn call_batch( &self, items: &[Box<dyn Value>], ctx: Arc<TaskContext>, ) -> TaskCall
Call this batch task with a slice of accumulated values.
Panics if called on a single-value variant — use Task::call for those.
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for Task
impl !UnwindSafe for Task
impl Freeze for Task
impl Send for Task
impl Sync for Task
impl Unpin for Task
impl UnsafeUnpin for Task
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> Read<Exclusive, BecauseExclusive> for Twhere
T: ?Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.