Skip to main content

xls_rs/
streaming.rs

1//! Real-time data streaming support
2//!
3//! Provides streaming capabilities for processing large datasets incrementally.
4
5use anyhow::Result;
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8use tokio::sync::broadcast;
9
10/// Streaming data chunk
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DataChunk {
13    pub sequence: usize,
14    pub data: Vec<Vec<String>>,
15    pub metadata: ChunkMetadata,
16}
17
18/// Chunk metadata
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ChunkMetadata {
21    pub timestamp: String,
22    pub source: Option<String>,
23    pub row_count: usize,
24    pub column_count: usize,
25}
26
27/// Streaming reader trait
28pub trait StreamingDataReader: Send + Sync {
29    fn read_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>>;
30    fn has_more(&self) -> bool;
31    fn reset(&mut self) -> Result<()>;
32}
33
34/// Streaming writer trait
35pub trait StreamingDataWriter: Send + Sync {
36    fn write_chunk(&mut self, chunk: &DataChunk) -> Result<()>;
37    fn flush(&mut self) -> Result<()>;
38}
39
40/// Streaming processor
41pub struct StreamingProcessor {
42    buffer_size: usize,
43    chunk_size: usize,
44}
45
46impl StreamingProcessor {
47    pub fn new(chunk_size: usize, buffer_size: usize) -> Self {
48        Self {
49            chunk_size,
50            buffer_size,
51        }
52    }
53
54    /// Process data in streaming fashion
55    pub fn process_streaming<R, W, F>(
56        &self,
57        reader: &mut R,
58        writer: &mut W,
59        processor: F,
60    ) -> Result<usize>
61    where
62        R: StreamingDataReader,
63        W: StreamingDataWriter,
64        F: Fn(&DataChunk) -> Result<DataChunk>,
65    {
66        let mut total_chunks = 0;
67        let mut buffer = VecDeque::new();
68
69        while reader.has_more() {
70            if let Some(chunk) = reader.read_chunk(self.chunk_size)? {
71                let processed = processor(&chunk)?;
72
73                // Buffer chunks if needed
74                buffer.push_back(processed);
75
76                // Write when buffer is full
77                if buffer.len() >= self.buffer_size {
78                    if let Some(buffered) = buffer.pop_front() {
79                        writer.write_chunk(&buffered)?;
80                        total_chunks += 1;
81                    }
82                }
83            }
84        }
85
86        // Flush remaining chunks
87        while let Some(chunk) = buffer.pop_front() {
88            writer.write_chunk(&chunk)?;
89            total_chunks += 1;
90        }
91
92        writer.flush()?;
93        Ok(total_chunks)
94    }
95
96    /// Stream data with callback
97    pub fn stream_with_callback<R, F>(&self, reader: &mut R, callback: F) -> Result<usize>
98    where
99        R: StreamingDataReader,
100        F: Fn(&DataChunk) -> Result<()>,
101    {
102        let mut total_chunks = 0;
103
104        while reader.has_more() {
105            if let Some(chunk) = reader.read_chunk(self.chunk_size)? {
106                callback(&chunk)?;
107                total_chunks += 1;
108            }
109        }
110
111        Ok(total_chunks)
112    }
113}
114
115/// Broadcast-based streaming channel
116pub struct StreamingChannel {
117    sender: broadcast::Sender<DataChunk>,
118    receiver: broadcast::Receiver<DataChunk>,
119}
120
121impl StreamingChannel {
122    pub fn new(buffer: usize) -> Self {
123        let (sender, receiver) = broadcast::channel(buffer);
124        Self { sender, receiver }
125    }
126
127    pub fn send(&self, chunk: DataChunk) -> Result<usize> {
128        self.sender
129            .send(chunk)
130            .map_err(|e| anyhow::anyhow!("Failed to send chunk: {}", e))
131    }
132
133    pub async fn receive(&mut self) -> Result<DataChunk> {
134        self.receiver
135            .recv()
136            .await
137            .map_err(|e| anyhow::anyhow!("Failed to receive chunk: {}", e))
138    }
139}
140
141/// CSV streaming reader implementation
142pub struct CsvStreamingReader {
143    path: String,
144    current_row: usize,
145    total_rows: Option<usize>,
146    reader: Option<csv::Reader<std::fs::File>>,
147}
148
149impl CsvStreamingReader {
150    pub fn new(path: &str) -> Result<Self> {
151        // Create reader on initialization
152        let reader = csv::Reader::from_path(path)
153            .map_err(|e| anyhow::anyhow!("Failed to open CSV: {}", e))?;
154
155        Ok(Self {
156            path: path.to_string(),
157            current_row: 0,
158            total_rows: None,
159            reader: Some(reader),
160        })
161    }
162
163    fn ensure_reader(&mut self) -> Result<&mut csv::Reader<std::fs::File>> {
164        if self.reader.is_none() {
165            self.reader = Some(
166                csv::Reader::from_path(&self.path)
167                    .map_err(|e| anyhow::anyhow!("Failed to open CSV: {}", e))?,
168            );
169        }
170        self.reader
171            .as_mut()
172            .ok_or_else(|| anyhow::anyhow!("Failed to get reader reference"))
173    }
174}
175
176impl StreamingDataReader for CsvStreamingReader {
177    fn read_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>> {
178        let start_row = self.current_row;
179        let reader = self.ensure_reader()?;
180
181        let mut chunk_data: Vec<Vec<String>> = Vec::new();
182        let mut rows_read = 0;
183
184        for result in reader.records().take(chunk_size) {
185            let record = result?;
186            chunk_data.push(record.iter().map(|s| s.to_string()).collect());
187            rows_read += 1;
188        }
189
190        // Update current_row after reading
191        self.current_row = start_row + rows_read;
192
193        if chunk_data.is_empty() {
194            return Ok(None);
195        }
196
197        let column_count = chunk_data.first().map(|r| r.len()).unwrap_or(0);
198
199        let sequence = if chunk_size > 0 {
200            start_row / chunk_size
201        } else {
202            0
203        };
204
205        Ok(Some(DataChunk {
206            sequence,
207            data: chunk_data,
208            metadata: ChunkMetadata {
209                timestamp: chrono::Utc::now().to_rfc3339(),
210                source: Some(self.path.clone()),
211                row_count: rows_read,
212                column_count,
213            },
214        }))
215    }
216
217    fn has_more(&self) -> bool {
218        // Simplified - in real implementation, would check file position
219        self.reader.is_some()
220    }
221
222    fn reset(&mut self) -> Result<()> {
223        self.reader = Some(csv::Reader::from_path(&self.path)?);
224        self.current_row = 0;
225        Ok(())
226    }
227}