rust_logic_graph/streaming/
mod.rs1use crate::core::Context;
6use crate::rule::{RuleError, RuleResult};
7use async_trait::async_trait;
8use serde_json::Value;
9use std::pin::Pin;
10use tokio::sync::mpsc;
11use tokio_stream::Stream;
12
13pub mod operators;
14pub mod stream_node;
15
16pub use operators::{FilterOperator, FoldOperator, MapOperator, StreamOperator};
17pub use stream_node::StreamNode;
18
19pub type StreamItem = Result<Value, RuleError>;
21
22pub type ValueStream = Pin<Box<dyn Stream<Item = StreamItem> + Send>>;
24
25#[derive(Debug, Clone)]
27pub struct BackpressureConfig {
28 pub buffer_size: usize,
30 pub max_concurrent: usize,
32}
33
34impl Default for BackpressureConfig {
35 fn default() -> Self {
36 Self {
37 buffer_size: 100,
38 max_concurrent: 10,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct ChunkConfig {
46 pub chunk_size: usize,
48 pub overlap: usize,
50}
51
52impl Default for ChunkConfig {
53 fn default() -> Self {
54 Self {
55 chunk_size: 1000,
56 overlap: 0,
57 }
58 }
59}
60
61#[async_trait]
63pub trait StreamProcessor: Send + Sync {
64 async fn process_item(&self, item: Value, ctx: &Context) -> RuleResult;
66
67 async fn process_chunk(
69 &self,
70 items: Vec<Value>,
71 ctx: &Context,
72 ) -> Result<Vec<Value>, RuleError> {
73 let mut results = Vec::with_capacity(items.len());
74 for item in items {
75 let result = self.process_item(item, ctx).await?;
76 results.push(result);
77 }
78 Ok(results)
79 }
80}
81
82pub fn create_stream_from_vec(data: Vec<Value>, config: BackpressureConfig) -> ValueStream {
84 let (tx, rx) = mpsc::channel(config.buffer_size);
85
86 tokio::spawn(async move {
87 for item in data {
88 if tx.send(Ok(item)).await.is_err() {
89 break;
90 }
91 }
92 });
93
94 Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))
95}
96
97pub fn create_chunked_stream(
99 data: Vec<Value>,
100 config: ChunkConfig,
101) -> Pin<Box<dyn Stream<Item = Result<Vec<Value>, RuleError>> + Send>> {
102 let chunks: Vec<Vec<Value>> = data
103 .chunks(config.chunk_size)
104 .map(|chunk| chunk.to_vec())
105 .collect();
106
107 Box::pin(tokio_stream::iter(chunks.into_iter().map(Ok)))
108}
109
110pub fn apply_backpressure(stream: ValueStream, _config: BackpressureConfig) -> ValueStream {
114 stream
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use tokio_stream::StreamExt;
123
124 #[tokio::test]
125 async fn test_create_stream_from_vec() {
126 let data = vec![
127 Value::Number(1.into()),
128 Value::Number(2.into()),
129 Value::Number(3.into()),
130 ];
131
132 let config = BackpressureConfig::default();
133 let mut stream = create_stream_from_vec(data, config);
134
135 let mut count = 0;
136 while let Some(Ok(_)) = stream.next().await {
137 count += 1;
138 }
139
140 assert_eq!(count, 3);
141 }
142
143 #[tokio::test]
144 async fn test_chunked_stream() {
145 let data: Vec<Value> = (0..10).map(|i| Value::Number(i.into())).collect();
146
147 let config = ChunkConfig {
148 chunk_size: 3,
149 overlap: 0,
150 };
151
152 let mut stream = create_chunked_stream(data, config);
153
154 let mut chunk_count = 0;
155 while let Some(Ok(chunk)) = stream.next().await {
156 chunk_count += 1;
157 assert!(chunk.len() <= 3);
158 }
159
160 assert_eq!(chunk_count, 4); }
162}