ruvector_data_framework/
ingester.rs1use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use tokio::sync::mpsc;
9
10use crate::{DataRecord, DataSource, FrameworkError, Result};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct IngestionConfig {
15 pub batch_size: usize,
17
18 pub max_concurrent: usize,
20
21 pub retry_count: u32,
23
24 pub retry_delay_ms: u64,
26
27 pub deduplicate: bool,
29
30 pub rate_limit: u32,
32}
33
34impl Default for IngestionConfig {
35 fn default() -> Self {
36 Self {
37 batch_size: 1000,
38 max_concurrent: 4,
39 retry_count: 3,
40 retry_delay_ms: 1000,
41 deduplicate: true,
42 rate_limit: 10,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SourceConfig {
50 pub source_id: String,
52
53 pub base_url: String,
55
56 pub api_key: Option<String>,
58
59 pub headers: HashMap<String, String>,
61
62 pub params: HashMap<String, String>,
64}
65
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
68pub struct IngestionStats {
69 pub records_fetched: u64,
71
72 pub batches_processed: u64,
74
75 pub retries: u64,
77
78 pub errors: u64,
80
81 pub duplicates_skipped: u64,
83
84 pub bytes_downloaded: u64,
86
87 pub avg_batch_time_ms: f64,
89}
90
91pub struct DataIngester {
93 config: IngestionConfig,
94 stats: Arc<std::sync::RwLock<IngestionStats>>,
95 seen_ids: Arc<std::sync::RwLock<std::collections::HashSet<String>>>,
96}
97
98impl DataIngester {
99 pub fn new(config: IngestionConfig) -> Self {
101 Self {
102 config,
103 stats: Arc::new(std::sync::RwLock::new(IngestionStats::default())),
104 seen_ids: Arc::new(std::sync::RwLock::new(std::collections::HashSet::new())),
105 }
106 }
107
108 pub async fn ingest_all<S: DataSource>(&self, source: &S) -> Result<Vec<DataRecord>> {
110 let mut all_records = Vec::new();
111 let mut cursor: Option<String> = None;
112
113 loop {
114 let (batch, next_cursor) = self
115 .fetch_with_retry(source, cursor.clone(), self.config.batch_size)
116 .await?;
117
118 if batch.is_empty() {
119 break;
120 }
121
122 let records = if self.config.deduplicate {
124 self.deduplicate_batch(batch)
125 } else {
126 batch
127 };
128
129 all_records.extend(records);
130
131 {
132 let mut stats = self.stats.write().unwrap();
133 stats.batches_processed += 1;
134 }
135
136 cursor = next_cursor;
137 if cursor.is_none() {
138 break;
139 }
140
141 if self.config.rate_limit > 0 {
143 let delay = 1000 / self.config.rate_limit as u64;
144 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
145 }
146 }
147
148 Ok(all_records)
149 }
150
151 pub async fn stream_records<S: DataSource + 'static>(
153 &self,
154 source: Arc<S>,
155 buffer_size: usize,
156 ) -> Result<mpsc::Receiver<DataRecord>> {
157 let (tx, rx) = mpsc::channel(buffer_size);
158 let config = self.config.clone();
159 let stats = self.stats.clone();
160 let seen_ids = self.seen_ids.clone();
161
162 tokio::spawn(async move {
163 let mut cursor: Option<String> = None;
164
165 loop {
166 match source
167 .fetch_batch(cursor.clone(), config.batch_size)
168 .await
169 {
170 Ok((batch, next_cursor)) => {
171 if batch.is_empty() {
172 break;
173 }
174
175 for record in batch {
176 if config.deduplicate {
178 let mut ids = seen_ids.write().unwrap();
179 if ids.contains(&record.id) {
180 continue;
181 }
182 ids.insert(record.id.clone());
183 }
184
185 if tx.send(record).await.is_err() {
186 return; }
188
189 let mut s = stats.write().unwrap();
190 s.records_fetched += 1;
191 }
192
193 cursor = next_cursor;
194 if cursor.is_none() {
195 break;
196 }
197 }
198 Err(_) => {
199 let mut s = stats.write().unwrap();
200 s.errors += 1;
201 break;
202 }
203 }
204 }
205 });
206
207 Ok(rx)
208 }
209
210 async fn fetch_with_retry<S: DataSource>(
212 &self,
213 source: &S,
214 cursor: Option<String>,
215 batch_size: usize,
216 ) -> Result<(Vec<DataRecord>, Option<String>)> {
217 let mut last_error = None;
218
219 for attempt in 0..=self.config.retry_count {
220 if attempt > 0 {
221 let delay = self.config.retry_delay_ms * (1 << (attempt - 1));
222 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
223
224 let mut stats = self.stats.write().unwrap();
225 stats.retries += 1;
226 }
227
228 match source.fetch_batch(cursor.clone(), batch_size).await {
229 Ok(result) => return Ok(result),
230 Err(e) => {
231 last_error = Some(e);
232 }
233 }
234 }
235
236 let mut stats = self.stats.write().unwrap();
237 stats.errors += 1;
238
239 Err(last_error.unwrap_or_else(|| FrameworkError::Ingestion("Unknown error".to_string())))
240 }
241
242 fn deduplicate_batch(&self, batch: Vec<DataRecord>) -> Vec<DataRecord> {
244 let mut unique = Vec::with_capacity(batch.len());
245 let mut seen = self.seen_ids.write().unwrap();
246
247 for record in batch {
248 if !seen.contains(&record.id) {
249 seen.insert(record.id.clone());
250 unique.push(record);
251 } else {
252 let mut stats = self.stats.write().unwrap();
253 stats.duplicates_skipped += 1;
254 }
255 }
256
257 unique
258 }
259
260 pub fn stats(&self) -> IngestionStats {
262 self.stats.read().unwrap().clone()
263 }
264
265 pub fn reset_stats(&self) {
267 *self.stats.write().unwrap() = IngestionStats::default();
268 }
269}
270
271#[async_trait]
273pub trait RecordTransformer: Send + Sync {
274 async fn transform(&self, record: DataRecord) -> Result<DataRecord>;
276
277 fn filter(&self, record: &DataRecord) -> bool {
279 true
280 }
281}
282
283pub struct IdentityTransformer;
285
286#[async_trait]
287impl RecordTransformer for IdentityTransformer {
288 async fn transform(&self, record: DataRecord) -> Result<DataRecord> {
289 Ok(record)
290 }
291}
292
293pub struct BatchIngester<T: RecordTransformer> {
295 ingester: DataIngester,
296 transformer: T,
297}
298
299impl<T: RecordTransformer> BatchIngester<T> {
300 pub fn new(config: IngestionConfig, transformer: T) -> Self {
302 Self {
303 ingester: DataIngester::new(config),
304 transformer,
305 }
306 }
307
308 pub async fn ingest_all<S: DataSource>(&self, source: &S) -> Result<Vec<DataRecord>> {
310 let raw_records = self.ingester.ingest_all(source).await?;
311
312 let mut transformed = Vec::with_capacity(raw_records.len());
313 for record in raw_records {
314 if self.transformer.filter(&record) {
315 let t = self.transformer.transform(record).await?;
316 transformed.push(t);
317 }
318 }
319
320 Ok(transformed)
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_default_config() {
330 let config = IngestionConfig::default();
331 assert_eq!(config.batch_size, 1000);
332 assert!(config.deduplicate);
333 }
334
335 #[test]
336 fn test_ingester_creation() {
337 let config = IngestionConfig::default();
338 let ingester = DataIngester::new(config);
339 let stats = ingester.stats();
340 assert_eq!(stats.records_fetched, 0);
341 }
342}