ruvector_data_framework/
ingester.rs

1//! Data ingestion pipeline for streaming data into RuVector
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use tokio::sync::mpsc;
9
10use crate::{DataRecord, DataSource, FrameworkError, Result};
11
12/// Configuration for data ingestion
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct IngestionConfig {
15    /// Batch size for fetching
16    pub batch_size: usize,
17
18    /// Maximum concurrent fetches
19    pub max_concurrent: usize,
20
21    /// Retry count on failure
22    pub retry_count: u32,
23
24    /// Delay between retries (ms)
25    pub retry_delay_ms: u64,
26
27    /// Enable deduplication
28    pub deduplicate: bool,
29
30    /// Rate limit (requests per second, 0 = unlimited)
31    pub rate_limit: u32,
32}
33
34impl Default for IngestionConfig {
35    fn default() -> Self {
36        Self {
37            batch_size: 1000,
38            max_concurrent: 4,
39            retry_count: 3,
40            retry_delay_ms: 1000,
41            deduplicate: true,
42            rate_limit: 10,
43        }
44    }
45}
46
47/// Configuration for a specific data source
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SourceConfig {
50    /// Source identifier
51    pub source_id: String,
52
53    /// API base URL
54    pub base_url: String,
55
56    /// API key (if required)
57    pub api_key: Option<String>,
58
59    /// Additional headers
60    pub headers: HashMap<String, String>,
61
62    /// Custom parameters
63    pub params: HashMap<String, String>,
64}
65
66/// Statistics for ingestion process
67#[derive(Debug, Clone, Default, Serialize, Deserialize)]
68pub struct IngestionStats {
69    /// Total records fetched
70    pub records_fetched: u64,
71
72    /// Batches processed
73    pub batches_processed: u64,
74
75    /// Retries performed
76    pub retries: u64,
77
78    /// Errors encountered
79    pub errors: u64,
80
81    /// Duplicates skipped
82    pub duplicates_skipped: u64,
83
84    /// Bytes downloaded
85    pub bytes_downloaded: u64,
86
87    /// Average batch fetch time (ms)
88    pub avg_batch_time_ms: f64,
89}
90
91/// Data ingestion pipeline
92pub struct DataIngester {
93    config: IngestionConfig,
94    stats: Arc<std::sync::RwLock<IngestionStats>>,
95    seen_ids: Arc<std::sync::RwLock<std::collections::HashSet<String>>>,
96}
97
98impl DataIngester {
99    /// Create a new data ingester
100    pub fn new(config: IngestionConfig) -> Self {
101        Self {
102            config,
103            stats: Arc::new(std::sync::RwLock::new(IngestionStats::default())),
104            seen_ids: Arc::new(std::sync::RwLock::new(std::collections::HashSet::new())),
105        }
106    }
107
108    /// Ingest all data from a source
109    pub async fn ingest_all<S: DataSource>(&self, source: &S) -> Result<Vec<DataRecord>> {
110        let mut all_records = Vec::new();
111        let mut cursor: Option<String> = None;
112
113        loop {
114            let (batch, next_cursor) = self
115                .fetch_with_retry(source, cursor.clone(), self.config.batch_size)
116                .await?;
117
118            if batch.is_empty() {
119                break;
120            }
121
122            // Deduplicate if enabled
123            let records = if self.config.deduplicate {
124                self.deduplicate_batch(batch)
125            } else {
126                batch
127            };
128
129            all_records.extend(records);
130
131            {
132                let mut stats = self.stats.write().unwrap();
133                stats.batches_processed += 1;
134            }
135
136            cursor = next_cursor;
137            if cursor.is_none() {
138                break;
139            }
140
141            // Rate limiting
142            if self.config.rate_limit > 0 {
143                let delay = 1000 / self.config.rate_limit as u64;
144                tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
145            }
146        }
147
148        Ok(all_records)
149    }
150
151    /// Stream records with backpressure
152    pub async fn stream_records<S: DataSource + 'static>(
153        &self,
154        source: Arc<S>,
155        buffer_size: usize,
156    ) -> Result<mpsc::Receiver<DataRecord>> {
157        let (tx, rx) = mpsc::channel(buffer_size);
158        let config = self.config.clone();
159        let stats = self.stats.clone();
160        let seen_ids = self.seen_ids.clone();
161
162        tokio::spawn(async move {
163            let mut cursor: Option<String> = None;
164
165            loop {
166                match source
167                    .fetch_batch(cursor.clone(), config.batch_size)
168                    .await
169                {
170                    Ok((batch, next_cursor)) => {
171                        if batch.is_empty() {
172                            break;
173                        }
174
175                        for record in batch {
176                            // Deduplicate
177                            if config.deduplicate {
178                                let mut ids = seen_ids.write().unwrap();
179                                if ids.contains(&record.id) {
180                                    continue;
181                                }
182                                ids.insert(record.id.clone());
183                            }
184
185                            if tx.send(record).await.is_err() {
186                                return; // Receiver dropped
187                            }
188
189                            let mut s = stats.write().unwrap();
190                            s.records_fetched += 1;
191                        }
192
193                        cursor = next_cursor;
194                        if cursor.is_none() {
195                            break;
196                        }
197                    }
198                    Err(_) => {
199                        let mut s = stats.write().unwrap();
200                        s.errors += 1;
201                        break;
202                    }
203                }
204            }
205        });
206
207        Ok(rx)
208    }
209
210    /// Fetch a batch with retry logic
211    async fn fetch_with_retry<S: DataSource>(
212        &self,
213        source: &S,
214        cursor: Option<String>,
215        batch_size: usize,
216    ) -> Result<(Vec<DataRecord>, Option<String>)> {
217        let mut last_error = None;
218
219        for attempt in 0..=self.config.retry_count {
220            if attempt > 0 {
221                let delay = self.config.retry_delay_ms * (1 << (attempt - 1));
222                tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
223
224                let mut stats = self.stats.write().unwrap();
225                stats.retries += 1;
226            }
227
228            match source.fetch_batch(cursor.clone(), batch_size).await {
229                Ok(result) => return Ok(result),
230                Err(e) => {
231                    last_error = Some(e);
232                }
233            }
234        }
235
236        let mut stats = self.stats.write().unwrap();
237        stats.errors += 1;
238
239        Err(last_error.unwrap_or_else(|| FrameworkError::Ingestion("Unknown error".to_string())))
240    }
241
242    /// Deduplicate a batch of records
243    fn deduplicate_batch(&self, batch: Vec<DataRecord>) -> Vec<DataRecord> {
244        let mut unique = Vec::with_capacity(batch.len());
245        let mut seen = self.seen_ids.write().unwrap();
246
247        for record in batch {
248            if !seen.contains(&record.id) {
249                seen.insert(record.id.clone());
250                unique.push(record);
251            } else {
252                let mut stats = self.stats.write().unwrap();
253                stats.duplicates_skipped += 1;
254            }
255        }
256
257        unique
258    }
259
260    /// Get current ingestion statistics
261    pub fn stats(&self) -> IngestionStats {
262        self.stats.read().unwrap().clone()
263    }
264
265    /// Reset statistics
266    pub fn reset_stats(&self) {
267        *self.stats.write().unwrap() = IngestionStats::default();
268    }
269}
270
271/// Trait for transforming records during ingestion
272#[async_trait]
273pub trait RecordTransformer: Send + Sync {
274    /// Transform a record
275    async fn transform(&self, record: DataRecord) -> Result<DataRecord>;
276
277    /// Filter records (return false to skip)
278    fn filter(&self, record: &DataRecord) -> bool {
279        true
280    }
281}
282
283/// Identity transformer (no-op)
284pub struct IdentityTransformer;
285
286#[async_trait]
287impl RecordTransformer for IdentityTransformer {
288    async fn transform(&self, record: DataRecord) -> Result<DataRecord> {
289        Ok(record)
290    }
291}
292
293/// Batched ingestion with transformations
294pub struct BatchIngester<T: RecordTransformer> {
295    ingester: DataIngester,
296    transformer: T,
297}
298
299impl<T: RecordTransformer> BatchIngester<T> {
300    /// Create a new batch ingester with transformer
301    pub fn new(config: IngestionConfig, transformer: T) -> Self {
302        Self {
303            ingester: DataIngester::new(config),
304            transformer,
305        }
306    }
307
308    /// Ingest and transform all records
309    pub async fn ingest_all<S: DataSource>(&self, source: &S) -> Result<Vec<DataRecord>> {
310        let raw_records = self.ingester.ingest_all(source).await?;
311
312        let mut transformed = Vec::with_capacity(raw_records.len());
313        for record in raw_records {
314            if self.transformer.filter(&record) {
315                let t = self.transformer.transform(record).await?;
316                transformed.push(t);
317            }
318        }
319
320        Ok(transformed)
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_default_config() {
330        let config = IngestionConfig::default();
331        assert_eq!(config.batch_size, 1000);
332        assert!(config.deduplicate);
333    }
334
335    #[test]
336    fn test_ingester_creation() {
337        let config = IngestionConfig::default();
338        let ingester = DataIngester::new(config);
339        let stats = ingester.stats();
340        assert_eq!(stats.records_fetched, 0);
341    }
342}