Skip to main content

oxirs_core/query/
streaming_results.rs

1//! Streaming result sets for large query results with minimal memory overhead
2
3#![allow(dead_code)]
4
5#[cfg(test)]
6use crate::model::{Literal, NamedNode};
7use crate::model::{Term, Triple, Variable};
8use crate::OxirsError;
9use crossbeam::channel;
10use futures::stream::Stream;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use std::time::{Duration, Instant};
18use tokio::sync::mpsc;
19
20/// Configuration for streaming result sets
21#[derive(Debug, Clone)]
22pub struct StreamingConfig {
23    /// Buffer size for internal channels
24    pub buffer_size: usize,
25    /// Maximum memory usage in bytes
26    pub max_memory: usize,
27    /// Enable progress tracking
28    pub track_progress: bool,
29    /// Backpressure threshold (0.0 - 1.0)
30    pub backpressure_threshold: f64,
31    /// Timeout for blocking operations
32    pub timeout: Option<Duration>,
33}
34
35impl Default for StreamingConfig {
36    fn default() -> Self {
37        Self {
38            buffer_size: 1024,
39            max_memory: 100 * 1024 * 1024, // 100MB
40            track_progress: true,
41            backpressure_threshold: 0.8,
42            timeout: Some(Duration::from_secs(30)), // 30 seconds
43        }
44    }
45}
46
47/// Progress information for streaming queries
48#[derive(Debug, Clone)]
49pub struct StreamingProgress {
50    /// Total results processed
51    pub processed: usize,
52    /// Estimated total results (if known)
53    pub estimated_total: Option<usize>,
54    /// Current memory usage
55    pub memory_used: usize,
56    /// Query start time
57    pub start_time: Instant,
58    /// Is query still running
59    pub is_running: bool,
60}
61
62/// A single solution (row) in a SELECT query result
63#[derive(Debug, Clone)]
64pub struct Solution {
65    /// Variable bindings for this solution
66    bindings: HashMap<Variable, Option<Term>>,
67    /// Metadata about this solution
68    metadata: SolutionMetadata,
69}
70
71#[derive(Debug, Clone, Default)]
72pub struct SolutionMetadata {
73    /// Source of this solution (for federated queries)
74    pub source: Option<String>,
75    /// Confidence score (for fuzzy queries)
76    pub confidence: Option<f64>,
77    /// Solution timestamp
78    pub timestamp: Option<u64>,
79}
80
81impl Solution {
82    pub fn new(bindings: HashMap<Variable, Option<Term>>) -> Self {
83        Self {
84            bindings,
85            metadata: SolutionMetadata::default(),
86        }
87    }
88
89    pub fn with_metadata(
90        bindings: HashMap<Variable, Option<Term>>,
91        metadata: SolutionMetadata,
92    ) -> Self {
93        Self { bindings, metadata }
94    }
95
96    pub fn get(&self, var: &Variable) -> Option<&Term> {
97        self.bindings.get(var).and_then(|opt| opt.as_ref())
98    }
99
100    pub fn contains(&self, var: &Variable) -> bool {
101        self.bindings.contains_key(var)
102    }
103
104    pub fn variables(&self) -> impl Iterator<Item = &Variable> {
105        self.bindings.keys()
106    }
107
108    pub fn values(&self) -> impl Iterator<Item = &Term> {
109        self.bindings.values().filter_map(|opt| opt.as_ref())
110    }
111
112    pub fn iter(&self) -> impl Iterator<Item = (&Variable, Option<&Term>)> {
113        self.bindings.iter().map(|(k, v)| (k, v.as_ref()))
114    }
115}
116
117/// Streaming iterator for SELECT query results
118pub struct SelectResults {
119    /// Variables in the result set
120    variables: Arc<Vec<Variable>>,
121    /// Channel receiver for solutions
122    receiver: channel::Receiver<Result<Solution, OxirsError>>,
123    /// Progress tracker
124    progress: Arc<RwLock<StreamingProgress>>,
125    /// Cancellation token
126    cancel_token: Arc<AtomicBool>,
127    /// Current buffer for batch operations
128    buffer: Vec<Solution>,
129    /// Configuration
130    config: StreamingConfig,
131}
132
133impl SelectResults {
134    pub fn new(
135        variables: Vec<Variable>,
136        receiver: channel::Receiver<Result<Solution, OxirsError>>,
137        config: StreamingConfig,
138    ) -> Self {
139        let progress = Arc::new(RwLock::new(StreamingProgress {
140            processed: 0,
141            estimated_total: None,
142            memory_used: 0,
143            start_time: Instant::now(),
144            is_running: true,
145        }));
146
147        Self {
148            variables: Arc::new(variables),
149            receiver,
150            progress,
151            cancel_token: Arc::new(AtomicBool::new(false)),
152            buffer: Vec::with_capacity(config.buffer_size),
153            config,
154        }
155    }
156
157    /// Get the variables in the result set
158    pub fn variables(&self) -> &[Variable] {
159        &self.variables
160    }
161
162    /// Get current progress information
163    pub fn progress(&self) -> StreamingProgress {
164        self.progress.read().clone()
165    }
166
167    /// Cancel the query execution
168    pub fn cancel(&self) {
169        self.cancel_token.store(true, Ordering::Relaxed);
170    }
171
172    /// Check if query was cancelled
173    pub fn is_cancelled(&self) -> bool {
174        self.cancel_token.load(Ordering::Relaxed)
175    }
176
177    /// Try to get the next solution without blocking
178    pub fn try_next(&mut self) -> Result<Option<Solution>, OxirsError> {
179        if self.is_cancelled() {
180            return Ok(None);
181        }
182
183        match self.receiver.try_recv() {
184            Ok(Ok(solution)) => {
185                self.update_progress(1);
186                Ok(Some(solution))
187            }
188            Ok(Err(e)) => Err(e),
189            Err(channel::TryRecvError::Empty) => Ok(None),
190            Err(channel::TryRecvError::Disconnected) => {
191                self.mark_completed();
192                Ok(None)
193            }
194        }
195    }
196
197    /// Get the next solution, blocking if necessary
198    #[allow(clippy::should_implement_trait)]
199    pub fn next(&mut self) -> Result<Option<Solution>, OxirsError> {
200        if self.is_cancelled() {
201            return Ok(None);
202        }
203
204        if let Some(timeout) = self.config.timeout {
205            match self.receiver.recv_timeout(timeout) {
206                Ok(Ok(solution)) => {
207                    self.update_progress(1);
208                    Ok(Some(solution))
209                }
210                Ok(Err(e)) => Err(e),
211                Err(channel::RecvTimeoutError::Timeout) => {
212                    Err(OxirsError::Query("Query timeout".to_string()))
213                }
214                Err(channel::RecvTimeoutError::Disconnected) => {
215                    self.mark_completed();
216                    Ok(None)
217                }
218            }
219        } else {
220            // No timeout - block indefinitely
221            match self.receiver.recv() {
222                Ok(Ok(solution)) => {
223                    self.update_progress(1);
224                    Ok(Some(solution))
225                }
226                Ok(Err(e)) => Err(e),
227                Err(channel::RecvError) => {
228                    self.mark_completed();
229                    Ok(None)
230                }
231            }
232        }
233    }
234
235    /// Collect next batch of solutions
236    pub fn next_batch(&mut self, max_size: usize) -> Result<Vec<Solution>, OxirsError> {
237        self.buffer.clear();
238
239        for _ in 0..max_size {
240            match self.try_next()? {
241                Some(solution) => self.buffer.push(solution),
242                None => break,
243            }
244        }
245
246        Ok(std::mem::take(&mut self.buffer))
247    }
248
249    /// Skip n solutions
250    pub fn skip_results(&mut self, n: usize) -> Result<(), OxirsError> {
251        for _ in 0..n {
252            if self.next()?.is_none() {
253                break;
254            }
255        }
256        Ok(())
257    }
258
259    /// Take up to n solutions
260    pub fn take_results(&mut self, n: usize) -> Result<Vec<Solution>, OxirsError> {
261        let mut results = Vec::with_capacity(n.min(self.config.buffer_size));
262
263        for _ in 0..n {
264            match self.next()? {
265                Some(solution) => results.push(solution),
266                None => break,
267            }
268        }
269
270        Ok(results)
271    }
272
273    /// Convert to a stream for async iteration
274    pub fn into_stream(self) -> impl Stream<Item = Result<Solution, OxirsError>> {
275        SelectResultStream::new(self)
276    }
277
278    fn update_progress(&self, count: usize) {
279        let mut progress = self.progress.write();
280        progress.processed += count;
281        // Update memory usage estimate (rough approximation)
282        progress.memory_used = progress.processed * std::mem::size_of::<Solution>();
283    }
284
285    fn mark_completed(&self) {
286        let mut progress = self.progress.write();
287        progress.is_running = false;
288    }
289}
290
291impl Iterator for SelectResults {
292    type Item = Result<Solution, OxirsError>;
293
294    fn next(&mut self) -> Option<Self::Item> {
295        match self.next() {
296            Ok(Some(solution)) => Some(Ok(solution)),
297            Ok(None) => None,
298            Err(e) => Some(Err(e)),
299        }
300    }
301}
302
303/// Async stream wrapper for SelectResults
304struct SelectResultStream {
305    results: SelectResults,
306    receiver: Option<mpsc::UnboundedReceiver<Result<Solution, OxirsError>>>,
307}
308
309impl SelectResultStream {
310    fn new(results: SelectResults) -> Self {
311        Self {
312            results,
313            receiver: None,
314        }
315    }
316}
317
318impl Stream for SelectResultStream {
319    type Item = Result<Solution, OxirsError>;
320
321    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
322        // Try non-blocking first
323        match self.results.try_next() {
324            Ok(Some(solution)) => Poll::Ready(Some(Ok(solution))),
325            Ok(None) => {
326                // Would block, need to wait
327                cx.waker().wake_by_ref();
328                Poll::Pending
329            }
330            Err(e) => Poll::Ready(Some(Err(e))),
331        }
332    }
333}
334
335/// Streaming iterator for CONSTRUCT query results
336pub struct ConstructResults {
337    receiver: channel::Receiver<Result<Triple, OxirsError>>,
338    progress: Arc<RwLock<StreamingProgress>>,
339    cancel_token: Arc<AtomicBool>,
340    config: StreamingConfig,
341}
342
343impl ConstructResults {
344    pub fn new(
345        receiver: channel::Receiver<Result<Triple, OxirsError>>,
346        config: StreamingConfig,
347    ) -> Self {
348        let progress = Arc::new(RwLock::new(StreamingProgress {
349            processed: 0,
350            estimated_total: None,
351            memory_used: 0,
352            start_time: Instant::now(),
353            is_running: true,
354        }));
355
356        Self {
357            receiver,
358            progress,
359            cancel_token: Arc::new(AtomicBool::new(false)),
360            config,
361        }
362    }
363
364    pub fn progress(&self) -> StreamingProgress {
365        self.progress.read().clone()
366    }
367
368    pub fn cancel(&self) {
369        self.cancel_token.store(true, Ordering::Relaxed);
370    }
371
372    #[allow(clippy::should_implement_trait)]
373    pub fn next(&mut self) -> Result<Option<Triple>, OxirsError> {
374        if self.cancel_token.load(Ordering::Relaxed) {
375            return Ok(None);
376        }
377
378        if let Some(timeout) = self.config.timeout {
379            match self.receiver.recv_timeout(timeout) {
380                Ok(Ok(triple)) => {
381                    self.update_progress(1);
382                    Ok(Some(triple))
383                }
384                Ok(Err(e)) => Err(e),
385                Err(channel::RecvTimeoutError::Timeout) => {
386                    Err(OxirsError::Query("Query timeout".to_string()))
387                }
388                Err(channel::RecvTimeoutError::Disconnected) => {
389                    self.mark_completed();
390                    Ok(None)
391                }
392            }
393        } else {
394            // No timeout - block indefinitely
395            match self.receiver.recv() {
396                Ok(Ok(triple)) => {
397                    self.update_progress(1);
398                    Ok(Some(triple))
399                }
400                Ok(Err(e)) => Err(e),
401                Err(channel::RecvError) => {
402                    self.mark_completed();
403                    Ok(None)
404                }
405            }
406        }
407    }
408
409    pub fn collect_batch(&mut self, max_size: usize) -> Result<Vec<Triple>, OxirsError> {
410        let mut batch = Vec::with_capacity(max_size.min(self.config.buffer_size));
411
412        for _ in 0..max_size {
413            match self.next()? {
414                Some(triple) => batch.push(triple),
415                None => break,
416            }
417        }
418
419        Ok(batch)
420    }
421
422    fn update_progress(&self, count: usize) {
423        let mut progress = self.progress.write();
424        progress.processed += count;
425        progress.memory_used = progress.processed * std::mem::size_of::<Triple>();
426    }
427
428    fn mark_completed(&self) {
429        let mut progress = self.progress.write();
430        progress.is_running = false;
431    }
432}
433
434impl Iterator for ConstructResults {
435    type Item = Result<Triple, OxirsError>;
436
437    fn next(&mut self) -> Option<Self::Item> {
438        match self.next() {
439            Ok(Some(triple)) => Some(Ok(triple)),
440            Ok(None) => None,
441            Err(e) => Some(Err(e)),
442        }
443    }
444}
445
446/// Streaming query results
447pub enum StreamingQueryResults {
448    /// SELECT query results
449    Select(SelectResults),
450    /// ASK query results
451    Ask(bool),
452    /// CONSTRUCT query results  
453    Construct(ConstructResults),
454    /// DESCRIBE query results
455    Describe(ConstructResults),
456}
457
458impl StreamingQueryResults {
459    /// Check if results are from a SELECT query
460    pub fn is_select(&self) -> bool {
461        matches!(self, Self::Select(_))
462    }
463
464    /// Check if results are from an ASK query
465    pub fn is_ask(&self) -> bool {
466        matches!(self, Self::Ask(_))
467    }
468
469    /// Check if results are from a CONSTRUCT query
470    pub fn is_construct(&self) -> bool {
471        matches!(self, Self::Construct(_))
472    }
473
474    /// Get SELECT results if applicable
475    pub fn as_select(&mut self) -> Option<&mut SelectResults> {
476        match self {
477            Self::Select(results) => Some(results),
478            _ => None,
479        }
480    }
481
482    /// Get ASK result if applicable
483    pub fn as_ask(&self) -> Option<bool> {
484        match self {
485            Self::Ask(result) => Some(*result),
486            _ => None,
487        }
488    }
489
490    /// Get CONSTRUCT results if applicable
491    pub fn as_construct(&mut self) -> Option<&mut ConstructResults> {
492        match self {
493            Self::Construct(results) => Some(results),
494            _ => None,
495        }
496    }
497
498    /// Cancel the query execution
499    pub fn cancel(&self) {
500        match self {
501            Self::Select(results) => results.cancel(),
502            Self::Construct(results) => results.cancel(),
503            Self::Describe(results) => results.cancel(),
504            Self::Ask(_) => {} // ASK queries complete immediately
505        }
506    }
507
508    /// Get progress information
509    pub fn progress(&self) -> Option<StreamingProgress> {
510        match self {
511            Self::Select(results) => Some(results.progress()),
512            Self::Construct(results) => Some(results.progress()),
513            Self::Describe(results) => Some(results.progress()),
514            Self::Ask(_) => None,
515        }
516    }
517}
518
519/// Builder for creating streaming result sets
520pub struct StreamingResultBuilder {
521    config: StreamingConfig,
522}
523
524impl Default for StreamingResultBuilder {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530impl StreamingResultBuilder {
531    pub fn new() -> Self {
532        Self {
533            config: StreamingConfig::default(),
534        }
535    }
536
537    pub fn with_buffer_size(mut self, size: usize) -> Self {
538        self.config.buffer_size = size;
539        self
540    }
541
542    pub fn with_max_memory(mut self, bytes: usize) -> Self {
543        self.config.max_memory = bytes;
544        self
545    }
546
547    pub fn with_progress_tracking(mut self, enable: bool) -> Self {
548        self.config.track_progress = enable;
549        self
550    }
551
552    pub fn with_timeout(mut self, timeout: Duration) -> Self {
553        self.config.timeout = Some(timeout);
554        self
555    }
556
557    pub fn build_select(
558        self,
559        variables: Vec<Variable>,
560    ) -> (SelectResults, channel::Sender<Result<Solution, OxirsError>>) {
561        let (tx, rx) = channel::bounded(self.config.buffer_size);
562        let results = SelectResults::new(variables, rx, self.config);
563        (results, tx)
564    }
565
566    pub fn build_construct(
567        self,
568    ) -> (
569        ConstructResults,
570        channel::Sender<Result<Triple, OxirsError>>,
571    ) {
572        let (tx, rx) = channel::bounded(self.config.buffer_size);
573        let results = ConstructResults::new(rx, self.config);
574        (results, tx)
575    }
576}
577
578#[cfg(test)]
579mod tests {
580    use super::*;
581
582    #[test]
583    fn test_solution_creation() {
584        let mut bindings = HashMap::new();
585        let var = Variable::new("x").expect("valid variable name");
586        let term = Term::NamedNode(NamedNode::new("http://example.org/test").expect("valid IRI"));
587        bindings.insert(var.clone(), Some(term.clone()));
588
589        let solution = Solution::new(bindings);
590        assert_eq!(solution.get(&var), Some(&term));
591        assert!(solution.contains(&var));
592    }
593
594    #[test]
595    fn test_streaming_select_results() {
596        let builder = StreamingResultBuilder::new().with_buffer_size(10);
597
598        let variables = vec![Variable::new("x").expect("valid variable name")];
599        let (mut results, sender) = builder.build_select(variables.clone());
600
601        // Send some solutions
602        for i in 0..5 {
603            let mut bindings = HashMap::new();
604            let term = Term::Literal(Literal::new(i.to_string()));
605            bindings.insert(variables[0].clone(), Some(term));
606            sender
607                .send(Ok(Solution::new(bindings)))
608                .expect("send should succeed");
609        }
610        drop(sender);
611
612        // Process results one by one to track progress
613        let mut collected = Vec::new();
614        while let Ok(Some(solution)) = results.next() {
615            collected.push(solution);
616        }
617
618        assert_eq!(collected.len(), 5);
619        assert_eq!(results.progress().processed, 5);
620    }
621
622    #[test]
623    fn test_batch_operations() {
624        let builder = StreamingResultBuilder::new();
625        let variables = vec![Variable::new("x").expect("valid variable name")];
626        let (mut results, sender) = builder.build_select(variables.clone());
627
628        // Send 20 solutions
629        for i in 0..20 {
630            let mut bindings = HashMap::new();
631            let term = Term::Literal(Literal::new(i.to_string()));
632            bindings.insert(variables[0].clone(), Some(term));
633            sender
634                .send(Ok(Solution::new(bindings)))
635                .expect("send should succeed");
636        }
637        drop(sender);
638
639        // Take batch of 10
640        let batch = results.next_batch(10).expect("operation should succeed");
641        assert_eq!(batch.len(), 10);
642
643        // Skip 5 using our skip method
644        results.skip_results(5).expect("operation should succeed");
645
646        // Take remaining using our take method
647        let remaining = results.take_results(10).expect("operation should succeed");
648        assert_eq!(remaining.len(), 5);
649    }
650
651    #[test]
652    fn test_cancellation() {
653        let builder = StreamingResultBuilder::new();
654        let variables = vec![Variable::new("x").expect("valid variable name")];
655        let (mut results, _sender) = builder.build_select(variables);
656
657        results.cancel();
658        assert!(results.is_cancelled());
659        assert!(results.next().expect("should have next item").is_none());
660    }
661}