grafeo_core/execution/
source.rs1use super::chunk::DataChunk;
6use super::operators::{Operator, OperatorError};
7use super::pipeline::Source;
8use super::vector::ValueVector;
9use grafeo_common::types::{NodeId, Value};
10
11pub struct OperatorSource {
16 operator: Box<dyn Operator>,
17}
18
19impl OperatorSource {
20 pub fn new(operator: Box<dyn Operator>) -> Self {
22 Self { operator }
23 }
24}
25
26impl Source for OperatorSource {
27 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
28 self.operator.next()
31 }
32
33 fn reset(&mut self) {
34 self.operator.reset();
35 }
36
37 fn name(&self) -> &'static str {
38 "OperatorSource"
39 }
40}
41
42pub struct VectorSource {
46 values: Vec<Vec<Value>>,
47 position: usize,
48 num_columns: usize,
49}
50
51impl VectorSource {
52 pub fn new(columns: Vec<Vec<Value>>) -> Self {
54 let num_columns = columns.len();
55 Self {
56 values: columns,
57 position: 0,
58 num_columns,
59 }
60 }
61
62 pub fn single_column(values: Vec<Value>) -> Self {
64 Self::new(vec![values])
65 }
66
67 pub fn from_node_ids(ids: Vec<NodeId>) -> Result<Self, OperatorError> {
73 let values: Vec<Value> = ids
74 .into_iter()
75 .map(|id| {
76 let signed = i64::try_from(id.0).map_err(|_| {
77 OperatorError::Execution(format!("NodeId {} exceeds i64 range", id.0))
78 })?;
79 Ok(Value::Int64(signed))
80 })
81 .collect::<Result<Vec<_>, OperatorError>>()?;
82 Ok(Self::single_column(values))
83 }
84}
85
86impl Source for VectorSource {
87 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
88 if self.num_columns == 0 || self.values[0].is_empty() {
89 return Ok(None);
90 }
91
92 let total_rows = self.values[0].len();
93 if self.position >= total_rows {
94 return Ok(None);
95 }
96
97 let end = (self.position + chunk_size).min(total_rows);
98 let mut columns = Vec::with_capacity(self.num_columns);
99
100 for col_values in &self.values {
101 let slice = &col_values[self.position..end];
102 let vector = ValueVector::from_values(slice);
103 columns.push(vector);
104 }
105
106 self.position = end;
107 Ok(Some(DataChunk::new(columns)))
108 }
109
110 fn reset(&mut self) {
111 self.position = 0;
112 }
113
114 fn name(&self) -> &'static str {
115 "VectorSource"
116 }
117}
118
119pub struct EmptySource;
121
122impl EmptySource {
123 pub fn new() -> Self {
125 Self
126 }
127}
128
129impl Default for EmptySource {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl Source for EmptySource {
136 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
137 Ok(None)
138 }
139
140 fn reset(&mut self) {}
141
142 fn name(&self) -> &'static str {
143 "EmptySource"
144 }
145}
146
147pub struct ChunkSource {
151 chunks: Vec<DataChunk>,
152 position: usize,
153}
154
155impl ChunkSource {
156 pub fn new(chunks: Vec<DataChunk>) -> Self {
158 Self {
159 chunks,
160 position: 0,
161 }
162 }
163
164 pub fn single(chunk: DataChunk) -> Self {
166 Self::new(vec![chunk])
167 }
168}
169
170impl Source for ChunkSource {
171 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
172 if self.position >= self.chunks.len() {
173 return Ok(None);
174 }
175
176 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
177 self.position += 1;
178 Ok(Some(chunk))
179 }
180
181 fn reset(&mut self) {
182 self.position = 0;
184 }
185
186 fn name(&self) -> &'static str {
187 "ChunkSource"
188 }
189}
190
191pub struct GeneratorSource<F>
195where
196 F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
197{
198 generator: F,
199 row_index: usize,
200 exhausted: bool,
201}
202
203impl<F> GeneratorSource<F>
204where
205 F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
206{
207 pub fn new(generator: F) -> Self {
209 Self {
210 generator,
211 row_index: 0,
212 exhausted: false,
213 }
214 }
215}
216
217impl<F> Source for GeneratorSource<F>
218where
219 F: FnMut(usize) -> Option<Vec<Value>> + Send + Sync,
220{
221 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
222 if self.exhausted {
223 return Ok(None);
224 }
225
226 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(chunk_size);
227
228 for _ in 0..chunk_size {
229 if let Some(row) = (self.generator)(self.row_index) {
230 rows.push(row);
231 self.row_index += 1;
232 } else {
233 self.exhausted = true;
234 break;
235 }
236 }
237
238 if rows.is_empty() {
239 return Ok(None);
240 }
241
242 let num_columns = rows[0].len();
244 let mut columns: Vec<ValueVector> = (0..num_columns).map(|_| ValueVector::new()).collect();
245
246 for row in rows {
247 for (col_idx, val) in row.into_iter().enumerate() {
248 if col_idx < columns.len() {
249 columns[col_idx].push(val);
250 }
251 }
252 }
253
254 Ok(Some(DataChunk::new(columns)))
255 }
256
257 fn reset(&mut self) {
258 self.row_index = 0;
259 self.exhausted = false;
260 }
261
262 fn name(&self) -> &'static str {
263 "GeneratorSource"
264 }
265}
266
267#[cfg(feature = "triple-store")]
271pub struct TripleScanSource {
272 triples: Vec<(Value, Value, Value)>,
274 position: usize,
276 output_vars: Vec<String>,
278}
279
280#[cfg(feature = "triple-store")]
281impl TripleScanSource {
282 pub fn new(triples: Vec<(Value, Value, Value)>, output_vars: Vec<String>) -> Self {
288 Self {
289 triples,
290 position: 0,
291 output_vars,
292 }
293 }
294
295 pub fn from_triples<I>(iter: I, output_vars: Vec<String>) -> Self
297 where
298 I: IntoIterator<Item = (Value, Value, Value)>,
299 {
300 Self::new(iter.into_iter().collect(), output_vars)
301 }
302
303 pub fn remaining(&self) -> usize {
305 self.triples.len().saturating_sub(self.position)
306 }
307}
308
309#[cfg(feature = "triple-store")]
310impl Source for TripleScanSource {
311 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
312 if self.position >= self.triples.len() {
313 return Ok(None);
314 }
315
316 let end = (self.position + chunk_size).min(self.triples.len());
317 let slice = &self.triples[self.position..end];
318
319 let mut subjects = Vec::with_capacity(slice.len());
321 let mut predicates = Vec::with_capacity(slice.len());
322 let mut objects = Vec::with_capacity(slice.len());
323
324 for (s, p, o) in slice {
325 subjects.push(s.clone());
326 predicates.push(p.clone());
327 objects.push(o.clone());
328 }
329
330 let mut columns = Vec::with_capacity(3);
331
332 for var in &self.output_vars {
334 match var.as_str() {
335 "s" | "subject" => columns.push(ValueVector::from_values(&subjects)),
336 "p" | "predicate" => columns.push(ValueVector::from_values(&predicates)),
337 "o" | "object" => columns.push(ValueVector::from_values(&objects)),
338 _ => {
339 if columns.is_empty() {
343 columns.push(ValueVector::from_values(&subjects));
344 columns.push(ValueVector::from_values(&predicates));
345 columns.push(ValueVector::from_values(&objects));
346 }
347 }
348 }
349 }
350
351 if columns.is_empty() {
353 columns.push(ValueVector::from_values(&subjects));
354 columns.push(ValueVector::from_values(&predicates));
355 columns.push(ValueVector::from_values(&objects));
356 }
357
358 self.position = end;
359 Ok(Some(DataChunk::new(columns)))
360 }
361
362 fn reset(&mut self) {
363 self.position = 0;
364 }
365
366 fn name(&self) -> &'static str {
367 "TripleScanSource"
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[test]
376 fn test_vector_source_single_chunk() {
377 let values = vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)];
378 let mut source = VectorSource::single_column(values);
379
380 let chunk = source.next_chunk(10).unwrap().unwrap();
381 assert_eq!(chunk.len(), 3);
382
383 let next = source.next_chunk(10).unwrap();
384 assert!(next.is_none());
385 }
386
387 #[test]
388 fn test_vector_source_chunked() {
389 let values: Vec<Value> = (0..10).map(Value::Int64).collect();
390 let mut source = VectorSource::single_column(values);
391
392 let chunk1 = source.next_chunk(3).unwrap().unwrap();
393 assert_eq!(chunk1.len(), 3);
394
395 let chunk2 = source.next_chunk(3).unwrap().unwrap();
396 assert_eq!(chunk2.len(), 3);
397
398 let chunk3 = source.next_chunk(3).unwrap().unwrap();
399 assert_eq!(chunk3.len(), 3);
400
401 let chunk4 = source.next_chunk(3).unwrap().unwrap();
402 assert_eq!(chunk4.len(), 1); let none = source.next_chunk(3).unwrap();
405 assert!(none.is_none());
406 }
407
408 #[test]
409 fn test_vector_source_reset() {
410 let values = vec![Value::Int64(1), Value::Int64(2)];
411 let mut source = VectorSource::single_column(values);
412
413 let _ = source.next_chunk(10).unwrap();
414 assert!(source.next_chunk(10).unwrap().is_none());
415
416 source.reset();
417 let chunk = source.next_chunk(10).unwrap().unwrap();
418 assert_eq!(chunk.len(), 2);
419 }
420
421 #[test]
422 fn test_empty_source() {
423 let mut source = EmptySource::new();
424 assert!(source.next_chunk(100).unwrap().is_none());
425 }
426
427 #[test]
428 fn test_chunk_source() {
429 let v1 = ValueVector::from_values(&[Value::Int64(1), Value::Int64(2)]);
430 let chunk1 = DataChunk::new(vec![v1]);
431
432 let v2 = ValueVector::from_values(&[Value::Int64(3), Value::Int64(4)]);
433 let chunk2 = DataChunk::new(vec![v2]);
434
435 let mut source = ChunkSource::new(vec![chunk1, chunk2]);
436
437 let c1 = source.next_chunk(100).unwrap().unwrap();
438 assert_eq!(c1.len(), 2);
439
440 let c2 = source.next_chunk(100).unwrap().unwrap();
441 assert_eq!(c2.len(), 2);
442
443 assert!(source.next_chunk(100).unwrap().is_none());
444 }
445
446 #[test]
447 #[allow(clippy::cast_possible_wrap)]
449 fn test_generator_source() {
450 let mut source = GeneratorSource::new(|i| {
451 if i < 5 {
452 Some(vec![Value::Int64(i as i64)])
453 } else {
454 None
455 }
456 });
457
458 let chunk1 = source.next_chunk(2).unwrap().unwrap();
459 assert_eq!(chunk1.len(), 2);
460
461 let chunk2 = source.next_chunk(2).unwrap().unwrap();
462 assert_eq!(chunk2.len(), 2);
463
464 let chunk3 = source.next_chunk(2).unwrap().unwrap();
465 assert_eq!(chunk3.len(), 1);
466
467 assert!(source.next_chunk(2).unwrap().is_none());
468 }
469}