rust_logic_graph/streaming/
mod.rs

1//! Streaming processing module
2//!
3//! Provides stream-based node execution with backpressure handling
4
5use crate::core::Context;
6use crate::rule::{RuleError, RuleResult};
7use async_trait::async_trait;
8use serde_json::Value;
9use std::pin::Pin;
10use tokio::sync::mpsc;
11use tokio_stream::Stream;
12
13pub mod operators;
14pub mod stream_node;
15
16pub use operators::{FilterOperator, FoldOperator, MapOperator, StreamOperator};
17pub use stream_node::StreamNode;
18
19/// Stream item type
20pub type StreamItem = Result<Value, RuleError>;
21
22/// Stream type alias
23pub type ValueStream = Pin<Box<dyn Stream<Item = StreamItem> + Send>>;
24
25/// Backpressure configuration
26#[derive(Debug, Clone)]
27pub struct BackpressureConfig {
28    /// Buffer size for bounded channels
29    pub buffer_size: usize,
30    /// Maximum concurrent operations
31    pub max_concurrent: usize,
32}
33
34impl Default for BackpressureConfig {
35    fn default() -> Self {
36        Self {
37            buffer_size: 100,
38            max_concurrent: 10,
39        }
40    }
41}
42
43/// Chunk configuration for large datasets
44#[derive(Debug, Clone)]
45pub struct ChunkConfig {
46    /// Size of each chunk
47    pub chunk_size: usize,
48    /// Overlap between chunks (for sliding windows)
49    pub overlap: usize,
50}
51
52impl Default for ChunkConfig {
53    fn default() -> Self {
54        Self {
55            chunk_size: 1000,
56            overlap: 0,
57        }
58    }
59}
60
61/// Stream processor trait
62#[async_trait]
63pub trait StreamProcessor: Send + Sync {
64    /// Process a single item from the stream
65    async fn process_item(&self, item: Value, ctx: &Context) -> RuleResult;
66
67    /// Process a chunk of items (for batch operations)
68    async fn process_chunk(
69        &self,
70        items: Vec<Value>,
71        ctx: &Context,
72    ) -> Result<Vec<Value>, RuleError> {
73        let mut results = Vec::with_capacity(items.len());
74        for item in items {
75            let result = self.process_item(item, ctx).await?;
76            results.push(result);
77        }
78        Ok(results)
79    }
80}
81
82/// Create a stream from a vector with backpressure
83pub fn create_stream_from_vec(data: Vec<Value>, config: BackpressureConfig) -> ValueStream {
84    let (tx, rx) = mpsc::channel(config.buffer_size);
85
86    tokio::spawn(async move {
87        for item in data {
88            if tx.send(Ok(item)).await.is_err() {
89                break;
90            }
91        }
92    });
93
94    Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))
95}
96
97/// Create a chunked stream from a large dataset
98pub fn create_chunked_stream(
99    data: Vec<Value>,
100    config: ChunkConfig,
101) -> Pin<Box<dyn Stream<Item = Result<Vec<Value>, RuleError>> + Send>> {
102    let chunks: Vec<Vec<Value>> = data
103        .chunks(config.chunk_size)
104        .map(|chunk| chunk.to_vec())
105        .collect();
106
107    Box::pin(tokio_stream::iter(chunks.into_iter().map(Ok)))
108}
109
110/// Apply backpressure to a stream
111/// Note: This is a placeholder. For full backpressure,
112/// use StreamNode with BackpressureConfig
113pub fn apply_backpressure(stream: ValueStream, _config: BackpressureConfig) -> ValueStream {
114    // Simply return the stream as-is
115    // Backpressure is handled by the bounded channels
116    stream
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use tokio_stream::StreamExt;
123
124    #[tokio::test]
125    async fn test_create_stream_from_vec() {
126        let data = vec![
127            Value::Number(1.into()),
128            Value::Number(2.into()),
129            Value::Number(3.into()),
130        ];
131
132        let config = BackpressureConfig::default();
133        let mut stream = create_stream_from_vec(data, config);
134
135        let mut count = 0;
136        while let Some(Ok(_)) = stream.next().await {
137            count += 1;
138        }
139
140        assert_eq!(count, 3);
141    }
142
143    #[tokio::test]
144    async fn test_chunked_stream() {
145        let data: Vec<Value> = (0..10).map(|i| Value::Number(i.into())).collect();
146
147        let config = ChunkConfig {
148            chunk_size: 3,
149            overlap: 0,
150        };
151
152        let mut stream = create_chunked_stream(data, config);
153
154        let mut chunk_count = 0;
155        while let Some(Ok(chunk)) = stream.next().await {
156            chunk_count += 1;
157            assert!(chunk.len() <= 3);
158        }
159
160        assert_eq!(chunk_count, 4); // 3, 3, 3, 1
161    }
162}