#![allow(dead_code)]
#[cfg(test)]
use crate::model::{Literal, NamedNode};
use crate::model::{Term, Triple, Variable};
use crate::OxirsError;
use crossbeam::channel;
use futures::stream::Stream;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub buffer_size: usize,
pub max_memory: usize,
pub track_progress: bool,
pub backpressure_threshold: f64,
pub timeout: Option<Duration>,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
buffer_size: 1024,
max_memory: 100 * 1024 * 1024, track_progress: true,
backpressure_threshold: 0.8,
timeout: Some(Duration::from_secs(30)), }
}
}
#[derive(Debug, Clone)]
pub struct StreamingProgress {
pub processed: usize,
pub estimated_total: Option<usize>,
pub memory_used: usize,
pub start_time: Instant,
pub is_running: bool,
}
#[derive(Debug, Clone)]
pub struct Solution {
bindings: HashMap<Variable, Option<Term>>,
metadata: SolutionMetadata,
}
#[derive(Debug, Clone, Default)]
pub struct SolutionMetadata {
pub source: Option<String>,
pub confidence: Option<f64>,
pub timestamp: Option<u64>,
}
impl Solution {
pub fn new(bindings: HashMap<Variable, Option<Term>>) -> Self {
Self {
bindings,
metadata: SolutionMetadata::default(),
}
}
pub fn with_metadata(
bindings: HashMap<Variable, Option<Term>>,
metadata: SolutionMetadata,
) -> Self {
Self { bindings, metadata }
}
pub fn get(&self, var: &Variable) -> Option<&Term> {
self.bindings.get(var).and_then(|opt| opt.as_ref())
}
pub fn contains(&self, var: &Variable) -> bool {
self.bindings.contains_key(var)
}
pub fn variables(&self) -> impl Iterator<Item = &Variable> {
self.bindings.keys()
}
pub fn values(&self) -> impl Iterator<Item = &Term> {
self.bindings.values().filter_map(|opt| opt.as_ref())
}
pub fn iter(&self) -> impl Iterator<Item = (&Variable, Option<&Term>)> {
self.bindings.iter().map(|(k, v)| (k, v.as_ref()))
}
}
pub struct SelectResults {
variables: Arc<Vec<Variable>>,
receiver: channel::Receiver<Result<Solution, OxirsError>>,
progress: Arc<RwLock<StreamingProgress>>,
cancel_token: Arc<AtomicBool>,
buffer: Vec<Solution>,
config: StreamingConfig,
}
impl SelectResults {
pub fn new(
variables: Vec<Variable>,
receiver: channel::Receiver<Result<Solution, OxirsError>>,
config: StreamingConfig,
) -> Self {
let progress = Arc::new(RwLock::new(StreamingProgress {
processed: 0,
estimated_total: None,
memory_used: 0,
start_time: Instant::now(),
is_running: true,
}));
Self {
variables: Arc::new(variables),
receiver,
progress,
cancel_token: Arc::new(AtomicBool::new(false)),
buffer: Vec::with_capacity(config.buffer_size),
config,
}
}
pub fn variables(&self) -> &[Variable] {
&self.variables
}
pub fn progress(&self) -> StreamingProgress {
self.progress.read().clone()
}
pub fn cancel(&self) {
self.cancel_token.store(true, Ordering::Relaxed);
}
pub fn is_cancelled(&self) -> bool {
self.cancel_token.load(Ordering::Relaxed)
}
pub fn try_next(&mut self) -> Result<Option<Solution>, OxirsError> {
if self.is_cancelled() {
return Ok(None);
}
match self.receiver.try_recv() {
Ok(Ok(solution)) => {
self.update_progress(1);
Ok(Some(solution))
}
Ok(Err(e)) => Err(e),
Err(channel::TryRecvError::Empty) => Ok(None),
Err(channel::TryRecvError::Disconnected) => {
self.mark_completed();
Ok(None)
}
}
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<Solution>, OxirsError> {
if self.is_cancelled() {
return Ok(None);
}
if let Some(timeout) = self.config.timeout {
match self.receiver.recv_timeout(timeout) {
Ok(Ok(solution)) => {
self.update_progress(1);
Ok(Some(solution))
}
Ok(Err(e)) => Err(e),
Err(channel::RecvTimeoutError::Timeout) => {
Err(OxirsError::Query("Query timeout".to_string()))
}
Err(channel::RecvTimeoutError::Disconnected) => {
self.mark_completed();
Ok(None)
}
}
} else {
match self.receiver.recv() {
Ok(Ok(solution)) => {
self.update_progress(1);
Ok(Some(solution))
}
Ok(Err(e)) => Err(e),
Err(channel::RecvError) => {
self.mark_completed();
Ok(None)
}
}
}
}
pub fn next_batch(&mut self, max_size: usize) -> Result<Vec<Solution>, OxirsError> {
self.buffer.clear();
for _ in 0..max_size {
match self.try_next()? {
Some(solution) => self.buffer.push(solution),
None => break,
}
}
Ok(std::mem::take(&mut self.buffer))
}
pub fn skip_results(&mut self, n: usize) -> Result<(), OxirsError> {
for _ in 0..n {
if self.next()?.is_none() {
break;
}
}
Ok(())
}
pub fn take_results(&mut self, n: usize) -> Result<Vec<Solution>, OxirsError> {
let mut results = Vec::with_capacity(n.min(self.config.buffer_size));
for _ in 0..n {
match self.next()? {
Some(solution) => results.push(solution),
None => break,
}
}
Ok(results)
}
pub fn into_stream(self) -> impl Stream<Item = Result<Solution, OxirsError>> {
SelectResultStream::new(self)
}
fn update_progress(&self, count: usize) {
let mut progress = self.progress.write();
progress.processed += count;
progress.memory_used = progress.processed * std::mem::size_of::<Solution>();
}
fn mark_completed(&self) {
let mut progress = self.progress.write();
progress.is_running = false;
}
}
impl Iterator for SelectResults {
type Item = Result<Solution, OxirsError>;
fn next(&mut self) -> Option<Self::Item> {
match self.next() {
Ok(Some(solution)) => Some(Ok(solution)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
struct SelectResultStream {
results: SelectResults,
receiver: Option<mpsc::UnboundedReceiver<Result<Solution, OxirsError>>>,
}
impl SelectResultStream {
fn new(results: SelectResults) -> Self {
Self {
results,
receiver: None,
}
}
}
impl Stream for SelectResultStream {
type Item = Result<Solution, OxirsError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.results.try_next() {
Ok(Some(solution)) => Poll::Ready(Some(Ok(solution))),
Ok(None) => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
pub struct ConstructResults {
receiver: channel::Receiver<Result<Triple, OxirsError>>,
progress: Arc<RwLock<StreamingProgress>>,
cancel_token: Arc<AtomicBool>,
config: StreamingConfig,
}
impl ConstructResults {
pub fn new(
receiver: channel::Receiver<Result<Triple, OxirsError>>,
config: StreamingConfig,
) -> Self {
let progress = Arc::new(RwLock::new(StreamingProgress {
processed: 0,
estimated_total: None,
memory_used: 0,
start_time: Instant::now(),
is_running: true,
}));
Self {
receiver,
progress,
cancel_token: Arc::new(AtomicBool::new(false)),
config,
}
}
pub fn progress(&self) -> StreamingProgress {
self.progress.read().clone()
}
pub fn cancel(&self) {
self.cancel_token.store(true, Ordering::Relaxed);
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<Triple>, OxirsError> {
if self.cancel_token.load(Ordering::Relaxed) {
return Ok(None);
}
if let Some(timeout) = self.config.timeout {
match self.receiver.recv_timeout(timeout) {
Ok(Ok(triple)) => {
self.update_progress(1);
Ok(Some(triple))
}
Ok(Err(e)) => Err(e),
Err(channel::RecvTimeoutError::Timeout) => {
Err(OxirsError::Query("Query timeout".to_string()))
}
Err(channel::RecvTimeoutError::Disconnected) => {
self.mark_completed();
Ok(None)
}
}
} else {
match self.receiver.recv() {
Ok(Ok(triple)) => {
self.update_progress(1);
Ok(Some(triple))
}
Ok(Err(e)) => Err(e),
Err(channel::RecvError) => {
self.mark_completed();
Ok(None)
}
}
}
}
pub fn collect_batch(&mut self, max_size: usize) -> Result<Vec<Triple>, OxirsError> {
let mut batch = Vec::with_capacity(max_size.min(self.config.buffer_size));
for _ in 0..max_size {
match self.next()? {
Some(triple) => batch.push(triple),
None => break,
}
}
Ok(batch)
}
fn update_progress(&self, count: usize) {
let mut progress = self.progress.write();
progress.processed += count;
progress.memory_used = progress.processed * std::mem::size_of::<Triple>();
}
fn mark_completed(&self) {
let mut progress = self.progress.write();
progress.is_running = false;
}
}
impl Iterator for ConstructResults {
type Item = Result<Triple, OxirsError>;
fn next(&mut self) -> Option<Self::Item> {
match self.next() {
Ok(Some(triple)) => Some(Ok(triple)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
pub enum StreamingQueryResults {
Select(SelectResults),
Ask(bool),
Construct(ConstructResults),
Describe(ConstructResults),
}
impl StreamingQueryResults {
pub fn is_select(&self) -> bool {
matches!(self, Self::Select(_))
}
pub fn is_ask(&self) -> bool {
matches!(self, Self::Ask(_))
}
pub fn is_construct(&self) -> bool {
matches!(self, Self::Construct(_))
}
pub fn as_select(&mut self) -> Option<&mut SelectResults> {
match self {
Self::Select(results) => Some(results),
_ => None,
}
}
pub fn as_ask(&self) -> Option<bool> {
match self {
Self::Ask(result) => Some(*result),
_ => None,
}
}
pub fn as_construct(&mut self) -> Option<&mut ConstructResults> {
match self {
Self::Construct(results) => Some(results),
_ => None,
}
}
pub fn cancel(&self) {
match self {
Self::Select(results) => results.cancel(),
Self::Construct(results) => results.cancel(),
Self::Describe(results) => results.cancel(),
Self::Ask(_) => {} }
}
pub fn progress(&self) -> Option<StreamingProgress> {
match self {
Self::Select(results) => Some(results.progress()),
Self::Construct(results) => Some(results.progress()),
Self::Describe(results) => Some(results.progress()),
Self::Ask(_) => None,
}
}
}
pub struct StreamingResultBuilder {
config: StreamingConfig,
}
impl Default for StreamingResultBuilder {
fn default() -> Self {
Self::new()
}
}
impl StreamingResultBuilder {
pub fn new() -> Self {
Self {
config: StreamingConfig::default(),
}
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.config.buffer_size = size;
self
}
pub fn with_max_memory(mut self, bytes: usize) -> Self {
self.config.max_memory = bytes;
self
}
pub fn with_progress_tracking(mut self, enable: bool) -> Self {
self.config.track_progress = enable;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = Some(timeout);
self
}
pub fn build_select(
self,
variables: Vec<Variable>,
) -> (SelectResults, channel::Sender<Result<Solution, OxirsError>>) {
let (tx, rx) = channel::bounded(self.config.buffer_size);
let results = SelectResults::new(variables, rx, self.config);
(results, tx)
}
pub fn build_construct(
self,
) -> (
ConstructResults,
channel::Sender<Result<Triple, OxirsError>>,
) {
let (tx, rx) = channel::bounded(self.config.buffer_size);
let results = ConstructResults::new(rx, self.config);
(results, tx)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_solution_creation() {
let mut bindings = HashMap::new();
let var = Variable::new("x").expect("valid variable name");
let term = Term::NamedNode(NamedNode::new("http://example.org/test").expect("valid IRI"));
bindings.insert(var.clone(), Some(term.clone()));
let solution = Solution::new(bindings);
assert_eq!(solution.get(&var), Some(&term));
assert!(solution.contains(&var));
}
#[test]
fn test_streaming_select_results() {
let builder = StreamingResultBuilder::new().with_buffer_size(10);
let variables = vec![Variable::new("x").expect("valid variable name")];
let (mut results, sender) = builder.build_select(variables.clone());
for i in 0..5 {
let mut bindings = HashMap::new();
let term = Term::Literal(Literal::new(i.to_string()));
bindings.insert(variables[0].clone(), Some(term));
sender
.send(Ok(Solution::new(bindings)))
.expect("send should succeed");
}
drop(sender);
let mut collected = Vec::new();
while let Ok(Some(solution)) = results.next() {
collected.push(solution);
}
assert_eq!(collected.len(), 5);
assert_eq!(results.progress().processed, 5);
}
#[test]
fn test_batch_operations() {
let builder = StreamingResultBuilder::new();
let variables = vec![Variable::new("x").expect("valid variable name")];
let (mut results, sender) = builder.build_select(variables.clone());
for i in 0..20 {
let mut bindings = HashMap::new();
let term = Term::Literal(Literal::new(i.to_string()));
bindings.insert(variables[0].clone(), Some(term));
sender
.send(Ok(Solution::new(bindings)))
.expect("send should succeed");
}
drop(sender);
let batch = results.next_batch(10).expect("operation should succeed");
assert_eq!(batch.len(), 10);
results.skip_results(5).expect("operation should succeed");
let remaining = results.take_results(10).expect("operation should succeed");
assert_eq!(remaining.len(), 5);
}
#[test]
fn test_cancellation() {
let builder = StreamingResultBuilder::new();
let variables = vec![Variable::new("x").expect("valid variable name")];
let (mut results, _sender) = builder.build_select(variables);
results.cancel();
assert!(results.is_cancelled());
assert!(results.next().expect("should have next item").is_none());
}
}