ruvector_graph_node/
streaming.rs

1//! Streaming query results using AsyncIterator pattern
2
3use crate::types::*;
4use futures::stream::Stream;
5use napi::bindgen_prelude::*;
6use napi_derive::napi;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10/// Streaming query result iterator
11#[napi]
12pub struct QueryResultStream {
13    inner: Pin<Box<dyn Stream<Item = JsQueryResult> + Send>>,
14}
15
16impl QueryResultStream {
17    /// Create a new query result stream
18    pub fn new(stream: Pin<Box<dyn Stream<Item = JsQueryResult> + Send>>) -> Self {
19        Self { inner: stream }
20    }
21}
22
23#[napi]
24impl QueryResultStream {
25    /// Get the next result from the stream
26    ///
27    /// # Example
28    /// ```javascript
29    /// const stream = await db.queryStream('MATCH (n) RETURN n');
30    /// while (true) {
31    ///   const result = await stream.next();
32    ///   if (!result) break;
33    ///   console.log(result);
34    /// }
35    /// ```
36    #[napi]
37    pub fn next(&mut self) -> Result<Option<JsQueryResult>> {
38        // This would poll the stream in a real implementation
39        Ok(None)
40    }
41}
42
43/// Streaming hyperedge result iterator
44#[napi]
45pub struct HyperedgeStream {
46    results: Vec<JsHyperedgeResult>,
47    index: usize,
48}
49
50impl HyperedgeStream {
51    /// Create a new hyperedge stream
52    pub fn new(results: Vec<JsHyperedgeResult>) -> Self {
53        Self { results, index: 0 }
54    }
55}
56
57#[napi]
58impl HyperedgeStream {
59    /// Get the next hyperedge result
60    ///
61    /// # Example
62    /// ```javascript
63    /// const stream = await db.searchHyperedgesStream(query);
64    /// for await (const result of stream) {
65    ///   console.log(result);
66    /// }
67    /// ```
68    #[napi]
69    pub fn next(&mut self) -> Result<Option<JsHyperedgeResult>> {
70        if self.index < self.results.len() {
71            let result = self.results[self.index].clone();
72            self.index += 1;
73            Ok(Some(result))
74        } else {
75            Ok(None)
76        }
77    }
78
79    /// Collect all remaining results
80    #[napi]
81    pub fn collect(&mut self) -> Vec<JsHyperedgeResult> {
82        let remaining = self.results[self.index..].to_vec();
83        self.index = self.results.len();
84        remaining
85    }
86}
87
88/// Node stream iterator
89#[napi]
90pub struct NodeStream {
91    nodes: Vec<JsNode>,
92    index: usize,
93}
94
95impl NodeStream {
96    /// Create a new node stream
97    pub fn new(nodes: Vec<JsNode>) -> Self {
98        Self { nodes, index: 0 }
99    }
100}
101
102#[napi]
103impl NodeStream {
104    /// Get the next node
105    #[napi]
106    pub fn next(&mut self) -> Result<Option<JsNode>> {
107        if self.index < self.nodes.len() {
108            let node = self.nodes[self.index].clone();
109            self.index += 1;
110            Ok(Some(node))
111        } else {
112            Ok(None)
113        }
114    }
115
116    /// Collect all remaining nodes
117    #[napi]
118    pub fn collect(&mut self) -> Vec<JsNode> {
119        let remaining = self.nodes[self.index..].to_vec();
120        self.index = self.nodes.len();
121        remaining
122    }
123}