1use async_trait::async_trait;
2use base64::engine::general_purpose::STANDARD as BASE64;
3use base64::Engine;
4use std::collections::HashMap;
5
6use kapsl_rag_sdk::types::DocumentPayload;
7
8use crate::vector::{AccessControl, EmbeddedChunk, VectorStore, VectorStoreError};
9
10#[derive(thiserror::Error, Debug)]
11pub enum IngestionError {
12 #[error("invalid input: {0}")]
13 InvalidInput(String),
14 #[error("parse error: {0}")]
15 Parse(String),
16 #[error("embedding error: {0}")]
17 Embedding(String),
18 #[error("store error: {0}")]
19 Store(String),
20}
21
22impl From<VectorStoreError> for IngestionError {
23 fn from(err: VectorStoreError) -> Self {
24 IngestionError::Store(err.to_string())
25 }
26}
27
28#[derive(Debug, Clone)]
29pub struct ParsedDocument {
30 pub id: String,
31 pub text: String,
32 pub metadata: HashMap<String, String>,
33 pub content_type: String,
34}
35
36#[derive(Debug, Clone)]
37pub struct Chunk {
38 pub id: String,
39 pub index: i64,
40 pub text: String,
41 pub metadata: HashMap<String, String>,
42}
43
44#[async_trait]
45pub trait DocumentParser: Send + Sync {
46 async fn parse(&self, payload: &DocumentPayload) -> Result<ParsedDocument, IngestionError>;
47}
48
49#[async_trait]
50pub trait Chunker: Send + Sync {
51 async fn chunk(&self, document: &ParsedDocument) -> Result<Vec<Chunk>, IngestionError>;
52}
53
54#[async_trait]
55pub trait Embedder: Send + Sync {
56 async fn embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, IngestionError>;
57}
58
59pub struct IngestionContext {
60 pub tenant_id: String,
61 pub workspace_id: String,
62 pub source_id: String,
63 pub doc_id: String,
64 pub acl: AccessControl,
65}
66
67pub struct IngestionPipeline<P, C, E, V>
68where
69 P: DocumentParser,
70 C: Chunker,
71 E: Embedder,
72 V: VectorStore,
73{
74 parser: P,
75 chunker: C,
76 embedder: E,
77 vector_store: V,
78}
79
80impl<P, C, E, V> IngestionPipeline<P, C, E, V>
81where
82 P: DocumentParser,
83 C: Chunker,
84 E: Embedder,
85 V: VectorStore,
86{
87 pub fn new(parser: P, chunker: C, embedder: E, vector_store: V) -> Self {
88 Self {
89 parser,
90 chunker,
91 embedder,
92 vector_store,
93 }
94 }
95
96 pub async fn ingest(
97 &self,
98 ctx: IngestionContext,
99 payload: DocumentPayload,
100 ) -> Result<(), IngestionError> {
101 let parsed = self.parser.parse(&payload).await?;
102 let chunks = self.chunker.chunk(&parsed).await?;
103 if chunks.is_empty() {
104 return Ok(());
105 }
106 let texts: Vec<String> = chunks.iter().map(|c| c.text.clone()).collect();
107 let embeddings = self.embedder.embed(&texts).await?;
108 if embeddings.len() != chunks.len() {
109 return Err(IngestionError::Embedding(
110 "embedding count mismatch".to_string(),
111 ));
112 }
113
114 let mut embedded = Vec::with_capacity(chunks.len());
115 for (chunk, embedding) in chunks.into_iter().zip(embeddings.into_iter()) {
116 embedded.push(EmbeddedChunk {
117 id: chunk.id,
118 tenant_id: ctx.tenant_id.clone(),
119 workspace_id: ctx.workspace_id.clone(),
120 source_id: ctx.source_id.clone(),
121 doc_id: ctx.doc_id.clone(),
122 chunk_index: chunk.index,
123 text: chunk.text,
124 embedding,
125 metadata: chunk.metadata,
126 acl: ctx.acl.clone(),
127 });
128 }
129
130 self.vector_store.upsert(embedded).await?;
131 Ok(())
132 }
133}
134
135pub struct PlainTextParser;
136
137#[async_trait]
138impl DocumentParser for PlainTextParser {
139 async fn parse(&self, payload: &DocumentPayload) -> Result<ParsedDocument, IngestionError> {
140 let bytes = BASE64
141 .decode(&payload.bytes_b64)
142 .map_err(|e| IngestionError::Parse(e.to_string()))?;
143 let text = String::from_utf8(bytes).map_err(|e| IngestionError::Parse(e.to_string()))?;
144 Ok(ParsedDocument {
145 id: payload.id.clone(),
146 text,
147 metadata: payload.metadata.clone(),
148 content_type: payload.content_type.clone(),
149 })
150 }
151}
152
153pub struct SimpleChunker {
154 pub chunk_size: usize,
155 pub overlap: usize,
156}
157
158#[async_trait]
159impl Chunker for SimpleChunker {
160 async fn chunk(&self, document: &ParsedDocument) -> Result<Vec<Chunk>, IngestionError> {
161 let tokens: Vec<&str> = document.text.split_whitespace().collect();
162 if tokens.is_empty() {
163 return Ok(Vec::new());
164 }
165 let mut chunks = Vec::new();
166 let mut start = 0usize;
167 let mut index = 0i64;
168 while start < tokens.len() {
169 let end = (start + self.chunk_size).min(tokens.len());
170 let slice = tokens[start..end].join(" ");
171 let mut metadata = document.metadata.clone();
172 metadata.insert("chunk_index".to_string(), index.to_string());
173 chunks.push(Chunk {
174 id: format!("{}:{}", document.id, index),
175 index,
176 text: slice,
177 metadata,
178 });
179 if end == tokens.len() {
180 break;
181 }
182 let overlap = self.overlap.min(self.chunk_size);
183 start = end.saturating_sub(overlap);
184 index += 1;
185 }
186 Ok(chunks)
187 }
188}
189
190pub struct DummyEmbedder {
191 pub dimension: usize,
192}
193
194#[async_trait]
195impl Embedder for DummyEmbedder {
196 async fn embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, IngestionError> {
197 let mut embeddings = Vec::with_capacity(texts.len());
198 for _ in texts {
199 embeddings.push(vec![0.0; self.dimension]);
200 }
201 Ok(embeddings)
202 }
203}