1use super::chunk::DataChunk;
6use super::operators::{Operator, OperatorError};
7use super::pipeline::Source;
8use super::vector::ValueVector;
9use graphos_common::types::{NodeId, Value};
10
11pub struct OperatorSource {
16 operator: Box<dyn Operator>,
17}
18
19impl OperatorSource {
20 pub fn new(operator: Box<dyn Operator>) -> Self {
22 Self { operator }
23 }
24}
25
26impl Source for OperatorSource {
27 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
28 self.operator.next()
31 }
32
33 fn reset(&mut self) {
34 self.operator.reset();
35 }
36
37 fn name(&self) -> &'static str {
38 "OperatorSource"
39 }
40}
41
42pub struct VectorSource {
46 values: Vec<Vec<Value>>,
47 position: usize,
48 num_columns: usize,
49}
50
51impl VectorSource {
52 pub fn new(columns: Vec<Vec<Value>>) -> Self {
54 let num_columns = columns.len();
55 Self {
56 values: columns,
57 position: 0,
58 num_columns,
59 }
60 }
61
62 pub fn single_column(values: Vec<Value>) -> Self {
64 Self::new(vec![values])
65 }
66
67 pub fn from_node_ids(ids: Vec<NodeId>) -> Self {
69 let values: Vec<Value> = ids
70 .into_iter()
71 .map(|id| Value::Int64(id.0 as i64))
72 .collect();
73 Self::single_column(values)
74 }
75}
76
77impl Source for VectorSource {
78 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
79 if self.num_columns == 0 || self.values[0].is_empty() {
80 return Ok(None);
81 }
82
83 let total_rows = self.values[0].len();
84 if self.position >= total_rows {
85 return Ok(None);
86 }
87
88 let end = (self.position + chunk_size).min(total_rows);
89 let mut columns = Vec::with_capacity(self.num_columns);
90
91 for col_values in &self.values {
92 let slice = &col_values[self.position..end];
93 let vector = ValueVector::from_values(slice);
94 columns.push(vector);
95 }
96
97 self.position = end;
98 Ok(Some(DataChunk::new(columns)))
99 }
100
101 fn reset(&mut self) {
102 self.position = 0;
103 }
104
105 fn name(&self) -> &'static str {
106 "VectorSource"
107 }
108}
109
110pub struct EmptySource;
112
113impl EmptySource {
114 pub fn new() -> Self {
116 Self
117 }
118}
119
120impl Default for EmptySource {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126impl Source for EmptySource {
127 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
128 Ok(None)
129 }
130
131 fn reset(&mut self) {}
132
133 fn name(&self) -> &'static str {
134 "EmptySource"
135 }
136}
137
138pub struct ChunkSource {
142 chunks: Vec<DataChunk>,
143 position: usize,
144}
145
146impl ChunkSource {
147 pub fn new(chunks: Vec<DataChunk>) -> Self {
149 Self {
150 chunks,
151 position: 0,
152 }
153 }
154
155 pub fn single(chunk: DataChunk) -> Self {
157 Self::new(vec![chunk])
158 }
159}
160
161impl Source for ChunkSource {
162 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
163 if self.position >= self.chunks.len() {
164 return Ok(None);
165 }
166
167 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
168 self.position += 1;
169 Ok(Some(chunk))
170 }
171
172 fn reset(&mut self) {
173 self.position = 0;
175 }
176
177 fn name(&self) -> &'static str {
178 "ChunkSource"
179 }
180}
181
182pub struct GeneratorSource<F>
186where
187 F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
188{
189 generator: F,
190 row_index: usize,
191 exhausted: bool,
192}
193
194impl<F> GeneratorSource<F>
195where
196 F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
197{
198 pub fn new(generator: F) -> Self {
200 Self {
201 generator,
202 row_index: 0,
203 exhausted: false,
204 }
205 }
206}
207
208impl<F> Source for GeneratorSource<F>
209where
210 F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
211{
212 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
213 if self.exhausted {
214 return Ok(None);
215 }
216
217 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(chunk_size);
218
219 for _ in 0..chunk_size {
220 if let Some(row) = (self.generator)(self.row_index) {
221 rows.push(row);
222 self.row_index += 1;
223 } else {
224 self.exhausted = true;
225 break;
226 }
227 }
228
229 if rows.is_empty() {
230 return Ok(None);
231 }
232
233 let num_columns = rows[0].len();
235 let mut columns: Vec<ValueVector> = (0..num_columns).map(|_| ValueVector::new()).collect();
236
237 for row in rows {
238 for (col_idx, val) in row.into_iter().enumerate() {
239 if col_idx < columns.len() {
240 columns[col_idx].push(val);
241 }
242 }
243 }
244
245 Ok(Some(DataChunk::new(columns)))
246 }
247
248 fn reset(&mut self) {
249 self.row_index = 0;
250 self.exhausted = false;
251 }
252
253 fn name(&self) -> &'static str {
254 "GeneratorSource"
255 }
256}
257
258#[cfg(feature = "rdf")]
262pub struct TripleScanSource {
263 triples: Vec<(Value, Value, Value)>,
265 position: usize,
267 output_vars: Vec<String>,
269}
270
271#[cfg(feature = "rdf")]
272impl TripleScanSource {
273 pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
279 Self {
280 triples,
281 position: 0,
282 output_vars,
283 }
284 }
285
286 pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
288 where
289 I: IntoIterator<Item = (Value, Value, Value)>,
290 {
291 Self::new(iter.into_iter().collect(), output_vars)
292 }
293
294 pub fn remaining(&self) -> usize {
296 self.triples.len().saturating_sub(self.position)
297 }
298}
299
300#[cfg(feature = "rdf")]
301impl Source for TripleScanSource {
302 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
303 if self.position >= self.triples.len() {
304 return Ok(None);
305 }
306
307 let end = (self.position + chunk_size).min(self.triples.len());
308 let slice = &self.triples[self.position..end];
309
310 let mut subjects = Vec::with_capacity(slice.len());
312 let mut predicates = Vec::with_capacity(slice.len());
313 let mut objects = Vec::with_capacity(slice.len());
314
315 for (s, p, o) in slice {
316 subjects.push(s.clone());
317 predicates.push(p.clone());
318 objects.push(o.clone());
319 }
320
321 let mut columns = Vec::with_capacity(3);
322
323 for var in &self.output_vars {
325 match var.as_str() {
326 "s" | "subject" => columns.push(ValueVector::from_values(&subjects)),
327 "p" | "predicate" => columns.push(ValueVector::from_values(&predicates)),
328 "o" | "object" => columns.push(ValueVector::from_values(&objects)),
329 _ => {
330 if columns.is_empty() {
334 columns.push(ValueVector::from_values(&subjects));
335 columns.push(ValueVector::from_values(&predicates));
336 columns.push(ValueVector::from_values(&objects));
337 }
338 }
339 }
340 }
341
342 if columns.is_empty() {
344 columns.push(ValueVector::from_values(&subjects));
345 columns.push(ValueVector::from_values(&predicates));
346 columns.push(ValueVector::from_values(&objects));
347 }
348
349 self.position = end;
350 Ok(Some(DataChunk::new(columns)))
351 }
352
353 fn reset(&mut self) {
354 self.position = 0;
355 }
356
357 fn name(&self) -> &'static str {
358 "TripleScanSource"
359 }
360}
361
362#[cfg(feature = "rdf")]
367#[allow(dead_code)]
368pub struct TripleJoinOperator {
369 join_var: String,
371 left_key_col: usize,
373 right_key_col: usize,
375 right_buffer: Vec<DataChunk>,
377 right_complete: bool,
379}
380
381#[cfg(feature = "rdf")]
382impl TripleJoinOperator {
383 pub fn new(join_var: String, left_key_col: usize, right_key_col: usize) -> Self {
385 Self {
386 join_var,
387 left_key_col,
388 right_key_col,
389 right_buffer: Vec::new(),
390 right_complete: false,
391 }
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn test_vector_source_single_chunk() {
401 let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
402 let mut source = VectorSource::single_column(values);
403
404 let chunk = source.next_chunk(10).unwrap().unwrap();
405 assert_eq!(chunk.len(), 3);
406
407 let next = source.next_chunk(10).unwrap();
408 assert!(next.is_none());
409 }
410
411 #[test]
412 fn test_vector_source_chunked() {
413 let values: Vec<Value> = (0..10).map(|i| Value::Int64(i)).collect();
414 let mut source = VectorSource::single_column(values);
415
416 let chunk1 = source.next_chunk(3).unwrap().unwrap();
417 assert_eq!(chunk1.len(), 3);
418
419 let chunk2 = source.next_chunk(3).unwrap().unwrap();
420 assert_eq!(chunk2.len(), 3);
421
422 let chunk3 = source.next_chunk(3).unwrap().unwrap();
423 assert_eq!(chunk3.len(), 3);
424
425 let chunk4 = source.next_chunk(3).unwrap().unwrap();
426 assert_eq!(chunk4.len(), 1); let none = source.next_chunk(3).unwrap();
429 assert!(none.is_none());
430 }
431
432 #[test]
433 fn test_vector_source_reset() {
434 let values = vec![Value::Int64(1), Value::Int64(2)];
435 let mut source = VectorSource::single_column(values);
436
437 let _ = source.next_chunk(10).unwrap();
438 assert!(source.next_chunk(10).unwrap().is_none());
439
440 source.reset();
441 let chunk = source.next_chunk(10).unwrap().unwrap();
442 assert_eq!(chunk.len(), 2);
443 }
444
445 #[test]
446 fn test_empty_source() {
447 let mut source = EmptySource::new();
448 assert!(source.next_chunk(100).unwrap().is_none());
449 }
450
451 #[test]
452 fn test_chunk_source() {
453 let v1 = ValueVector::from_values(&[Value::Int64(1), Value::Int64(2)]);
454 let chunk1 = DataChunk::new(vec![v1]);
455
456 let v2 = ValueVector::from_values(&[Value::Int64(3), Value::Int64(4)]);
457 let chunk2 = DataChunk::new(vec![v2]);
458
459 let mut source = ChunkSource::new(vec![chunk1, chunk2]);
460
461 let c1 = source.next_chunk(100).unwrap().unwrap();
462 assert_eq!(c1.len(), 2);
463
464 let c2 = source.next_chunk(100).unwrap().unwrap();
465 assert_eq!(c2.len(), 2);
466
467 assert!(source.next_chunk(100).unwrap().is_none());
468 }
469
470 #[test]
471 fn test_generator_source() {
472 let mut source = GeneratorSource::new(|i| {
473 if i < 5 {
474 Some(vec![Value::Int64(i as i64)])
475 } else {
476 None
477 }
478 });
479
480 let chunk1 = source.next_chunk(2).unwrap().unwrap();
481 assert_eq!(chunk1.len(), 2);
482
483 let chunk2 = source.next_chunk(2).unwrap().unwrap();
484 assert_eq!(chunk2.len(), 2);
485
486 let chunk3 = source.next_chunk(2).unwrap().unwrap();
487 assert_eq!(chunk3.len(), 1);
488
489 assert!(source.next_chunk(2).unwrap().is_none());
490 }
491}