1#![allow(dead_code)]
4
5#[cfg(test)]
6use crate::model::{Literal, NamedNode};
7use crate::model::{Term, Triple, Variable};
8use crate::OxirsError;
9use crossbeam::channel;
10use futures::stream::Stream;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17use std::time::{Duration, Instant};
18use tokio::sync::mpsc;
19
20#[derive(Debug, Clone)]
22pub struct StreamingConfig {
23 pub buffer_size: usize,
25 pub max_memory: usize,
27 pub track_progress: bool,
29 pub backpressure_threshold: f64,
31 pub timeout: Option<Duration>,
33}
34
35impl Default for StreamingConfig {
36 fn default() -> Self {
37 Self {
38 buffer_size: 1024,
39 max_memory: 100 * 1024 * 1024, track_progress: true,
41 backpressure_threshold: 0.8,
42 timeout: Some(Duration::from_secs(30)), }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct StreamingProgress {
50 pub processed: usize,
52 pub estimated_total: Option<usize>,
54 pub memory_used: usize,
56 pub start_time: Instant,
58 pub is_running: bool,
60}
61
62#[derive(Debug, Clone)]
64pub struct Solution {
65 bindings: HashMap<Variable, Option<Term>>,
67 metadata: SolutionMetadata,
69}
70
71#[derive(Debug, Clone, Default)]
72pub struct SolutionMetadata {
73 pub source: Option<String>,
75 pub confidence: Option<f64>,
77 pub timestamp: Option<u64>,
79}
80
81impl Solution {
82 pub fn new(bindings: HashMap<Variable, Option<Term>>) -> Self {
83 Self {
84 bindings,
85 metadata: SolutionMetadata::default(),
86 }
87 }
88
89 pub fn with_metadata(
90 bindings: HashMap<Variable, Option<Term>>,
91 metadata: SolutionMetadata,
92 ) -> Self {
93 Self { bindings, metadata }
94 }
95
96 pub fn get(&self, var: &Variable) -> Option<&Term> {
97 self.bindings.get(var).and_then(|opt| opt.as_ref())
98 }
99
100 pub fn contains(&self, var: &Variable) -> bool {
101 self.bindings.contains_key(var)
102 }
103
104 pub fn variables(&self) -> impl Iterator<Item = &Variable> {
105 self.bindings.keys()
106 }
107
108 pub fn values(&self) -> impl Iterator<Item = &Term> {
109 self.bindings.values().filter_map(|opt| opt.as_ref())
110 }
111
112 pub fn iter(&self) -> impl Iterator<Item = (&Variable, Option<&Term>)> {
113 self.bindings.iter().map(|(k, v)| (k, v.as_ref()))
114 }
115}
116
117pub struct SelectResults {
119 variables: Arc<Vec<Variable>>,
121 receiver: channel::Receiver<Result<Solution, OxirsError>>,
123 progress: Arc<RwLock<StreamingProgress>>,
125 cancel_token: Arc<AtomicBool>,
127 buffer: Vec<Solution>,
129 config: StreamingConfig,
131}
132
133impl SelectResults {
134 pub fn new(
135 variables: Vec<Variable>,
136 receiver: channel::Receiver<Result<Solution, OxirsError>>,
137 config: StreamingConfig,
138 ) -> Self {
139 let progress = Arc::new(RwLock::new(StreamingProgress {
140 processed: 0,
141 estimated_total: None,
142 memory_used: 0,
143 start_time: Instant::now(),
144 is_running: true,
145 }));
146
147 Self {
148 variables: Arc::new(variables),
149 receiver,
150 progress,
151 cancel_token: Arc::new(AtomicBool::new(false)),
152 buffer: Vec::with_capacity(config.buffer_size),
153 config,
154 }
155 }
156
157 pub fn variables(&self) -> &[Variable] {
159 &self.variables
160 }
161
162 pub fn progress(&self) -> StreamingProgress {
164 self.progress.read().clone()
165 }
166
167 pub fn cancel(&self) {
169 self.cancel_token.store(true, Ordering::Relaxed);
170 }
171
172 pub fn is_cancelled(&self) -> bool {
174 self.cancel_token.load(Ordering::Relaxed)
175 }
176
177 pub fn try_next(&mut self) -> Result<Option<Solution>, OxirsError> {
179 if self.is_cancelled() {
180 return Ok(None);
181 }
182
183 match self.receiver.try_recv() {
184 Ok(Ok(solution)) => {
185 self.update_progress(1);
186 Ok(Some(solution))
187 }
188 Ok(Err(e)) => Err(e),
189 Err(channel::TryRecvError::Empty) => Ok(None),
190 Err(channel::TryRecvError::Disconnected) => {
191 self.mark_completed();
192 Ok(None)
193 }
194 }
195 }
196
197 #[allow(clippy::should_implement_trait)]
199 pub fn next(&mut self) -> Result<Option<Solution>, OxirsError> {
200 if self.is_cancelled() {
201 return Ok(None);
202 }
203
204 if let Some(timeout) = self.config.timeout {
205 match self.receiver.recv_timeout(timeout) {
206 Ok(Ok(solution)) => {
207 self.update_progress(1);
208 Ok(Some(solution))
209 }
210 Ok(Err(e)) => Err(e),
211 Err(channel::RecvTimeoutError::Timeout) => {
212 Err(OxirsError::Query("Query timeout".to_string()))
213 }
214 Err(channel::RecvTimeoutError::Disconnected) => {
215 self.mark_completed();
216 Ok(None)
217 }
218 }
219 } else {
220 match self.receiver.recv() {
222 Ok(Ok(solution)) => {
223 self.update_progress(1);
224 Ok(Some(solution))
225 }
226 Ok(Err(e)) => Err(e),
227 Err(channel::RecvError) => {
228 self.mark_completed();
229 Ok(None)
230 }
231 }
232 }
233 }
234
235 pub fn next_batch(&mut self, max_size: usize) -> Result<Vec<Solution>, OxirsError> {
237 self.buffer.clear();
238
239 for _ in 0..max_size {
240 match self.try_next()? {
241 Some(solution) => self.buffer.push(solution),
242 None => break,
243 }
244 }
245
246 Ok(std::mem::take(&mut self.buffer))
247 }
248
249 pub fn skip_results(&mut self, n: usize) -> Result<(), OxirsError> {
251 for _ in 0..n {
252 if self.next()?.is_none() {
253 break;
254 }
255 }
256 Ok(())
257 }
258
259 pub fn take_results(&mut self, n: usize) -> Result<Vec<Solution>, OxirsError> {
261 let mut results = Vec::with_capacity(n.min(self.config.buffer_size));
262
263 for _ in 0..n {
264 match self.next()? {
265 Some(solution) => results.push(solution),
266 None => break,
267 }
268 }
269
270 Ok(results)
271 }
272
273 pub fn into_stream(self) -> impl Stream<Item = Result<Solution, OxirsError>> {
275 SelectResultStream::new(self)
276 }
277
278 fn update_progress(&self, count: usize) {
279 let mut progress = self.progress.write();
280 progress.processed += count;
281 progress.memory_used = progress.processed * std::mem::size_of::<Solution>();
283 }
284
285 fn mark_completed(&self) {
286 let mut progress = self.progress.write();
287 progress.is_running = false;
288 }
289}
290
291impl Iterator for SelectResults {
292 type Item = Result<Solution, OxirsError>;
293
294 fn next(&mut self) -> Option<Self::Item> {
295 match self.next() {
296 Ok(Some(solution)) => Some(Ok(solution)),
297 Ok(None) => None,
298 Err(e) => Some(Err(e)),
299 }
300 }
301}
302
303struct SelectResultStream {
305 results: SelectResults,
306 receiver: Option<mpsc::UnboundedReceiver<Result<Solution, OxirsError>>>,
307}
308
309impl SelectResultStream {
310 fn new(results: SelectResults) -> Self {
311 Self {
312 results,
313 receiver: None,
314 }
315 }
316}
317
318impl Stream for SelectResultStream {
319 type Item = Result<Solution, OxirsError>;
320
321 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
322 match self.results.try_next() {
324 Ok(Some(solution)) => Poll::Ready(Some(Ok(solution))),
325 Ok(None) => {
326 cx.waker().wake_by_ref();
328 Poll::Pending
329 }
330 Err(e) => Poll::Ready(Some(Err(e))),
331 }
332 }
333}
334
335pub struct ConstructResults {
337 receiver: channel::Receiver<Result<Triple, OxirsError>>,
338 progress: Arc<RwLock<StreamingProgress>>,
339 cancel_token: Arc<AtomicBool>,
340 config: StreamingConfig,
341}
342
343impl ConstructResults {
344 pub fn new(
345 receiver: channel::Receiver<Result<Triple, OxirsError>>,
346 config: StreamingConfig,
347 ) -> Self {
348 let progress = Arc::new(RwLock::new(StreamingProgress {
349 processed: 0,
350 estimated_total: None,
351 memory_used: 0,
352 start_time: Instant::now(),
353 is_running: true,
354 }));
355
356 Self {
357 receiver,
358 progress,
359 cancel_token: Arc::new(AtomicBool::new(false)),
360 config,
361 }
362 }
363
364 pub fn progress(&self) -> StreamingProgress {
365 self.progress.read().clone()
366 }
367
368 pub fn cancel(&self) {
369 self.cancel_token.store(true, Ordering::Relaxed);
370 }
371
372 #[allow(clippy::should_implement_trait)]
373 pub fn next(&mut self) -> Result<Option<Triple>, OxirsError> {
374 if self.cancel_token.load(Ordering::Relaxed) {
375 return Ok(None);
376 }
377
378 if let Some(timeout) = self.config.timeout {
379 match self.receiver.recv_timeout(timeout) {
380 Ok(Ok(triple)) => {
381 self.update_progress(1);
382 Ok(Some(triple))
383 }
384 Ok(Err(e)) => Err(e),
385 Err(channel::RecvTimeoutError::Timeout) => {
386 Err(OxirsError::Query("Query timeout".to_string()))
387 }
388 Err(channel::RecvTimeoutError::Disconnected) => {
389 self.mark_completed();
390 Ok(None)
391 }
392 }
393 } else {
394 match self.receiver.recv() {
396 Ok(Ok(triple)) => {
397 self.update_progress(1);
398 Ok(Some(triple))
399 }
400 Ok(Err(e)) => Err(e),
401 Err(channel::RecvError) => {
402 self.mark_completed();
403 Ok(None)
404 }
405 }
406 }
407 }
408
409 pub fn collect_batch(&mut self, max_size: usize) -> Result<Vec<Triple>, OxirsError> {
410 let mut batch = Vec::with_capacity(max_size.min(self.config.buffer_size));
411
412 for _ in 0..max_size {
413 match self.next()? {
414 Some(triple) => batch.push(triple),
415 None => break,
416 }
417 }
418
419 Ok(batch)
420 }
421
422 fn update_progress(&self, count: usize) {
423 let mut progress = self.progress.write();
424 progress.processed += count;
425 progress.memory_used = progress.processed * std::mem::size_of::<Triple>();
426 }
427
428 fn mark_completed(&self) {
429 let mut progress = self.progress.write();
430 progress.is_running = false;
431 }
432}
433
434impl Iterator for ConstructResults {
435 type Item = Result<Triple, OxirsError>;
436
437 fn next(&mut self) -> Option<Self::Item> {
438 match self.next() {
439 Ok(Some(triple)) => Some(Ok(triple)),
440 Ok(None) => None,
441 Err(e) => Some(Err(e)),
442 }
443 }
444}
445
446pub enum StreamingQueryResults {
448 Select(SelectResults),
450 Ask(bool),
452 Construct(ConstructResults),
454 Describe(ConstructResults),
456}
457
458impl StreamingQueryResults {
459 pub fn is_select(&self) -> bool {
461 matches!(self, Self::Select(_))
462 }
463
464 pub fn is_ask(&self) -> bool {
466 matches!(self, Self::Ask(_))
467 }
468
469 pub fn is_construct(&self) -> bool {
471 matches!(self, Self::Construct(_))
472 }
473
474 pub fn as_select(&mut self) -> Option<&mut SelectResults> {
476 match self {
477 Self::Select(results) => Some(results),
478 _ => None,
479 }
480 }
481
482 pub fn as_ask(&self) -> Option<bool> {
484 match self {
485 Self::Ask(result) => Some(*result),
486 _ => None,
487 }
488 }
489
490 pub fn as_construct(&mut self) -> Option<&mut ConstructResults> {
492 match self {
493 Self::Construct(results) => Some(results),
494 _ => None,
495 }
496 }
497
498 pub fn cancel(&self) {
500 match self {
501 Self::Select(results) => results.cancel(),
502 Self::Construct(results) => results.cancel(),
503 Self::Describe(results) => results.cancel(),
504 Self::Ask(_) => {} }
506 }
507
508 pub fn progress(&self) -> Option<StreamingProgress> {
510 match self {
511 Self::Select(results) => Some(results.progress()),
512 Self::Construct(results) => Some(results.progress()),
513 Self::Describe(results) => Some(results.progress()),
514 Self::Ask(_) => None,
515 }
516 }
517}
518
519pub struct StreamingResultBuilder {
521 config: StreamingConfig,
522}
523
524impl Default for StreamingResultBuilder {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530impl StreamingResultBuilder {
531 pub fn new() -> Self {
532 Self {
533 config: StreamingConfig::default(),
534 }
535 }
536
537 pub fn with_buffer_size(mut self, size: usize) -> Self {
538 self.config.buffer_size = size;
539 self
540 }
541
542 pub fn with_max_memory(mut self, bytes: usize) -> Self {
543 self.config.max_memory = bytes;
544 self
545 }
546
547 pub fn with_progress_tracking(mut self, enable: bool) -> Self {
548 self.config.track_progress = enable;
549 self
550 }
551
552 pub fn with_timeout(mut self, timeout: Duration) -> Self {
553 self.config.timeout = Some(timeout);
554 self
555 }
556
557 pub fn build_select(
558 self,
559 variables: Vec<Variable>,
560 ) -> (SelectResults, channel::Sender<Result<Solution, OxirsError>>) {
561 let (tx, rx) = channel::bounded(self.config.buffer_size);
562 let results = SelectResults::new(variables, rx, self.config);
563 (results, tx)
564 }
565
566 pub fn build_construct(
567 self,
568 ) -> (
569 ConstructResults,
570 channel::Sender<Result<Triple, OxirsError>>,
571 ) {
572 let (tx, rx) = channel::bounded(self.config.buffer_size);
573 let results = ConstructResults::new(rx, self.config);
574 (results, tx)
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581
582 #[test]
583 fn test_solution_creation() {
584 let mut bindings = HashMap::new();
585 let var = Variable::new("x").expect("valid variable name");
586 let term = Term::NamedNode(NamedNode::new("http://example.org/test").expect("valid IRI"));
587 bindings.insert(var.clone(), Some(term.clone()));
588
589 let solution = Solution::new(bindings);
590 assert_eq!(solution.get(&var), Some(&term));
591 assert!(solution.contains(&var));
592 }
593
594 #[test]
595 fn test_streaming_select_results() {
596 let builder = StreamingResultBuilder::new().with_buffer_size(10);
597
598 let variables = vec![Variable::new("x").expect("valid variable name")];
599 let (mut results, sender) = builder.build_select(variables.clone());
600
601 for i in 0..5 {
603 let mut bindings = HashMap::new();
604 let term = Term::Literal(Literal::new(i.to_string()));
605 bindings.insert(variables[0].clone(), Some(term));
606 sender
607 .send(Ok(Solution::new(bindings)))
608 .expect("send should succeed");
609 }
610 drop(sender);
611
612 let mut collected = Vec::new();
614 while let Ok(Some(solution)) = results.next() {
615 collected.push(solution);
616 }
617
618 assert_eq!(collected.len(), 5);
619 assert_eq!(results.progress().processed, 5);
620 }
621
622 #[test]
623 fn test_batch_operations() {
624 let builder = StreamingResultBuilder::new();
625 let variables = vec![Variable::new("x").expect("valid variable name")];
626 let (mut results, sender) = builder.build_select(variables.clone());
627
628 for i in 0..20 {
630 let mut bindings = HashMap::new();
631 let term = Term::Literal(Literal::new(i.to_string()));
632 bindings.insert(variables[0].clone(), Some(term));
633 sender
634 .send(Ok(Solution::new(bindings)))
635 .expect("send should succeed");
636 }
637 drop(sender);
638
639 let batch = results.next_batch(10).expect("operation should succeed");
641 assert_eq!(batch.len(), 10);
642
643 results.skip_results(5).expect("operation should succeed");
645
646 let remaining = results.take_results(10).expect("operation should succeed");
648 assert_eq!(remaining.len(), 5);
649 }
650
651 #[test]
652 fn test_cancellation() {
653 let builder = StreamingResultBuilder::new();
654 let variables = vec![Variable::new("x").expect("valid variable name")];
655 let (mut results, _sender) = builder.build_select(variables);
656
657 results.cancel();
658 assert!(results.is_cancelled());
659 assert!(results.next().expect("should have next item").is_none());
660 }
661}