1use anyhow::Result;
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8use tokio::sync::broadcast;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DataChunk {
13 pub sequence: usize,
14 pub data: Vec<Vec<String>>,
15 pub metadata: ChunkMetadata,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ChunkMetadata {
21 pub timestamp: String,
22 pub source: Option<String>,
23 pub row_count: usize,
24 pub column_count: usize,
25}
26
27pub trait StreamingDataReader: Send + Sync {
29 fn read_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>>;
30 fn has_more(&self) -> bool;
31 fn reset(&mut self) -> Result<()>;
32}
33
34pub trait StreamingDataWriter: Send + Sync {
36 fn write_chunk(&mut self, chunk: &DataChunk) -> Result<()>;
37 fn flush(&mut self) -> Result<()>;
38}
39
40pub struct StreamingProcessor {
42 buffer_size: usize,
43 chunk_size: usize,
44}
45
46impl StreamingProcessor {
47 pub fn new(chunk_size: usize, buffer_size: usize) -> Self {
48 Self {
49 chunk_size,
50 buffer_size,
51 }
52 }
53
54 pub fn process_streaming<R, W, F>(
56 &self,
57 reader: &mut R,
58 writer: &mut W,
59 processor: F,
60 ) -> Result<usize>
61 where
62 R: StreamingDataReader,
63 W: StreamingDataWriter,
64 F: Fn(&DataChunk) -> Result<DataChunk>,
65 {
66 let mut total_chunks = 0;
67 let mut buffer = VecDeque::new();
68
69 while reader.has_more() {
70 if let Some(chunk) = reader.read_chunk(self.chunk_size)? {
71 let processed = processor(&chunk)?;
72
73 buffer.push_back(processed);
75
76 if buffer.len() >= self.buffer_size {
78 if let Some(buffered) = buffer.pop_front() {
79 writer.write_chunk(&buffered)?;
80 total_chunks += 1;
81 }
82 }
83 }
84 }
85
86 while let Some(chunk) = buffer.pop_front() {
88 writer.write_chunk(&chunk)?;
89 total_chunks += 1;
90 }
91
92 writer.flush()?;
93 Ok(total_chunks)
94 }
95
96 pub fn stream_with_callback<R, F>(&self, reader: &mut R, callback: F) -> Result<usize>
98 where
99 R: StreamingDataReader,
100 F: Fn(&DataChunk) -> Result<()>,
101 {
102 let mut total_chunks = 0;
103
104 while reader.has_more() {
105 if let Some(chunk) = reader.read_chunk(self.chunk_size)? {
106 callback(&chunk)?;
107 total_chunks += 1;
108 }
109 }
110
111 Ok(total_chunks)
112 }
113}
114
115pub struct StreamingChannel {
117 sender: broadcast::Sender<DataChunk>,
118 receiver: broadcast::Receiver<DataChunk>,
119}
120
121impl StreamingChannel {
122 pub fn new(buffer: usize) -> Self {
123 let (sender, receiver) = broadcast::channel(buffer);
124 Self { sender, receiver }
125 }
126
127 pub fn send(&self, chunk: DataChunk) -> Result<usize> {
128 self.sender
129 .send(chunk)
130 .map_err(|e| anyhow::anyhow!("Failed to send chunk: {}", e))
131 }
132
133 pub async fn receive(&mut self) -> Result<DataChunk> {
134 self.receiver
135 .recv()
136 .await
137 .map_err(|e| anyhow::anyhow!("Failed to receive chunk: {}", e))
138 }
139}
140
141pub struct CsvStreamingReader {
143 path: String,
144 current_row: usize,
145 total_rows: Option<usize>,
146 reader: Option<csv::Reader<std::fs::File>>,
147}
148
149impl CsvStreamingReader {
150 pub fn new(path: &str) -> Result<Self> {
151 let reader = csv::Reader::from_path(path)
153 .map_err(|e| anyhow::anyhow!("Failed to open CSV: {}", e))?;
154
155 Ok(Self {
156 path: path.to_string(),
157 current_row: 0,
158 total_rows: None,
159 reader: Some(reader),
160 })
161 }
162
163 fn ensure_reader(&mut self) -> Result<&mut csv::Reader<std::fs::File>> {
164 if self.reader.is_none() {
165 self.reader = Some(
166 csv::Reader::from_path(&self.path)
167 .map_err(|e| anyhow::anyhow!("Failed to open CSV: {}", e))?,
168 );
169 }
170 self.reader
171 .as_mut()
172 .ok_or_else(|| anyhow::anyhow!("Failed to get reader reference"))
173 }
174}
175
176impl StreamingDataReader for CsvStreamingReader {
177 fn read_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>> {
178 let start_row = self.current_row;
179 let reader = self.ensure_reader()?;
180
181 let mut chunk_data: Vec<Vec<String>> = Vec::new();
182 let mut rows_read = 0;
183
184 for result in reader.records().take(chunk_size) {
185 let record = result?;
186 chunk_data.push(record.iter().map(|s| s.to_string()).collect());
187 rows_read += 1;
188 }
189
190 self.current_row = start_row + rows_read;
192
193 if chunk_data.is_empty() {
194 return Ok(None);
195 }
196
197 let column_count = chunk_data.first().map(|r| r.len()).unwrap_or(0);
198
199 let sequence = if chunk_size > 0 {
200 start_row / chunk_size
201 } else {
202 0
203 };
204
205 Ok(Some(DataChunk {
206 sequence,
207 data: chunk_data,
208 metadata: ChunkMetadata {
209 timestamp: chrono::Utc::now().to_rfc3339(),
210 source: Some(self.path.clone()),
211 row_count: rows_read,
212 column_count,
213 },
214 }))
215 }
216
217 fn has_more(&self) -> bool {
218 self.reader.is_some()
220 }
221
222 fn reset(&mut self) -> Result<()> {
223 self.reader = Some(csv::Reader::from_path(&self.path)?);
224 self.current_row = 0;
225 Ok(())
226 }
227}