Struct Pipeline

Source
pub struct Pipeline { /* private fields */ }
Expand description

A pipeline for indexing files, adding metadata, chunking, transforming, embedding, and then storing them.

The Pipeline struct orchestrates the entire file indexing process. It is designed to be flexible and performant, allowing for various stages of data transformation and storage to be configured and executed asynchronously.

§Fields

  • stream - The stream of Node items to be processed.
  • storage - Optional storage backend where the processed nodes will be stored.
  • concurrency - The level of concurrency for processing nodes.

Implementations§

Source§

impl Pipeline

Source

pub fn from_loader(loader: impl Loader + 'static) -> Self

Creates a Pipeline from a given loader.

§Arguments
  • loader - A loader that implements the Loader trait.
§Returns

An instance of Pipeline initialized with the provided loader.

Source

pub fn with_default_llm_client( self, client: impl SimplePrompt + 'static, ) -> Self

Sets the default LLM client to be used for LLM prompts for all transformers in the pipeline.

Source

pub fn from_stream(stream: impl Into<IndexingStream>) -> Self

Creates a Pipeline from a given stream.

§Arguments
  • stream - An IndexingStream containing the nodes to be processed.
§Returns

An instance of Pipeline initialized with the provided stream.

Source

pub fn with_concurrency(self, concurrency: usize) -> Self

Sets the concurrency level for the pipeline. By default the concurrency is set to the number of cpus.

§Arguments
  • concurrency - The desired level of concurrency.
§Returns

An instance of Pipeline with the updated concurrency level.

Source

pub fn with_embed_mode(self, embed_mode: EmbedMode) -> Self

Sets the embed mode for the pipeline. The embed mode controls what (combination) fields of a Node be embedded with a vector when transforming with crate::transformers::Embed

See also swiftide_core::indexing::EmbedMode.

§Arguments
  • embed_mode - The desired embed mode.
§Returns

An instance of Pipeline with the updated embed mode.

Source

pub fn filter_cached(self, cache: impl NodeCache + 'static) -> Self

Filters out cached nodes using the provided cache.

§Arguments
  • cache - A cache that implements the NodeCache trait.
§Returns

An instance of Pipeline with the updated stream that filters out cached nodes.

Source

pub fn then( self, transformer: impl Transformer + WithIndexingDefaults + 'static, ) -> Self

Adds a transformer to the pipeline.

Closures can also be provided as transformers.

§Arguments
  • transformer - A transformer that implements the Transformer trait.
§Returns

An instance of Pipeline with the updated stream that applies the transformer to each node.

Source

pub fn then_in_batch( self, transformer: impl BatchableTransformer + WithBatchIndexingDefaults + 'static, ) -> Self

Adds a batch transformer to the pipeline.

If the transformer has a batch size set, the batch size from the transformer is used, otherwise the pipeline default batch size ([DEFAULT_BATCH_SIZE]).

§Arguments
  • transformer - A transformer that implements the BatchableTransformer trait.
§Returns

An instance of Pipeline with the updated stream that applies the batch transformer to each batch of nodes.

Source

pub fn then_chunk(self, chunker: impl ChunkerTransformer + 'static) -> Self

Adds a chunker transformer to the pipeline.

§Arguments
  • chunker - A transformer that implements the ChunkerTransformer trait.
§Returns

An instance of Pipeline with the updated stream that applies the chunker transformer to each node.

Source

pub fn then_store_with(self, storage: impl Persist + 'static) -> Self

Persists indexing nodes using the provided storage backend.

§Arguments
  • storage - A storage backend that implements the Storage trait.
§Returns

An instance of Pipeline with the configured storage backend.

§Panics

Panics if batch size turns out to be not set and batch storage is still invoked. Pipeline only invokes batch storing if the batch size is set, so should be alright.

Source

pub fn split_by<P>(self, predicate: P) -> (Self, Self)
where P: Fn(&Result<Node>) -> bool + Send + Sync + 'static,

Splits the stream into two streams based on a predicate.

Note that this is not lazy. It will start consuming the stream immediately and send each item to the left or right stream based on the predicate.

The other streams have a buffer, but should be started as soon as possible. The channels of the resulting streams are bounded and the parent stream will panic if sending fails.

They can either be run concurrently, alternated between or merged back together.

§Panics

Panics if the receiving pipelines buffers are full or unavailable.

Source

pub fn merge(self, other: Self) -> Self

Merges two streams into one

This is useful for merging two streams that have been split using the split_by method.

The full stream can then be processed using the run method.

Source

pub fn throttle(self, duration: impl Into<Duration>) -> Self

Throttles the stream of nodes, limiting the rate to 1 per duration.

Useful for rate limiting the indexing pipeline. Uses tokio_stream::StreamExt::throttle internally which has a granualarity of 1ms.

Source

pub fn filter_errors(self) -> Self

Source

pub fn filter<F>(self, filter: F) -> Self
where F: Fn(&Result<Node>) -> bool + Send + Sync + 'static,

Provide a closure to selectively filter nodes or errors

This allows you to skip specific errors or nodes, or do ad hoc inspection.

If the closure returns true, the result is kept, otherwise it is skipped.

Source

pub fn log_all(self) -> Self

Logs all results processed by the pipeline.

This method logs all results processed by the pipeline at the DEBUG level.

Source

pub fn log_errors(self) -> Self

Logs all errors encountered by the pipeline.

This method logs all errors encountered by the pipeline at the ERROR level.

Source

pub fn log_nodes(self) -> Self

Logs all nodes processed by the pipeline.

This method logs all nodes processed by the pipeline at the DEBUG level.

Source

pub async fn run(self) -> Result<()>

Runs the indexing pipeline.

This method processes the stream of nodes, applying all configured transformations and storing the results.

§Returns

A Result indicating the success or failure of the pipeline execution.

§Errors

Returns an error if no storage backend is configured or if any stage of the pipeline fails.

Trait Implementations§

Source§

impl Default for Pipeline

Source§

fn default() -> Self

Creates a default Pipeline with an empty stream, no storage, and a concurrency level equal to the number of CPUs.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,