pub struct Pipeline { /* private fields */ }
Expand description
A pipeline for indexing files, adding metadata, chunking, transforming, embedding, and then storing them.
The Pipeline
struct orchestrates the entire file indexing process. It is designed to be
flexible and performant, allowing for various stages of data transformation and storage to be
configured and executed asynchronously.
§Fields
stream
- The stream ofNode
items to be processed.storage
- Optional storage backend where the processed nodes will be stored.concurrency
- The level of concurrency for processing nodes.
Implementations§
Source§impl Pipeline
impl Pipeline
Sourcepub fn from_loader(loader: impl Loader + 'static) -> Self
pub fn from_loader(loader: impl Loader + 'static) -> Self
Sourcepub fn with_default_llm_client(
self,
client: impl SimplePrompt + 'static,
) -> Self
pub fn with_default_llm_client( self, client: impl SimplePrompt + 'static, ) -> Self
Sets the default LLM client to be used for LLM prompts for all transformers in the pipeline.
Sourcepub fn from_stream(stream: impl Into<IndexingStream>) -> Self
pub fn from_stream(stream: impl Into<IndexingStream>) -> Self
Sourcepub fn with_concurrency(self, concurrency: usize) -> Self
pub fn with_concurrency(self, concurrency: usize) -> Self
Sourcepub fn with_embed_mode(self, embed_mode: EmbedMode) -> Self
pub fn with_embed_mode(self, embed_mode: EmbedMode) -> Self
Sets the embed mode for the pipeline. The embed mode controls what (combination) fields of a
Node
be embedded with a vector when transforming with crate::transformers::Embed
See also swiftide_core::indexing::EmbedMode
.
§Arguments
embed_mode
- The desired embed mode.
§Returns
An instance of Pipeline
with the updated embed mode.
Sourcepub fn filter_cached(self, cache: impl NodeCache + 'static) -> Self
pub fn filter_cached(self, cache: impl NodeCache + 'static) -> Self
Sourcepub fn then(
self,
transformer: impl Transformer + WithIndexingDefaults + 'static,
) -> Self
pub fn then( self, transformer: impl Transformer + WithIndexingDefaults + 'static, ) -> Self
Sourcepub fn then_in_batch(
self,
transformer: impl BatchableTransformer + WithBatchIndexingDefaults + 'static,
) -> Self
pub fn then_in_batch( self, transformer: impl BatchableTransformer + WithBatchIndexingDefaults + 'static, ) -> Self
Adds a batch transformer to the pipeline.
If the transformer has a batch size set, the batch size from the transformer is used,
otherwise the pipeline default batch size ([DEFAULT_BATCH_SIZE
]).
§Arguments
transformer
- A transformer that implements theBatchableTransformer
trait.
§Returns
An instance of Pipeline
with the updated stream that applies the batch transformer to each
batch of nodes.
Sourcepub fn then_chunk(self, chunker: impl ChunkerTransformer + 'static) -> Self
pub fn then_chunk(self, chunker: impl ChunkerTransformer + 'static) -> Self
Sourcepub fn then_store_with(self, storage: impl Persist + 'static) -> Self
pub fn then_store_with(self, storage: impl Persist + 'static) -> Self
Persists indexing nodes using the provided storage backend.
§Arguments
storage
- A storage backend that implements theStorage
trait.
§Returns
An instance of Pipeline
with the configured storage backend.
§Panics
Panics if batch size turns out to be not set and batch storage is still invoked. Pipeline only invokes batch storing if the batch size is set, so should be alright.
Sourcepub fn split_by<P>(self, predicate: P) -> (Self, Self)
pub fn split_by<P>(self, predicate: P) -> (Self, Self)
Splits the stream into two streams based on a predicate.
Note that this is not lazy. It will start consuming the stream immediately and send each item to the left or right stream based on the predicate.
The other streams have a buffer, but should be started as soon as possible. The channels of the resulting streams are bounded and the parent stream will panic if sending fails.
They can either be run concurrently, alternated between or merged back together.
§Panics
Panics if the receiving pipelines buffers are full or unavailable.
Sourcepub fn merge(self, other: Self) -> Self
pub fn merge(self, other: Self) -> Self
Merges two streams into one
This is useful for merging two streams that have been split using the split_by
method.
The full stream can then be processed using the run
method.
Sourcepub fn throttle(self, duration: impl Into<Duration>) -> Self
pub fn throttle(self, duration: impl Into<Duration>) -> Self
Throttles the stream of nodes, limiting the rate to 1 per duration.
Useful for rate limiting the indexing pipeline. Uses tokio_stream::StreamExt::throttle
internally which has a granualarity of 1ms.
pub fn filter_errors(self) -> Self
Sourcepub fn filter<F>(self, filter: F) -> Self
pub fn filter<F>(self, filter: F) -> Self
Provide a closure to selectively filter nodes or errors
This allows you to skip specific errors or nodes, or do ad hoc inspection.
If the closure returns true, the result is kept, otherwise it is skipped.
Sourcepub fn log_all(self) -> Self
pub fn log_all(self) -> Self
Logs all results processed by the pipeline.
This method logs all results processed by the pipeline at the DEBUG
level.
Sourcepub fn log_errors(self) -> Self
pub fn log_errors(self) -> Self
Logs all errors encountered by the pipeline.
This method logs all errors encountered by the pipeline at the ERROR
level.
Sourcepub fn log_nodes(self) -> Self
pub fn log_nodes(self) -> Self
Logs all nodes processed by the pipeline.
This method logs all nodes processed by the pipeline at the DEBUG
level.
Sourcepub async fn run(self) -> Result<()>
pub async fn run(self) -> Result<()>
Runs the indexing pipeline.
This method processes the stream of nodes, applying all configured transformations and storing the results.
§Returns
A Result
indicating the success or failure of the pipeline execution.
§Errors
Returns an error if no storage backend is configured or if any stage of the pipeline fails.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Pipeline
impl !RefUnwindSafe for Pipeline
impl Send for Pipeline
impl !Sync for Pipeline
impl Unpin for Pipeline
impl !UnwindSafe for Pipeline
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more