Skip to main content

kapsl_rag/ingestion/
mod.rs

1use async_trait::async_trait;
2use base64::engine::general_purpose::STANDARD as BASE64;
3use base64::Engine;
4use std::collections::HashMap;
5
6use kapsl_rag_sdk::types::DocumentPayload;
7
8use crate::vector::{AccessControl, EmbeddedChunk, VectorStore, VectorStoreError};
9
10#[derive(thiserror::Error, Debug)]
11pub enum IngestionError {
12    #[error("invalid input: {0}")]
13    InvalidInput(String),
14    #[error("parse error: {0}")]
15    Parse(String),
16    #[error("embedding error: {0}")]
17    Embedding(String),
18    #[error("store error: {0}")]
19    Store(String),
20}
21
22impl From<VectorStoreError> for IngestionError {
23    fn from(err: VectorStoreError) -> Self {
24        IngestionError::Store(err.to_string())
25    }
26}
27
28#[derive(Debug, Clone)]
29pub struct ParsedDocument {
30    pub id: String,
31    pub text: String,
32    pub metadata: HashMap<String, String>,
33    pub content_type: String,
34}
35
36#[derive(Debug, Clone)]
37pub struct Chunk {
38    pub id: String,
39    pub index: i64,
40    pub text: String,
41    pub metadata: HashMap<String, String>,
42}
43
44#[async_trait]
45pub trait DocumentParser: Send + Sync {
46    async fn parse(&self, payload: &DocumentPayload) -> Result<ParsedDocument, IngestionError>;
47}
48
49#[async_trait]
50pub trait Chunker: Send + Sync {
51    async fn chunk(&self, document: &ParsedDocument) -> Result<Vec<Chunk>, IngestionError>;
52}
53
54#[async_trait]
55pub trait Embedder: Send + Sync {
56    async fn embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, IngestionError>;
57}
58
59pub struct IngestionContext {
60    pub tenant_id: String,
61    pub workspace_id: String,
62    pub source_id: String,
63    pub doc_id: String,
64    pub acl: AccessControl,
65}
66
67pub struct IngestionPipeline<P, C, E, V>
68where
69    P: DocumentParser,
70    C: Chunker,
71    E: Embedder,
72    V: VectorStore,
73{
74    parser: P,
75    chunker: C,
76    embedder: E,
77    vector_store: V,
78}
79
80impl<P, C, E, V> IngestionPipeline<P, C, E, V>
81where
82    P: DocumentParser,
83    C: Chunker,
84    E: Embedder,
85    V: VectorStore,
86{
87    pub fn new(parser: P, chunker: C, embedder: E, vector_store: V) -> Self {
88        Self {
89            parser,
90            chunker,
91            embedder,
92            vector_store,
93        }
94    }
95
96    pub async fn ingest(
97        &self,
98        ctx: IngestionContext,
99        payload: DocumentPayload,
100    ) -> Result<(), IngestionError> {
101        let parsed = self.parser.parse(&payload).await?;
102        let chunks = self.chunker.chunk(&parsed).await?;
103        if chunks.is_empty() {
104            return Ok(());
105        }
106        let texts: Vec<String> = chunks.iter().map(|c| c.text.clone()).collect();
107        let embeddings = self.embedder.embed(&texts).await?;
108        if embeddings.len() != chunks.len() {
109            return Err(IngestionError::Embedding(
110                "embedding count mismatch".to_string(),
111            ));
112        }
113
114        let mut embedded = Vec::with_capacity(chunks.len());
115        for (chunk, embedding) in chunks.into_iter().zip(embeddings.into_iter()) {
116            embedded.push(EmbeddedChunk {
117                id: chunk.id,
118                tenant_id: ctx.tenant_id.clone(),
119                workspace_id: ctx.workspace_id.clone(),
120                source_id: ctx.source_id.clone(),
121                doc_id: ctx.doc_id.clone(),
122                chunk_index: chunk.index,
123                text: chunk.text,
124                embedding,
125                metadata: chunk.metadata,
126                acl: ctx.acl.clone(),
127            });
128        }
129
130        self.vector_store.upsert(embedded).await?;
131        Ok(())
132    }
133}
134
135pub struct PlainTextParser;
136
137#[async_trait]
138impl DocumentParser for PlainTextParser {
139    async fn parse(&self, payload: &DocumentPayload) -> Result<ParsedDocument, IngestionError> {
140        let bytes = BASE64
141            .decode(&payload.bytes_b64)
142            .map_err(|e| IngestionError::Parse(e.to_string()))?;
143        let text = String::from_utf8(bytes).map_err(|e| IngestionError::Parse(e.to_string()))?;
144        Ok(ParsedDocument {
145            id: payload.id.clone(),
146            text,
147            metadata: payload.metadata.clone(),
148            content_type: payload.content_type.clone(),
149        })
150    }
151}
152
153pub struct SimpleChunker {
154    pub chunk_size: usize,
155    pub overlap: usize,
156}
157
158#[async_trait]
159impl Chunker for SimpleChunker {
160    async fn chunk(&self, document: &ParsedDocument) -> Result<Vec<Chunk>, IngestionError> {
161        let tokens: Vec<&str> = document.text.split_whitespace().collect();
162        if tokens.is_empty() {
163            return Ok(Vec::new());
164        }
165        let mut chunks = Vec::new();
166        let mut start = 0usize;
167        let mut index = 0i64;
168        while start < tokens.len() {
169            let end = (start + self.chunk_size).min(tokens.len());
170            let slice = tokens[start..end].join(" ");
171            let mut metadata = document.metadata.clone();
172            metadata.insert("chunk_index".to_string(), index.to_string());
173            chunks.push(Chunk {
174                id: format!("{}:{}", document.id, index),
175                index,
176                text: slice,
177                metadata,
178            });
179            if end == tokens.len() {
180                break;
181            }
182            let overlap = self.overlap.min(self.chunk_size);
183            start = end.saturating_sub(overlap);
184            index += 1;
185        }
186        Ok(chunks)
187    }
188}
189
190pub struct DummyEmbedder {
191    pub dimension: usize,
192}
193
194#[async_trait]
195impl Embedder for DummyEmbedder {
196    async fn embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, IngestionError> {
197        let mut embeddings = Vec::with_capacity(texts.len());
198        for _ in texts {
199            embeddings.push(vec![0.0; self.dimension]);
200        }
201        Ok(embeddings)
202    }
203}