rust_logic_graph/streaming/
mod.rs

1//! Streaming processing module
2//!
3//! Provides stream-based node execution with backpressure handling
4
5use crate::core::Context;
6use crate::rule::{RuleResult, RuleError};
7use async_trait::async_trait;
8use serde_json::Value;
9use tokio::sync::mpsc;
10use tokio_stream::Stream;
11use std::pin::Pin;
12
13pub mod operators;
14pub mod stream_node;
15
16pub use operators::{StreamOperator, MapOperator, FilterOperator, FoldOperator};
17pub use stream_node::StreamNode;
18
19/// Stream item type
20pub type StreamItem = Result<Value, RuleError>;
21
22/// Stream type alias
23pub type ValueStream = Pin<Box<dyn Stream<Item = StreamItem> + Send>>;
24
25/// Backpressure configuration
26#[derive(Debug, Clone)]
27pub struct BackpressureConfig {
28    /// Buffer size for bounded channels
29    pub buffer_size: usize,
30    /// Maximum concurrent operations
31    pub max_concurrent: usize,
32}
33
34impl Default for BackpressureConfig {
35    fn default() -> Self {
36        Self {
37            buffer_size: 100,
38            max_concurrent: 10,
39        }
40    }
41}
42
43/// Chunk configuration for large datasets
44#[derive(Debug, Clone)]
45pub struct ChunkConfig {
46    /// Size of each chunk
47    pub chunk_size: usize,
48    /// Overlap between chunks (for sliding windows)
49    pub overlap: usize,
50}
51
52impl Default for ChunkConfig {
53    fn default() -> Self {
54        Self {
55            chunk_size: 1000,
56            overlap: 0,
57        }
58    }
59}
60
61/// Stream processor trait
62#[async_trait]
63pub trait StreamProcessor: Send + Sync {
64    /// Process a single item from the stream
65    async fn process_item(&self, item: Value, ctx: &Context) -> RuleResult;
66
67    /// Process a chunk of items (for batch operations)
68    async fn process_chunk(&self, items: Vec<Value>, ctx: &Context) -> Result<Vec<Value>, RuleError> {
69        let mut results = Vec::with_capacity(items.len());
70        for item in items {
71            let result = self.process_item(item, ctx).await?;
72            results.push(result);
73        }
74        Ok(results)
75    }
76}
77
78/// Create a stream from a vector with backpressure
79pub fn create_stream_from_vec(
80    data: Vec<Value>,
81    config: BackpressureConfig,
82) -> ValueStream {
83    let (tx, rx) = mpsc::channel(config.buffer_size);
84
85    tokio::spawn(async move {
86        for item in data {
87            if tx.send(Ok(item)).await.is_err() {
88                break;
89            }
90        }
91    });
92
93    Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))
94}
95
96/// Create a chunked stream from a large dataset
97pub fn create_chunked_stream(
98    data: Vec<Value>,
99    config: ChunkConfig,
100) -> Pin<Box<dyn Stream<Item = Result<Vec<Value>, RuleError>> + Send>> {
101    let chunks: Vec<Vec<Value>> = data
102        .chunks(config.chunk_size)
103        .map(|chunk| chunk.to_vec())
104        .collect();
105
106    Box::pin(tokio_stream::iter(chunks.into_iter().map(Ok)))
107}
108
109/// Apply backpressure to a stream
110/// Note: This is a placeholder. For full backpressure,
111/// use StreamNode with BackpressureConfig
112pub fn apply_backpressure(
113    stream: ValueStream,
114    _config: BackpressureConfig,
115) -> ValueStream {
116    // Simply return the stream as-is
117    // Backpressure is handled by the bounded channels
118    stream
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use tokio_stream::StreamExt;
125
126    #[tokio::test]
127    async fn test_create_stream_from_vec() {
128        let data = vec![
129            Value::Number(1.into()),
130            Value::Number(2.into()),
131            Value::Number(3.into()),
132        ];
133
134        let config = BackpressureConfig::default();
135        let mut stream = create_stream_from_vec(data, config);
136
137        let mut count = 0;
138        while let Some(Ok(_)) = stream.next().await {
139            count += 1;
140        }
141
142        assert_eq!(count, 3);
143    }
144
145    #[tokio::test]
146    async fn test_chunked_stream() {
147        let data: Vec<Value> = (0..10)
148            .map(|i| Value::Number(i.into()))
149            .collect();
150
151        let config = ChunkConfig {
152            chunk_size: 3,
153            overlap: 0,
154        };
155
156        let mut stream = create_chunked_stream(data, config);
157
158        let mut chunk_count = 0;
159        while let Some(Ok(chunk)) = stream.next().await {
160            chunk_count += 1;
161            assert!(chunk.len() <= 3);
162        }
163
164        assert_eq!(chunk_count, 4); // 3, 3, 3, 1
165    }
166}