rust_logic_graph/streaming/
mod.rs1use crate::core::Context;
6use crate::rule::{RuleResult, RuleError};
7use async_trait::async_trait;
8use serde_json::Value;
9use tokio::sync::mpsc;
10use tokio_stream::Stream;
11use std::pin::Pin;
12
13pub mod operators;
14pub mod stream_node;
15
16pub use operators::{StreamOperator, MapOperator, FilterOperator, FoldOperator};
17pub use stream_node::StreamNode;
18
19pub type StreamItem = Result<Value, RuleError>;
21
22pub type ValueStream = Pin<Box<dyn Stream<Item = StreamItem> + Send>>;
24
25#[derive(Debug, Clone)]
27pub struct BackpressureConfig {
28 pub buffer_size: usize,
30 pub max_concurrent: usize,
32}
33
34impl Default for BackpressureConfig {
35 fn default() -> Self {
36 Self {
37 buffer_size: 100,
38 max_concurrent: 10,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct ChunkConfig {
46 pub chunk_size: usize,
48 pub overlap: usize,
50}
51
52impl Default for ChunkConfig {
53 fn default() -> Self {
54 Self {
55 chunk_size: 1000,
56 overlap: 0,
57 }
58 }
59}
60
61#[async_trait]
63pub trait StreamProcessor: Send + Sync {
64 async fn process_item(&self, item: Value, ctx: &Context) -> RuleResult;
66
67 async fn process_chunk(&self, items: Vec<Value>, ctx: &Context) -> Result<Vec<Value>, RuleError> {
69 let mut results = Vec::with_capacity(items.len());
70 for item in items {
71 let result = self.process_item(item, ctx).await?;
72 results.push(result);
73 }
74 Ok(results)
75 }
76}
77
78pub fn create_stream_from_vec(
80 data: Vec<Value>,
81 config: BackpressureConfig,
82) -> ValueStream {
83 let (tx, rx) = mpsc::channel(config.buffer_size);
84
85 tokio::spawn(async move {
86 for item in data {
87 if tx.send(Ok(item)).await.is_err() {
88 break;
89 }
90 }
91 });
92
93 Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))
94}
95
96pub fn create_chunked_stream(
98 data: Vec<Value>,
99 config: ChunkConfig,
100) -> Pin<Box<dyn Stream<Item = Result<Vec<Value>, RuleError>> + Send>> {
101 let chunks: Vec<Vec<Value>> = data
102 .chunks(config.chunk_size)
103 .map(|chunk| chunk.to_vec())
104 .collect();
105
106 Box::pin(tokio_stream::iter(chunks.into_iter().map(Ok)))
107}
108
109pub fn apply_backpressure(
113 stream: ValueStream,
114 _config: BackpressureConfig,
115) -> ValueStream {
116 stream
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124 use tokio_stream::StreamExt;
125
126 #[tokio::test]
127 async fn test_create_stream_from_vec() {
128 let data = vec![
129 Value::Number(1.into()),
130 Value::Number(2.into()),
131 Value::Number(3.into()),
132 ];
133
134 let config = BackpressureConfig::default();
135 let mut stream = create_stream_from_vec(data, config);
136
137 let mut count = 0;
138 while let Some(Ok(_)) = stream.next().await {
139 count += 1;
140 }
141
142 assert_eq!(count, 3);
143 }
144
145 #[tokio::test]
146 async fn test_chunked_stream() {
147 let data: Vec<Value> = (0..10)
148 .map(|i| Value::Number(i.into()))
149 .collect();
150
151 let config = ChunkConfig {
152 chunk_size: 3,
153 overlap: 0,
154 };
155
156 let mut stream = create_chunked_stream(data, config);
157
158 let mut chunk_count = 0;
159 while let Some(Ok(chunk)) = stream.next().await {
160 chunk_count += 1;
161 assert!(chunk.len() <= 3);
162 }
163
164 assert_eq!(chunk_count, 4); }
166}